1 module mmutils.thread_pool;
2 
3 import std.algorithm : map;
4 
5 version = MM_NO_LOGS; // Disable log creation
6 //version = MM_USE_POSIX_THREADS; // Use posix threads insted of standard library, required for betterC
7 
8 version (WebAssembly)
9 {
10 	version = MM_NO_LOGS;
11 	version = MM_USE_POSIX_THREADS;
12 	extern(C) struct FILE
13 	{
14 		
15 	}
16 }
17 else 
18 {
19 	import core.stdc.stdio;
20 }
21 
22 //////////////////////////////////////////////
23 /////////////// BetterC Support //////////////
24 //////////////////////////////////////////////
25 
26 version (D_BetterC)
27 {
28 	version (Posix) version = MM_USE_POSIX_THREADS;
29 
30     extern (C) void free(void*) @nogc nothrow @system;
31     extern (C) void* malloc(size_t size) @nogc nothrow @system;
32     extern (C) void* realloc(void*, size_t size) @nogc nothrow @system;
33     extern (C) void* memcpy(return void*, scope const void*, size_t size) @nogc nothrow @system;
34 
35 	//hacks for LDC
36 	/*extern (C) __gshared int _d_eh_personality(int, int, size_t, void*, void*)
37 	{
38 		return 0;
39 	}
40 
41 	extern (C) __gshared void _d_eh_resume_unwind(void*)
42 	{
43 		return;
44 	}
45 
46 	extern (C) void* _d_allocmemory(size_t sz)
47 	{
48 		return malloc(sz);
49 	}*/
50 }
51 else
52 {
53 	import core.stdc.stdlib;
54 	import core.stdc.string;
55 }
56 
57 //////////////////////////////////////////////
58 /////////////// Atomics //////////////////////
59 //////////////////////////////////////////////
60 
61 version (ECSEmscripten)
62 {
63     import std.traits;
64 
65     enum MemoryOrder
66     {
67         acq,
68         acq_rel,
69         raw,
70         rel,
71         seq
72     }
73 
74     extern (C) ubyte emscripten_atomic_cas_u8(void* addr, ubyte oldVal, ubyte newVal) @nogc nothrow pure;
75     extern (C) ushort emscripten_atomic_cas_u16(void* addr, ushort oldVal, ushort newVal) @nogc nothrow pure;
76     extern (C) uint emscripten_atomic_cas_u32(void* addr, uint oldVal, uint newVal) @nogc nothrow pure;
77 
78     extern (C) ubyte emscripten_atomic_load_u8(const void* addr) @nogc nothrow pure;
79     extern (C) ushort emscripten_atomic_load_u16(const void* addr) @nogc nothrow pure;
80     extern (C) uint emscripten_atomic_load_u32(const void* addr) @nogc nothrow pure;
81 
82     extern (C) ubyte emscripten_atomic_store_u8(void* addr, ubyte val) @nogc nothrow pure;
83     extern (C) ushort emscripten_atomic_store_u16(void* addr, ushort val) @nogc nothrow pure;
84     extern (C) uint emscripten_atomic_store_u32(void* addr, uint val) @nogc nothrow pure;
85 
86     extern (C) ubyte emscripten_atomic_add_u8(void* addr, ubyte val) @nogc nothrow pure;
87     extern (C) ushort emscripten_atomic_add_u16(void* addr, ushort val) @nogc nothrow pure;
88     extern (C) uint emscripten_atomic_add_u32(void* addr, uint val) @nogc nothrow pure;
89 
90     extern (C) ubyte emscripten_atomic_sub_u8(void* addr, ubyte val) @nogc nothrow pure;
91     extern (C) ushort emscripten_atomic_sub_u16(void* addr, ushort val) @nogc nothrow pure;
92     extern (C) uint emscripten_atomic_sub_u32(void* addr, uint val) @nogc nothrow pure;
93 
94     public pure nothrow @nogc Unqual!T atomicOp(string op, T, V1)(ref shared T val, V1 mod)
95     {
96         static if (op == "+=")
97         {
98             static if (is(T == byte) || is(T == ubyte))
99                 return cast(Unqual!T)(emscripten_atomic_add_u8(cast(void*)&val,
100                         cast(Unqual!T) mod) + 1);
101             else static if (is(T == short) || is(T == ushort))
102                 return cast(Unqual!T)(emscripten_atomic_add_u16(cast(void*)&val,
103                         cast(Unqual!T) mod) + 1);
104             else static if (is(T == int) || is(T == uint))
105                 return cast(Unqual!T)(emscripten_atomic_add_u32(cast(void*)&val,
106                         cast(Unqual!T) mod) + 1);
107             else
108                 static assert(0);
109         }
110         else static if (op == "-=")
111         {
112             static if (is(T == byte) || is(T == ubyte))
113                 return cast(Unqual!T)(emscripten_atomic_sub_u8(cast(void*)&val,
114                         cast(Unqual!T) mod) - 1);
115             else static if (is(T == short) || is(T == ushort))
116                 return cast(Unqual!T)(emscripten_atomic_sub_u16(cast(void*)&val,
117                         cast(Unqual!T) mod) - 1);
118             else static if (is(T == int) || is(T == uint))
119                 return cast(Unqual!T)(emscripten_atomic_sub_u32(cast(void*)&val,
120                         cast(Unqual!T) mod) - 1);
121             else
122                 static assert(0);
123         }
124     }
125 
126     public pure nothrow @nogc @trusted void atomicStore(MemoryOrder ms = MemoryOrder.seq, T, V)(ref T val,
127             V newval)
128     {
129         alias UT = Unqual!T;
130         static if (is(UT == bool) || is(UT == byte) || is(UT == ubyte))
131             emscripten_atomic_store_u8(cast(void*)&val, cast(UT) newval);
132         else static if (is(UT == short) || is(UT == ushort))
133             emscripten_atomic_store_u16(cast(void*)&val, cast(UT) newval);
134         else static if (is(UT == int) || is(UT == uint))
135             emscripten_atomic_store_u32(cast(void*)&val, cast(UT) newval);
136         else
137             static assert(0);
138     }
139 
140     public pure nothrow @nogc @trusted T atomicLoad(MemoryOrder ms = MemoryOrder.seq, T)(
141             ref const T val)
142     {
143         alias UT = Unqual!T;
144         static if (is(UT == bool))
145             return emscripten_atomic_load_u8(cast(const void*)&val) != 0;
146         else static if (is(UT == byte) || is(UT == ubyte))
147             return emscripten_atomic_load_u8(cast(const void*)&val);
148         else static if (is(UT == short) || is(UT == ushort))
149             return emscripten_atomic_load_u16(cast(const void*)&val);
150         else static if (is(UT == int) || is(UT == uint))
151             return emscripten_atomic_load_u32(cast(const void*)&val);
152         else
153             static assert(0);
154     }
155 
156     public pure nothrow @nogc @trusted bool cas(MemoryOrder succ = MemoryOrder.seq,
157             MemoryOrder fail = MemoryOrder.seq, T, V1, V2)(T* here, V1 ifThis, V2 writeThis)
158     {
159         alias UT = Unqual!T;
160         static if (is(UT == bool))
161             return emscripten_atomic_cas_u8(cast(void*) here,
162                     cast(Unqual!T) ifThis, cast(Unqual!T) writeThis) == ifThis;
163         else static if (is(UT == byte) || is(UT == ubyte))
164             return emscripten_atomic_cas_u8(cast(void*) here,
165                     cast(Unqual!T) ifThis, cast(Unqual!T) writeThis) == ifThis;
166         else static if (is(UT == short) || is(UT == ushort))
167             return emscripten_atomic_cas_u16(cast(void*) here,
168                     cast(Unqual!T) ifThis, cast(Unqual!T) writeThis) == ifThis;
169         else static if (is(UT == int) || is(UT == uint))
170             return emscripten_atomic_cas_u32(cast(void*) here,
171                     cast(Unqual!T) ifThis, cast(Unqual!T) writeThis) == ifThis;
172         else
173             static assert(0);
174     }
175 }
176 else
177 {
178     public import core.atomic;
179 }
180 
181 //////////////////////////////////////////////////
182 //////////////////// Allocator ///////////////////
183 //////////////////////////////////////////////////
184 T* makeVar(T)(T init)
185 {
186 	T* el = cast(T*) malloc(T.sizeof);
187 	memcpy(el, &init, T.sizeof);
188 	return el;
189 }
190 
191 T* makeVar(T)()
192 {
193 	T init;
194 	T* el = cast(T*) malloc(T.sizeof);
195 	memcpy(el, &init, T.sizeof);
196 	return el;
197 }
198 
199 T[] makeVarArray(T)(int num, T init = T.init)
200 {
201 	T* ptr = cast(T*) malloc(num * (T.sizeof + T.sizeof % T.alignof));
202 	T[] arr = ptr[0 .. num];
203 	foreach (ref el; arr)
204 	{
205 		memcpy(&el, &init, T.sizeof);
206 	}
207 	return arr;
208 }
209 
210 void disposeVar(T)(T* var)
211 {
212 	free(var);
213 }
214 
215 void disposeArray(T)(T[] var)
216 {
217 	free(var.ptr);
218 }
219 //////////////////////////////////////////////
220 //////////////////// Timer ///////////////////
221 //////////////////////////////////////////////
222 
223 version (WebAssembly)
224 {
225 	alias int time_t;
226 	alias int clockid_t;
227 	enum CLOCK_REALTIME = 0;
228 
229 	struct timespec
230 	{
231 		time_t  tv_sec;
232 		int     tv_nsec;
233 	}
234 
235 	extern(C) int clock_gettime(clockid_t, timespec*) @nogc nothrow @system;
236 
237 	extern(C) double emscripten_get_now() @nogc nothrow @system;
238 
239 }
240 
241 /// High precison timer
242 long useconds()
243 {
244 	version (WebAssembly)
245 	{
246 		return cast(long)(emscripten_get_now() * 1000.0); 
247 	}
248 	else version (Posix)
249 	{
250 		import core.sys.posix.sys.time : gettimeofday, timeval;
251 
252 		timeval t;
253 		gettimeofday(&t, null);
254 		return t.tv_sec * 1_000_000 + t.tv_usec;
255 	}
256 	else version (Windows)
257 	{
258 		//TODO: implement timer on windows
259 		/*import core.sys.windows.windows : QueryPerformanceFrequency;
260 
261 		__gshared double mul = -1;
262 		if (mul < 0)
263 		{
264 			long frequency;
265 			int ok = QueryPerformanceFrequency(&frequency);
266 			assert(ok);
267 			mul = 1_000_000.0 / frequency;
268 		}
269 		long ticks;
270 		int ok = QueryPerformanceCounter(&ticks);
271 		assert(ok);
272 		return cast(long)(ticks * mul);*/
273 		return 0;
274 	}
275 	else
276 	{
277 		static assert("OS not supported.");
278 	}
279 }
280 
281 //////////////////////////////////////////////
282 //////////////////// Pause ///////////////////
283 //////////////////////////////////////////////
284 
285 void instructionPause()
286 {
287 	version (X86_64)
288 	{
289 		version (LDC)
290 		{
291 			import ldc.gccbuiltins_x86 : __builtin_ia32_pause;
292 
293 			__builtin_ia32_pause();
294 		}
295 		else version(GNU)
296 		{
297 			import gcc.builtins;
298 
299 			__builtin_ia32_pause();
300 		}
301 		else version (DigitalMars)
302 		{
303 			asm
304 			{
305 				rep;
306 				nop;
307 			}
308 		}
309 		else
310 		{
311 			static assert(0);
312 		}
313 	}
314 	else version (Android)
315 	{
316 		version(LDC)
317 		{
318 			import ldc.attributes;
319 			@optStrategy("none")
320 			static void nop()
321 			{
322 				int i;
323 				i++;
324 			}
325 			nop();
326 		}
327 		else static assert(0);
328 	}
329 	else version(WebAssembly)
330 	{
331 		version(LDC)
332 		{
333 			import ldc.attributes;
334 			@optStrategy("none")
335 			static void nop()
336 			{
337 				int i;
338 				i++;
339 }
340 			nop();
341 		}
342 		else static assert(0);
343 	}
344 	else static assert(0);
345 }
346 
347 //////////////////////////////////////////////
348 ///////////// Semaphore + Thread /////////////
349 //////////////////////////////////////////////
350 
351 version (MM_USE_POSIX_THREADS)
352 {
353 	version (WebAssembly)
354 	{
355 		extern(C):
356 		
357 		struct pthread_attr_t 
358 		{
359 			union 
360 			{
361 				int[10] __i;
362 				uint[10] __s;
363 		}
364 		}
365 	
366 		struct pthread_t
367 		{
368 			void* p;
369 			uint x;
370 		}
371 
372 		// pthread
373 		int pthread_create(pthread_t*, in pthread_attr_t*, void* function(void*), void*);
374 		int pthread_join(pthread_t, void**);
375 		void pthread_exit(void *retval);
376 
377 		struct sem_t
378 		{
379 			shared int[4] __val;
380 		}
381 		int sem_init(sem_t*, int, uint);
382 		int sem_wait(sem_t*);
383 		int sem_trywait(sem_t*);
384 		int sem_post(sem_t*);
385 		int sem_destroy(sem_t*);
386 		int sem_timedwait(sem_t* sem, const timespec* abstime);
387 	}
388 	else version (Posix)
389 	{
390 		import core.sys.posix.pthread;
391 		import core.sys.posix.semaphore;
392 	}
393 	else version (Windows)
394 	{
395 	extern (C):
396 		alias uint time_t;
397 		struct pthread_attr_t 
398 		{
399 
400 		}
401 	
402 		struct pthread_t
403 		{
404 			void* p;
405 			uint x;
406 		}
407 
408 		struct timespec
409 		{
410 			time_t tv_sec;
411 			int tv_nsec;
412 		}
413 
414 		// pthread
415 		int pthread_create(pthread_t*, in pthread_attr_t*, void* function(void*), void*);
416 		int pthread_join(pthread_t, void**);
417 		void pthread_exit(void *retval);
418 
419 		// semaphore.h
420 		alias sem_t = void*;
421 		int sem_init(sem_t*, int, uint);
422 		int sem_wait(sem_t*);
423 		int sem_trywait(sem_t*);
424 		int sem_post(sem_t*);
425 		int sem_destroy(sem_t*);
426 		int sem_timedwait(sem_t* sem, const timespec* abstime);
427 	}
428 	else
429 	{
430 		static assert(false);
431 	}
432 
433 	struct Semaphore
434 	{
435 		sem_t mutex;
436 
437 		void initialize()
438 		{
439 			sem_init(&mutex, 0, 0);
440 		}
441 
442 		void wait()
443 		{
444 			int ret = sem_wait(&mutex);
445 			assert(ret == 0);
446 		}
447 
448 		bool tryWait()
449 		{
450 			int ret = sem_trywait(&mutex);
451 			return (ret == 0);
452 		}
453 
454 		bool timedWait(int usecs)
455 		{
456 			timespec tv;
457 			// if there is no such a function look at it: https://stackoverflow.com/questions/5404277/porting-clock-gettime-to-windows
458 			clock_gettime(CLOCK_REALTIME, &tv);
459 			tv.tv_sec += usecs / 1_000_000;
460 			tv.tv_nsec += (usecs % 1_000_000) * 1_000;
461 
462 			int ret = sem_timedwait(&mutex, &tv);
463 			return (ret == 0);
464 		}
465 
466 		void post()
467 		{
468 			int ret = sem_post(&mutex);
469 			assert(ret >= 0);
470 		}
471 
472 		void destroy()
473 		{
474 			sem_destroy(&mutex);
475 		}
476 	}
477 
478 	private extern (C) void* threadRunFunc(void* threadVoid)
479 	{
480 		Thread* th = cast(Thread*) threadVoid;
481 
482 		th.threadStart();
483 
484 		pthread_exit(null);
485 		return null;
486 	}
487 
488 	struct Thread
489 	{
490 		alias DG = void delegate();
491 
492 		DG threadStart;
493 		pthread_t handle;
494 
495 		void start(DG dg)
496 		{
497 			threadStart = dg;
498 			int err = pthread_create(&handle, null, &threadRunFunc, cast(void*)&this);
499 			if(err)handle = pthread_t();
500 			//assert(ok == 0);
501 		}
502 
503 		void join()
504 		{
505 			pthread_join(handle, null);
506 			handle = handle.init;
507 			threadStart = null;
508 		}
509 	}
510 }
511 else version(D_BetterC)
512 {
513 	version(Windows)
514 	{
515 		import core.stdc.stdint : uintptr_t;
516 		import core.sys.windows.windows;
517 		extern (Windows) alias btex_fptr = uint function(void*);
518 		extern (C) uintptr_t _beginthreadex(void*, uint, btex_fptr, void*, uint, uint*) nothrow @nogc;
519 
520 		struct Semaphore
521 		{
522 			HANDLE handle;
523 
524 			void initialize()
525 			{
526 				handle = CreateSemaphoreA( null, 0, int.max, null );
527             	assert ( handle != handle.init );
528                 //throw new SyncError( "Unable to create semaphore" );
529 			}
530 
531 			void wait()
532 			{
533 				DWORD rc = WaitForSingleObject( handle, INFINITE );
534 				//int ret = sem_wait(&mutex);
535 				assert(rc == WAIT_OBJECT_0);
536 			}
537 
538 			bool tryWait()
539 			{
540 				switch ( WaitForSingleObject( handle, 0 ) )
541 				{
542 				case WAIT_OBJECT_0:
543 					return true;
544 				case WAIT_TIMEOUT:
545 					return false;
546 				default:
547 					assert(0);//throw new SyncError( "Unable to wait for semaphore" );
548 				}
549 			}
550 
551 			bool timedWait(int usecs)
552 			{
553 				/*timespec tv;
554 				// if there is no such a function look at it: https://stackoverflow.com/questions/5404277/porting-clock-gettime-to-windows
555 				clock_gettime(CLOCK_REALTIME, &tv);
556 				tv.tv_sec += usecs / 1_000_000;
557 				tv.tv_nsec += (usecs % 1_000_000) * 1_000;
558 
559 				int ret = sem_timedwait(&mutex, &tv);
560 				return (ret == 0);*/
561 
562 				switch ( WaitForSingleObject( handle, cast(uint) usecs / 1000 ) )
563 				{
564 				case WAIT_OBJECT_0:
565 					return true;
566 				case WAIT_TIMEOUT:
567 					return false;
568 				default:
569 					assert(0, "Unable to wait for semaphore" );
570 				}
571 			}
572 
573 			void post()
574 			{
575 				assert(ReleaseSemaphore( handle, 1, null ));
576 			}
577 
578 			void destroy()
579 			{
580 				BOOL rc = CloseHandle( handle );
581             	assert( rc, "Unable to destroy semaphore" );
582 			}
583 		}
584 
585 		private extern (Windows) uint threadRunFunc(void* threadVoid)
586 		{
587 			Thread* th = cast(Thread*) threadVoid;
588 
589 			th.threadStart();
590 
591 			ExitThread(0);
592 			return 0;
593 		}
594 
595 		struct Thread
596 		{
597 			alias DG = void delegate();
598 
599 			DG threadStart;
600 			HANDLE handle;
601 
602 			void start(DG dg)
603 			{
604 				threadStart = dg;
605 				handle = cast(HANDLE) _beginthreadex( null, 0, &threadRunFunc, cast(void*)&this, 0, null );
606 			}
607 
608 			void join()
609 			{
610 				if ( WaitForSingleObject( handle, INFINITE ) == WAIT_OBJECT_0 )assert(0);
611 				CloseHandle( handle );
612 
613 				handle = handle.init;
614 				threadStart = null;
615 			}
616 		}
617 	}
618 	else 
619 	{
620 		static assert(0, "Platform is unsupported in betterC mode!");
621 	}
622 }
623 else
624 {
625 	import core.thread : D_Thread = Thread;
626 	import core.sync.semaphore : D_Semaphore = Semaphore;
627 	import core.time : dur;
628 	import std.experimental.allocator;
629 	import std.experimental.allocator.mallocator;
630 
631 	struct Semaphore
632 	{
633 		D_Semaphore sem;
634 
635 		void initialize()
636 		{
637 			sem = Mallocator.instance.make!D_Semaphore();
638 		}
639 
640 		void wait()
641 		{
642 			sem.wait();
643 		}
644 
645 		bool tryWait()
646 		{
647 			return sem.tryWait();
648 		}
649 
650 		bool timedWait(int usecs)
651 		{
652 			return sem.wait(dur!"usecs"(usecs));
653 		}
654 
655 		void post()
656 		{
657 			sem.notify();
658 		}
659 
660 		void destroy()
661 		{
662 			Mallocator.instance.dispose(sem);
663 		}
664 	}
665 
666 	struct Thread
667 	{
668 		alias DG = void delegate();
669 
670 		DG threadStart;
671 		D_Thread thread;
672 
673 		void start(DG dg)
674 		{
675 			thread = Mallocator.instance.make!D_Thread(dg);
676 			thread.start();
677 		}
678 
679 		void join()
680 		{
681 			thread.join();
682 		}
683 	}
684 }
685 
686 //////////////////////////////////////////////
687 ///////////////// ThreadPool /////////////////
688 //////////////////////////////////////////////
689 
690 private enum gMaxThreadsNum = 64;
691 
692 alias JobDelegate = void delegate(ThreadData*, JobData*);
693 
694 // Structure to store job start and end time
695 struct JobLog
696 {
697 	string name; /// Name of job
698 	ulong time; /// Time started (us)
699 	ulong duration; /// Took time (us)
700 }
701 
702 /// First in first out queue with atomic lock
703 struct JobQueue
704 {
705 	alias LockType = int;
706 	align(64) shared LockType lock; /// Lock for accesing list of Jobs
707 	align(64) JobData* first; /// Fist element in list of Jobs
708 
709 	/// Check if empty without locking, doesn't give guarantee that list is truly empty
710 	bool emptyRaw()
711 	{
712 		bool isEmpty = first == null;
713 		return isEmpty;
714 	}
715 
716 	/// Check if empty
717 	bool empty()
718 	{
719 		while (!cas(&lock, cast(LockType) false, cast(LockType) true))
720 			instructionPause();
721 
722 		bool isEmpty = first == null;
723 		atomicStore!(MemoryOrder.rel)(lock, cast(LockType) false);
724 		return isEmpty;
725 	}
726 
727 	/// Add job to queue
728 	void add(JobData* t)
729 	{
730 		while (!cas(&lock, cast(LockType) false, cast(LockType) true))
731 			instructionPause();
732 
733 		t.next = first;
734 		first = t;
735 
736 		atomicStore!(MemoryOrder.rel)(lock, cast(LockType) false);
737 	}
738 
739 	/// Add range of jobs to queue
740 	void addRange(Range)(Range arr)
741 	{
742 		if (arr.length == 0)
743 			return;
744 
745 		JobData* start = arr[0];
746 		JobData* last = start;
747 
748 		foreach (t; arr[1 .. $])
749 		{
750 			last.next = t;
751 			last = t;
752 		}
753 
754 		while (!cas(&lock, cast(LockType) false, cast(LockType) true))
755 			instructionPause();
756 		last.next = first;
757 		first = start;
758 		atomicStore!(MemoryOrder.rel)(lock, cast(LockType) false);
759 	}
760 
761 	/// Pop job from queue
762 	JobData* pop()
763 	{
764 		while (!cas(&lock, cast(LockType) false, cast(LockType) true))
765 			instructionPause();
766 
767 		if (first == null)
768 		{
769 			atomicStore!(MemoryOrder.rel)(lock, cast(LockType) false);
770 			return null;
771 		}
772 
773 		JobData* result = first;
774 		first = first.next;
775 
776 		atomicStore!(MemoryOrder.rel)(lock, cast(LockType) false);
777 		return result;
778 	}
779 
780 }
781 
782 /// Structure containing job data
783 /// JobData memory is allocated by user
784 /// JobData lifetime is managed by user
785 /// JobData has to live as long as it's group or end of job execution
786 /// JobData fields can be changed in del delegate and job can be added to thread pool again, to continue execution (call same function again or another if del was changed)
787 struct JobData
788 {
789 	JobDelegate del; /// Delegate to execute
790 	string name; /// Name of job
791 	private JobsGroup* group; /// Group to which this job belongs
792 	private align(64) JobData* next; /// JobData makes a list of jobs to be done by thread
793 }
794 
795 /// Structure responsible for thread in thread pool
796 /// Stores jobs to be executed by this thread (jobs can be stolen by another thread)
797 /// Stores cache for logs
798 struct ThreadData
799 {
800 public:
801 	ThreadPool* threadPool; /// Pool this thread belongs to
802 	int threadId; /// Thread id. Valid only for this thread pool
803 
804 	/// Function starting execution of thread main loop
805 	/// External threads can call this function to start executing jobs
806 	void threadStartFunc()
807 	{
808 		//end = false;
809 		threadFunc(&this);
810 	}
811 
812 private:
813 	JobQueue jobsQueue; /// Queue of jobs to be done, jobs can be stolen from another thread
814 	JobQueue jobsExclusiveQueue; /// Queue of jobs to be done, jobs can't be stolen
815 	align(64) Semaphore semaphore; /// Semaphore to wake/sleep this thread
816 	align(64) Thread thread; /// Systemn thread handle
817 	JobLog[] logs; /// Logs cache
818 	int lastLogIndex = -1; /// Last created log index
819 	int jobsDoneCount;
820 
821 	shared bool end; /// Check if thread has to exit. Thread will exit only if end is true and jobsToDo is empty
822 	shared bool acceptJobs; /// Check if thread should accept new jobs, If false thread won't steal jobs from other threads and will sleep longer if queue js empty
823 	bool externalThread; /// Thread not allocated by thread pool
824 
825 }
826 
827 /// Thread Pool
828 /// Manages bounch of threads to execute given jobs as quickly as possible
829 /// There are no priorities beetween jobs. Jobs added to queues in same order as they are in slices, but due to job stealing and uneven speed of execution beetween threads jobs execution order is unspecified.
830 /// Number of threads executing jobs can be dynamically changed in any time. Threads removed from execution will work until the end of the program but shouldn't accept new jobs.
831 struct ThreadPool
832 {
833 	alias FlushLogsDelegaste = void delegate(ThreadData* threadData, JobLog[] logs); /// Type of delegate to flush logs
834 	FlushLogsDelegaste onFlushLogs; /// User custom delegate to flush logs, if overriden defaultFlushLogs will be used. Can be sset after initialize() call
835 	int logsCacheNum; /// Number of log cache entries. Should be set before setThreadsNum is called
836 	int tryWaitCount = 2000; ///Number of times which tryWait are called before timedWait call. Higher value sets better response but takes CPU time even if there are no jobs.
837 private:
838 	ThreadData*[gMaxThreadsNum] threadsData; /// Data for threads
839 	align(64) shared int threadsNum; /// Number of threads currentlu accepting jobs
840 	align(64) shared bool threadsDataLock; /// Any modification of threadsData array (change in size or pointer modification) had to be locked
841 	align(64) shared int threadSelector; /// Index of thread to which add next job
842 	FILE* logFile; /// File handle for defaultFlushLogs log file
843 	JobData[4] resumeJobs; /// Dummu jobs to resume some thread
844 
845 public:
846 
847 	static int getCPUCoresCount()
848 	{
849 		version(Windows)
850 		{
851 			import core.sys.windows.winbase : SYSTEM_INFO, GetSystemInfo;
852 			SYSTEM_INFO sysinfo;
853 			GetSystemInfo(&sysinfo);
854 			return sysinfo.dwNumberOfProcessors;
855 		}
856 		else version (linux)
857 		{
858 			version(D_BetterC)
859 			{
860 				import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
861 				return cast(int)sysconf(_SC_NPROCESSORS_ONLN);
862 			}
863 			else
864 			{
865 				import core.sys.linux.sched : CPU_COUNT, cpu_set_t, sched_getaffinity;
866 				import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
867 
868 				cpu_set_t set = void;
869 				if (sched_getaffinity(0, cpu_set_t.sizeof, &set) == 0)
870 				{
871 					int count = CPU_COUNT(&set);
872 					if (count > 0)
873 						return cast(uint) count;
874 				}
875 				return cast(int)sysconf(_SC_NPROCESSORS_ONLN);
876 			}
877 		}
878 		else version(Posix)
879 		{
880 			import core.sys.posix.unistd;
881     		return cast(int)sysconf(_SC_NPROCESSORS_ONLN);
882 		}
883 		else return -1;
884 	}
885 
886 	int jobsDoneCount()
887 	{
888 		int sum;
889 		foreach (i, ref ThreadData* th; threadsData)
890 		{
891 			if (th is null)
892 				continue;
893 
894 			sum += th.jobsDoneCount;
895 		}
896 		return sum;
897 	}
898 
899 	void jobsDoneCountReset()
900 	{
901 		foreach (i, ref ThreadData* th; threadsData)
902 		{
903 			if (th is null)
904 				continue;
905 			th.jobsDoneCount = 0;
906 		}
907 	}
908 	/// Initialize thread pool
909 	void initialize()
910 	{
911 
912 		foreach (ref JobData j; resumeJobs)
913 			j = JobData(&dummyJob, "Dummy-Resume");
914 
915 		version (MM_NO_LOGS)
916 		{
917 			logsCacheNum = 0;
918 		}
919 		else
920 		{
921 			onFlushLogs = &defaultFlushLogs;
922 			logsCacheNum = 1024;
923 
924 			logFile = fopen("trace.json", "w");
925 			fprintf(logFile, "[");
926 			fclose(logFile);
927 			logFile = fopen("trace.json", "a");
928 			assert(logFile !is null);
929 		}
930 	}
931 
932 	/// Clean ups ThreadPool
933 	~this()
934 	{
935 		version (MM_NO_LOGS)
936 		{
937 
938 		}
939 		else if (logFile)
940 		{
941 			fclose(logFile);
942 			logFile = null;
943 		}
944 		
945 
946 	}
947 
948 	/// Registers external thread to thread pool array. There will be allocated data for this thread and it will have specified id
949 	/// External threads are not joined at the end of thread pool execution
950 	/// Returns ThreadData corresponding to external thread. To acually start executing, external thread had to call threadStartFunc() from returned variable
951 	ThreadData* registerExternalThread()
952 	{
953 		lockThreadsData();
954 		//scope (exit)
955 			
956 
957 		ThreadData* threadData = makeThreadData();
958 		threadData.threadPool = &this;
959 		threadData.semaphore.initialize();
960 		threadData.externalThread = true;
961 		atomicStore(threadData.acceptJobs, true);
962 		//threadData.acceptJobs = true;
963 
964 		int threadNum = atomicOp!"+="(threadsNum, 1) - 1;
965 
966 		threadData.threadId = threadNum;
967 
968 		threadsData[threadNum] = threadData;
969 
970 		unlockThreadsData();
971 
972 		return threadData;
973 	}
974 
975 	/// Unregisters external thread. Can be called only when external thread have left the thread pool
976 	void unregistExternalThread(ThreadData* threadData)
977 	{
978 		lockThreadsData();
979 		//scope (exit)
980 		//	unlockThreadsData();
981 
982 		disposeThreadData(threadData);
983 		unlockThreadsData();
984 	}
985 
986 	/// Allows external threads to return from threadStartFunc
987 	void releaseExternalThreads()
988 	{
989 		lockThreadsData();
990 		//scope (exit)
991 		//	unlockThreadsData();
992 
993 		// Release external threads (including main thread)
994 		foreach (i, ref ThreadData* th; threadsData)
995 		{
996 			if (th is null)
997 				continue;
998 			if (!th.externalThread)
999 				continue;
1000 
1001 			auto rng = resumeJobs[].map!((ref a) => &a);
1002 			addJobsRange(rng, cast(int) i);
1003 			atomicStore(th.end, true);
1004 		}
1005 		unlockThreadsData();
1006 	}
1007 
1008 	/// Waits for all threads to finish and joins them (excluding external threads)
1009 	void waitThreads()
1010 	{
1011 		lockThreadsData();
1012 		//scope (exit)
1013 		//	unlockThreadsData();
1014 		foreach (i, ref ThreadData* th; threadsData)
1015 		{
1016 			if (th is null)
1017 				continue;
1018 
1019 			atomicStore(th.acceptJobs, false);
1020 			atomicStore(th.end, true);
1021 		}
1022 		foreach (i, ref ThreadData* th; threadsData)
1023 		{
1024 			if (th is null || th.externalThread)
1025 				continue;
1026 
1027 			th.thread.join();
1028 			disposeThreadData(th);
1029 		}
1030 		unlockThreadsData();
1031 	}
1032 
1033 	/// Sets number of threads to accept new jobs
1034 	/// If there were never so much threads created, they will be created
1035 	/// If number of threads set is smaller than there was threads before, they are not joined but they stop getting new jobs, they stop stealing jobs and they sleep longer
1036 	/// Locking operation
1037 	void setThreadsNum(int num)
1038 	{
1039 		assert(num <= gMaxThreadsNum);
1040 		assert(num > 0);
1041 
1042 		lockThreadsData();
1043 		//scope (exit)
1044 		//	unlockThreadsData();
1045 
1046 		foreach (i, ref ThreadData* th; threadsData)
1047 		{
1048 			if (th)
1049 			{
1050 				// Exists but has to be disabled
1051 				atomicStore(th.acceptJobs, i < num);
1052 				continue;
1053 			}
1054 			else if (i >= num)
1055 			{
1056 				// Doesn't exist and is not required
1057 				continue;
1058 			}
1059 			// Doesn't exist and is required
1060 			th = makeThreadData();
1061 			th.threadPool = &this;
1062 			th.threadId = cast(int) i;
1063 			atomicStore(th.acceptJobs, true);
1064 			//th.acceptJobs = true;
1065 			th.semaphore.initialize();
1066 
1067 			th.thread.start(&th.threadStartFunc);
1068 		}
1069 
1070 		atomicStore(threadsNum, num);
1071 		unlockThreadsData();
1072 	}
1073 
1074 	/// Adds job to be executed by thread pool, such a job won't be synchronized with any group or job
1075 	/// If threadNum is different than -1 only thread with threadNum will be able to execute given job
1076 	/// It is advised to use synchronized group of jobs
1077 	void addJobAsynchronous(JobData* data, int threadNum = -1)
1078 	{
1079 		if (threadNum == -1)
1080 		{
1081 			ThreadData* threadData = getThreadDataToAddJobTo();
1082 			threadData.jobsQueue.add(data);
1083 			threadData.semaphore.post();
1084 			return;
1085 		}
1086 		ThreadData* threadData = threadsData[threadNum];
1087 		assert(threadData !is null);
1088 		threadData.jobsExclusiveQueue.add(data);
1089 		threadData.semaphore.post();
1090 	}
1091 
1092 	/// Adds job to be executed by thread pool, group specified in group data won't be finished until this job ends
1093 	/// If threadNum is different than -1 only thread with threadNum will be able to execute given job
1094 	void addJob(JobData* data, int threadNum = -1)
1095 	{
1096 		assert(data.group);
1097 		atomicOp!"+="(data.group.jobsToBeDoneCount, 1);
1098 		addJobAsynchronous(data, threadNum);
1099 	}
1100 
1101 	/// Adds multiple jobs at once
1102 	/// Range has to return JobData*
1103 	/// Range has to have length property
1104 	/// Range is used so there is no need to allocate JobData*[]
1105 	/// All jobs has to belong to one group
1106 	/// If threadNum is different than -1 only thread with threadNum will be able to execute given jobs
1107 	void addJobsRange(Range)(Range rng, int threadNum = -1)
1108 	{
1109 		if (threadNum != -1)
1110 		{
1111 			ThreadData* threadData = threadsData[threadNum];
1112 			assert(threadData !is null);
1113 			threadData.jobsExclusiveQueue.addRange(rng);
1114 			foreach (sInc; 0 .. rng.length)
1115 				threadData.semaphore.post();
1116 
1117 			return;
1118 		}
1119 
1120 		if (rng.length == 0)
1121 		{
1122 			return;
1123 		}
1124 
1125 		foreach (JobData* threadData; rng)
1126 		{
1127 			assert(rng[0].group == threadData.group);
1128 		}
1129 		
1130 		atomicOp!"+="(rng[0].group.jobsToBeDoneCount, cast(int) rng.length);
1131 		int threadsNumLocal = atomicLoad(threadsNum);
1132 		int part = cast(int) rng.length / threadsNumLocal;
1133 		if (part > 0)
1134 		{
1135 			foreach (i, ThreadData* threadData; threadsData[0 .. threadsNumLocal])
1136 			{
1137 				auto slice = rng[i * part .. (i + 1) * part];
1138 				threadData.jobsQueue.addRange(slice);
1139 
1140 				foreach (sInc; 0 .. part)
1141 					threadData.semaphore.post();
1142 
1143 			}
1144 			rng = rng[part * threadsNumLocal .. $];
1145 		}
1146 		foreach (i, ThreadData* threadData; threadsData[0 .. rng.length])
1147 		{
1148 			threadData.jobsQueue.add(rng[i]);
1149 			threadData.semaphore.post();
1150 		}
1151 
1152 	}
1153 
1154 	/// Adds group of jobs to threadPool, group won't be synchronized
1155 	void addGroupAsynchronous(JobsGroup* group)
1156 	{
1157 		group.thPool = &this;
1158 		
1159 		if (group.jobs.length == 0)
1160 		{
1161 			// Immediately call group end
1162 			group.onGroupFinish();
1163 			return;
1164 		}
1165 		group.setUpJobs();
1166 		auto rng = group.jobs[].map!((ref a) => &a);
1167 		addJobsRange(rng, group.executeOnThreadNum);
1168 	}
1169 
1170 	/// Adds group of jobs to threadPool
1171 	/// Spwaning group will finish after this group have finished
1172 	void addGroup(JobsGroup* group, JobsGroup* spawnedByGroup)
1173 	{
1174 		assert(spawnedByGroup);
1175 		group.spawnedByGroup = spawnedByGroup;
1176 		atomicOp!"+="(spawnedByGroup.jobsToBeDoneCount, 1); // Increase by one, so 'spawning group' will wait for 'newly added group' to finish
1177 		addGroupAsynchronous(group); // Synchronized by jobsToBeDoneCount atomic variable
1178 	}
1179 
1180 	/// Explicitly calls onFlushLogs on all threads
1181 	void flushAllLogs()
1182 	{
1183 		lockThreadsData();
1184 		//scope (exit)
1185 		//	unlockThreadsData();
1186 		foreach (thNum; 0 .. atomicLoad(threadsNum))
1187 		{
1188 			ThreadData* th = threadsData[thNum];
1189 			onThreadFlushLogs(th);
1190 		}
1191 
1192 		foreach (i, ref ThreadData* th; threadsData)
1193 		{
1194 			if (th is null)
1195 				continue;
1196 
1197 			onThreadFlushLogs(th);
1198 		}
1199 		unlockThreadsData();
1200 	}
1201 
1202 	/// Default implementation of flushing logs
1203 	/// Saves logs to trace.json file in format acceptable by Google Chrome tracking tool chrome://tracing/ 
1204 	/// Logs can be watched even if apllication crashed, but might require removing last log entry from trace.json
1205 	void defaultFlushLogs(ThreadData* threadData, JobLog[] logs)
1206 	{
1207 		version (MM_NO_LOGS)
1208 		{
1209 		}
1210 		else
1211 		{
1212 			// (log rows num) * (static json length * time length * duration length)
1213 			long start = useconds();
1214 			size_t size = (logs.length + 1) * (128 + 20 + 20);
1215 			size_t used = 0;
1216 
1217 			foreach (ref log; logs)
1218 			{
1219 				size += log.name.length + 1; // size of name
1220 			}
1221 
1222 			char* buffer = cast(char*) malloc(size);
1223 
1224 			foreach (ref log; logs)
1225 			{
1226 				char[100] name_buffer;
1227 				name_buffer[0 .. log.name.length] = log.name;
1228 				name_buffer[log.name.length] = 0;
1229 				size_t charWritten = snprintf(buffer + used, size - used,
1230 						`{"name":"%s", "pid":1, "tid":%lld, "ph":"X", "ts":%lld,  "dur":%lld }, %s`,
1231 						name_buffer.ptr, threadData.threadId + 1, log.time, log.duration, "\n".ptr);
1232 				used += charWritten;
1233 			}
1234 
1235 			long end = useconds();
1236 			size_t charWritten = snprintf(buffer + used, size - used,
1237 					`{"name":"logFlush", "pid":1, "tid":%lld, "ph":"X", "ts":%lld,  "dur":%lld }, %s`,
1238 					threadData.threadId + 1, start, end - start, "\n".ptr);
1239 			used += charWritten;
1240 			fwrite(buffer, 1, used, logFile);
1241 		}
1242 	}
1243 
1244 private:
1245 	/// Atomic lock
1246 	void lockThreadsData()
1247 	{
1248 		// Only one thread at a time can change threads number in a threadpool
1249 		while (!cas(&threadsDataLock, false, true))
1250 		{
1251 		}
1252 	}
1253 
1254 	/// Atomic unlock
1255 	void unlockThreadsData()
1256 	{
1257 		atomicStore(threadsDataLock, false);
1258 	}
1259 
1260 	/// Allocate ThreadData
1261 	ThreadData* makeThreadData()
1262 	{
1263 		ThreadData* threadData = makeVar!ThreadData();
1264 		threadData.logs = makeVarArray!(JobLog)(logsCacheNum);
1265 		return threadData;
1266 	}
1267 
1268 	/// Dispose ThreadData
1269 	void disposeThreadData(ThreadData* threadData)
1270 	{
1271 		disposeArray(threadData.logs);
1272 		return disposeVar(threadData);
1273 	}
1274 
1275 	/// Get thread most suiting to add job to
1276 	ThreadData* getThreadDataToAddJobTo()
1277 	{
1278 		int threadNum = atomicOp!"+="(threadSelector, 1);
1279 
1280 		foreach (i; 0 .. 1_000)
1281 		{
1282 			if (threadNum >= atomicLoad(threadsNum))
1283 			{
1284 				threadNum = 0;
1285 				atomicStore(threadSelector, 0);
1286 			}
1287 			ThreadData* threadData = threadsData[threadNum];
1288 			if (threadData != null)
1289 			{
1290 				return threadData;
1291 			}
1292 			threadNum++;
1293 		}
1294 		assert(0);
1295 	}
1296 
1297 	/// Create log on start of job
1298 	void onStartJob(JobData* data, ThreadData* threadData)
1299 	{
1300 
1301 		threadData.jobsDoneCount++;
1302 		version (MM_NO_LOGS)
1303 		{
1304 		}
1305 		else
1306 		{
1307 			if (cast(int) threadData.logs.length <= 0)
1308 			{
1309 				return;
1310 			}
1311 			if (threadData.lastLogIndex >= cast(int) threadData.logs.length - 1)
1312 			{
1313 				onThreadFlushLogs(threadData);
1314 			}
1315 
1316 			threadData.lastLogIndex++;
1317 
1318 			JobLog log;
1319 			log.name = data.name;
1320 			log.time = useconds();
1321 			threadData.logs[threadData.lastLogIndex] = log;
1322 		}
1323 	}
1324 
1325 	/// Set log finish time on end of job
1326 	void onEndJob(JobData* data, ThreadData* threadData)
1327 	{
1328 		version (MM_NO_LOGS)
1329 		{
1330 		}
1331 		else
1332 		{
1333 			if (cast(int) threadData.logs.length <= 0)
1334 			{
1335 				return;
1336 			}
1337 			assert(threadData.lastLogIndex < threadData.logs.length);
1338 			JobLog* log = &threadData.logs[threadData.lastLogIndex];
1339 			log.duration = useconds() - log.time;
1340 		}
1341 	}
1342 
1343 	/// Flush logs
1344 	void onThreadFlushLogs(ThreadData* threadData)
1345 	{
1346 		/*scope (exit)
1347 		{
1348 			threadData.lastLogIndex = -1;
1349 		}*/
1350 
1351 		assert(threadData);
1352 
1353 		if (threadData.lastLogIndex < 0 || onFlushLogs is null)
1354 		{
1355 			return;
1356 		}
1357 
1358 		onFlushLogs(threadData, threadData.logs[0 .. threadData.lastLogIndex + 1]);
1359 
1360 		threadData.lastLogIndex = -1;
1361 	}
1362 
1363 	/// Does nothing
1364 	void dummyJob(ThreadData* threadData, JobData* data)
1365 	{
1366 
1367 	}
1368 
1369 	/// Steal job from another thread
1370 	JobData* stealJob(int threadNum)
1371 	{
1372 		foreach (thSteal; 0 .. atomicLoad(threadsNum))
1373 		{
1374 			if (thSteal == threadNum)
1375 				continue; // Do not steal from requesting thread
1376 
1377 			ThreadData* threadData = threadsData[thSteal];
1378 
1379 			if (threadData is null || !threadData.semaphore.tryWait())
1380 				continue;
1381 
1382 			JobData* data = threadData.jobsQueue.pop();
1383 
1384 			if (data is null)
1385 				threadData.semaphore.post();
1386 
1387 			return data;
1388 		}
1389 		return null;
1390 	}
1391 }
1392 
1393 /// Adding groups of jobs is faster and groups can have dependencies between each other
1394 struct JobsGroup
1395 {
1396 public:
1397 	string name; /// Name of group
1398 	JobData[] jobs; /// Jobs to be executed by this group, jobs have to live as long as group lives
1399 	void delegate(JobsGroup* group) onFinish; // Delegate called when group will finish, can be used to free memory
1400 	ThreadPool* thPool; /// Thread pool of this group
1401 	int executeOnThreadNum = -1; /// Thread num to execute jobs on
1402 
1403 	this(string name, JobData[] jobs = [], int executeOnThreadNum = -1)
1404 	{
1405 		this.name = name;
1406 		this.jobs = jobs;
1407 		this.executeOnThreadNum = executeOnThreadNum;
1408 		//jobsToBeDoneCount = 0;
1409 		//dependenciesWaitCount = 0;
1410 		atomicStore(jobsToBeDoneCount,0);
1411 		atomicStore(dependenciesWaitCount,0);
1412 	}
1413 
1414 	~this() nothrow
1415 	{
1416 		free(children.ptr);
1417 		children = null;
1418 	}
1419 
1420 	/// Make this group dependant from another group
1421 	/// Dependant group won't start untill its dependencies will be fulfilled
1422 	void dependantOn(JobsGroup* parent)
1423 	{
1424 		size_t newLen = parent.children.length + 1;
1425 		JobsGroup** ptr = cast(JobsGroup**) realloc(parent.children.ptr,
1426 				newLen * (JobsGroup*).sizeof);
1427 		parent.children = ptr[0 .. newLen];
1428 		parent.children[$ - 1] = &this;
1429 		// parent.children ~= &this;
1430 		atomicOp!"+="(dependenciesWaitCount, 1);
1431 	}
1432 
1433 	/// Returns number of dependencies this group is waiting for
1434 	int getDependenciesWaitCount()
1435 	{
1436 		return atomicLoad(dependenciesWaitCount);
1437 	}
1438 
1439 private:
1440 	JobsGroup* spawnedByGroup; /// Group which spawned this group, if present spwaning group is waiting for this group to finish
1441 	JobsGroup*[] children; /// Groups depending on this group
1442 	align(64) shared int dependenciesWaitCount; /// Count of dependencies this group waits for
1443 	align(64) shared int jobsToBeDoneCount; /// Number of this group jobs still executing
1444 
1445 	/// Checks if depending groups or spawning group have to be started
1446 	/// Executes user onFinish function
1447 	void onGroupFinish()
1448 	{
1449 
1450 		decrementChildrendependencies();
1451 		if (spawnedByGroup)
1452 		{
1453 			auto num = atomicOp!"-="(spawnedByGroup.jobsToBeDoneCount, 1);
1454 			assert(num >= 0);
1455 			if (num == 0)
1456 			{
1457 				spawnedByGroup.onGroupFinish();
1458 			}
1459 		}
1460 		if (onFinish)
1461 			onFinish(&this);
1462 	}
1463 
1464 	/// Check if decrement dependencies counter and start them if theirs dependencies are fulfilled
1465 	void decrementChildrendependencies()
1466 	{
1467 		foreach (JobsGroup* group; children)
1468 		{
1469 			auto num = atomicOp!"-="(group.dependenciesWaitCount, 1);
1470 			assert(num >= 0);
1471 			if (num == 0)
1472 			{
1473 				thPool.addGroupAsynchronous(group); // All dependencies of this group are fulfilled, so is already synchronized
1474 			}
1475 		}
1476 	}
1477 	/// Prepare jobs data for adding to thread pool
1478 	void setUpJobs()
1479 	{
1480 		foreach (i; 0 .. jobs.length)
1481 		{
1482 			jobs[i].group = &this;
1483 		}
1484 	}
1485 
1486 }
1487 
1488 /// Main function executed by thread present in thread pool
1489 /// Executes functions from its own queue
1490 /// If there are no jobs in its queue, steals from another thread
1491 /// If there is nothing to steal, sleeps on its semaphore for a while (stage when cpu is not used)
1492 /// Sleep time is longer for jobs not accepting jobs, they don't exit because it is hard to guarantee that nobody is adding to them some job (thread might exit but job will be added anyway and application will malfunctio
1493 /// Thread end only when it's queue is empty. Jobs shouldn't be added to queue after ThreadPool.waitThreads() call
1494 private void threadFunc(ThreadData* threadData)
1495 {
1496 	ThreadPool* threadPool = threadData.threadPool;
1497 	int threadNum = threadData.threadId;
1498 
1499 	while (!atomicLoad!(MemoryOrder.raw)(threadData.end)
1500 			|| !threadData.jobsQueue.empty() || !threadData.jobsExclusiveQueue.empty())
1501 	{
1502 		JobData* data;
1503 		if (threadData.semaphore.tryWait())
1504 		{
1505 			if (!threadData.jobsExclusiveQueue.emptyRaw())
1506 				data = threadData.jobsExclusiveQueue.pop();
1507 
1508 			if (data is null)
1509 				data = threadData.jobsQueue.pop();
1510 
1511 			if (data is null)
1512 				threadData.semaphore.post();
1513 
1514 			assert(data !is null);
1515 		}
1516 		else
1517 		{
1518 			bool acceptJobs = atomicLoad!(MemoryOrder.raw)(threadData.acceptJobs);
1519 			if (acceptJobs)
1520 			{
1521 				data = threadPool.stealJob(threadNum);
1522 			}
1523 
1524 			if (data is null)
1525 			{
1526 				// Thread does not have own job and can not steal it, so wait for a job 
1527 				int tryWait = 0;
1528 				//bool ok = threadData.semaphore.timedWait(1_000 + !acceptJobs * 10_000);
1529 				bool ok = true;
1530 				while(!threadData.semaphore.tryWait())
1531 				{
1532 					tryWait++;
1533 					if(tryWait>threadPool.tryWaitCount)
1534 					{
1535 						ok = false;
1536 						break;
1537 					}
1538 					static foreach(i;0..10)instructionPause();
1539 				}
1540 				if(!ok)ok = threadData.semaphore.timedWait(1_000 + !acceptJobs * 10_000);
1541 				
1542 				if (ok)
1543 				{
1544 
1545 					if (!threadData.jobsExclusiveQueue.emptyRaw())
1546 						data = threadData.jobsExclusiveQueue.pop();
1547 
1548 					if (data is null)
1549 						data = threadData.jobsQueue.pop();
1550 
1551 					if (data is null)
1552 						threadData.semaphore.post();
1553 				}
1554 			}
1555 		}
1556 
1557 		// Nothing to do
1558 		if (data is null)
1559 		{
1560 			continue;
1561 		}
1562 
1563 		// Do the job
1564 		threadPool.onStartJob(data, threadData);
1565 		data.del(threadData, data);
1566 		threadPool.onEndJob(data, threadData);
1567 		if (data.group)
1568 		{
1569 			auto num = atomicOp!"-="(data.group.jobsToBeDoneCount, 1);
1570 			if (num == 0)
1571 			{
1572 				data.group.onGroupFinish();
1573 			}
1574 		}
1575 
1576 	}
1577 	//threadData.end = false;
1578 	atomicStore(threadData.end, false);
1579 	assert(threadData.jobsQueue.empty());
1580 }
1581 
1582 //////////////////////////////////////////////
1583 //////////////////// Test ////////////////////
1584 //////////////////////////////////////////////
1585 /*
1586 void testThreadPool()
1587 {
1588 	enum jobsNum = 1024 * 4;
1589 
1590 	ThreadPool thPool;
1591 	thPool.initialize();
1592 
1593 	ThreadData* mainThread = thPool.registerExternalThread(); // Register main thread as thread 0
1594 	JobData startFrameJobData; // Variable to store job starting the TestApp
1595 
1596 	JobData[jobsNum] frameJobs; // Array to store jobs created in TestApp.continueFrameInOtherJob
1597 	shared int frameNum; // Simulate game loop, run jobs for few frames and exit
1598 
1599 	// App starts as one job &startFrame
1600 	// Then using own JobData memory spawns 'continueFrameInOtherJob' job
1601 	// 'continueFrameInOtherJob' job fills frameJobs array with &importantTask jobs. Group created to run this jobs is freed using JobsGroup.onFinish delegate
1602 	// 'importantTask' allocate new jobs (&importantTaskSubTask) and group, they all deallocated using JobsGroup.onFinish delegate
1603 	// 'continueFrameInOtherJob' waits for all 'importantTask'.  All 'importantTask' wait for all 'importantTaskSubTask'. 
1604 	// So after all 'importantTaskSubTask' and 'importantTask' are done 'continueFrameInOtherJob' ends and spawns &finishFrame
1605 	// 'finishFrame' spawn new frame or exits application
1606 	struct TestApp
1607 	{
1608 		// First job in frame
1609 		// Do some stuff and spawn some other job
1610 		// 1 - Number of jobs of this kind in frame
1611 		void startFrame(ThreadData* threadData, JobData* startFrameJobData)
1612 		{
1613 			startFrameJobData.del = &continueFrameInOtherJobAAA;
1614 			startFrameJobData.name = "cont frmAAA";
1615 			thPool.addJobAsynchronous(startFrameJobData, thPool.threadsNum - 1); /// startFrame is the only job in thread pool no synchronization is required
1616 		}
1617 
1618 		void continueFrameInOtherJobAAA(ThreadData* threadData, JobData* startFrameJobData)
1619 		{
1620 
1621 			static struct JobGroupMemory
1622 			{
1623 				JobsGroup[6] groups;
1624 				JobData[1][6] groupsJobs;
1625 				TestApp* app;
1626 				JobData* startFrameJobData;
1627 
1628 				void spawnCont(JobsGroup* group)
1629 				{
1630 					// startFrameJobData.del = &continueFrameInOtherJob;
1631 					startFrameJobData.del = &app.finishFrame;
1632 					startFrameJobData.name = "cont frm";
1633 					group.thPool.addJobAsynchronous(startFrameJobData); /// startFrame is the only job in thread pool no synchronization is required
1634 
1635 				}
1636 			}
1637 
1638 			JobGroupMemory* memory = makeVar!JobGroupMemory();
1639 			memory.app = &this;
1640 			memory.startFrameJobData = startFrameJobData;
1641 
1642 			with (memory)
1643 			{
1644 				groups[0] = JobsGroup("dependant 0", groupsJobs[0]);
1645 				groups[1] = JobsGroup("dependant 1", groupsJobs[1]);
1646 				groups[2] = JobsGroup("dependant 2", groupsJobs[2]);
1647 				groups[3] = JobsGroup("dependant 3", groupsJobs[3]);
1648 				groups[4] = JobsGroup("dependant 4", groupsJobs[4]);
1649 				groups[5] = JobsGroup("dependant 5", groupsJobs[5]);
1650 				groups[5].onFinish = &spawnCont;
1651 
1652 				groups[2].dependantOn(&groups[0]);
1653 				groups[2].dependantOn(&groups[1]);
1654 
1655 				groups[3].dependantOn(&groups[0]);
1656 				groups[3].dependantOn(&groups[1]);
1657 				groups[3].dependantOn(&groups[2]);
1658 
1659 				groups[4].dependantOn(&groups[1]);
1660 				groups[4].dependantOn(&groups[3]);
1661 
1662 				groups[5].dependantOn(&groups[0]);
1663 				groups[5].dependantOn(&groups[1]);
1664 				groups[5].dependantOn(&groups[2]);
1665 				groups[5].dependantOn(&groups[3]);
1666 				groups[5].dependantOn(&groups[4]);
1667 
1668 				foreach (ref jobs; groupsJobs)
1669 					foreach (ref j; jobs)
1670 						j = JobData(&this.importantTaskSubTask, "n");
1671 
1672 				thPool.addGroupAsynchronous(&groups[0]);
1673 				thPool.addGroupAsynchronous(&groups[1]);
1674 			}
1675 		}
1676 
1677 		// Job for some big system
1678 		// Spawns some jobs and when they are done spawns finishFrame job
1679 		// 1 - Number of jobs of this kind in frame
1680 		void continueFrameInOtherJob(ThreadData* threadData, JobData* startFrameJobData)
1681 		{
1682 			static struct JobGroupMemory
1683 			{
1684 				JobsGroup group;
1685 				TestApp* app;
1686 				JobData* startFrameJobData;
1687 
1688 				void freeAndContinue(JobsGroup* group)
1689 				{
1690 					startFrameJobData.del = &app.finishFrame;
1691 					startFrameJobData.name = "finishFrame";
1692 					group.thPool.addJobAsynchronous(startFrameJobData, group.thPool.threadsNum - 1); /// startFrameJobData is continuation of 'startFrame data', all important jobs finished so it is the only job, no synchronization required. Always spawn on last thread
1693 					disposeVar!(JobGroupMemory)(&this);
1694 				}
1695 			}
1696 
1697 			JobGroupMemory* important = makeVar!JobGroupMemory();
1698 			important.app = &this;
1699 			important.startFrameJobData = startFrameJobData;
1700 
1701 			foreach (ref j; frameJobs)
1702 				j = JobData(&this.importantTask, "vip");
1703 
1704 			important.group = JobsGroup("a lot of jobs", frameJobs[]);
1705 			important.group.onFinish = &important.freeAndContinue;
1706 
1707 			thPool.addGroupAsynchronous(&important.group); // No Synchronization required continueFrameInOtherJob is the only job
1708 		}
1709 
1710 		// Some task which by itself does a lot of computation so it spawns few more jobs
1711 		// jobsNum - Number of jobs of this kind in frame
1712 		void importantTask(ThreadData* threadData, JobData* data)
1713 		{
1714 			// All tasks created here will, make 'importantTask' wait with finish untill this jobs will finish
1715 
1716 			/// Add 10 tasks in group
1717 			static struct JobGroupMemory
1718 			{
1719 				JobsGroup group;
1720 				JobData[128] jobs;
1721 
1722 				void freeMee(JobsGroup* group)
1723 				{
1724 					disposeVar!(JobGroupMemory)(&this);
1725 				}
1726 			}
1727 
1728 			JobGroupMemory* subGroup = makeVar!JobGroupMemory();
1729 
1730 			foreach (ref j; subGroup.jobs)
1731 				j = JobData(&this.importantTaskSubTask, "vip sub");
1732 
1733 			subGroup.group = JobsGroup("128 jobs", subGroup.jobs[]);
1734 			subGroup.group.onFinish = &subGroup.freeMee;
1735 			thPool.addGroup(&subGroup.group, data.group);
1736 
1737 			/// Add single tasks
1738 			data.del = &importantTaskSubTask;
1739 			data.name = "sub";
1740 			thPool.addJob(data);
1741 		}
1742 
1743 		// Job which simply does some work
1744 		// jobsNum * 128 - Number of jobs of this kind in frame
1745 		void importantTaskSubTask(ThreadData* threadData, JobData* data)
1746 		{
1747 		}
1748 
1749 		// Finish frame
1750 		// 1 - Number of jobs of this kind in frame
1751 		void finishFrame(ThreadData* threadData, JobData* startFrameJobData)
1752 		{
1753 			auto num = atomicOp!"+="(frameNum, 1);
1754 			if (num == 10)
1755 			{
1756 				thPool.releaseExternalThreads(); // After 10 frames exit application
1757 				return;
1758 			}
1759 			*startFrameJobData = JobData(&startFrame, "StartFrame"); //
1760 			thPool.addJobAsynchronous(startFrameJobData); // Start next frame, there should't be any other tasks execept of this one, so no synchronization is required
1761 
1762 		}
1763 
1764 		// Func to test if dynamic changing of threads number works
1765 		// void changeThreadsNum()
1766 		// {
1767 		// 	import std.random : uniform;
1768 
1769 		// 	bool change = uniform(0, 100) == 3;
1770 		// 	if (!change)
1771 		// 		return;
1772 
1773 		// 	int threadsNum = uniform(3, 5);
1774 		// 	thPool.setThreadsNum(threadsNum);
1775 
1776 		// }
1777 	}
1778 
1779 	void testThreadsNum(int threadsNum)
1780 	{
1781 		frameNum = 0;
1782 		thPool.jobsDoneCountReset();
1783 		thPool.setThreadsNum(threadsNum);
1784 
1785 		TestApp testApp = TestApp();
1786 		startFrameJobData = JobData(&testApp.startFrame, "StartFrame"); // Start first frame, will live as long as main thread won't exit from threadStartFunc()
1787 
1788 		ulong start = useconds();
1789 		thPool.addJobAsynchronous(&startFrameJobData); // Synchronization is made by groupEnd (last job in pool) which calls thPool.releaseExternalThreads();
1790 		mainThread.threadStartFunc();
1791 		ulong end = useconds();
1792 		printf("Threads Num: %2d. Jobs: %d. Time: %5.2f ms. jobs/ms: %5.2f\n", threadsNum, thPool.jobsDoneCount,
1793 				(end - start) / 1000.0f, thPool.jobsDoneCount / ((end - start) / 1000.0f));
1794 	}
1795 
1796 	while (1)
1797 	{
1798 		// foreach (i; 1 .. 32)
1799 		// 	testThreadsNum(i);
1800 
1801 		testThreadsNum(1);
1802 		testThreadsNum(4);
1803 		testThreadsNum(16);
1804 	}
1805 	thPool.flushAllLogs();
1806 	thPool.waitThreads();
1807 	thPool.unregistExternalThread(mainThread);
1808 }
1809 
1810 version (D_BetterC)
1811 {
1812 
1813 	extern (C) int main(int argc, char*[] argv) // for betterC
1814 	{
1815 		testThreadPool();
1816 		return 0;
1817 	}
1818 }
1819 else
1820 {
1821 	int main()
1822 	{
1823 		testThreadPool();
1824 		return 0;
1825 	}
1826 }//*/
1827 // Compile
1828 // -fsanitize=address
1829 // rdmd -g -of=thread_pool src/mmutils/thread_pool.d && ./thread_pool
1830 // ldmd2 -release -inline -checkaction=C -g -of=thread_pool src/mmutils/thread_pool.d && ./thread_pool
1831 // ldmd2 -checkaction=C -g -of=thread_pool src/mmutils/thread_pool.d && ./thread_pool