1 module game_core.job_updater; 2 3 import bubel.ecs.std; 4 import bubel.ecs.vector; 5 import bubel.ecs.atomic; 6 import bubel.ecs.manager; 7 8 import mmutils.thread_pool; 9 10 version(LDC) 11 { 12 import ldc.attributes; 13 } 14 else 15 { 16 struct optStrategy { 17 string strategy; 18 } 19 } 20 21 version(Android) 22 { 23 alias pthread_key_t = uint; 24 25 extern (C) int pthread_key_create(pthread_key_t *, void* function(void *)) @nogc nothrow; 26 extern (C) int pthread_key_delete(pthread_key_t) @nogc nothrow; 27 extern (C) void* pthread_getspecific(pthread_key_t) @nogc nothrow; 28 extern (C) int pthread_setspecific(pthread_key_t, const void *) @nogc nothrow; 29 } 30 else version(WebAssembly) 31 { 32 alias pthread_key_t = uint; 33 34 extern (C) int pthread_key_create(pthread_key_t *, void* function(void *)) @nogc nothrow; 35 extern (C) int pthread_key_delete(pthread_key_t) @nogc nothrow; 36 extern (C) void* pthread_getspecific(pthread_key_t) @nogc nothrow; 37 extern (C) int pthread_setspecific(pthread_key_t, const void *) @nogc nothrow; 38 39 extern (C) void emscripten_main_thread_process_queued_calls(); 40 } 41 42 struct ECSJobUpdater 43 { 44 45 this(uint threads) 46 { 47 onCreate(threads); 48 } 49 50 ~this() 51 { 52 //wait for end of jobs 53 pool.waitThreads(); 54 //dispose jobs array 55 if(jobs)Mallocator.dispose(jobs); 56 //free TLS data 57 version(WebAssembly)pthread_key_delete(tls_key); 58 else version(Android)pthread_key_delete(tls_key); 59 } 60 61 struct Group 62 { 63 ~this() nothrow 64 { 65 66 } 67 68 JobsGroup group; 69 //each group can have up to 128 jobs 70 JobData[128] jobs; 71 JobCaller[128] callers; 72 uint count = 0; 73 string name; 74 75 //mmutils.ThreadPool uses system of dependency where dependencies are added for child groups. 76 //Parent group has atomic counter and after completition it will add job groups dependant on it. 77 void dependantOn(Group* dependency) 78 { 79 group.dependantOn(&dependency.group); 80 } 81 82 //add group to pool 83 void start() 84 { 85 group.thPool.addGroupAsynchronous(&group); 86 } 87 88 //add jobs slice to group structure 89 void build(ThreadPool* pool) 90 { 91 group.thPool = pool; 92 group.jobs = jobs[0..count]; 93 } 94 95 //clear jobs 96 void clear() 97 { 98 group = JobsGroup("name",null); 99 count = 0; 100 } 101 102 //add single job to group 103 void add(JobCaller caller) 104 { 105 callers[count] = caller; 106 jobs[count] = JobData(&callers[count].callJob,name); 107 count++; 108 } 109 } 110 111 //initialize thread pool and data 112 void onCreate(uint threads_count) 113 { 114 //create TLS for Android and WebAsssembly 115 version(WebAssembly)pthread_key_create(&tls_key, null); 116 else version(Android)pthread_key_create(&tls_key, null); 117 118 pool.initialize(); 119 thread_data = pool.registerExternalThread(); 120 pool.setThreadsNum(threads_count); 121 122 jobs = Mallocator.makeArray!Group(256); 123 } 124 125 //this function are providingn ThreadID to ECS. BubelECS is expecting ThreadID to be linear ID in range (0;ThreadsCount) 126 uint getThreadID() @nogc nothrow 127 { 128 version(WebAssembly)return cast(int)pthread_getspecific(tls_key); 129 else version(Android)return cast(int)pthread_getspecific(tls_key); 130 else return thread_id; 131 } 132 133 //clear jobs data 134 void begin() 135 { 136 call_jobs.clear(); 137 138 foreach(ref job;jobs) 139 { 140 job.clear(); 141 } 142 143 last_job.clear(); 144 } 145 146 //execute jobs 147 void call() 148 { 149 //if there is no work return 150 if(last_job.group.getDependenciesWaitCount() == 0)return; 151 if(call_jobs.length == 0)return; 152 153 //set last job 154 groupEndJobs[0] = JobData(&releaseMainThread, "Stop Threads", null, null); 155 156 //add job to group 157 last_job.group.jobs = groupEndJobs; 158 //set thread pool pointer 159 last_job.group.thPool = &pool; 160 //last job should be called on main thread. It prevent some issues with death loops. 161 last_job.group.executeOnThreadNum = 0; 162 163 //start jobs without dependencies 164 foreach(job;call_jobs) 165 { 166 job.start(); 167 } 168 169 //add main thread to pool. It will be released in last job. 170 thread_data.threadStartFunc(); 171 } 172 173 //callback that will release main thread 174 void releaseMainThread(ThreadData* th_data, JobData* data) 175 { 176 pool.releaseExternalThreads(); 177 } 178 179 static struct JobCaller 180 { 181 //ECS job 182 EntityManager.Job* job; 183 //pointer to parent 184 ECSJobUpdater* updater; 185 //job ID 186 uint id; 187 188 //called by external thread 189 void callJob(ThreadData* th_data, JobData* data) 190 { 191 version(WebAssembly) 192 { 193 pthread_setspecific(tls_key, cast(void*)th_data.threadId); 194 if(th_data.threadId == 0) 195 { 196 //this emscripten call is required to make multithreading working 197 emscripten_main_thread_process_queued_calls(); 198 job.execute(); 199 emscripten_main_thread_process_queued_calls(); 200 } 201 else job.execute(); 202 } 203 else version(Android) 204 { 205 pthread_setspecific(tls_key, cast(void*)th_data.threadId); 206 job.execute(); 207 } 208 else 209 { 210 //set thread id 211 updater.thread_id = th_data.threadId; 212 //execture job. It's the function from BubelECS 213 job.execute(); 214 } 215 } 216 } 217 218 //this is callback passed to EntityManager. EntityManager will call this for every jobs group. Every system will generate one group. 219 void dispatch(EntityManager.JobGroup group) 220 { 221 //check if group isn't empty 222 if(group.jobs.length == 0) 223 { 224 return; 225 } 226 227 //add name for job. Used for traces. 228 jobs[group.id].name = cast(string)group.caller.system.name; 229 230 //add jobs to group 231 foreach(ref job;group.jobs) 232 { 233 uint index = 0; 234 if(job.callers.length)index = job.callers[0].system_id; 235 JobCaller caller; 236 caller.updater = &this; 237 caller.job = &job; 238 caller.id = index; 239 jobs[group.id].add(caller); 240 } 241 242 //build group 243 jobs[group.id].build(&pool); 244 245 uint deps = cast(uint)group.dependencies.length; 246 247 //add dependencies 248 foreach(dep;group.dependencies) 249 { 250 if(jobs[dep.id].count && dep.caller.system.willExecute && dep.caller.system.enabled)jobs[group.id].dependantOn(&jobs[dep.id]); 251 else deps--; 252 } 253 254 //set as job without dependencies if it hasn't any 255 if(deps == 0) 256 { 257 call_jobs.add(&jobs[group.id]); 258 } 259 260 //last job is dependant on all jobs so it will be called after everything will be finished 261 last_job.dependantOn(&jobs[group.id]); 262 } 263 264 //Webassembly version works properly only when there is no thread local data (static variables). 265 //Because of that I'm using pthread tls instead of D. TLS is used only for storing ThreadID 266 version(WebAssembly) 267 { 268 __gshared pthread_key_t tls_key; 269 } 270 else version(Android) 271 { 272 __gshared pthread_key_t tls_key; 273 } 274 else static uint thread_id = 0; 275 276 //thread pool 277 ThreadPool pool; 278 //thread data used for main thread 279 ThreadData* thread_data; 280 281 //array of jobs 282 Group[] jobs; 283 //list of jobs which should be called on frame start as they have no dependencies 284 Vector!(Group*) call_jobs; 285 //last job group is used for releasing main thread from pool 286 Group last_job; 287 //last_job group has one job 288 JobData[1] groupEndJobs; 289 }