1 module game_core.job_updater;
2 
3 import bubel.ecs.std;
4 import bubel.ecs.vector;
5 import bubel.ecs.atomic;
6 import bubel.ecs.manager;
7 
8 import mmutils.thread_pool;
9 
10 version(LDC)
11 {
12     import ldc.attributes;
13 }
14 else 
15 {
16     struct optStrategy {
17         string strategy;
18     }
19 }
20 
21 version(Android)
22 {
23 	alias pthread_key_t = uint;
24 
25 	extern (C) int pthread_key_create(pthread_key_t *, void* function(void *)) @nogc nothrow;
26 	extern (C) int pthread_key_delete(pthread_key_t) @nogc nothrow;
27 	extern (C) void* pthread_getspecific(pthread_key_t) @nogc nothrow;
28 	extern (C) int pthread_setspecific(pthread_key_t, const void *) @nogc nothrow;
29 }
30 else version(WebAssembly)
31 {
32 	alias pthread_key_t = uint;
33 
34 	extern (C) int pthread_key_create(pthread_key_t *, void* function(void *)) @nogc nothrow;
35 	extern (C) int pthread_key_delete(pthread_key_t) @nogc nothrow;
36 	extern (C) void* pthread_getspecific(pthread_key_t) @nogc nothrow;
37 	extern (C) int pthread_setspecific(pthread_key_t, const void *) @nogc nothrow;
38 
39     extern (C) void emscripten_main_thread_process_queued_calls();
40 }
41 
42 struct ECSJobUpdater
43 {
44 
45     this(uint threads)
46     {
47         onCreate(threads);
48     }
49 
50     ~this()
51     {
52         //wait for end of jobs
53         pool.waitThreads();
54         //dispose jobs array
55         if(jobs)Mallocator.dispose(jobs);
56         //free TLS data
57         version(WebAssembly)pthread_key_delete(tls_key);
58         else version(Android)pthread_key_delete(tls_key);
59     }
60 
61     struct Group
62     {
63         ~this() nothrow
64         {
65 
66         }
67 
68         JobsGroup group;
69         //each group can have up to 128 jobs
70         JobData[128] jobs;
71         JobCaller[128] callers;
72         uint count = 0;
73         string name;
74 
75         //mmutils.ThreadPool uses system of dependency where dependencies are added for child groups. 
76         //Parent group has atomic counter and after completition it will add job groups dependant on it.
77         void dependantOn(Group* dependency)
78         {
79             group.dependantOn(&dependency.group);
80         }
81 
82         //add group to pool
83         void start()
84         {
85             group.thPool.addGroupAsynchronous(&group);
86         }
87 
88         //add jobs slice to group structure
89         void build(ThreadPool* pool)
90         {
91             group.thPool = pool;
92             group.jobs = jobs[0..count];
93         }
94 
95         //clear jobs
96         void clear()
97         {
98             group = JobsGroup("name",null);
99             count = 0;
100         }
101 
102         //add single job to group
103         void add(JobCaller caller)
104         {
105             callers[count] = caller;
106             jobs[count] = JobData(&callers[count].callJob,name);
107             count++;
108         }
109     }
110 
111     //initialize thread pool and data
112     void onCreate(uint threads_count)
113     {
114         //create TLS for Android and WebAsssembly
115         version(WebAssembly)pthread_key_create(&tls_key, null);
116         else version(Android)pthread_key_create(&tls_key, null);
117 
118         pool.initialize();
119         thread_data = pool.registerExternalThread();
120         pool.setThreadsNum(threads_count);
121 
122         jobs = Mallocator.makeArray!Group(256);
123     }
124 
125     //this function are providingn ThreadID to ECS. BubelECS is expecting ThreadID to be linear ID in range (0;ThreadsCount)
126     uint getThreadID() @nogc nothrow
127     {
128         version(WebAssembly)return cast(int)pthread_getspecific(tls_key);
129         else version(Android)return cast(int)pthread_getspecific(tls_key);
130         else return thread_id;
131     }
132 
133     //clear jobs data
134     void begin()
135     {
136         call_jobs.clear();
137 
138         foreach(ref job;jobs)
139         {
140             job.clear();
141         }
142 
143         last_job.clear();
144     }
145 
146     //execute jobs
147     void call()
148     {
149         //if there is no work return
150         if(last_job.group.getDependenciesWaitCount() == 0)return;
151         if(call_jobs.length == 0)return;
152         
153         //set last job
154 		groupEndJobs[0] = JobData(&releaseMainThread, "Stop Threads", null, null);
155         
156         //add job to group
157         last_job.group.jobs = groupEndJobs;
158         //set thread pool pointer
159         last_job.group.thPool = &pool;
160         //last job should be called on main thread. It prevent some issues with death loops.
161         last_job.group.executeOnThreadNum = 0;
162 
163         //start jobs without dependencies
164         foreach(job;call_jobs)
165         {
166             job.start();
167         }
168     
169         //add main thread to pool. It will be released in last job.
170         thread_data.threadStartFunc();
171     }
172     
173     //callback that will release main thread
174     void releaseMainThread(ThreadData* th_data, JobData* data)
175     {
176         pool.releaseExternalThreads();
177     }
178     
179     static struct JobCaller
180     {
181         //ECS job
182         EntityManager.Job* job;
183         //pointer to parent
184         ECSJobUpdater* updater;
185         //job ID
186         uint id;
187 
188         //called by external thread
189         void callJob(ThreadData* th_data, JobData* data)
190         {
191             version(WebAssembly)
192             {
193                 pthread_setspecific(tls_key, cast(void*)th_data.threadId);
194                 if(th_data.threadId == 0)
195                 {
196                     //this emscripten call is required to make multithreading working
197                     emscripten_main_thread_process_queued_calls();
198                     job.execute();
199                     emscripten_main_thread_process_queued_calls();
200                 }
201                 else job.execute();
202             }
203             else version(Android)
204             {
205                 pthread_setspecific(tls_key, cast(void*)th_data.threadId);
206                 job.execute();
207             }
208             else 
209             {
210                 //set thread id
211                 updater.thread_id = th_data.threadId;
212                 //execture job. It's the function from BubelECS
213                 job.execute();
214             }
215         }
216     }
217 
218     //this is callback passed to EntityManager. EntityManager will call this for every jobs group. Every system will generate one group.
219     void dispatch(EntityManager.JobGroup group)
220     {
221         //check if group isn't empty
222         if(group.jobs.length == 0)
223         {
224             return;
225         }
226 
227         //add name for job. Used for traces.
228         jobs[group.id].name = cast(string)group.caller.system.name;
229 
230         //add jobs to group
231         foreach(ref job;group.jobs)
232         {
233             uint index = 0;
234             if(job.callers.length)index = job.callers[0].system_id;
235             JobCaller caller;
236             caller.updater = &this;
237             caller.job = &job;
238             caller.id = index;
239             jobs[group.id].add(caller);
240         }
241         
242         //build group
243         jobs[group.id].build(&pool);
244 
245         uint deps = cast(uint)group.dependencies.length;
246 
247         //add dependencies
248         foreach(dep;group.dependencies)
249         {
250             if(jobs[dep.id].count && dep.caller.system.willExecute && dep.caller.system.enabled)jobs[group.id].dependantOn(&jobs[dep.id]);
251             else deps--;
252         }
253 
254         //set as job without dependencies if it hasn't any
255         if(deps == 0)
256         {
257             call_jobs.add(&jobs[group.id]);
258         }
259 
260         //last job is dependant on all jobs so it will be called after everything will be finished
261         last_job.dependantOn(&jobs[group.id]);
262     }
263 
264     //Webassembly version works properly only when there is no thread local data (static variables).
265     //Because of that I'm using pthread tls instead of D. TLS is used only for storing ThreadID
266     version(WebAssembly)
267     {
268         __gshared pthread_key_t tls_key;
269     }
270     else version(Android)
271     {
272         __gshared pthread_key_t tls_key;
273     }
274     else static uint thread_id = 0;
275 
276     //thread pool
277     ThreadPool pool;
278     //thread data used for main thread
279     ThreadData* thread_data;
280     
281     //array of jobs
282     Group[] jobs;
283     //list of jobs which should be called on frame start as they have no dependencies
284     Vector!(Group*) call_jobs;
285     //last job group is used for releasing main thread from pool
286     Group last_job;
287     //last_job group has one job
288     JobData[1] groupEndJobs;
289 }