vspline 1.1.0
Generic C++11 Code for Uniform B-Splines
multithread.h
Go to the documentation of this file.
1/************************************************************************/
2/* */
3/* vspline - a set of generic tools for creation and evaluation */
4/* of uniform b-splines */
5/* */
6/* Copyright 2015 - 2023 by Kay F. Jahnke */
7/* */
8/* The git repository for this software is at */
9/* */
10/* https://bitbucket.org/kfj/vspline */
11/* */
12/* Please direct questions, bug reports, and contributions to */
13/* */
14/* kfjahnke+vspline@gmail.com */
15/* */
16/* Permission is hereby granted, free of charge, to any person */
17/* obtaining a copy of this software and associated documentation */
18/* files (the "Software"), to deal in the Software without */
19/* restriction, including without limitation the rights to use, */
20/* copy, modify, merge, publish, distribute, sublicense, and/or */
21/* sell copies of the Software, and to permit persons to whom the */
22/* Software is furnished to do so, subject to the following */
23/* conditions: */
24/* */
25/* The above copyright notice and this permission notice shall be */
26/* included in all copies or substantial portions of the */
27/* Software. */
28/* */
29/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND */
30/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES */
31/* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND */
32/* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT */
33/* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, */
34/* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING */
35/* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR */
36/* OTHER DEALINGS IN THE SOFTWARE. */
37/* */
38/************************************************************************/
39
40/// \file multithread.h
41///
42/// \brief code to distribute the processing of bulk data to several threads
43///
44/// The code in this header provides a resonably general method to perform
45/// processing of manifolds of data with several threads in parallel. In vspline,
46/// there are several areas where potentially large numbers of individual values
47/// have to be processed independently of each other or in a dependence which
48/// can be preserved in partitioning. To process such 'bulk' data effectively,
49/// vspline employs two strategies: multithreading and vectorization.
50/// This file handles the multithreading.
51///
52/// multithreading, the use of related headers and linkage with pthread are
53/// optional in vspline and can be switched off by #defining VSPLINE_SINGLETHREAD.
54/// This is the reason for the template vspline::atomic, which normally uses
55/// std::atomic, but instead uses a stand-in type providing 'mock' functionality
56/// if VSPLINE_SINGLETHREAD is defined, to allow the use of the same logic.
57///
58/// As of March 2019, the multithreading code has been simplified: The routine
59/// 'multithread' takes a std::function<void()> as 'payload'. The payload code
60/// is wrapped in an outer function keeping track of worker threads terminating,
61/// and when the last of a set of worker threads who have been assigned a job
62/// terminates, control is returned to the caller of 'maultithread'. This logic
63/// ensures that a multithreaded task is complete when control returns.
64///
65/// This logic implies that the caller and the payload code cooperate to
66/// split up the work load, since 'multithread' itself does not concern itself
67/// with this aspect. In vspline, this is done using a vspline::atomic instance
68/// in the caller, which is passed into the workers and used by them to obtain
69/// 'joblet numbers', which they interpret as signifying a (small-ish) share of
70/// some larger data set. The passing-to-the-workers is done by per-reference
71/// lambda capture, which conveniently transports the caller's context into
72/// the workers. The payload code keeps taking 'joblet numbers' from the atomic
73/// until they are finished - then it terminates.
74///
75/// This strategy separates the granularity of the workload distribution from
76/// the number of worker threads, resulting in even load distribution and little
77/// tail-end idling (when most jobs are complete and only a few aren't) - and
78/// it also makes any data partitioning code unnecessary: jobs which are laid
79/// to rest by the OS may, on re-awakening, have some data left to process, but
80/// if the remainder of the job is done (joblet numbers are finished) that's all
81/// they have to do, taking much less time than having to complete some previously
82/// scheduled fixed work load. It also allows running some task 'on the back
83/// burner', employing only a small number of workers: Since load distribution
84/// is automatic, the job will only take longer to execute.
85///
86/// I like this technique, and I've dubbed it 'atomic surfing' ;)
87///
88/// So now it should be clear why a stnd-in type is needed if VSPLINE_SINGLETHREAD
89/// is #defined: using the atomic to share joblet numbers between caller and
90/// workers is part of the logic, and it's easier to just code using them and
91/// using the stand-in type for the single-threaded case, which preserves the
92/// logic, but gets by without any reference to multithreading-related headers
93/// or libraries.
94///
95/// In vspline, the granularity of 'joblets' is single 1D subarrays of a larger
96/// data set. If the data are images, joblets process lines. The number of threads
97/// used to process an entire data set is fixed in thread_pool.h, and is some small
98/// multiple of the number of available physical cores. This seems strange, because
99/// one might assume that it should be best to have as many threads as cores, but
100/// I found that using more (up to a point) increases performance, and since one of
101/// my main concerns in vspline is speed, I've coded so that per default a number
102/// of threads is chosen which runs best *on my system* - thiy may not be optimal
103/// for other hardware. To change the number, see the definition of default_njobs
104/// in thread_pool.h.
105///
106/// the multithreading code in this header is used both by vspline's digital filter
107/// code and by 'wielding.h' which provides the data flow logic for vspline's
108/// transform-like routines.
109
110#ifndef VSPLINE_MULTITHREAD_H
111#define VSPLINE_MULTITHREAD_H
112
113#include <assert.h>
114
115#ifndef VSPLINE_SINGLETHREAD
116
117// only include multithreading-related headers if VSPLINE_SINGLETHREAD
118// is *not* defined
119
120#include <thread>
121#include <mutex>
122#include <queue>
123#include <condition_variable>
124#include <atomic>
125#include "thread_pool.h"
126
127#endif // #ifndef VSPLINE_SINGLETHREAD
128
129#include "common.h"
130
131namespace vspline
132{
133
134// if VSPLINE_SINGLETHREAD is defined, we provide some fallback code which
135// allows the remainder of vspline to remain unaware of the fact and use the
136// same logic that's used for multithreaded operation
137
138#ifdef VSPLINE_SINGLETHREAD
139
140const int ncores = 1 ;
141
142const int default_njobs = 1 ;
143
144// 'multithread' itself collapses to merely executing the payload code in the
145// current thread:
146
147template < bool dummy = true >
148int multithread ( std::function < void() > payload ,
149 std::size_t nr_workers = 1 )
150{
151 // guard against empty or wrong number
152
153 if ( nr_workers <= 0 )
154 {
155 return 0 ;
156 }
157
158 payload() ;
159 return 1 ;
160}
161
162// we need a stand-in for vspline::atomic since atomics are used where
163// multithreading normally takes place, but with VSPLINE_SINGLETHREAD
164// <atomic> is not #included. The stand-in type only provides a minimal
165// set of functions, namely those which are used inside vspline.
166
167template < typename T >
168struct atomic
169{
170 typedef T value_type ;
171
172 value_type value ;
173
174 atomic ( const value_type & _value )
175 : value ( _value ) { } ;
176
177 value_type load()
178 {
179 return value ;
180 }
181
182 value_type operator++()
183 {
184 return ++value ;
185 }
186
187 value_type operator--()
188 {
189 return --value ;
190 }
191
192 value_type operator++ ( int )
193 {
194 return value++ ;
195 }
196
197 value_type operator-- ( int )
198 {
199 return value-- ;
200 }
201
202 value_type fetch_sub ( value_type arg )
203 {
204 value_type v = value ;
205 value -= arg ;
206 return v ;
207 }
208
209 value_type fetch_add ( value_type arg )
210 {
211 value_type v = value ;
212 value += arg ;
213 return v ;
214 }
215} ;
216
217#else // VSPLINE_SINGLETHREAD
218
221
222// when using multithreading, vspline::atomic is an alias for std::atomic
223
224template < typename T > using atomic = std::atomic < T > ;
225
226#endif // VSPLINE_SINGLETHREAD
227
228// we start out with a bit of collateral code. I have changed the
229// multithreading code to not use ranges anymore. This simplifies
230// the code greatly, and calling code will now follow a specific
231// pattern: it will set up a vspline::atomic initialized to some total
232// number of 'joblets', which are taken to mean indexes which can be
233// applied either directly to pointers and C++ arrays, or to iterators.
234// The payload code receives a pointer or reference to this atomic.
235// With the new multithreading logic, all workers get precisely the
236// same payload routine and are responsible to obtain shares of the
237// total work load autonomously. This is realized by obtaining
238// indexes from the atomic. There are two typical cases: the payload
239// code may want to process the indexes singly, or in batches of
240// a certain size. The fetch_XXX routines below are utility code to
241// obtain such indexes. I provide two variants for each case, one
242// variant counting the indexes up from zero, the other counting
243// down to zero. The payload code runs a loop repeatedly calling
244// fetch_XXX. If fetch_XXX returns false, there are no more indexes
245// to be had, the loop is exited, and the payload routine finishes.
246// If fetch_XXX returns true, the index(es) were set and the payload
247// code can use them in whichever way is appropriate, and then
248// proceed to the next iteration.
249//
250// If VSPLINE_SINGLETHREAD is defined, the stand-in type above
251// is used in stead of a std::atomic, so the logic can remain the same:
252// the joblet numbers are now consumed by the caller's thread one after
253// the other, just like in an ordinary loop, and with minimal overhead,
254// which may even be optimized away completely.
255
256/// fetch_descending fetches the next index from an atomic,
257/// counting down. Indexes will range from one less than the value
258/// the atomic was initialized with, down to zero. If all indexes were
259/// distributed already, false is returned, true otherwise. Like the
260/// other fetch_XXX routines, the return of a boolean makes these
261/// functions good candidates to be used as conditional in a loop.
262
263template < typename index_t >
265 index_t & index )
266{
267 index_t _index = --source ;
268
269 if ( _index < 0 )
270 return false ;
271
272 index = _index ;
273 return true ;
274}
275
276/// fetch_ascending counts up from zero to total-1, which is more
277/// efficient if the indexes are used to address memory. This is due
278/// to the side effects of accessing memory: if memory is accessed at
279/// an address x, the OS will typically fetch a chunk of data starting
280/// at or shortly before x. If the next fetch requires data just after
281/// x, there is a good chance that they are already in cache.
282
283template < typename index_t >
285 const index_t & total ,
286 index_t & index )
287{
288 index_t _index = --source ;
289
290 if ( _index < 0 )
291 return false ;
292
293 index = total - 1 - _index ;
294 return true ;
295}
296
297/// fetch_range_ascending fetches the beginning and end of a range of
298/// indexes (in iterator semantic, low <= i < high) from a
299/// vspline::atomic which has been initialized with the total number
300/// of indexes that are to be processed. If the vspline::atomic, when
301/// accessed, already holds a value of or below zero, fetch_index_range
302/// returns false and leaves low and high unchanged. Otherwise it
303/// returns true and low and high will be set to the values gleaned
304/// from the atomic, raising 'low 'to zero if it would come out below.
305/// this function (and the next) enable calling code to process batches
306/// of adjacent indexes without any index artistry: the code is perfectly
307/// general and simple, and the use of the atomic and fetch_sub garantees
308/// that each fetch provides a distinct batch of indexes.
309
310template < typename index_t >
312 const index_t & count ,
313 index_t & low ,
314 index_t & high )
315{
316 index_t high_index = source.fetch_sub ( count ) ;
317 index_t low_index = high_index - count ;
318
319 if ( high_index <= 0 )
320 return false ;
321
322 if ( low_index < 0 )
323 low_index = 0 ;
324
325 low = low_index ;
326 high = high_index ;
327 return true ;
328}
329
330/// fetch_range_ascending also uses an atomic initialized to the total
331/// number of indexes to be distributed, but the successive ranges are
332/// handed out in ascending order, which is more efficient if the indexes
333/// are used to address memory.
334
335template < typename index_t >
337 const index_t & count ,
338 const index_t & total ,
339 index_t & low ,
340 index_t & high )
341{
342 index_t high_index = source.fetch_sub ( count ) ;
343 index_t low_index = high_index - count ;
344
345 if ( high_index <= 0 )
346 return false ;
347
348 if ( low_index < 0 )
349 low_index = 0 ;
350
351 high = total - low_index ;
352 low = total - high_index ;
353
354 return true ;
355}
356
357#ifndef VSPLINE_SINGLETHREAD
358
359/// multithread uses a thread pool of worker threads to perform
360/// a multithreaded operation. It receives a functor (a single-threaded
361/// function used for all individual tasks), and, optionally, the
362/// desired number of worker instances to be used.
363/// These tasks are wrapped with a wrapper which takes care of
364/// signalling when the last task has completed.
365///
366/// This, in a way, is the purest implementation of 'multithread': this
367/// implementation is not involved with any property of the jobs at hand, it
368/// merely forwards the arguments for the payload function to a specified
369/// number of workers. The code invoking 'multithread' has to set up
370/// whatever scheme it deems appropriate to convey, via the arguments,
371/// what the worker threads should do. with this mechanism in place, the
372/// calling code has the very efficient option of setting up a vspline::atomic
373/// holding some sort of 'joblet number' and passing a reference to this atomic
374/// to the payload code. The worker(s) executing payload code all get
375/// equal load until the job numbers are exhausted, which is when they
376/// terminate one by one. Since there is no inter-thread communication
377/// during the active phase, there is no signalling overhead at all,
378/// which allows fine granularity. The fine granularity ensures little
379/// tail-end idling (when the caller has to wait for the last worker to
380/// finish) and also makes it possible to choose some sort of partitioning
381/// which insinuates itself from the structure of the data at hand, rather
382/// than some preconceived parcel of the total job. When there are many more
383/// joblet numbers than workers, intermittent inactivity of a worker simply
384/// makes it consume fewer job numbers, rather than delaying it on a way
385/// to a preset goal.
386///
387/// Another effect is that, if job numbers relate to memory worked on,
388/// (think of lines of an image) - all activity is focussed in a narrow
389/// band of the memory, because the currently processed job numbers are
390/// all usually in sequence (unless the OS throws a spanner in the works
391/// by halting threads). This may or may not help - in some situations
392/// having several threads access adjacent memory locations may make it
393/// harder for the system to synchronize access. But payload code is
394/// free to use any interpretation of job numbers anyway, so that's an
395/// issue on the payload side.
396///
397/// Since the number of threads in the pool is static, requesting more
398/// workers than there are threads is futile (but still works). Requesting
399/// fewer may be useful to have some task 'on the back burner' while some
400/// critical task receives more workers to complete faster.
401///
402/// As I've pointed out in thread_pool.h, it seems beneficial (at least for
403/// vspline) to have a good deal more threads than physical cores. See there
404/// for reasons why this may be so.
405///
406/// Last but not least: the code is very simple :)
407///
408/// Why is multithread a template? So that several TUs can #include
409/// vspline without linker errors.
410
411template < bool dummy = true >
412int multithread ( std::function < void() > payload ,
413 std::size_t nr_workers = default_njobs )
414{
415 // guard against empty or wrong number
416
417 if ( nr_workers <= 0 )
418 {
419 return 0 ;
420 }
421
422 if ( nr_workers == 1 )
423 {
424 // if only one worker is to be used, we take a shortcut
425 // and execute the payload function right here:
426
427 payload() ;
428 return 1 ;
429 }
430
431 // TODO: I'd rather use the code where the count is kept in an atomic,
432 // but I get failure to join in the thread pool's d'tor.
433 // This variant seems to terminate reliably.
434
435 int count = nr_workers ; // number of tasks
436 std::mutex pool_mutex ; // mutex guarding count and pool_cv
437 std::condition_variable pool_cv ; // cv for signalling completion
438
439 // first we create the callable which is passed to the worker threads.
440 // this wrapper around the 'payload' takes care of signalling when the
441 // last worker thread has finished with it's current job.
442
443 auto action = [&]
444 {
445
446 // execute the 'payload'
447
448 payload() ;
449
450 {
451 // under pool_mutex, check if this was the last missing worker to
452 // terminate. The lock is released with the closing scope; the
453 // notify call to the condition variable does not need the lock,
454 // the docu says that that would even be a pessimization.
455 // but - here the notify is back under the lock_guard, had
456 // random crashes again, see if this fixes it.
457
458 std::lock_guard<std::mutex> lk ( pool_mutex ) ;
459
460 if ( ( -- count ) == 0 )
461
462 pool_cv.notify_one() ;
463
464 }
465
466 } ;
467
468 {
469 // acquire a lock on pool_mutex to stop any action finishing early
470 // from modifying 'count'
471
472 std::unique_lock<std::mutex> lk_pool ( pool_mutex ) ;
473
474 vspline_threadpool::common_thread_pool.launch ( action , nr_workers ) ;
475
476 // now wait for the last task to complete. This is signalled by
477 // the action code by notifying on pool_cv
478 // the predicate count == 0 rejects spurious wakes
479
480 pool_cv.wait ( lk_pool , [&] { return count == 0 ; } ) ;
481 }
482
483 // all jobs are done
484
485 return nr_workers ;
486}
487
488#endif // VSPLINE_SINGLETHREAD
489
490} ; // end of namespace vspline
491
492#endif // #ifndef VSPLINE_MULTITHREAD_H
void launch(std::function< void() > job)
launch simply enqueues a job and calls notify_one. Such a job will run to it's completion and end sil...
Definition: thread_pool.h:204
definitions common to all files in this project, utility code
const int ncores
number of CPU cores in the system
Definition: thread_pool.h:87
const int default_njobs
when multithreading, use this number of jobs per default. This looks like overkill and unnecessary si...
Definition: thread_pool.h:99
Definition: basis.h:79
const int ncores
Definition: multithread.h:219
const int default_njobs
Definition: multithread.h:220
bool fetch_ascending(vspline::atomic< index_t > &source, const index_t &total, index_t &index)
fetch_ascending counts up from zero to total-1, which is more efficient if the indexes are used to ad...
Definition: multithread.h:284
int multithread(std::function< void() > payload, std::size_t nr_workers=default_njobs)
multithread uses a thread pool of worker threads to perform a multithreaded operation....
Definition: multithread.h:412
bool fetch_range_ascending(vspline::atomic< index_t > &source, const index_t &count, const index_t &total, index_t &low, index_t &high)
fetch_range_ascending also uses an atomic initialized to the total number of indexes to be distribute...
Definition: multithread.h:336
std::atomic< T > atomic
Definition: multithread.h:224
bool fetch_descending(vspline::atomic< index_t > &source, index_t &index)
fetch_descending fetches the next index from an atomic, counting down. Indexes will range from one le...
Definition: multithread.h:264
bool fetch_range_descending(vspline::atomic< index_t > &source, const index_t &count, index_t &low, index_t &high)
fetch_range_ascending fetches the beginning and end of a range of indexes (in iterator semantic,...
Definition: multithread.h:311
provides a thread pool for vspline's multithread() routine