1 module mmutils.thread_pool;
3 import std.algorithm : map;
5 version = MM_NO_LOGS; // Disable log creation
6 //version = MM_USE_POSIX_THREADS; // Use posix threads insted of standard library, required for betterC
8 version (WebAssembly)
9 {
10 	version = MM_NO_LOGS;
11 	version = MM_USE_POSIX_THREADS;
12 	extern(C) struct FILE
13 	{
15 	}
16 }
17 else 
18 {
19 	import core.stdc.stdio;
20 }
22 //////////////////////////////////////////////
23 /////////////// BetterC Support //////////////
24 //////////////////////////////////////////////
26 version (D_BetterC)
27 {
28 	version (Posix) version = MM_USE_POSIX_THREADS;
30     extern (C) void free(void*) @nogc nothrow @system;
31     extern (C) void* malloc(size_t size) @nogc nothrow @system;
32     extern (C) void* realloc(void*, size_t size) @nogc nothrow @system;
33     extern (C) void* memcpy(return void*, scope const void*, size_t size) @nogc nothrow @system;
35 	//hacks for LDC
36 	/*extern (C) __gshared int _d_eh_personality(int, int, size_t, void*, void*)
37 	{
38 		return 0;
39 	}
41 	extern (C) __gshared void _d_eh_resume_unwind(void*)
42 	{
43 		return;
44 	}
46 	extern (C) void* _d_allocmemory(size_t sz)
47 	{
48 		return malloc(sz);
49 	}*/
50 }
51 else
52 {
53 	import core.stdc.stdlib;
54 	import core.stdc.string;
55 }
57 //////////////////////////////////////////////
58 /////////////// Atomics //////////////////////
59 //////////////////////////////////////////////
61 version (ECSEmscripten)
62 {
63     import std.traits;
65     enum MemoryOrder
66     {
67         acq,
68         acq_rel,
69         raw,
70         rel,
71         seq
72     }
74     extern (C) ubyte emscripten_atomic_cas_u8(void* addr, ubyte oldVal, ubyte newVal) @nogc nothrow pure;
75     extern (C) ushort emscripten_atomic_cas_u16(void* addr, ushort oldVal, ushort newVal) @nogc nothrow pure;
76     extern (C) uint emscripten_atomic_cas_u32(void* addr, uint oldVal, uint newVal) @nogc nothrow pure;
78     extern (C) ubyte emscripten_atomic_load_u8(const void* addr) @nogc nothrow pure;
79     extern (C) ushort emscripten_atomic_load_u16(const void* addr) @nogc nothrow pure;
80     extern (C) uint emscripten_atomic_load_u32(const void* addr) @nogc nothrow pure;
82     extern (C) ubyte emscripten_atomic_store_u8(void* addr, ubyte val) @nogc nothrow pure;
83     extern (C) ushort emscripten_atomic_store_u16(void* addr, ushort val) @nogc nothrow pure;
84     extern (C) uint emscripten_atomic_store_u32(void* addr, uint val) @nogc nothrow pure;
86     extern (C) ubyte emscripten_atomic_add_u8(void* addr, ubyte val) @nogc nothrow pure;
87     extern (C) ushort emscripten_atomic_add_u16(void* addr, ushort val) @nogc nothrow pure;
88     extern (C) uint emscripten_atomic_add_u32(void* addr, uint val) @nogc nothrow pure;
90     extern (C) ubyte emscripten_atomic_sub_u8(void* addr, ubyte val) @nogc nothrow pure;
91     extern (C) ushort emscripten_atomic_sub_u16(void* addr, ushort val) @nogc nothrow pure;
92     extern (C) uint emscripten_atomic_sub_u32(void* addr, uint val) @nogc nothrow pure;
94     public pure nothrow @nogc Unqual!T atomicOp(string op, T, V1)(ref shared T val, V1 mod)
95     {
96         static if (op == "+=")
97         {
98             static if (is(T == byte) || is(T == ubyte))
99                 return cast(Unqual!T)(emscripten_atomic_add_u8(cast(void*)&val,
100                         cast(Unqual!T) mod) + 1);
101             else static if (is(T == short) || is(T == ushort))
102                 return cast(Unqual!T)(emscripten_atomic_add_u16(cast(void*)&val,
103                         cast(Unqual!T) mod) + 1);
104             else static if (is(T == int) || is(T == uint))
105                 return cast(Unqual!T)(emscripten_atomic_add_u32(cast(void*)&val,
106                         cast(Unqual!T) mod) + 1);
107             else
108                 static assert(0);
109         }
110         else static if (op == "-=")
111         {
112             static if (is(T == byte) || is(T == ubyte))
113                 return cast(Unqual!T)(emscripten_atomic_sub_u8(cast(void*)&val,
114                         cast(Unqual!T) mod) - 1);
115             else static if (is(T == short) || is(T == ushort))
116                 return cast(Unqual!T)(emscripten_atomic_sub_u16(cast(void*)&val,
117                         cast(Unqual!T) mod) - 1);
118             else static if (is(T == int) || is(T == uint))
119                 return cast(Unqual!T)(emscripten_atomic_sub_u32(cast(void*)&val,
120                         cast(Unqual!T) mod) - 1);
121             else
122                 static assert(0);
123         }
124     }
126     public pure nothrow @nogc @trusted void atomicStore(MemoryOrder ms = MemoryOrder.seq, T, V)(ref T val,
127             V newval)
128     {
129         alias UT = Unqual!T;
130         static if (is(UT == bool) || is(UT == byte) || is(UT == ubyte))
131             emscripten_atomic_store_u8(cast(void*)&val, cast(UT) newval);
132         else static if (is(UT == short) || is(UT == ushort))
133             emscripten_atomic_store_u16(cast(void*)&val, cast(UT) newval);
134         else static if (is(UT == int) || is(UT == uint))
135             emscripten_atomic_store_u32(cast(void*)&val, cast(UT) newval);
136         else
137             static assert(0);
138     }
140     public pure nothrow @nogc @trusted T atomicLoad(MemoryOrder ms = MemoryOrder.seq, T)(
141             ref const T val)
142     {
143         alias UT = Unqual!T;
144         static if (is(UT == bool))
145             return emscripten_atomic_load_u8(cast(const void*)&val) != 0;
146         else static if (is(UT == byte) || is(UT == ubyte))
147             return emscripten_atomic_load_u8(cast(const void*)&val);
148         else static if (is(UT == short) || is(UT == ushort))
149             return emscripten_atomic_load_u16(cast(const void*)&val);
150         else static if (is(UT == int) || is(UT == uint))
151             return emscripten_atomic_load_u32(cast(const void*)&val);
152         else
153             static assert(0);
154     }
156     public pure nothrow @nogc @trusted bool cas(MemoryOrder succ = MemoryOrder.seq,
157             MemoryOrder fail = MemoryOrder.seq, T, V1, V2)(T* here, V1 ifThis, V2 writeThis)
158     {
159         alias UT = Unqual!T;
160         static if (is(UT == bool))
161             return emscripten_atomic_cas_u8(cast(void*) here,
162                     cast(Unqual!T) ifThis, cast(Unqual!T) writeThis) == ifThis;
163         else static if (is(UT == byte) || is(UT == ubyte))
164             return emscripten_atomic_cas_u8(cast(void*) here,
165                     cast(Unqual!T) ifThis, cast(Unqual!T) writeThis) == ifThis;
166         else static if (is(UT == short) || is(UT == ushort))
167             return emscripten_atomic_cas_u16(cast(void*) here,
168                     cast(Unqual!T) ifThis, cast(Unqual!T) writeThis) == ifThis;
169         else static if (is(UT == int) || is(UT == uint))
170             return emscripten_atomic_cas_u32(cast(void*) here,
171                     cast(Unqual!T) ifThis, cast(Unqual!T) writeThis) == ifThis;
172         else
173             static assert(0);
174     }
175 }
176 else
177 {
178     public import core.atomic;
179 }
181 //////////////////////////////////////////////////
182 //////////////////// Allocator ///////////////////
183 //////////////////////////////////////////////////
184 T* makeVar(T)(T init)
185 {
186 	T* el = cast(T*) malloc(T.sizeof);
187 	memcpy(el, &init, T.sizeof);
188 	return el;
189 }
191 T* makeVar(T)()
192 {
193 	T init;
194 	T* el = cast(T*) malloc(T.sizeof);
195 	memcpy(el, &init, T.sizeof);
196 	return el;
197 }
199 T[] makeVarArray(T)(int num, T init = T.init)
200 {
201 	T* ptr = cast(T*) malloc(num * (T.sizeof + T.sizeof % T.alignof));
202 	T[] arr = ptr[0 .. num];
203 	foreach (ref el; arr)
204 	{
205 		memcpy(&el, &init, T.sizeof);
206 	}
207 	return arr;
208 }
210 void disposeVar(T)(T* var)
211 {
212 	free(var);
213 }
215 void disposeArray(T)(T[] var)
216 {
217 	free(var.ptr);
218 }
219 //////////////////////////////////////////////
220 //////////////////// Timer ///////////////////
221 //////////////////////////////////////////////
223 version (WebAssembly)
224 {
225 	alias int time_t;
226 	alias int clockid_t;
227 	enum CLOCK_REALTIME = 0;
229 	struct timespec
230 	{
231 		time_t  tv_sec;
232 		int     tv_nsec;
233 	}
235 	extern(C) int clock_gettime(clockid_t, timespec*) @nogc nothrow @system;
237 	extern(C) double emscripten_get_now() @nogc nothrow @system;
239 }
241 /// High precison timer
242 long useconds()
243 {
244 	version (WebAssembly)
245 	{
246 		return cast(long)(emscripten_get_now() * 1000.0); 
247 	}
248 	else version (Posix)
249 	{
250 		import core.sys.posix.sys.time : gettimeofday, timeval;
252 		timeval t;
253 		gettimeofday(&t, null);
254 		return t.tv_sec * 1_000_000 + t.tv_usec;
255 	}
256 	else version (Windows)
257 	{
258 		//TODO: implement timer on windows
259 		/*import core.sys.windows.windows : QueryPerformanceFrequency;
261 		__gshared double mul = -1;
262 		if (mul < 0)
263 		{
264 			long frequency;
265 			int ok = QueryPerformanceFrequency(&frequency);
266 			assert(ok);
267 			mul = 1_000_000.0 / frequency;
268 		}
269 		long ticks;
270 		int ok = QueryPerformanceCounter(&ticks);
271 		assert(ok);
272 		return cast(long)(ticks * mul);*/
273 		return 0;
274 	}
275 	else
276 	{
277 		static assert("OS not supported.");
278 	}
279 }
281 //////////////////////////////////////////////
282 //////////////////// Pause ///////////////////
283 //////////////////////////////////////////////
285 void instructionPause()
286 {
287 	version (X86_64)
288 	{
289 		version (LDC)
290 		{
291 			import ldc.gccbuiltins_x86 : __builtin_ia32_pause;
293 			__builtin_ia32_pause();
294 		}
295 		else version(GNU)
296 		{
297 			import gcc.builtins;
299 			__builtin_ia32_pause();
300 		}
301 		else version (DigitalMars)
302 		{
303 			asm
304 			{
305 				rep;
306 				nop;
307 			}
308 		}
309 		else
310 		{
311 			static assert(0);
312 		}
313 	}
314 	else version (Android)
315 	{
316 		version(LDC)
317 		{
318 			import ldc.attributes;
319 			@optStrategy("none")
320 			static void nop()
321 			{
322 				int i;
323 				i++;
324 			}
325 			nop();
326 		}
327 		else static assert(0);
328 	}
329 	else version(WebAssembly)
330 	{
331 		version(LDC)
332 		{
333 			import ldc.attributes;
334 			@optStrategy("none")
335 			static void nop()
336 			{
337 				int i;
338 				i++;
339 }
340 			nop();
341 		}
342 		else static assert(0);
343 	}
344 	else static assert(0);
345 }
347 //////////////////////////////////////////////
348 ///////////// Semaphore + Thread /////////////
349 //////////////////////////////////////////////
351 version (MM_USE_POSIX_THREADS)
352 {
353 	version (WebAssembly)
354 	{
355 		extern(C):
357 		struct pthread_attr_t 
358 		{
359 			union 
360 			{
361 				int[10] __i;
362 				uint[10] __s;
363 		}
364 		}
366 		struct pthread_t
367 		{
368 			void* p;
369 			uint x;
370 		}
372 		// pthread
373 		int pthread_create(pthread_t*, in pthread_attr_t*, void* function(void*), void*);
374 		int pthread_join(pthread_t, void**);
375 		void pthread_exit(void *retval);
377 		struct sem_t
378 		{
379 			shared int[4] __val;
380 		}
381 		int sem_init(sem_t*, int, uint);
382 		int sem_wait(sem_t*);
383 		int sem_trywait(sem_t*);
384 		int sem_post(sem_t*);
385 		int sem_destroy(sem_t*);
386 		int sem_timedwait(sem_t* sem, const timespec* abstime);
387 	}
388 	else version (Posix)
389 	{
390 		import core.sys.posix.pthread;
391 		import core.sys.posix.semaphore;
392 	}
393 	else version (Windows)
394 	{
395 	extern (C):
396 		alias uint time_t;
397 		struct pthread_attr_t 
398 		{
400 		}
402 		struct pthread_t
403 		{
404 			void* p;
405 			uint x;
406 		}
408 		struct timespec
409 		{
410 			time_t tv_sec;
411 			int tv_nsec;
412 		}
414 		// pthread
415 		int pthread_create(pthread_t*, in pthread_attr_t*, void* function(void*), void*);
416 		int pthread_join(pthread_t, void**);
417 		void pthread_exit(void *retval);
419 		// semaphore.h
420 		alias sem_t = void*;
421 		int sem_init(sem_t*, int, uint);
422 		int sem_wait(sem_t*);
423 		int sem_trywait(sem_t*);
424 		int sem_post(sem_t*);
425 		int sem_destroy(sem_t*);
426 		int sem_timedwait(sem_t* sem, const timespec* abstime);
427 	}
428 	else
429 	{
430 		static assert(false);
431 	}
433 	struct Semaphore
434 	{
435 		sem_t mutex;
437 		void initialize()
438 		{
439 			sem_init(&mutex, 0, 0);
440 		}
442 		void wait()
443 		{
444 			int ret = sem_wait(&mutex);
445 			assert(ret == 0);
446 		}
448 		bool tryWait()
449 		{
450 			int ret = sem_trywait(&mutex);
451 			return (ret == 0);
452 		}
454 		bool timedWait(int usecs)
455 		{
456 			timespec tv;
457 			// if there is no such a function look at it: https://stackoverflow.com/questions/5404277/porting-clock-gettime-to-windows
458 			clock_gettime(CLOCK_REALTIME, &tv);
459 			tv.tv_sec += usecs / 1_000_000;
460 			tv.tv_nsec += (usecs % 1_000_000) * 1_000;
462 			int ret = sem_timedwait(&mutex, &tv);
463 			return (ret == 0);
464 		}
466 		void post()
467 		{
468 			int ret = sem_post(&mutex);
469 			assert(ret >= 0);
470 		}
472 		void destroy()
473 		{
474 			sem_destroy(&mutex);
475 		}
476 	}
478 	private extern (C) void* threadRunFunc(void* threadVoid)
479 	{
480 		Thread* th = cast(Thread*) threadVoid;
482 		th.threadStart();
484 		pthread_exit(null);
485 		return null;
486 	}
488 	struct Thread
489 	{
490 		alias DG = void delegate();
492 		DG threadStart;
493 		pthread_t handle;
495 		void start(DG dg)
496 		{
497 			threadStart = dg;
498 			int err = pthread_create(&handle, null, &threadRunFunc, cast(void*)&this);
499 			if(err)handle = pthread_t();
500 			//assert(ok == 0);
501 		}
503 		void join()
504 		{
505 			pthread_join(handle, null);
506 			handle = handle.init;
507 			threadStart = null;
508 		}
509 	}
510 }
511 else version(D_BetterC)
512 {
513 	version(Windows)
514 	{
515 		import core.stdc.stdint : uintptr_t;
516 		import core.sys.windows.windows;
517 		extern (Windows) alias btex_fptr = uint function(void*);
518 		extern (C) uintptr_t _beginthreadex(void*, uint, btex_fptr, void*, uint, uint*) nothrow @nogc;
520 		struct Semaphore
521 		{
522 			HANDLE handle;
524 			void initialize()
525 			{
526 				handle = CreateSemaphoreA( null, 0, int.max, null );
527             	assert ( handle != handle.init );
528                 //throw new SyncError( "Unable to create semaphore" );
529 			}
531 			void wait()
532 			{
533 				DWORD rc = WaitForSingleObject( handle, INFINITE );
534 				//int ret = sem_wait(&mutex);
535 				assert(rc == WAIT_OBJECT_0);
536 			}
538 			bool tryWait()
539 			{
540 				switch ( WaitForSingleObject( handle, 0 ) )
541 				{
542 				case WAIT_OBJECT_0:
543 					return true;
544 				case WAIT_TIMEOUT:
545 					return false;
546 				default:
547 					assert(0);//throw new SyncError( "Unable to wait for semaphore" );
548 				}
549 			}
551 			bool timedWait(int usecs)
552 			{
553 				/*timespec tv;
554 				// if there is no such a function look at it: https://stackoverflow.com/questions/5404277/porting-clock-gettime-to-windows
555 				clock_gettime(CLOCK_REALTIME, &tv);
556 				tv.tv_sec += usecs / 1_000_000;
557 				tv.tv_nsec += (usecs % 1_000_000) * 1_000;
559 				int ret = sem_timedwait(&mutex, &tv);
560 				return (ret == 0);*/
562 				switch ( WaitForSingleObject( handle, cast(uint) usecs / 1000 ) )
563 				{
564 				case WAIT_OBJECT_0:
565 					return true;
566 				case WAIT_TIMEOUT:
567 					return false;
568 				default:
569 					assert(0, "Unable to wait for semaphore" );
570 				}
571 			}
573 			void post()
574 			{
575 				assert(ReleaseSemaphore( handle, 1, null ));
576 			}
578 			void destroy()
579 			{
580 				BOOL rc = CloseHandle( handle );
581             	assert( rc, "Unable to destroy semaphore" );
582 			}
583 		}
585 		private extern (Windows) uint threadRunFunc(void* threadVoid)
586 		{
587 			Thread* th = cast(Thread*) threadVoid;
589 			th.threadStart();
591 			ExitThread(0);
592 			return 0;
593 		}
595 		struct Thread
596 		{
597 			alias DG = void delegate();
599 			DG threadStart;
600 			HANDLE handle;
602 			void start(DG dg)
603 			{
604 				threadStart = dg;
605 				handle = cast(HANDLE) _beginthreadex( null, 0, &threadRunFunc, cast(void*)&this, 0, null );
606 			}
608 			void join()
609 			{
610 				if ( WaitForSingleObject( handle, INFINITE ) == WAIT_OBJECT_0 )assert(0);
611 				CloseHandle( handle );
613 				handle = handle.init;
614 				threadStart = null;
615 			}
616 		}
617 	}
618 	else 
619 	{
620 		static assert(0, "Platform is unsupported in betterC mode!");
621 	}
622 }
623 else
624 {
625 	import core.thread : D_Thread = Thread;
626 	import core.sync.semaphore : D_Semaphore = Semaphore;
627 	import core.time : dur;
628 	import std.experimental.allocator;
629 	import std.experimental.allocator.mallocator;
631 	struct Semaphore
632 	{
633 		D_Semaphore sem;
635 		void initialize()
636 		{
637 			sem = Mallocator.instance.make!D_Semaphore();
638 		}
640 		void wait()
641 		{
642 			sem.wait();
643 		}
645 		bool tryWait()
646 		{
647 			return sem.tryWait();
648 		}
650 		bool timedWait(int usecs)
651 		{
652 			return sem.wait(dur!"usecs"(usecs));
653 		}
655 		void post()
656 		{
657 			sem.notify();
658 		}
660 		void destroy()
661 		{
662 			Mallocator.instance.dispose(sem);
663 		}
664 	}
666 	struct Thread
667 	{
668 		alias DG = void delegate();
670 		DG threadStart;
671 		D_Thread thread;
673 		void start(DG dg)
674 		{
675 			thread = Mallocator.instance.make!D_Thread(dg);
676 			thread.start();
677 		}
679 		void join()
680 		{
681 			thread.join();
682 		}
683 	}
684 }
686 //////////////////////////////////////////////
687 ///////////////// ThreadPool /////////////////
688 //////////////////////////////////////////////
690 private enum gMaxThreadsNum = 64;
692 alias JobDelegate = void delegate(ThreadData*, JobData*);
694 // Structure to store job start and end time
695 struct JobLog
696 {
697 	string name; /// Name of job
698 	ulong time; /// Time started (us)
699 	ulong duration; /// Took time (us)
700 }
702 /// First in first out queue with atomic lock
703 struct JobQueue
704 {
705 	alias LockType = int;
706 	align(64) shared LockType lock; /// Lock for accesing list of Jobs
707 	align(64) JobData* first; /// Fist element in list of Jobs
709 	/// Check if empty without locking, doesn't give guarantee that list is truly empty
710 	bool emptyRaw()
711 	{
712 		bool isEmpty = first == null;
713 		return isEmpty;
714 	}
716 	/// Check if empty
717 	bool empty()
718 	{
719 		while (!cas(&lock, cast(LockType) false, cast(LockType) true))
720 			instructionPause();
722 		bool isEmpty = first == null;
723 		atomicStore!(MemoryOrder.rel)(lock, cast(LockType) false);
724 		return isEmpty;
725 	}
727 	/// Add job to queue
728 	void add(JobData* t)
729 	{
730 		while (!cas(&lock, cast(LockType) false, cast(LockType) true))
731 			instructionPause();
733 		t.next = first;
734 		first = t;
736 		atomicStore!(MemoryOrder.rel)(lock, cast(LockType) false);
737 	}
739 	/// Add range of jobs to queue
740 	void addRange(Range)(Range arr)
741 	{
742 		if (arr.length == 0)
743 			return;
745 		JobData* start = arr[0];
746 		JobData* last = start;
748 		foreach (t; arr[1 .. $])
749 		{
750 			last.next = t;
751 			last = t;
752 		}
754 		while (!cas(&lock, cast(LockType) false, cast(LockType) true))
755 			instructionPause();
756 		last.next = first;
757 		first = start;
758 		atomicStore!(MemoryOrder.rel)(lock, cast(LockType) false);
759 	}
761 	/// Pop job from queue
762 	JobData* pop()
763 	{
764 		while (!cas(&lock, cast(LockType) false, cast(LockType) true))
765 			instructionPause();
767 		if (first == null)
768 		{
769 			atomicStore!(MemoryOrder.rel)(lock, cast(LockType) false);
770 			return null;
771 		}
773 		JobData* result = first;
774 		first = first.next;
776 		atomicStore!(MemoryOrder.rel)(lock, cast(LockType) false);
777 		return result;
778 	}
780 }
782 /// Structure containing job data
783 /// JobData memory is allocated by user
784 /// JobData lifetime is managed by user
785 /// JobData has to live as long as it's group or end of job execution
786 /// JobData fields can be changed in del delegate and job can be added to thread pool again, to continue execution (call same function again or another if del was changed)
787 struct JobData
788 {
789 	JobDelegate del; /// Delegate to execute
790 	string name; /// Name of job
791 	private JobsGroup* group; /// Group to which this job belongs
792 	private align(64) JobData* next; /// JobData makes a list of jobs to be done by thread
793 }
795 /// Structure responsible for thread in thread pool
796 /// Stores jobs to be executed by this thread (jobs can be stolen by another thread)
797 /// Stores cache for logs
798 struct ThreadData
799 {
800 public:
801 	ThreadPool* threadPool; /// Pool this thread belongs to
802 	int threadId; /// Thread id. Valid only for this thread pool
804 	/// Function starting execution of thread main loop
805 	/// External threads can call this function to start executing jobs
806 	void threadStartFunc()
807 	{
808 		//end = false;
809 		threadFunc(&this);
810 	}
812 private:
813 	JobQueue jobsQueue; /// Queue of jobs to be done, jobs can be stolen from another thread
814 	JobQueue jobsExclusiveQueue; /// Queue of jobs to be done, jobs can't be stolen
815 	align(64) Semaphore semaphore; /// Semaphore to wake/sleep this thread
816 	align(64) Thread thread; /// Systemn thread handle
817 	JobLog[] logs; /// Logs cache
818 	int lastLogIndex = -1; /// Last created log index
819 	int jobsDoneCount;
821 	shared bool end; /// Check if thread has to exit. Thread will exit only if end is true and jobsToDo is empty
822 	shared bool acceptJobs; /// Check if thread should accept new jobs, If false thread won't steal jobs from other threads and will sleep longer if queue js empty
823 	bool externalThread; /// Thread not allocated by thread pool
825 }
827 /// Thread Pool
828 /// Manages bounch of threads to execute given jobs as quickly as possible
829 /// There are no priorities beetween jobs. Jobs added to queues in same order as they are in slices, but due to job stealing and uneven speed of execution beetween threads jobs execution order is unspecified.
830 /// Number of threads executing jobs can be dynamically changed in any time. Threads removed from execution will work until the end of the program but shouldn't accept new jobs.
831 struct ThreadPool
832 {
833 	alias FlushLogsDelegaste = void delegate(ThreadData* threadData, JobLog[] logs); /// Type of delegate to flush logs
834 	FlushLogsDelegaste onFlushLogs; /// User custom delegate to flush logs, if overriden defaultFlushLogs will be used. Can be sset after initialize() call
835 	int logsCacheNum; /// Number of log cache entries. Should be set before setThreadsNum is called
836 	int tryWaitCount = 2000; ///Number of times which tryWait are called before timedWait call. Higher value sets better response but takes CPU time even if there are no jobs.
837 private:
838 	ThreadData*[gMaxThreadsNum] threadsData; /// Data for threads
839 	align(64) shared int threadsNum; /// Number of threads currentlu accepting jobs
840 	align(64) shared bool threadsDataLock; /// Any modification of threadsData array (change in size or pointer modification) had to be locked
841 	align(64) shared int threadSelector; /// Index of thread to which add next job
842 	FILE* logFile; /// File handle for defaultFlushLogs log file
843 	JobData[4] resumeJobs; /// Dummu jobs to resume some thread
845 public:
847 	static int getCPUCoresCount()
848 	{
849 		version(Windows)
850 		{
851 			import core.sys.windows.winbase : SYSTEM_INFO, GetSystemInfo;
852 			SYSTEM_INFO sysinfo;
853 			GetSystemInfo(&sysinfo);
854 			return sysinfo.dwNumberOfProcessors;
855 		}
856 		else version (linux)
857 		{
858 			version(D_BetterC)
859 			{
860 				import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
861 				return cast(int)sysconf(_SC_NPROCESSORS_ONLN);
862 			}
863 			else
864 			{
865 				import core.sys.linux.sched : CPU_COUNT, cpu_set_t, sched_getaffinity;
866 				import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
868 				cpu_set_t set = void;
869 				if (sched_getaffinity(0, cpu_set_t.sizeof, &set) == 0)
870 				{
871 					int count = CPU_COUNT(&set);
872 					if (count > 0)
873 						return cast(uint) count;
874 				}
875 				return cast(int)sysconf(_SC_NPROCESSORS_ONLN);
876 			}
877 		}
878 		else version(Posix)
879 		{
880 			import core.sys.posix.unistd;
881     		return cast(int)sysconf(_SC_NPROCESSORS_ONLN);
882 		}
883 		else return -1;
884 	}
886 	int jobsDoneCount()
887 	{
888 		int sum;
889 		foreach (i, ref ThreadData* th; threadsData)
890 		{
891 			if (th is null)
892 				continue;
894 			sum += th.jobsDoneCount;
895 		}
896 		return sum;
897 	}
899 	void jobsDoneCountReset()
900 	{
901 		foreach (i, ref ThreadData* th; threadsData)
902 		{
903 			if (th is null)
904 				continue;
905 			th.jobsDoneCount = 0;
906 		}
907 	}
908 	/// Initialize thread pool
909 	void initialize()
910 	{
912 		foreach (ref JobData j; resumeJobs)
913 			j = JobData(&dummyJob, "Dummy-Resume");
915 		version (MM_NO_LOGS)
916 		{
917 			logsCacheNum = 0;
918 		}
919 		else
920 		{
921 			onFlushLogs = &defaultFlushLogs;
922 			logsCacheNum = 1024;
924 			logFile = fopen("trace.json", "w");
925 			fprintf(logFile, "[");
926 			fclose(logFile);
927 			logFile = fopen("trace.json", "a");
928 			assert(logFile !is null);
929 		}
930 	}
932 	/// Clean ups ThreadPool
933 	~this()
934 	{
935 		version (MM_NO_LOGS)
936 		{
938 		}
939 		else if (logFile)
940 		{
941 			fclose(logFile);
942 			logFile = null;
943 		}
946 	}
948 	/// Registers external thread to thread pool array. There will be allocated data for this thread and it will have specified id
949 	/// External threads are not joined at the end of thread pool execution
950 	/// Returns ThreadData corresponding to external thread. To acually start executing, external thread had to call threadStartFunc() from returned variable
951 	ThreadData* registerExternalThread()
952 	{
953 		lockThreadsData();
954 		//scope (exit)
957 		ThreadData* threadData = makeThreadData();
958 		threadData.threadPool = &this;
959 		threadData.semaphore.initialize();
960 		threadData.externalThread = true;
961 		atomicStore(threadData.acceptJobs, true);
962 		//threadData.acceptJobs = true;
964 		int threadNum = atomicOp!"+="(threadsNum, 1) - 1;
966 		threadData.threadId = threadNum;
968 		threadsData[threadNum] = threadData;
970 		unlockThreadsData();
972 		return threadData;
973 	}
975 	/// Unregisters external thread. Can be called only when external thread have left the thread pool
976 	void unregistExternalThread(ThreadData* threadData)
977 	{
978 		lockThreadsData();
979 		//scope (exit)
980 		//	unlockThreadsData();
982 		disposeThreadData(threadData);
983 		unlockThreadsData();
984 	}
986 	/// Allows external threads to return from threadStartFunc
987 	void releaseExternalThreads()
988 	{
989 		lockThreadsData();
990 		//scope (exit)
991 		//	unlockThreadsData();
993 		// Release external threads (including main thread)
994 		foreach (i, ref ThreadData* th; threadsData)
995 		{
996 			if (th is null)
997 				continue;
998 			if (!th.externalThread)
999 				continue;
1001 			auto rng = resumeJobs[].map!((ref a) => &a);
1002 			addJobsRange(rng, cast(int) i);
1003 			atomicStore(th.end, true);
1004 		}
1005 		unlockThreadsData();
1006 	}
1008 	/// Waits for all threads to finish and joins them (excluding external threads)
1009 	void waitThreads()
1010 	{
1011 		lockThreadsData();
1012 		//scope (exit)
1013 		//	unlockThreadsData();
1014 		foreach (i, ref ThreadData* th; threadsData)
1015 		{
1016 			if (th is null)
1017 				continue;
1019 			atomicStore(th.acceptJobs, false);
1020 			atomicStore(th.end, true);
1021 		}
1022 		foreach (i, ref ThreadData* th; threadsData)
1023 		{
1024 			if (th is null || th.externalThread)
1025 				continue;
1027 			th.thread.join();
1028 			disposeThreadData(th);
1029 		}
1030 		unlockThreadsData();
1031 	}
1033 	/// Sets number of threads to accept new jobs
1034 	/// If there were never so much threads created, they will be created
1035 	/// If number of threads set is smaller than there was threads before, they are not joined but they stop getting new jobs, they stop stealing jobs and they sleep longer
1036 	/// Locking operation
1037 	void setThreadsNum(int num)
1038 	{
1039 		assert(num <= gMaxThreadsNum);
1040 		assert(num > 0);
1042 		lockThreadsData();
1043 		//scope (exit)
1044 		//	unlockThreadsData();
1046 		foreach (i, ref ThreadData* th; threadsData)
1047 		{
1048 			if (th)
1049 			{
1050 				// Exists but has to be disabled
1051 				atomicStore(th.acceptJobs, i < num);
1052 				continue;
1053 			}
1054 			else if (i >= num)
1055 			{
1056 				// Doesn't exist and is not required
1057 				continue;
1058 			}
1059 			// Doesn't exist and is required
1060 			th = makeThreadData();
1061 			th.threadPool = &this;
1062 			th.threadId = cast(int) i;
1063 			atomicStore(th.acceptJobs, true);
1064 			//th.acceptJobs = true;
1065 			th.semaphore.initialize();
1067 			th.thread.start(&th.threadStartFunc);
1068 		}
1070 		atomicStore(threadsNum, num);
1071 		unlockThreadsData();
1072 	}
1074 	/// Adds job to be executed by thread pool, such a job won't be synchronized with any group or job
1075 	/// If threadNum is different than -1 only thread with threadNum will be able to execute given job
1076 	/// It is advised to use synchronized group of jobs
1077 	void addJobAsynchronous(JobData* data, int threadNum = -1)
1078 	{
1079 		if (threadNum == -1)
1080 		{
1081 			ThreadData* threadData = getThreadDataToAddJobTo();
1082 			threadData.jobsQueue.add(data);
1083 			threadData.semaphore.post();
1084 			return;
1085 		}
1086 		ThreadData* threadData = threadsData[threadNum];
1087 		assert(threadData !is null);
1088 		threadData.jobsExclusiveQueue.add(data);
1089 		threadData.semaphore.post();
1090 	}
1092 	/// Adds job to be executed by thread pool, group specified in group data won't be finished until this job ends
1093 	/// If threadNum is different than -1 only thread with threadNum will be able to execute given job
1094 	void addJob(JobData* data, int threadNum = -1)
1095 	{
1096 		assert(data.group);
1097 		atomicOp!"+="(data.group.jobsToBeDoneCount, 1);
1098 		addJobAsynchronous(data, threadNum);
1099 	}
1101 	/// Adds multiple jobs at once
1102 	/// Range has to return JobData*
1103 	/// Range has to have length property
1104 	/// Range is used so there is no need to allocate JobData*[]
1105 	/// All jobs has to belong to one group
1106 	/// If threadNum is different than -1 only thread with threadNum will be able to execute given jobs
1107 	void addJobsRange(Range)(Range rng, int threadNum = -1)
1108 	{
1109 		if (threadNum != -1)
1110 		{
1111 			ThreadData* threadData = threadsData[threadNum];
1112 			assert(threadData !is null);
1113 			threadData.jobsExclusiveQueue.addRange(rng);
1114 			foreach (sInc; 0 .. rng.length)
1115 				threadData.semaphore.post();
1117 			return;
1118 		}
1120 		if (rng.length == 0)
1121 		{
1122 			return;
1123 		}
1125 		foreach (JobData* threadData; rng)
1126 		{
1127 			assert(rng[0].group == threadData.group);
1128 		}
1130 		atomicOp!"+="(rng[0].group.jobsToBeDoneCount, cast(int) rng.length);
1131 		int threadsNumLocal = atomicLoad(threadsNum);
1132 		int part = cast(int) rng.length / threadsNumLocal;
1133 		if (part > 0)
1134 		{
1135 			foreach (i, ThreadData* threadData; threadsData[0 .. threadsNumLocal])
1136 			{
1137 				auto slice = rng[i * part .. (i + 1) * part];
1138 				threadData.jobsQueue.addRange(slice);
1140 				foreach (sInc; 0 .. part)
1141 					threadData.semaphore.post();
1143 			}
1144 			rng = rng[part * threadsNumLocal .. $];
1145 		}
1146 		foreach (i, ThreadData* threadData; threadsData[0 .. rng.length])
1147 		{
1148 			threadData.jobsQueue.add(rng[i]);
1149 			threadData.semaphore.post();
1150 		}
1152 	}
1154 	/// Adds group of jobs to threadPool, group won't be synchronized
1155 	void addGroupAsynchronous(JobsGroup* group)
1156 	{
1157 		group.thPool = &this;
1159 		if (group.jobs.length == 0)
1160 		{
1161 			// Immediately call group end
1162 			group.onGroupFinish();
1163 			return;
1164 		}
1165 		group.setUpJobs();
1166 		auto rng = group.jobs[].map!((ref a) => &a);
1167 		addJobsRange(rng, group.executeOnThreadNum);
1168 	}
1170 	/// Adds group of jobs to threadPool
1171 	/// Spwaning group will finish after this group have finished
1172 	void addGroup(JobsGroup* group, JobsGroup* spawnedByGroup)
1173 	{
1174 		assert(spawnedByGroup);
1175 		group.spawnedByGroup = spawnedByGroup;
1176 		atomicOp!"+="(spawnedByGroup.jobsToBeDoneCount, 1); // Increase by one, so 'spawning group' will wait for 'newly added group' to finish
1177 		addGroupAsynchronous(group); // Synchronized by jobsToBeDoneCount atomic variable
1178 	}
1180 	/// Explicitly calls onFlushLogs on all threads
1181 	void flushAllLogs()
1182 	{
1183 		lockThreadsData();
1184 		//scope (exit)
1185 		//	unlockThreadsData();
1186 		foreach (thNum; 0 .. atomicLoad(threadsNum))
1187 		{
1188 			ThreadData* th = threadsData[thNum];
1189 			onThreadFlushLogs(th);
1190 		}
1192 		foreach (i, ref ThreadData* th; threadsData)
1193 		{
1194 			if (th is null)
1195 				continue;
1197 			onThreadFlushLogs(th);
1198 		}
1199 		unlockThreadsData();
1200 	}
1202 	/// Default implementation of flushing logs
1203 	/// Saves logs to trace.json file in format acceptable by Google Chrome tracking tool chrome://tracing/ 
1204 	/// Logs can be watched even if apllication crashed, but might require removing last log entry from trace.json
1205 	void defaultFlushLogs(ThreadData* threadData, JobLog[] logs)
1206 	{
1207 		version (MM_NO_LOGS)
1208 		{
1209 		}
1210 		else
1211 		{
1212 			// (log rows num) * (static json length * time length * duration length)
1213 			long start = useconds();
1214 			size_t size = (logs.length + 1) * (128 + 20 + 20);
1215 			size_t used = 0;
1217 			foreach (ref log; logs)
1218 			{
1219 				size += log.name.length + 1; // size of name
1220 			}
1222 			char* buffer = cast(char*) malloc(size);
1224 			foreach (ref log; logs)
1225 			{
1226 				char[100] name_buffer;
1227 				name_buffer[0 .. log.name.length] = log.name;
1228 				name_buffer[log.name.length] = 0;
1229 				size_t charWritten = snprintf(buffer + used, size - used,
1230 						`{"name":"%s", "pid":1, "tid":%lld, "ph":"X", "ts":%lld,  "dur":%lld }, %s`,
1231 						name_buffer.ptr, threadData.threadId + 1, log.time, log.duration, "\n".ptr);
1232 				used += charWritten;
1233 			}
1235 			long end = useconds();
1236 			size_t charWritten = snprintf(buffer + used, size - used,
1237 					`{"name":"logFlush", "pid":1, "tid":%lld, "ph":"X", "ts":%lld,  "dur":%lld }, %s`,
1238 					threadData.threadId + 1, start, end - start, "\n".ptr);
1239 			used += charWritten;
1240 			fwrite(buffer, 1, used, logFile);
1241 		}
1242 	}
1244 private:
1245 	/// Atomic lock
1246 	void lockThreadsData()
1247 	{
1248 		// Only one thread at a time can change threads number in a threadpool
1249 		while (!cas(&threadsDataLock, false, true))
1250 		{
1251 		}
1252 	}
1254 	/// Atomic unlock
1255 	void unlockThreadsData()
1256 	{
1257 		atomicStore(threadsDataLock, false);
1258 	}
1260 	/// Allocate ThreadData
1261 	ThreadData* makeThreadData()
1262 	{
1263 		ThreadData* threadData = makeVar!ThreadData();
1264 		threadData.logs = makeVarArray!(JobLog)(logsCacheNum);
1265 		return threadData;
1266 	}
1268 	/// Dispose ThreadData
1269 	void disposeThreadData(ThreadData* threadData)
1270 	{
1271 		disposeArray(threadData.logs);
1272 		return disposeVar(threadData);
1273 	}
1275 	/// Get thread most suiting to add job to
1276 	ThreadData* getThreadDataToAddJobTo()
1277 	{
1278 		int threadNum = atomicOp!"+="(threadSelector, 1);
1280 		foreach (i; 0 .. 1_000)
1281 		{
1282 			if (threadNum >= atomicLoad(threadsNum))
1283 			{
1284 				threadNum = 0;
1285 				atomicStore(threadSelector, 0);
1286 			}
1287 			ThreadData* threadData = threadsData[threadNum];
1288 			if (threadData != null)
1289 			{
1290 				return threadData;
1291 			}
1292 			threadNum++;
1293 		}
1294 		assert(0);
1295 	}
1297 	/// Create log on start of job
1298 	void onStartJob(JobData* data, ThreadData* threadData)
1299 	{
1301 		threadData.jobsDoneCount++;
1302 		version (MM_NO_LOGS)
1303 		{
1304 		}
1305 		else
1306 		{
1307 			if (cast(int) threadData.logs.length <= 0)
1308 			{
1309 				return;
1310 			}
1311 			if (threadData.lastLogIndex >= cast(int) threadData.logs.length - 1)
1312 			{
1313 				onThreadFlushLogs(threadData);
1314 			}
1316 			threadData.lastLogIndex++;
1318 			JobLog log;
1319 			log.name = data.name;
1320 			log.time = useconds();
1321 			threadData.logs[threadData.lastLogIndex] = log;
1322 		}
1323 	}
1325 	/// Set log finish time on end of job
1326 	void onEndJob(JobData* data, ThreadData* threadData)
1327 	{
1328 		version (MM_NO_LOGS)
1329 		{
1330 		}
1331 		else
1332 		{
1333 			if (cast(int) threadData.logs.length <= 0)
1334 			{
1335 				return;
1336 			}
1337 			assert(threadData.lastLogIndex < threadData.logs.length);
1338 			JobLog* log = &threadData.logs[threadData.lastLogIndex];
1339 			log.duration = useconds() - log.time;
1340 		}
1341 	}
1343 	/// Flush logs
1344 	void onThreadFlushLogs(ThreadData* threadData)
1345 	{
1346 		/*scope (exit)
1347 		{
1348 			threadData.lastLogIndex = -1;
1349 		}*/
1351 		assert(threadData);
1353 		if (threadData.lastLogIndex < 0 || onFlushLogs is null)
1354 		{
1355 			return;
1356 		}
1358 		onFlushLogs(threadData, threadData.logs[0 .. threadData.lastLogIndex + 1]);
1360 		threadData.lastLogIndex = -1;
1361 	}
1363 	/// Does nothing
1364 	void dummyJob(ThreadData* threadData, JobData* data)
1365 	{
1367 	}
1369 	/// Steal job from another thread
1370 	JobData* stealJob(int threadNum)
1371 	{
1372 		foreach (thSteal; 0 .. atomicLoad(threadsNum))
1373 		{
1374 			if (thSteal == threadNum)
1375 				continue; // Do not steal from requesting thread
1377 			ThreadData* threadData = threadsData[thSteal];
1379 			if (threadData is null || !threadData.semaphore.tryWait())
1380 				continue;
1382 			JobData* data = threadData.jobsQueue.pop();
1384 			if (data is null)
1385 				threadData.semaphore.post();
1387 			return data;
1388 		}
1389 		return null;
1390 	}
1391 }
1393 /// Adding groups of jobs is faster and groups can have dependencies between each other
1394 struct JobsGroup
1395 {
1396 public:
1397 	string name; /// Name of group
1398 	JobData[] jobs; /// Jobs to be executed by this group, jobs have to live as long as group lives
1399 	void delegate(JobsGroup* group) onFinish; // Delegate called when group will finish, can be used to free memory
1400 	ThreadPool* thPool; /// Thread pool of this group
1401 	int executeOnThreadNum = -1; /// Thread num to execute jobs on
1403 	this(string name, JobData[] jobs = [], int executeOnThreadNum = -1)
1404 	{
1405 		this.name = name;
1406 		this.jobs = jobs;
1407 		this.executeOnThreadNum = executeOnThreadNum;
1408 		//jobsToBeDoneCount = 0;
1409 		//dependenciesWaitCount = 0;
1410 		atomicStore(jobsToBeDoneCount,0);
1411 		atomicStore(dependenciesWaitCount,0);
1412 	}
1414 	~this() nothrow
1415 	{
1416 		free(children.ptr);
1417 		children = null;
1418 	}
1420 	/// Make this group dependant from another group
1421 	/// Dependant group won't start untill its dependencies will be fulfilled
1422 	void dependantOn(JobsGroup* parent)
1423 	{
1424 		size_t newLen = parent.children.length + 1;
1425 		JobsGroup** ptr = cast(JobsGroup**) realloc(parent.children.ptr,
1426 				newLen * (JobsGroup*).sizeof);
1427 		parent.children = ptr[0 .. newLen];
1428 		parent.children[$ - 1] = &this;
1429 		// parent.children ~= &this;
1430 		atomicOp!"+="(dependenciesWaitCount, 1);
1431 	}
1433 	/// Returns number of dependencies this group is waiting for
1434 	int getDependenciesWaitCount()
1435 	{
1436 		return atomicLoad(dependenciesWaitCount);
1437 	}
1439 private:
1440 	JobsGroup* spawnedByGroup; /// Group which spawned this group, if present spwaning group is waiting for this group to finish
1441 	JobsGroup*[] children; /// Groups depending on this group
1442 	align(64) shared int dependenciesWaitCount; /// Count of dependencies this group waits for
1443 	align(64) shared int jobsToBeDoneCount; /// Number of this group jobs still executing
1445 	/// Checks if depending groups or spawning group have to be started
1446 	/// Executes user onFinish function
1447 	void onGroupFinish()
1448 	{
1450 		decrementChildrendependencies();
1451 		if (spawnedByGroup)
1452 		{
1453 			auto num = atomicOp!"-="(spawnedByGroup.jobsToBeDoneCount, 1);
1454 			assert(num >= 0);
1455 			if (num == 0)
1456 			{
1457 				spawnedByGroup.onGroupFinish();
1458 			}
1459 		}
1460 		if (onFinish)
1461 			onFinish(&this);
1462 	}
1464 	/// Check if decrement dependencies counter and start them if theirs dependencies are fulfilled
1465 	void decrementChildrendependencies()
1466 	{
1467 		foreach (JobsGroup* group; children)
1468 		{
1469 			auto num = atomicOp!"-="(group.dependenciesWaitCount, 1);
1470 			assert(num >= 0);
1471 			if (num == 0)
1472 			{
1473 				thPool.addGroupAsynchronous(group); // All dependencies of this group are fulfilled, so is already synchronized
1474 			}
1475 		}
1476 	}
1477 	/// Prepare jobs data for adding to thread pool
1478 	void setUpJobs()
1479 	{
1480 		foreach (i; 0 .. jobs.length)
1481 		{
1482 			jobs[i].group = &this;
1483 		}
1484 	}
1486 }
1488 /// Main function executed by thread present in thread pool
1489 /// Executes functions from its own queue
1490 /// If there are no jobs in its queue, steals from another thread
1491 /// If there is nothing to steal, sleeps on its semaphore for a while (stage when cpu is not used)
1492 /// Sleep time is longer for jobs not accepting jobs, they don't exit because it is hard to guarantee that nobody is adding to them some job (thread might exit but job will be added anyway and application will malfunctio
1493 /// Thread end only when it's queue is empty. Jobs shouldn't be added to queue after ThreadPool.waitThreads() call
1494 private void threadFunc(ThreadData* threadData)
1495 {
1496 	ThreadPool* threadPool = threadData.threadPool;
1497 	int threadNum = threadData.threadId;
1499 	while (!atomicLoad!(MemoryOrder.raw)(threadData.end)
1500 			|| !threadData.jobsQueue.empty() || !threadData.jobsExclusiveQueue.empty())
1501 	{
1502 		JobData* data;
1503 		if (threadData.semaphore.tryWait())
1504 		{
1505 			if (!threadData.jobsExclusiveQueue.emptyRaw())
1506 				data = threadData.jobsExclusiveQueue.pop();
1508 			if (data is null)
1509 				data = threadData.jobsQueue.pop();
1511 			if (data is null)
1512 				threadData.semaphore.post();
1514 			assert(data !is null);
1515 		}
1516 		else
1517 		{
1518 			bool acceptJobs = atomicLoad!(MemoryOrder.raw)(threadData.acceptJobs);
1519 			if (acceptJobs)
1520 			{
1521 				data = threadPool.stealJob(threadNum);
1522 			}
1524 			if (data is null)
1525 			{
1526 				// Thread does not have own job and can not steal it, so wait for a job 
1527 				int tryWait = 0;
1528 				//bool ok = threadData.semaphore.timedWait(1_000 + !acceptJobs * 10_000);
1529 				bool ok = true;
1530 				while(!threadData.semaphore.tryWait())
1531 				{
1532 					tryWait++;
1533 					if(tryWait>threadPool.tryWaitCount)
1534 					{
1535 						ok = false;
1536 						break;
1537 					}
1538 					static foreach(i;0..10)instructionPause();
1539 				}
1540 				if(!ok)ok = threadData.semaphore.timedWait(1_000 + !acceptJobs * 10_000);
1542 				if (ok)
1543 				{
1545 					if (!threadData.jobsExclusiveQueue.emptyRaw())
1546 						data = threadData.jobsExclusiveQueue.pop();
1548 					if (data is null)
1549 						data = threadData.jobsQueue.pop();
1551 					if (data is null)
1552 						threadData.semaphore.post();
1553 				}
1554 			}
1555 		}
1557 		// Nothing to do
1558 		if (data is null)
1559 		{
1560 			continue;
1561 		}
1563 		// Do the job
1564 		threadPool.onStartJob(data, threadData);
1565 		data.del(threadData, data);
1566 		threadPool.onEndJob(data, threadData);
1567 		if (data.group)
1568 		{
1569 			auto num = atomicOp!"-="(data.group.jobsToBeDoneCount, 1);
1570 			if (num == 0)
1571 			{
1572 				data.group.onGroupFinish();
1573 			}
1574 		}
1576 	}
1577 	//threadData.end = false;
1578 	atomicStore(threadData.end, false);
1579 	assert(threadData.jobsQueue.empty());
1580 }
1582 //////////////////////////////////////////////
1583 //////////////////// Test ////////////////////
1584 //////////////////////////////////////////////
1585 /*
1586 void testThreadPool()
1587 {
1588 	enum jobsNum = 1024 * 4;
1590 	ThreadPool thPool;
1591 	thPool.initialize();
1593 	ThreadData* mainThread = thPool.registerExternalThread(); // Register main thread as thread 0
1594 	JobData startFrameJobData; // Variable to store job starting the TestApp
1596 	JobData[jobsNum] frameJobs; // Array to store jobs created in TestApp.continueFrameInOtherJob
1597 	shared int frameNum; // Simulate game loop, run jobs for few frames and exit
1599 	// App starts as one job &startFrame
1600 	// Then using own JobData memory spawns 'continueFrameInOtherJob' job
1601 	// 'continueFrameInOtherJob' job fills frameJobs array with &importantTask jobs. Group created to run this jobs is freed using JobsGroup.onFinish delegate
1602 	// 'importantTask' allocate new jobs (&importantTaskSubTask) and group, they all deallocated using JobsGroup.onFinish delegate
1603 	// 'continueFrameInOtherJob' waits for all 'importantTask'.  All 'importantTask' wait for all 'importantTaskSubTask'. 
1604 	// So after all 'importantTaskSubTask' and 'importantTask' are done 'continueFrameInOtherJob' ends and spawns &finishFrame
1605 	// 'finishFrame' spawn new frame or exits application
1606 	struct TestApp
1607 	{
1608 		// First job in frame
1609 		// Do some stuff and spawn some other job
1610 		// 1 - Number of jobs of this kind in frame
1611 		void startFrame(ThreadData* threadData, JobData* startFrameJobData)
1612 		{
1613 			startFrameJobData.del = &continueFrameInOtherJobAAA;
1614 			startFrameJobData.name = "cont frmAAA";
1615 			thPool.addJobAsynchronous(startFrameJobData, thPool.threadsNum - 1); /// startFrame is the only job in thread pool no synchronization is required
1616 		}
1618 		void continueFrameInOtherJobAAA(ThreadData* threadData, JobData* startFrameJobData)
1619 		{
1621 			static struct JobGroupMemory
1622 			{
1623 				JobsGroup[6] groups;
1624 				JobData[1][6] groupsJobs;
1625 				TestApp* app;
1626 				JobData* startFrameJobData;
1628 				void spawnCont(JobsGroup* group)
1629 				{
1630 					// startFrameJobData.del = &continueFrameInOtherJob;
1631 					startFrameJobData.del = &app.finishFrame;
1632 					startFrameJobData.name = "cont frm";
1633 					group.thPool.addJobAsynchronous(startFrameJobData); /// startFrame is the only job in thread pool no synchronization is required
1635 				}
1636 			}
1638 			JobGroupMemory* memory = makeVar!JobGroupMemory();
1639 			memory.app = &this;
1640 			memory.startFrameJobData = startFrameJobData;
1642 			with (memory)
1643 			{
1644 				groups[0] = JobsGroup("dependant 0", groupsJobs[0]);
1645 				groups[1] = JobsGroup("dependant 1", groupsJobs[1]);
1646 				groups[2] = JobsGroup("dependant 2", groupsJobs[2]);
1647 				groups[3] = JobsGroup("dependant 3", groupsJobs[3]);
1648 				groups[4] = JobsGroup("dependant 4", groupsJobs[4]);
1649 				groups[5] = JobsGroup("dependant 5", groupsJobs[5]);
1650 				groups[5].onFinish = &spawnCont;
1652 				groups[2].dependantOn(&groups[0]);
1653 				groups[2].dependantOn(&groups[1]);
1655 				groups[3].dependantOn(&groups[0]);
1656 				groups[3].dependantOn(&groups[1]);
1657 				groups[3].dependantOn(&groups[2]);
1659 				groups[4].dependantOn(&groups[1]);
1660 				groups[4].dependantOn(&groups[3]);
1662 				groups[5].dependantOn(&groups[0]);
1663 				groups[5].dependantOn(&groups[1]);
1664 				groups[5].dependantOn(&groups[2]);
1665 				groups[5].dependantOn(&groups[3]);
1666 				groups[5].dependantOn(&groups[4]);
1668 				foreach (ref jobs; groupsJobs)
1669 					foreach (ref j; jobs)
1670 						j = JobData(&this.importantTaskSubTask, "n");
1672 				thPool.addGroupAsynchronous(&groups[0]);
1673 				thPool.addGroupAsynchronous(&groups[1]);
1674 			}
1675 		}
1677 		// Job for some big system
1678 		// Spawns some jobs and when they are done spawns finishFrame job
1679 		// 1 - Number of jobs of this kind in frame
1680 		void continueFrameInOtherJob(ThreadData* threadData, JobData* startFrameJobData)
1681 		{
1682 			static struct JobGroupMemory
1683 			{
1684 				JobsGroup group;
1685 				TestApp* app;
1686 				JobData* startFrameJobData;
1688 				void freeAndContinue(JobsGroup* group)
1689 				{
1690 					startFrameJobData.del = &app.finishFrame;
1691 					startFrameJobData.name = "finishFrame";
1692 					group.thPool.addJobAsynchronous(startFrameJobData, group.thPool.threadsNum - 1); /// startFrameJobData is continuation of 'startFrame data', all important jobs finished so it is the only job, no synchronization required. Always spawn on last thread
1693 					disposeVar!(JobGroupMemory)(&this);
1694 				}
1695 			}
1697 			JobGroupMemory* important = makeVar!JobGroupMemory();
1698 			important.app = &this;
1699 			important.startFrameJobData = startFrameJobData;
1701 			foreach (ref j; frameJobs)
1702 				j = JobData(&this.importantTask, "vip");
1704 			important.group = JobsGroup("a lot of jobs", frameJobs[]);
1705 			important.group.onFinish = &important.freeAndContinue;
1707 			thPool.addGroupAsynchronous(&important.group); // No Synchronization required continueFrameInOtherJob is the only job
1708 		}
1710 		// Some task which by itself does a lot of computation so it spawns few more jobs
1711 		// jobsNum - Number of jobs of this kind in frame
1712 		void importantTask(ThreadData* threadData, JobData* data)
1713 		{
1714 			// All tasks created here will, make 'importantTask' wait with finish untill this jobs will finish
1716 			/// Add 10 tasks in group
1717 			static struct JobGroupMemory
1718 			{
1719 				JobsGroup group;
1720 				JobData[128] jobs;
1722 				void freeMee(JobsGroup* group)
1723 				{
1724 					disposeVar!(JobGroupMemory)(&this);
1725 				}
1726 			}
1728 			JobGroupMemory* subGroup = makeVar!JobGroupMemory();
1730 			foreach (ref j; subGroup.jobs)
1731 				j = JobData(&this.importantTaskSubTask, "vip sub");
1733 			subGroup.group = JobsGroup("128 jobs", subGroup.jobs[]);
1734 			subGroup.group.onFinish = &subGroup.freeMee;
1735 			thPool.addGroup(&subGroup.group, data.group);
1737 			/// Add single tasks
1738 			data.del = &importantTaskSubTask;
1739 			data.name = "sub";
1740 			thPool.addJob(data);
1741 		}
1743 		// Job which simply does some work
1744 		// jobsNum * 128 - Number of jobs of this kind in frame
1745 		void importantTaskSubTask(ThreadData* threadData, JobData* data)
1746 		{
1747 		}
1749 		// Finish frame
1750 		// 1 - Number of jobs of this kind in frame
1751 		void finishFrame(ThreadData* threadData, JobData* startFrameJobData)
1752 		{
1753 			auto num = atomicOp!"+="(frameNum, 1);
1754 			if (num == 10)
1755 			{
1756 				thPool.releaseExternalThreads(); // After 10 frames exit application
1757 				return;
1758 			}
1759 			*startFrameJobData = JobData(&startFrame, "StartFrame"); //
1760 			thPool.addJobAsynchronous(startFrameJobData); // Start next frame, there should't be any other tasks execept of this one, so no synchronization is required
1762 		}
1764 		// Func to test if dynamic changing of threads number works
1765 		// void changeThreadsNum()
1766 		// {
1767 		// 	import std.random : uniform;
1769 		// 	bool change = uniform(0, 100) == 3;
1770 		// 	if (!change)
1771 		// 		return;
1773 		// 	int threadsNum = uniform(3, 5);
1774 		// 	thPool.setThreadsNum(threadsNum);
1776 		// }
1777 	}
1779 	void testThreadsNum(int threadsNum)
1780 	{
1781 		frameNum = 0;
1782 		thPool.jobsDoneCountReset();
1783 		thPool.setThreadsNum(threadsNum);
1785 		TestApp testApp = TestApp();
1786 		startFrameJobData = JobData(&testApp.startFrame, "StartFrame"); // Start first frame, will live as long as main thread won't exit from threadStartFunc()
1788 		ulong start = useconds();
1789 		thPool.addJobAsynchronous(&startFrameJobData); // Synchronization is made by groupEnd (last job in pool) which calls thPool.releaseExternalThreads();
1790 		mainThread.threadStartFunc();
1791 		ulong end = useconds();
1792 		printf("Threads Num: %2d. Jobs: %d. Time: %5.2f ms. jobs/ms: %5.2f\n", threadsNum, thPool.jobsDoneCount,
1793 				(end - start) / 1000.0f, thPool.jobsDoneCount / ((end - start) / 1000.0f));
1794 	}
1796 	while (1)
1797 	{
1798 		// foreach (i; 1 .. 32)
1799 		// 	testThreadsNum(i);
1801 		testThreadsNum(1);
1802 		testThreadsNum(4);
1803 		testThreadsNum(16);
1804 	}
1805 	thPool.flushAllLogs();
1806 	thPool.waitThreads();
1807 	thPool.unregistExternalThread(mainThread);
1808 }
1810 version (D_BetterC)
1811 {
1813 	extern (C) int main(int argc, char*[] argv) // for betterC
1814 	{
1815 		testThreadPool();
1816 		return 0;
1817 	}
1818 }
1819 else
1820 {
1821 	int main()
1822 	{
1823 		testThreadPool();
1824 		return 0;
1825 	}
1826 }//*/
1827 // Compile
1828 // -fsanitize=address
1829 // rdmd -g -of=thread_pool src/mmutils/thread_pool.d && ./thread_pool
1830 // ldmd2 -release -inline -checkaction=C -g -of=thread_pool src/mmutils/thread_pool.d && ./thread_pool
1831 // ldmd2 -checkaction=C -g -of=thread_pool src/mmutils/thread_pool.d && ./thread_pool