1 module mmutils.thread_pool; 2 3 import std.algorithm : map; 4 5 version = MM_NO_LOGS; // Disable log creation 6 //version = MM_USE_POSIX_THREADS; // Use posix threads insted of standard library, required for betterC 7 8 version (WebAssembly) 9 { 10 version = MM_NO_LOGS; 11 version = MM_USE_POSIX_THREADS; 12 extern(C) struct FILE 13 { 14 15 } 16 } 17 else 18 { 19 import core.stdc.stdio; 20 } 21 22 ////////////////////////////////////////////// 23 /////////////// BetterC Support ////////////// 24 ////////////////////////////////////////////// 25 26 version (D_BetterC) 27 { 28 version (Posix) version = MM_USE_POSIX_THREADS; 29 30 extern (C) void free(void*) @nogc nothrow @system; 31 extern (C) void* malloc(size_t size) @nogc nothrow @system; 32 extern (C) void* realloc(void*, size_t size) @nogc nothrow @system; 33 extern (C) void* memcpy(return void*, scope const void*, size_t size) @nogc nothrow @system; 34 35 //hacks for LDC 36 /*extern (C) __gshared int _d_eh_personality(int, int, size_t, void*, void*) 37 { 38 return 0; 39 } 40 41 extern (C) __gshared void _d_eh_resume_unwind(void*) 42 { 43 return; 44 } 45 46 extern (C) void* _d_allocmemory(size_t sz) 47 { 48 return malloc(sz); 49 }*/ 50 } 51 else 52 { 53 import core.stdc.stdlib; 54 import core.stdc.string; 55 } 56 57 ////////////////////////////////////////////// 58 /////////////// Atomics ////////////////////// 59 ////////////////////////////////////////////// 60 61 version (ECSEmscripten) 62 { 63 import std.traits; 64 65 enum MemoryOrder 66 { 67 acq, 68 acq_rel, 69 raw, 70 rel, 71 seq 72 } 73 74 extern (C) ubyte emscripten_atomic_cas_u8(void* addr, ubyte oldVal, ubyte newVal) @nogc nothrow pure; 75 extern (C) ushort emscripten_atomic_cas_u16(void* addr, ushort oldVal, ushort newVal) @nogc nothrow pure; 76 extern (C) uint emscripten_atomic_cas_u32(void* addr, uint oldVal, uint newVal) @nogc nothrow pure; 77 78 extern (C) ubyte emscripten_atomic_load_u8(const void* addr) @nogc nothrow pure; 79 extern (C) ushort emscripten_atomic_load_u16(const void* addr) @nogc nothrow pure; 80 extern (C) uint emscripten_atomic_load_u32(const void* addr) @nogc nothrow pure; 81 82 extern (C) ubyte emscripten_atomic_store_u8(void* addr, ubyte val) @nogc nothrow pure; 83 extern (C) ushort emscripten_atomic_store_u16(void* addr, ushort val) @nogc nothrow pure; 84 extern (C) uint emscripten_atomic_store_u32(void* addr, uint val) @nogc nothrow pure; 85 86 extern (C) ubyte emscripten_atomic_add_u8(void* addr, ubyte val) @nogc nothrow pure; 87 extern (C) ushort emscripten_atomic_add_u16(void* addr, ushort val) @nogc nothrow pure; 88 extern (C) uint emscripten_atomic_add_u32(void* addr, uint val) @nogc nothrow pure; 89 90 extern (C) ubyte emscripten_atomic_sub_u8(void* addr, ubyte val) @nogc nothrow pure; 91 extern (C) ushort emscripten_atomic_sub_u16(void* addr, ushort val) @nogc nothrow pure; 92 extern (C) uint emscripten_atomic_sub_u32(void* addr, uint val) @nogc nothrow pure; 93 94 public pure nothrow @nogc Unqual!T atomicOp(string op, T, V1)(ref shared T val, V1 mod) 95 { 96 static if (op == "+=") 97 { 98 static if (is(T == byte) || is(T == ubyte)) 99 return cast(Unqual!T)(emscripten_atomic_add_u8(cast(void*)&val, 100 cast(Unqual!T) mod) + 1); 101 else static if (is(T == short) || is(T == ushort)) 102 return cast(Unqual!T)(emscripten_atomic_add_u16(cast(void*)&val, 103 cast(Unqual!T) mod) + 1); 104 else static if (is(T == int) || is(T == uint)) 105 return cast(Unqual!T)(emscripten_atomic_add_u32(cast(void*)&val, 106 cast(Unqual!T) mod) + 1); 107 else 108 static assert(0); 109 } 110 else static if (op == "-=") 111 { 112 static if (is(T == byte) || is(T == ubyte)) 113 return cast(Unqual!T)(emscripten_atomic_sub_u8(cast(void*)&val, 114 cast(Unqual!T) mod) - 1); 115 else static if (is(T == short) || is(T == ushort)) 116 return cast(Unqual!T)(emscripten_atomic_sub_u16(cast(void*)&val, 117 cast(Unqual!T) mod) - 1); 118 else static if (is(T == int) || is(T == uint)) 119 return cast(Unqual!T)(emscripten_atomic_sub_u32(cast(void*)&val, 120 cast(Unqual!T) mod) - 1); 121 else 122 static assert(0); 123 } 124 } 125 126 public pure nothrow @nogc @trusted void atomicStore(MemoryOrder ms = MemoryOrder.seq, T, V)(ref T val, 127 V newval) 128 { 129 alias UT = Unqual!T; 130 static if (is(UT == bool) || is(UT == byte) || is(UT == ubyte)) 131 emscripten_atomic_store_u8(cast(void*)&val, cast(UT) newval); 132 else static if (is(UT == short) || is(UT == ushort)) 133 emscripten_atomic_store_u16(cast(void*)&val, cast(UT) newval); 134 else static if (is(UT == int) || is(UT == uint)) 135 emscripten_atomic_store_u32(cast(void*)&val, cast(UT) newval); 136 else 137 static assert(0); 138 } 139 140 public pure nothrow @nogc @trusted T atomicLoad(MemoryOrder ms = MemoryOrder.seq, T)( 141 ref const T val) 142 { 143 alias UT = Unqual!T; 144 static if (is(UT == bool)) 145 return emscripten_atomic_load_u8(cast(const void*)&val) != 0; 146 else static if (is(UT == byte) || is(UT == ubyte)) 147 return emscripten_atomic_load_u8(cast(const void*)&val); 148 else static if (is(UT == short) || is(UT == ushort)) 149 return emscripten_atomic_load_u16(cast(const void*)&val); 150 else static if (is(UT == int) || is(UT == uint)) 151 return emscripten_atomic_load_u32(cast(const void*)&val); 152 else 153 static assert(0); 154 } 155 156 public pure nothrow @nogc @trusted bool cas(MemoryOrder succ = MemoryOrder.seq, 157 MemoryOrder fail = MemoryOrder.seq, T, V1, V2)(T* here, V1 ifThis, V2 writeThis) 158 { 159 alias UT = Unqual!T; 160 static if (is(UT == bool)) 161 return emscripten_atomic_cas_u8(cast(void*) here, 162 cast(Unqual!T) ifThis, cast(Unqual!T) writeThis) == ifThis; 163 else static if (is(UT == byte) || is(UT == ubyte)) 164 return emscripten_atomic_cas_u8(cast(void*) here, 165 cast(Unqual!T) ifThis, cast(Unqual!T) writeThis) == ifThis; 166 else static if (is(UT == short) || is(UT == ushort)) 167 return emscripten_atomic_cas_u16(cast(void*) here, 168 cast(Unqual!T) ifThis, cast(Unqual!T) writeThis) == ifThis; 169 else static if (is(UT == int) || is(UT == uint)) 170 return emscripten_atomic_cas_u32(cast(void*) here, 171 cast(Unqual!T) ifThis, cast(Unqual!T) writeThis) == ifThis; 172 else 173 static assert(0); 174 } 175 } 176 else 177 { 178 public import core.atomic; 179 } 180 181 ////////////////////////////////////////////////// 182 //////////////////// Allocator /////////////////// 183 ////////////////////////////////////////////////// 184 T* makeVar(T)(T init) 185 { 186 T* el = cast(T*) malloc(T.sizeof); 187 memcpy(el, &init, T.sizeof); 188 return el; 189 } 190 191 T* makeVar(T)() 192 { 193 T init; 194 T* el = cast(T*) malloc(T.sizeof); 195 memcpy(el, &init, T.sizeof); 196 return el; 197 } 198 199 T[] makeVarArray(T)(int num, T init = T.init) 200 { 201 T* ptr = cast(T*) malloc(num * (T.sizeof + T.sizeof % T.alignof)); 202 T[] arr = ptr[0 .. num]; 203 foreach (ref el; arr) 204 { 205 memcpy(&el, &init, T.sizeof); 206 } 207 return arr; 208 } 209 210 void disposeVar(T)(T* var) 211 { 212 free(var); 213 } 214 215 void disposeArray(T)(T[] var) 216 { 217 free(var.ptr); 218 } 219 ////////////////////////////////////////////// 220 //////////////////// Timer /////////////////// 221 ////////////////////////////////////////////// 222 223 version (WebAssembly) 224 { 225 alias int time_t; 226 alias int clockid_t; 227 enum CLOCK_REALTIME = 0; 228 229 struct timespec 230 { 231 time_t tv_sec; 232 int tv_nsec; 233 } 234 235 extern(C) int clock_gettime(clockid_t, timespec*) @nogc nothrow @system; 236 237 extern(C) double emscripten_get_now() @nogc nothrow @system; 238 239 } 240 241 /// High precison timer 242 long useconds() 243 { 244 version (WebAssembly) 245 { 246 return cast(long)(emscripten_get_now() * 1000.0); 247 } 248 else version (Posix) 249 { 250 import core.sys.posix.sys.time : gettimeofday, timeval; 251 252 timeval t; 253 gettimeofday(&t, null); 254 return t.tv_sec * 1_000_000 + t.tv_usec; 255 } 256 else version (Windows) 257 { 258 //TODO: implement timer on windows 259 /*import core.sys.windows.windows : QueryPerformanceFrequency; 260 261 __gshared double mul = -1; 262 if (mul < 0) 263 { 264 long frequency; 265 int ok = QueryPerformanceFrequency(&frequency); 266 assert(ok); 267 mul = 1_000_000.0 / frequency; 268 } 269 long ticks; 270 int ok = QueryPerformanceCounter(&ticks); 271 assert(ok); 272 return cast(long)(ticks * mul);*/ 273 return 0; 274 } 275 else 276 { 277 static assert("OS not supported."); 278 } 279 } 280 281 ////////////////////////////////////////////// 282 //////////////////// Pause /////////////////// 283 ////////////////////////////////////////////// 284 285 void instructionPause() 286 { 287 version (X86_64) 288 { 289 version (LDC) 290 { 291 import ldc.gccbuiltins_x86 : __builtin_ia32_pause; 292 293 __builtin_ia32_pause(); 294 } 295 else version(GNU) 296 { 297 import gcc.builtins; 298 299 __builtin_ia32_pause(); 300 } 301 else version (DigitalMars) 302 { 303 asm 304 { 305 rep; 306 nop; 307 } 308 } 309 else 310 { 311 static assert(0); 312 } 313 } 314 else version (Android) 315 { 316 version(LDC) 317 { 318 import ldc.attributes; 319 @optStrategy("none") 320 static void nop() 321 { 322 int i; 323 i++; 324 } 325 nop(); 326 } 327 else static assert(0); 328 } 329 else version(WebAssembly) 330 { 331 version(LDC) 332 { 333 import ldc.attributes; 334 @optStrategy("none") 335 static void nop() 336 { 337 int i; 338 i++; 339 } 340 nop(); 341 } 342 else static assert(0); 343 } 344 else static assert(0); 345 } 346 347 ////////////////////////////////////////////// 348 ///////////// Semaphore + Thread ///////////// 349 ////////////////////////////////////////////// 350 351 version (MM_USE_POSIX_THREADS) 352 { 353 version (WebAssembly) 354 { 355 extern(C): 356 357 struct pthread_attr_t 358 { 359 union 360 { 361 int[10] __i; 362 uint[10] __s; 363 } 364 } 365 366 struct pthread_t 367 { 368 void* p; 369 uint x; 370 } 371 372 // pthread 373 int pthread_create(pthread_t*, in pthread_attr_t*, void* function(void*), void*); 374 int pthread_join(pthread_t, void**); 375 void pthread_exit(void *retval); 376 377 struct sem_t 378 { 379 shared int[4] __val; 380 } 381 int sem_init(sem_t*, int, uint); 382 int sem_wait(sem_t*); 383 int sem_trywait(sem_t*); 384 int sem_post(sem_t*); 385 int sem_destroy(sem_t*); 386 int sem_timedwait(sem_t* sem, const timespec* abstime); 387 } 388 else version (Posix) 389 { 390 import core.sys.posix.pthread; 391 import core.sys.posix.semaphore; 392 } 393 else version (Windows) 394 { 395 extern (C): 396 alias uint time_t; 397 struct pthread_attr_t 398 { 399 400 } 401 402 struct pthread_t 403 { 404 void* p; 405 uint x; 406 } 407 408 struct timespec 409 { 410 time_t tv_sec; 411 int tv_nsec; 412 } 413 414 // pthread 415 int pthread_create(pthread_t*, in pthread_attr_t*, void* function(void*), void*); 416 int pthread_join(pthread_t, void**); 417 void pthread_exit(void *retval); 418 419 // semaphore.h 420 alias sem_t = void*; 421 int sem_init(sem_t*, int, uint); 422 int sem_wait(sem_t*); 423 int sem_trywait(sem_t*); 424 int sem_post(sem_t*); 425 int sem_destroy(sem_t*); 426 int sem_timedwait(sem_t* sem, const timespec* abstime); 427 } 428 else 429 { 430 static assert(false); 431 } 432 433 struct Semaphore 434 { 435 sem_t mutex; 436 437 void initialize() 438 { 439 sem_init(&mutex, 0, 0); 440 } 441 442 void wait() 443 { 444 int ret = sem_wait(&mutex); 445 assert(ret == 0); 446 } 447 448 bool tryWait() 449 { 450 int ret = sem_trywait(&mutex); 451 return (ret == 0); 452 } 453 454 bool timedWait(int usecs) 455 { 456 timespec tv; 457 // if there is no such a function look at it: https://stackoverflow.com/questions/5404277/porting-clock-gettime-to-windows 458 clock_gettime(CLOCK_REALTIME, &tv); 459 tv.tv_sec += usecs / 1_000_000; 460 tv.tv_nsec += (usecs % 1_000_000) * 1_000; 461 462 int ret = sem_timedwait(&mutex, &tv); 463 return (ret == 0); 464 } 465 466 void post() 467 { 468 int ret = sem_post(&mutex); 469 assert(ret >= 0); 470 } 471 472 void destroy() 473 { 474 sem_destroy(&mutex); 475 } 476 } 477 478 private extern (C) void* threadRunFunc(void* threadVoid) 479 { 480 Thread* th = cast(Thread*) threadVoid; 481 482 th.threadStart(); 483 484 pthread_exit(null); 485 return null; 486 } 487 488 struct Thread 489 { 490 alias DG = void delegate(); 491 492 DG threadStart; 493 pthread_t handle; 494 495 void start(DG dg) 496 { 497 threadStart = dg; 498 int err = pthread_create(&handle, null, &threadRunFunc, cast(void*)&this); 499 if(err)handle = pthread_t(); 500 //assert(ok == 0); 501 } 502 503 void join() 504 { 505 pthread_join(handle, null); 506 handle = handle.init; 507 threadStart = null; 508 } 509 } 510 } 511 else version(D_BetterC) 512 { 513 version(Windows) 514 { 515 import core.stdc.stdint : uintptr_t; 516 import core.sys.windows.windows; 517 extern (Windows) alias btex_fptr = uint function(void*); 518 extern (C) uintptr_t _beginthreadex(void*, uint, btex_fptr, void*, uint, uint*) nothrow @nogc; 519 520 struct Semaphore 521 { 522 HANDLE handle; 523 524 void initialize() 525 { 526 handle = CreateSemaphoreA( null, 0, int.max, null ); 527 assert ( handle != handle.init ); 528 //throw new SyncError( "Unable to create semaphore" ); 529 } 530 531 void wait() 532 { 533 DWORD rc = WaitForSingleObject( handle, INFINITE ); 534 //int ret = sem_wait(&mutex); 535 assert(rc == WAIT_OBJECT_0); 536 } 537 538 bool tryWait() 539 { 540 switch ( WaitForSingleObject( handle, 0 ) ) 541 { 542 case WAIT_OBJECT_0: 543 return true; 544 case WAIT_TIMEOUT: 545 return false; 546 default: 547 assert(0);//throw new SyncError( "Unable to wait for semaphore" ); 548 } 549 } 550 551 bool timedWait(int usecs) 552 { 553 /*timespec tv; 554 // if there is no such a function look at it: https://stackoverflow.com/questions/5404277/porting-clock-gettime-to-windows 555 clock_gettime(CLOCK_REALTIME, &tv); 556 tv.tv_sec += usecs / 1_000_000; 557 tv.tv_nsec += (usecs % 1_000_000) * 1_000; 558 559 int ret = sem_timedwait(&mutex, &tv); 560 return (ret == 0);*/ 561 562 switch ( WaitForSingleObject( handle, cast(uint) usecs / 1000 ) ) 563 { 564 case WAIT_OBJECT_0: 565 return true; 566 case WAIT_TIMEOUT: 567 return false; 568 default: 569 assert(0, "Unable to wait for semaphore" ); 570 } 571 } 572 573 void post() 574 { 575 assert(ReleaseSemaphore( handle, 1, null )); 576 } 577 578 void destroy() 579 { 580 BOOL rc = CloseHandle( handle ); 581 assert( rc, "Unable to destroy semaphore" ); 582 } 583 } 584 585 private extern (Windows) uint threadRunFunc(void* threadVoid) 586 { 587 Thread* th = cast(Thread*) threadVoid; 588 589 th.threadStart(); 590 591 ExitThread(0); 592 return 0; 593 } 594 595 struct Thread 596 { 597 alias DG = void delegate(); 598 599 DG threadStart; 600 HANDLE handle; 601 602 void start(DG dg) 603 { 604 threadStart = dg; 605 handle = cast(HANDLE) _beginthreadex( null, 0, &threadRunFunc, cast(void*)&this, 0, null ); 606 } 607 608 void join() 609 { 610 if ( WaitForSingleObject( handle, INFINITE ) == WAIT_OBJECT_0 )assert(0); 611 CloseHandle( handle ); 612 613 handle = handle.init; 614 threadStart = null; 615 } 616 } 617 } 618 else 619 { 620 static assert(0, "Platform is unsupported in betterC mode!"); 621 } 622 } 623 else 624 { 625 import core.thread : D_Thread = Thread; 626 import core.sync.semaphore : D_Semaphore = Semaphore; 627 import core.time : dur; 628 import std.experimental.allocator; 629 import std.experimental.allocator.mallocator; 630 631 struct Semaphore 632 { 633 D_Semaphore sem; 634 635 void initialize() 636 { 637 sem = Mallocator.instance.make!D_Semaphore(); 638 } 639 640 void wait() 641 { 642 sem.wait(); 643 } 644 645 bool tryWait() 646 { 647 return sem.tryWait(); 648 } 649 650 bool timedWait(int usecs) 651 { 652 return sem.wait(dur!"usecs"(usecs)); 653 } 654 655 void post() 656 { 657 sem.notify(); 658 } 659 660 void destroy() 661 { 662 Mallocator.instance.dispose(sem); 663 } 664 } 665 666 struct Thread 667 { 668 alias DG = void delegate(); 669 670 DG threadStart; 671 D_Thread thread; 672 673 void start(DG dg) 674 { 675 thread = Mallocator.instance.make!D_Thread(dg); 676 thread.start(); 677 } 678 679 void join() 680 { 681 thread.join(); 682 } 683 } 684 } 685 686 ////////////////////////////////////////////// 687 ///////////////// ThreadPool ///////////////// 688 ////////////////////////////////////////////// 689 690 private enum gMaxThreadsNum = 64; 691 692 alias JobDelegate = void delegate(ThreadData*, JobData*); 693 694 // Structure to store job start and end time 695 struct JobLog 696 { 697 string name; /// Name of job 698 ulong time; /// Time started (us) 699 ulong duration; /// Took time (us) 700 } 701 702 /// First in first out queue with atomic lock 703 struct JobQueue 704 { 705 alias LockType = int; 706 align(64) shared LockType lock; /// Lock for accesing list of Jobs 707 align(64) JobData* first; /// Fist element in list of Jobs 708 709 /// Check if empty without locking, doesn't give guarantee that list is truly empty 710 bool emptyRaw() 711 { 712 bool isEmpty = first == null; 713 return isEmpty; 714 } 715 716 /// Check if empty 717 bool empty() 718 { 719 while (!cas(&lock, cast(LockType) false, cast(LockType) true)) 720 instructionPause(); 721 722 bool isEmpty = first == null; 723 atomicStore!(MemoryOrder.rel)(lock, cast(LockType) false); 724 return isEmpty; 725 } 726 727 /// Add job to queue 728 void add(JobData* t) 729 { 730 while (!cas(&lock, cast(LockType) false, cast(LockType) true)) 731 instructionPause(); 732 733 t.next = first; 734 first = t; 735 736 atomicStore!(MemoryOrder.rel)(lock, cast(LockType) false); 737 } 738 739 /// Add range of jobs to queue 740 void addRange(Range)(Range arr) 741 { 742 if (arr.length == 0) 743 return; 744 745 JobData* start = arr[0]; 746 JobData* last = start; 747 748 foreach (t; arr[1 .. $]) 749 { 750 last.next = t; 751 last = t; 752 } 753 754 while (!cas(&lock, cast(LockType) false, cast(LockType) true)) 755 instructionPause(); 756 last.next = first; 757 first = start; 758 atomicStore!(MemoryOrder.rel)(lock, cast(LockType) false); 759 } 760 761 /// Pop job from queue 762 JobData* pop() 763 { 764 while (!cas(&lock, cast(LockType) false, cast(LockType) true)) 765 instructionPause(); 766 767 if (first == null) 768 { 769 atomicStore!(MemoryOrder.rel)(lock, cast(LockType) false); 770 return null; 771 } 772 773 JobData* result = first; 774 first = first.next; 775 776 atomicStore!(MemoryOrder.rel)(lock, cast(LockType) false); 777 return result; 778 } 779 780 } 781 782 /// Structure containing job data 783 /// JobData memory is allocated by user 784 /// JobData lifetime is managed by user 785 /// JobData has to live as long as it's group or end of job execution 786 /// JobData fields can be changed in del delegate and job can be added to thread pool again, to continue execution (call same function again or another if del was changed) 787 struct JobData 788 { 789 JobDelegate del; /// Delegate to execute 790 string name; /// Name of job 791 private JobsGroup* group; /// Group to which this job belongs 792 private align(64) JobData* next; /// JobData makes a list of jobs to be done by thread 793 } 794 795 /// Structure responsible for thread in thread pool 796 /// Stores jobs to be executed by this thread (jobs can be stolen by another thread) 797 /// Stores cache for logs 798 struct ThreadData 799 { 800 public: 801 ThreadPool* threadPool; /// Pool this thread belongs to 802 int threadId; /// Thread id. Valid only for this thread pool 803 804 /// Function starting execution of thread main loop 805 /// External threads can call this function to start executing jobs 806 void threadStartFunc() 807 { 808 //end = false; 809 threadFunc(&this); 810 } 811 812 private: 813 JobQueue jobsQueue; /// Queue of jobs to be done, jobs can be stolen from another thread 814 JobQueue jobsExclusiveQueue; /// Queue of jobs to be done, jobs can't be stolen 815 align(64) Semaphore semaphore; /// Semaphore to wake/sleep this thread 816 align(64) Thread thread; /// Systemn thread handle 817 JobLog[] logs; /// Logs cache 818 int lastLogIndex = -1; /// Last created log index 819 int jobsDoneCount; 820 821 shared bool end; /// Check if thread has to exit. Thread will exit only if end is true and jobsToDo is empty 822 shared bool acceptJobs; /// Check if thread should accept new jobs, If false thread won't steal jobs from other threads and will sleep longer if queue js empty 823 bool externalThread; /// Thread not allocated by thread pool 824 825 } 826 827 /// Thread Pool 828 /// Manages bounch of threads to execute given jobs as quickly as possible 829 /// There are no priorities beetween jobs. Jobs added to queues in same order as they are in slices, but due to job stealing and uneven speed of execution beetween threads jobs execution order is unspecified. 830 /// Number of threads executing jobs can be dynamically changed in any time. Threads removed from execution will work until the end of the program but shouldn't accept new jobs. 831 struct ThreadPool 832 { 833 alias FlushLogsDelegaste = void delegate(ThreadData* threadData, JobLog[] logs); /// Type of delegate to flush logs 834 FlushLogsDelegaste onFlushLogs; /// User custom delegate to flush logs, if overriden defaultFlushLogs will be used. Can be sset after initialize() call 835 int logsCacheNum; /// Number of log cache entries. Should be set before setThreadsNum is called 836 int tryWaitCount = 2000; ///Number of times which tryWait are called before timedWait call. Higher value sets better response but takes CPU time even if there are no jobs. 837 private: 838 ThreadData*[gMaxThreadsNum] threadsData; /// Data for threads 839 align(64) shared int threadsNum; /// Number of threads currentlu accepting jobs 840 align(64) shared bool threadsDataLock; /// Any modification of threadsData array (change in size or pointer modification) had to be locked 841 align(64) shared int threadSelector; /// Index of thread to which add next job 842 FILE* logFile; /// File handle for defaultFlushLogs log file 843 JobData[4] resumeJobs; /// Dummu jobs to resume some thread 844 845 public: 846 847 static int getCPUCoresCount() 848 { 849 version(Windows) 850 { 851 import core.sys.windows.winbase : SYSTEM_INFO, GetSystemInfo; 852 SYSTEM_INFO sysinfo; 853 GetSystemInfo(&sysinfo); 854 return sysinfo.dwNumberOfProcessors; 855 } 856 else version (linux) 857 { 858 version(D_BetterC) 859 { 860 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 861 return cast(int)sysconf(_SC_NPROCESSORS_ONLN); 862 } 863 else 864 { 865 import core.sys.linux.sched : CPU_COUNT, cpu_set_t, sched_getaffinity; 866 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 867 868 cpu_set_t set = void; 869 if (sched_getaffinity(0, cpu_set_t.sizeof, &set) == 0) 870 { 871 int count = CPU_COUNT(&set); 872 if (count > 0) 873 return cast(uint) count; 874 } 875 return cast(int)sysconf(_SC_NPROCESSORS_ONLN); 876 } 877 } 878 else version(Posix) 879 { 880 import core.sys.posix.unistd; 881 return cast(int)sysconf(_SC_NPROCESSORS_ONLN); 882 } 883 else return -1; 884 } 885 886 int jobsDoneCount() 887 { 888 int sum; 889 foreach (i, ref ThreadData* th; threadsData) 890 { 891 if (th is null) 892 continue; 893 894 sum += th.jobsDoneCount; 895 } 896 return sum; 897 } 898 899 void jobsDoneCountReset() 900 { 901 foreach (i, ref ThreadData* th; threadsData) 902 { 903 if (th is null) 904 continue; 905 th.jobsDoneCount = 0; 906 } 907 } 908 /// Initialize thread pool 909 void initialize() 910 { 911 912 foreach (ref JobData j; resumeJobs) 913 j = JobData(&dummyJob, "Dummy-Resume"); 914 915 version (MM_NO_LOGS) 916 { 917 logsCacheNum = 0; 918 } 919 else 920 { 921 onFlushLogs = &defaultFlushLogs; 922 logsCacheNum = 1024; 923 924 logFile = fopen("trace.json", "w"); 925 fprintf(logFile, "["); 926 fclose(logFile); 927 logFile = fopen("trace.json", "a"); 928 assert(logFile !is null); 929 } 930 } 931 932 /// Clean ups ThreadPool 933 ~this() 934 { 935 version (MM_NO_LOGS) 936 { 937 938 } 939 else if (logFile) 940 { 941 fclose(logFile); 942 logFile = null; 943 } 944 945 946 } 947 948 /// Registers external thread to thread pool array. There will be allocated data for this thread and it will have specified id 949 /// External threads are not joined at the end of thread pool execution 950 /// Returns ThreadData corresponding to external thread. To acually start executing, external thread had to call threadStartFunc() from returned variable 951 ThreadData* registerExternalThread() 952 { 953 lockThreadsData(); 954 //scope (exit) 955 956 957 ThreadData* threadData = makeThreadData(); 958 threadData.threadPool = &this; 959 threadData.semaphore.initialize(); 960 threadData.externalThread = true; 961 atomicStore(threadData.acceptJobs, true); 962 //threadData.acceptJobs = true; 963 964 int threadNum = atomicOp!"+="(threadsNum, 1) - 1; 965 966 threadData.threadId = threadNum; 967 968 threadsData[threadNum] = threadData; 969 970 unlockThreadsData(); 971 972 return threadData; 973 } 974 975 /// Unregisters external thread. Can be called only when external thread have left the thread pool 976 void unregistExternalThread(ThreadData* threadData) 977 { 978 lockThreadsData(); 979 //scope (exit) 980 // unlockThreadsData(); 981 982 disposeThreadData(threadData); 983 unlockThreadsData(); 984 } 985 986 /// Allows external threads to return from threadStartFunc 987 void releaseExternalThreads() 988 { 989 lockThreadsData(); 990 //scope (exit) 991 // unlockThreadsData(); 992 993 // Release external threads (including main thread) 994 foreach (i, ref ThreadData* th; threadsData) 995 { 996 if (th is null) 997 continue; 998 if (!th.externalThread) 999 continue; 1000 1001 auto rng = resumeJobs[].map!((ref a) => &a); 1002 addJobsRange(rng, cast(int) i); 1003 atomicStore(th.end, true); 1004 } 1005 unlockThreadsData(); 1006 } 1007 1008 /// Waits for all threads to finish and joins them (excluding external threads) 1009 void waitThreads() 1010 { 1011 lockThreadsData(); 1012 //scope (exit) 1013 // unlockThreadsData(); 1014 foreach (i, ref ThreadData* th; threadsData) 1015 { 1016 if (th is null) 1017 continue; 1018 1019 atomicStore(th.acceptJobs, false); 1020 atomicStore(th.end, true); 1021 } 1022 foreach (i, ref ThreadData* th; threadsData) 1023 { 1024 if (th is null || th.externalThread) 1025 continue; 1026 1027 th.thread.join(); 1028 disposeThreadData(th); 1029 } 1030 unlockThreadsData(); 1031 } 1032 1033 /// Sets number of threads to accept new jobs 1034 /// If there were never so much threads created, they will be created 1035 /// If number of threads set is smaller than there was threads before, they are not joined but they stop getting new jobs, they stop stealing jobs and they sleep longer 1036 /// Locking operation 1037 void setThreadsNum(int num) 1038 { 1039 assert(num <= gMaxThreadsNum); 1040 assert(num > 0); 1041 1042 lockThreadsData(); 1043 //scope (exit) 1044 // unlockThreadsData(); 1045 1046 foreach (i, ref ThreadData* th; threadsData) 1047 { 1048 if (th) 1049 { 1050 // Exists but has to be disabled 1051 atomicStore(th.acceptJobs, i < num); 1052 continue; 1053 } 1054 else if (i >= num) 1055 { 1056 // Doesn't exist and is not required 1057 continue; 1058 } 1059 // Doesn't exist and is required 1060 th = makeThreadData(); 1061 th.threadPool = &this; 1062 th.threadId = cast(int) i; 1063 atomicStore(th.acceptJobs, true); 1064 //th.acceptJobs = true; 1065 th.semaphore.initialize(); 1066 1067 th.thread.start(&th.threadStartFunc); 1068 } 1069 1070 atomicStore(threadsNum, num); 1071 unlockThreadsData(); 1072 } 1073 1074 /// Adds job to be executed by thread pool, such a job won't be synchronized with any group or job 1075 /// If threadNum is different than -1 only thread with threadNum will be able to execute given job 1076 /// It is advised to use synchronized group of jobs 1077 void addJobAsynchronous(JobData* data, int threadNum = -1) 1078 { 1079 if (threadNum == -1) 1080 { 1081 ThreadData* threadData = getThreadDataToAddJobTo(); 1082 threadData.jobsQueue.add(data); 1083 threadData.semaphore.post(); 1084 return; 1085 } 1086 ThreadData* threadData = threadsData[threadNum]; 1087 assert(threadData !is null); 1088 threadData.jobsExclusiveQueue.add(data); 1089 threadData.semaphore.post(); 1090 } 1091 1092 /// Adds job to be executed by thread pool, group specified in group data won't be finished until this job ends 1093 /// If threadNum is different than -1 only thread with threadNum will be able to execute given job 1094 void addJob(JobData* data, int threadNum = -1) 1095 { 1096 assert(data.group); 1097 atomicOp!"+="(data.group.jobsToBeDoneCount, 1); 1098 addJobAsynchronous(data, threadNum); 1099 } 1100 1101 /// Adds multiple jobs at once 1102 /// Range has to return JobData* 1103 /// Range has to have length property 1104 /// Range is used so there is no need to allocate JobData*[] 1105 /// All jobs has to belong to one group 1106 /// If threadNum is different than -1 only thread with threadNum will be able to execute given jobs 1107 void addJobsRange(Range)(Range rng, int threadNum = -1) 1108 { 1109 if (threadNum != -1) 1110 { 1111 ThreadData* threadData = threadsData[threadNum]; 1112 assert(threadData !is null); 1113 threadData.jobsExclusiveQueue.addRange(rng); 1114 foreach (sInc; 0 .. rng.length) 1115 threadData.semaphore.post(); 1116 1117 return; 1118 } 1119 1120 if (rng.length == 0) 1121 { 1122 return; 1123 } 1124 1125 foreach (JobData* threadData; rng) 1126 { 1127 assert(rng[0].group == threadData.group); 1128 } 1129 1130 atomicOp!"+="(rng[0].group.jobsToBeDoneCount, cast(int) rng.length); 1131 int threadsNumLocal = atomicLoad(threadsNum); 1132 int part = cast(int) rng.length / threadsNumLocal; 1133 if (part > 0) 1134 { 1135 foreach (i, ThreadData* threadData; threadsData[0 .. threadsNumLocal]) 1136 { 1137 auto slice = rng[i * part .. (i + 1) * part]; 1138 threadData.jobsQueue.addRange(slice); 1139 1140 foreach (sInc; 0 .. part) 1141 threadData.semaphore.post(); 1142 1143 } 1144 rng = rng[part * threadsNumLocal .. $]; 1145 } 1146 foreach (i, ThreadData* threadData; threadsData[0 .. rng.length]) 1147 { 1148 threadData.jobsQueue.add(rng[i]); 1149 threadData.semaphore.post(); 1150 } 1151 1152 } 1153 1154 /// Adds group of jobs to threadPool, group won't be synchronized 1155 void addGroupAsynchronous(JobsGroup* group) 1156 { 1157 group.thPool = &this; 1158 1159 if (group.jobs.length == 0) 1160 { 1161 // Immediately call group end 1162 group.onGroupFinish(); 1163 return; 1164 } 1165 group.setUpJobs(); 1166 auto rng = group.jobs[].map!((ref a) => &a); 1167 addJobsRange(rng, group.executeOnThreadNum); 1168 } 1169 1170 /// Adds group of jobs to threadPool 1171 /// Spwaning group will finish after this group have finished 1172 void addGroup(JobsGroup* group, JobsGroup* spawnedByGroup) 1173 { 1174 assert(spawnedByGroup); 1175 group.spawnedByGroup = spawnedByGroup; 1176 atomicOp!"+="(spawnedByGroup.jobsToBeDoneCount, 1); // Increase by one, so 'spawning group' will wait for 'newly added group' to finish 1177 addGroupAsynchronous(group); // Synchronized by jobsToBeDoneCount atomic variable 1178 } 1179 1180 /// Explicitly calls onFlushLogs on all threads 1181 void flushAllLogs() 1182 { 1183 lockThreadsData(); 1184 //scope (exit) 1185 // unlockThreadsData(); 1186 foreach (thNum; 0 .. atomicLoad(threadsNum)) 1187 { 1188 ThreadData* th = threadsData[thNum]; 1189 onThreadFlushLogs(th); 1190 } 1191 1192 foreach (i, ref ThreadData* th; threadsData) 1193 { 1194 if (th is null) 1195 continue; 1196 1197 onThreadFlushLogs(th); 1198 } 1199 unlockThreadsData(); 1200 } 1201 1202 /// Default implementation of flushing logs 1203 /// Saves logs to trace.json file in format acceptable by Google Chrome tracking tool chrome://tracing/ 1204 /// Logs can be watched even if apllication crashed, but might require removing last log entry from trace.json 1205 void defaultFlushLogs(ThreadData* threadData, JobLog[] logs) 1206 { 1207 version (MM_NO_LOGS) 1208 { 1209 } 1210 else 1211 { 1212 // (log rows num) * (static json length * time length * duration length) 1213 long start = useconds(); 1214 size_t size = (logs.length + 1) * (128 + 20 + 20); 1215 size_t used = 0; 1216 1217 foreach (ref log; logs) 1218 { 1219 size += log.name.length + 1; // size of name 1220 } 1221 1222 char* buffer = cast(char*) malloc(size); 1223 1224 foreach (ref log; logs) 1225 { 1226 char[100] name_buffer; 1227 name_buffer[0 .. log.name.length] = log.name; 1228 name_buffer[log.name.length] = 0; 1229 size_t charWritten = snprintf(buffer + used, size - used, 1230 `{"name":"%s", "pid":1, "tid":%lld, "ph":"X", "ts":%lld, "dur":%lld }, %s`, 1231 name_buffer.ptr, threadData.threadId + 1, log.time, log.duration, "\n".ptr); 1232 used += charWritten; 1233 } 1234 1235 long end = useconds(); 1236 size_t charWritten = snprintf(buffer + used, size - used, 1237 `{"name":"logFlush", "pid":1, "tid":%lld, "ph":"X", "ts":%lld, "dur":%lld }, %s`, 1238 threadData.threadId + 1, start, end - start, "\n".ptr); 1239 used += charWritten; 1240 fwrite(buffer, 1, used, logFile); 1241 } 1242 } 1243 1244 private: 1245 /// Atomic lock 1246 void lockThreadsData() 1247 { 1248 // Only one thread at a time can change threads number in a threadpool 1249 while (!cas(&threadsDataLock, false, true)) 1250 { 1251 } 1252 } 1253 1254 /// Atomic unlock 1255 void unlockThreadsData() 1256 { 1257 atomicStore(threadsDataLock, false); 1258 } 1259 1260 /// Allocate ThreadData 1261 ThreadData* makeThreadData() 1262 { 1263 ThreadData* threadData = makeVar!ThreadData(); 1264 threadData.logs = makeVarArray!(JobLog)(logsCacheNum); 1265 return threadData; 1266 } 1267 1268 /// Dispose ThreadData 1269 void disposeThreadData(ThreadData* threadData) 1270 { 1271 disposeArray(threadData.logs); 1272 return disposeVar(threadData); 1273 } 1274 1275 /// Get thread most suiting to add job to 1276 ThreadData* getThreadDataToAddJobTo() 1277 { 1278 int threadNum = atomicOp!"+="(threadSelector, 1); 1279 1280 foreach (i; 0 .. 1_000) 1281 { 1282 if (threadNum >= atomicLoad(threadsNum)) 1283 { 1284 threadNum = 0; 1285 atomicStore(threadSelector, 0); 1286 } 1287 ThreadData* threadData = threadsData[threadNum]; 1288 if (threadData != null) 1289 { 1290 return threadData; 1291 } 1292 threadNum++; 1293 } 1294 assert(0); 1295 } 1296 1297 /// Create log on start of job 1298 void onStartJob(JobData* data, ThreadData* threadData) 1299 { 1300 1301 threadData.jobsDoneCount++; 1302 version (MM_NO_LOGS) 1303 { 1304 } 1305 else 1306 { 1307 if (cast(int) threadData.logs.length <= 0) 1308 { 1309 return; 1310 } 1311 if (threadData.lastLogIndex >= cast(int) threadData.logs.length - 1) 1312 { 1313 onThreadFlushLogs(threadData); 1314 } 1315 1316 threadData.lastLogIndex++; 1317 1318 JobLog log; 1319 log.name = data.name; 1320 log.time = useconds(); 1321 threadData.logs[threadData.lastLogIndex] = log; 1322 } 1323 } 1324 1325 /// Set log finish time on end of job 1326 void onEndJob(JobData* data, ThreadData* threadData) 1327 { 1328 version (MM_NO_LOGS) 1329 { 1330 } 1331 else 1332 { 1333 if (cast(int) threadData.logs.length <= 0) 1334 { 1335 return; 1336 } 1337 assert(threadData.lastLogIndex < threadData.logs.length); 1338 JobLog* log = &threadData.logs[threadData.lastLogIndex]; 1339 log.duration = useconds() - log.time; 1340 } 1341 } 1342 1343 /// Flush logs 1344 void onThreadFlushLogs(ThreadData* threadData) 1345 { 1346 /*scope (exit) 1347 { 1348 threadData.lastLogIndex = -1; 1349 }*/ 1350 1351 assert(threadData); 1352 1353 if (threadData.lastLogIndex < 0 || onFlushLogs is null) 1354 { 1355 return; 1356 } 1357 1358 onFlushLogs(threadData, threadData.logs[0 .. threadData.lastLogIndex + 1]); 1359 1360 threadData.lastLogIndex = -1; 1361 } 1362 1363 /// Does nothing 1364 void dummyJob(ThreadData* threadData, JobData* data) 1365 { 1366 1367 } 1368 1369 /// Steal job from another thread 1370 JobData* stealJob(int threadNum) 1371 { 1372 foreach (thSteal; 0 .. atomicLoad(threadsNum)) 1373 { 1374 if (thSteal == threadNum) 1375 continue; // Do not steal from requesting thread 1376 1377 ThreadData* threadData = threadsData[thSteal]; 1378 1379 if (threadData is null || !threadData.semaphore.tryWait()) 1380 continue; 1381 1382 JobData* data = threadData.jobsQueue.pop(); 1383 1384 if (data is null) 1385 threadData.semaphore.post(); 1386 1387 return data; 1388 } 1389 return null; 1390 } 1391 } 1392 1393 /// Adding groups of jobs is faster and groups can have dependencies between each other 1394 struct JobsGroup 1395 { 1396 public: 1397 string name; /// Name of group 1398 JobData[] jobs; /// Jobs to be executed by this group, jobs have to live as long as group lives 1399 void delegate(JobsGroup* group) onFinish; // Delegate called when group will finish, can be used to free memory 1400 ThreadPool* thPool; /// Thread pool of this group 1401 int executeOnThreadNum = -1; /// Thread num to execute jobs on 1402 1403 this(string name, JobData[] jobs = [], int executeOnThreadNum = -1) 1404 { 1405 this.name = name; 1406 this.jobs = jobs; 1407 this.executeOnThreadNum = executeOnThreadNum; 1408 //jobsToBeDoneCount = 0; 1409 //dependenciesWaitCount = 0; 1410 atomicStore(jobsToBeDoneCount,0); 1411 atomicStore(dependenciesWaitCount,0); 1412 } 1413 1414 ~this() nothrow 1415 { 1416 free(children.ptr); 1417 children = null; 1418 } 1419 1420 /// Make this group dependant from another group 1421 /// Dependant group won't start untill its dependencies will be fulfilled 1422 void dependantOn(JobsGroup* parent) 1423 { 1424 size_t newLen = parent.children.length + 1; 1425 JobsGroup** ptr = cast(JobsGroup**) realloc(parent.children.ptr, 1426 newLen * (JobsGroup*).sizeof); 1427 parent.children = ptr[0 .. newLen]; 1428 parent.children[$ - 1] = &this; 1429 // parent.children ~= &this; 1430 atomicOp!"+="(dependenciesWaitCount, 1); 1431 } 1432 1433 /// Returns number of dependencies this group is waiting for 1434 int getDependenciesWaitCount() 1435 { 1436 return atomicLoad(dependenciesWaitCount); 1437 } 1438 1439 private: 1440 JobsGroup* spawnedByGroup; /// Group which spawned this group, if present spwaning group is waiting for this group to finish 1441 JobsGroup*[] children; /// Groups depending on this group 1442 align(64) shared int dependenciesWaitCount; /// Count of dependencies this group waits for 1443 align(64) shared int jobsToBeDoneCount; /// Number of this group jobs still executing 1444 1445 /// Checks if depending groups or spawning group have to be started 1446 /// Executes user onFinish function 1447 void onGroupFinish() 1448 { 1449 1450 decrementChildrendependencies(); 1451 if (spawnedByGroup) 1452 { 1453 auto num = atomicOp!"-="(spawnedByGroup.jobsToBeDoneCount, 1); 1454 assert(num >= 0); 1455 if (num == 0) 1456 { 1457 spawnedByGroup.onGroupFinish(); 1458 } 1459 } 1460 if (onFinish) 1461 onFinish(&this); 1462 } 1463 1464 /// Check if decrement dependencies counter and start them if theirs dependencies are fulfilled 1465 void decrementChildrendependencies() 1466 { 1467 foreach (JobsGroup* group; children) 1468 { 1469 auto num = atomicOp!"-="(group.dependenciesWaitCount, 1); 1470 assert(num >= 0); 1471 if (num == 0) 1472 { 1473 thPool.addGroupAsynchronous(group); // All dependencies of this group are fulfilled, so is already synchronized 1474 } 1475 } 1476 } 1477 /// Prepare jobs data for adding to thread pool 1478 void setUpJobs() 1479 { 1480 foreach (i; 0 .. jobs.length) 1481 { 1482 jobs[i].group = &this; 1483 } 1484 } 1485 1486 } 1487 1488 /// Main function executed by thread present in thread pool 1489 /// Executes functions from its own queue 1490 /// If there are no jobs in its queue, steals from another thread 1491 /// If there is nothing to steal, sleeps on its semaphore for a while (stage when cpu is not used) 1492 /// Sleep time is longer for jobs not accepting jobs, they don't exit because it is hard to guarantee that nobody is adding to them some job (thread might exit but job will be added anyway and application will malfunctio 1493 /// Thread end only when it's queue is empty. Jobs shouldn't be added to queue after ThreadPool.waitThreads() call 1494 private void threadFunc(ThreadData* threadData) 1495 { 1496 ThreadPool* threadPool = threadData.threadPool; 1497 int threadNum = threadData.threadId; 1498 1499 while (!atomicLoad!(MemoryOrder.raw)(threadData.end) 1500 || !threadData.jobsQueue.empty() || !threadData.jobsExclusiveQueue.empty()) 1501 { 1502 JobData* data; 1503 if (threadData.semaphore.tryWait()) 1504 { 1505 if (!threadData.jobsExclusiveQueue.emptyRaw()) 1506 data = threadData.jobsExclusiveQueue.pop(); 1507 1508 if (data is null) 1509 data = threadData.jobsQueue.pop(); 1510 1511 if (data is null) 1512 threadData.semaphore.post(); 1513 1514 assert(data !is null); 1515 } 1516 else 1517 { 1518 bool acceptJobs = atomicLoad!(MemoryOrder.raw)(threadData.acceptJobs); 1519 if (acceptJobs) 1520 { 1521 data = threadPool.stealJob(threadNum); 1522 } 1523 1524 if (data is null) 1525 { 1526 // Thread does not have own job and can not steal it, so wait for a job 1527 int tryWait = 0; 1528 //bool ok = threadData.semaphore.timedWait(1_000 + !acceptJobs * 10_000); 1529 bool ok = true; 1530 while(!threadData.semaphore.tryWait()) 1531 { 1532 tryWait++; 1533 if(tryWait>threadPool.tryWaitCount) 1534 { 1535 ok = false; 1536 break; 1537 } 1538 static foreach(i;0..10)instructionPause(); 1539 } 1540 if(!ok)ok = threadData.semaphore.timedWait(1_000 + !acceptJobs * 10_000); 1541 1542 if (ok) 1543 { 1544 1545 if (!threadData.jobsExclusiveQueue.emptyRaw()) 1546 data = threadData.jobsExclusiveQueue.pop(); 1547 1548 if (data is null) 1549 data = threadData.jobsQueue.pop(); 1550 1551 if (data is null) 1552 threadData.semaphore.post(); 1553 } 1554 } 1555 } 1556 1557 // Nothing to do 1558 if (data is null) 1559 { 1560 continue; 1561 } 1562 1563 // Do the job 1564 threadPool.onStartJob(data, threadData); 1565 data.del(threadData, data); 1566 threadPool.onEndJob(data, threadData); 1567 if (data.group) 1568 { 1569 auto num = atomicOp!"-="(data.group.jobsToBeDoneCount, 1); 1570 if (num == 0) 1571 { 1572 data.group.onGroupFinish(); 1573 } 1574 } 1575 1576 } 1577 //threadData.end = false; 1578 atomicStore(threadData.end, false); 1579 assert(threadData.jobsQueue.empty()); 1580 } 1581 1582 ////////////////////////////////////////////// 1583 //////////////////// Test //////////////////// 1584 ////////////////////////////////////////////// 1585 /* 1586 void testThreadPool() 1587 { 1588 enum jobsNum = 1024 * 4; 1589 1590 ThreadPool thPool; 1591 thPool.initialize(); 1592 1593 ThreadData* mainThread = thPool.registerExternalThread(); // Register main thread as thread 0 1594 JobData startFrameJobData; // Variable to store job starting the TestApp 1595 1596 JobData[jobsNum] frameJobs; // Array to store jobs created in TestApp.continueFrameInOtherJob 1597 shared int frameNum; // Simulate game loop, run jobs for few frames and exit 1598 1599 // App starts as one job &startFrame 1600 // Then using own JobData memory spawns 'continueFrameInOtherJob' job 1601 // 'continueFrameInOtherJob' job fills frameJobs array with &importantTask jobs. Group created to run this jobs is freed using JobsGroup.onFinish delegate 1602 // 'importantTask' allocate new jobs (&importantTaskSubTask) and group, they all deallocated using JobsGroup.onFinish delegate 1603 // 'continueFrameInOtherJob' waits for all 'importantTask'. All 'importantTask' wait for all 'importantTaskSubTask'. 1604 // So after all 'importantTaskSubTask' and 'importantTask' are done 'continueFrameInOtherJob' ends and spawns &finishFrame 1605 // 'finishFrame' spawn new frame or exits application 1606 struct TestApp 1607 { 1608 // First job in frame 1609 // Do some stuff and spawn some other job 1610 // 1 - Number of jobs of this kind in frame 1611 void startFrame(ThreadData* threadData, JobData* startFrameJobData) 1612 { 1613 startFrameJobData.del = &continueFrameInOtherJobAAA; 1614 startFrameJobData.name = "cont frmAAA"; 1615 thPool.addJobAsynchronous(startFrameJobData, thPool.threadsNum - 1); /// startFrame is the only job in thread pool no synchronization is required 1616 } 1617 1618 void continueFrameInOtherJobAAA(ThreadData* threadData, JobData* startFrameJobData) 1619 { 1620 1621 static struct JobGroupMemory 1622 { 1623 JobsGroup[6] groups; 1624 JobData[1][6] groupsJobs; 1625 TestApp* app; 1626 JobData* startFrameJobData; 1627 1628 void spawnCont(JobsGroup* group) 1629 { 1630 // startFrameJobData.del = &continueFrameInOtherJob; 1631 startFrameJobData.del = &app.finishFrame; 1632 startFrameJobData.name = "cont frm"; 1633 group.thPool.addJobAsynchronous(startFrameJobData); /// startFrame is the only job in thread pool no synchronization is required 1634 1635 } 1636 } 1637 1638 JobGroupMemory* memory = makeVar!JobGroupMemory(); 1639 memory.app = &this; 1640 memory.startFrameJobData = startFrameJobData; 1641 1642 with (memory) 1643 { 1644 groups[0] = JobsGroup("dependant 0", groupsJobs[0]); 1645 groups[1] = JobsGroup("dependant 1", groupsJobs[1]); 1646 groups[2] = JobsGroup("dependant 2", groupsJobs[2]); 1647 groups[3] = JobsGroup("dependant 3", groupsJobs[3]); 1648 groups[4] = JobsGroup("dependant 4", groupsJobs[4]); 1649 groups[5] = JobsGroup("dependant 5", groupsJobs[5]); 1650 groups[5].onFinish = &spawnCont; 1651 1652 groups[2].dependantOn(&groups[0]); 1653 groups[2].dependantOn(&groups[1]); 1654 1655 groups[3].dependantOn(&groups[0]); 1656 groups[3].dependantOn(&groups[1]); 1657 groups[3].dependantOn(&groups[2]); 1658 1659 groups[4].dependantOn(&groups[1]); 1660 groups[4].dependantOn(&groups[3]); 1661 1662 groups[5].dependantOn(&groups[0]); 1663 groups[5].dependantOn(&groups[1]); 1664 groups[5].dependantOn(&groups[2]); 1665 groups[5].dependantOn(&groups[3]); 1666 groups[5].dependantOn(&groups[4]); 1667 1668 foreach (ref jobs; groupsJobs) 1669 foreach (ref j; jobs) 1670 j = JobData(&this.importantTaskSubTask, "n"); 1671 1672 thPool.addGroupAsynchronous(&groups[0]); 1673 thPool.addGroupAsynchronous(&groups[1]); 1674 } 1675 } 1676 1677 // Job for some big system 1678 // Spawns some jobs and when they are done spawns finishFrame job 1679 // 1 - Number of jobs of this kind in frame 1680 void continueFrameInOtherJob(ThreadData* threadData, JobData* startFrameJobData) 1681 { 1682 static struct JobGroupMemory 1683 { 1684 JobsGroup group; 1685 TestApp* app; 1686 JobData* startFrameJobData; 1687 1688 void freeAndContinue(JobsGroup* group) 1689 { 1690 startFrameJobData.del = &app.finishFrame; 1691 startFrameJobData.name = "finishFrame"; 1692 group.thPool.addJobAsynchronous(startFrameJobData, group.thPool.threadsNum - 1); /// startFrameJobData is continuation of 'startFrame data', all important jobs finished so it is the only job, no synchronization required. Always spawn on last thread 1693 disposeVar!(JobGroupMemory)(&this); 1694 } 1695 } 1696 1697 JobGroupMemory* important = makeVar!JobGroupMemory(); 1698 important.app = &this; 1699 important.startFrameJobData = startFrameJobData; 1700 1701 foreach (ref j; frameJobs) 1702 j = JobData(&this.importantTask, "vip"); 1703 1704 important.group = JobsGroup("a lot of jobs", frameJobs[]); 1705 important.group.onFinish = &important.freeAndContinue; 1706 1707 thPool.addGroupAsynchronous(&important.group); // No Synchronization required continueFrameInOtherJob is the only job 1708 } 1709 1710 // Some task which by itself does a lot of computation so it spawns few more jobs 1711 // jobsNum - Number of jobs of this kind in frame 1712 void importantTask(ThreadData* threadData, JobData* data) 1713 { 1714 // All tasks created here will, make 'importantTask' wait with finish untill this jobs will finish 1715 1716 /// Add 10 tasks in group 1717 static struct JobGroupMemory 1718 { 1719 JobsGroup group; 1720 JobData[128] jobs; 1721 1722 void freeMee(JobsGroup* group) 1723 { 1724 disposeVar!(JobGroupMemory)(&this); 1725 } 1726 } 1727 1728 JobGroupMemory* subGroup = makeVar!JobGroupMemory(); 1729 1730 foreach (ref j; subGroup.jobs) 1731 j = JobData(&this.importantTaskSubTask, "vip sub"); 1732 1733 subGroup.group = JobsGroup("128 jobs", subGroup.jobs[]); 1734 subGroup.group.onFinish = &subGroup.freeMee; 1735 thPool.addGroup(&subGroup.group, data.group); 1736 1737 /// Add single tasks 1738 data.del = &importantTaskSubTask; 1739 data.name = "sub"; 1740 thPool.addJob(data); 1741 } 1742 1743 // Job which simply does some work 1744 // jobsNum * 128 - Number of jobs of this kind in frame 1745 void importantTaskSubTask(ThreadData* threadData, JobData* data) 1746 { 1747 } 1748 1749 // Finish frame 1750 // 1 - Number of jobs of this kind in frame 1751 void finishFrame(ThreadData* threadData, JobData* startFrameJobData) 1752 { 1753 auto num = atomicOp!"+="(frameNum, 1); 1754 if (num == 10) 1755 { 1756 thPool.releaseExternalThreads(); // After 10 frames exit application 1757 return; 1758 } 1759 *startFrameJobData = JobData(&startFrame, "StartFrame"); // 1760 thPool.addJobAsynchronous(startFrameJobData); // Start next frame, there should't be any other tasks execept of this one, so no synchronization is required 1761 1762 } 1763 1764 // Func to test if dynamic changing of threads number works 1765 // void changeThreadsNum() 1766 // { 1767 // import std.random : uniform; 1768 1769 // bool change = uniform(0, 100) == 3; 1770 // if (!change) 1771 // return; 1772 1773 // int threadsNum = uniform(3, 5); 1774 // thPool.setThreadsNum(threadsNum); 1775 1776 // } 1777 } 1778 1779 void testThreadsNum(int threadsNum) 1780 { 1781 frameNum = 0; 1782 thPool.jobsDoneCountReset(); 1783 thPool.setThreadsNum(threadsNum); 1784 1785 TestApp testApp = TestApp(); 1786 startFrameJobData = JobData(&testApp.startFrame, "StartFrame"); // Start first frame, will live as long as main thread won't exit from threadStartFunc() 1787 1788 ulong start = useconds(); 1789 thPool.addJobAsynchronous(&startFrameJobData); // Synchronization is made by groupEnd (last job in pool) which calls thPool.releaseExternalThreads(); 1790 mainThread.threadStartFunc(); 1791 ulong end = useconds(); 1792 printf("Threads Num: %2d. Jobs: %d. Time: %5.2f ms. jobs/ms: %5.2f\n", threadsNum, thPool.jobsDoneCount, 1793 (end - start) / 1000.0f, thPool.jobsDoneCount / ((end - start) / 1000.0f)); 1794 } 1795 1796 while (1) 1797 { 1798 // foreach (i; 1 .. 32) 1799 // testThreadsNum(i); 1800 1801 testThreadsNum(1); 1802 testThreadsNum(4); 1803 testThreadsNum(16); 1804 } 1805 thPool.flushAllLogs(); 1806 thPool.waitThreads(); 1807 thPool.unregistExternalThread(mainThread); 1808 } 1809 1810 version (D_BetterC) 1811 { 1812 1813 extern (C) int main(int argc, char*[] argv) // for betterC 1814 { 1815 testThreadPool(); 1816 return 0; 1817 } 1818 } 1819 else 1820 { 1821 int main() 1822 { 1823 testThreadPool(); 1824 return 0; 1825 } 1826 }//*/ 1827 // Compile 1828 // -fsanitize=address 1829 // rdmd -g -of=thread_pool src/mmutils/thread_pool.d && ./thread_pool 1830 // ldmd2 -release -inline -checkaction=C -g -of=thread_pool src/mmutils/thread_pool.d && ./thread_pool 1831 // ldmd2 -checkaction=C -g -of=thread_pool src/mmutils/thread_pool.d && ./thread_pool