vspline 1.1.0
Generic C++11 Code for Uniform B-Splines
thread_pool.h
Go to the documentation of this file.
1/************************************************************************/
2/* */
3/* vspline - a set of generic tools for creation and evaluation */
4/* of uniform b-splines */
5/* */
6/* Copyright 2015 - 2023 by Kay F. Jahnke */
7/* */
8/* The git repository for this software is at */
9/* */
10/* https://bitbucket.org/kfj/vspline */
11/* */
12/* Please direct questions, bug reports, and contributions to */
13/* */
14/* kfjahnke+vspline@gmail.com */
15/* */
16/* Permission is hereby granted, free of charge, to any person */
17/* obtaining a copy of this software and associated documentation */
18/* files (the "Software"), to deal in the Software without */
19/* restriction, including without limitation the rights to use, */
20/* copy, modify, merge, publish, distribute, sublicense, and/or */
21/* sell copies of the Software, and to permit persons to whom the */
22/* Software is furnished to do so, subject to the following */
23/* conditions: */
24/* */
25/* The above copyright notice and this permission notice shall be */
26/* included in all copies or substantial portions of the */
27/* Software. */
28/* */
29/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND */
30/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES */
31/* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND */
32/* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT */
33/* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, */
34/* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING */
35/* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR */
36/* OTHER DEALINGS IN THE SOFTWARE. */
37/* */
38/************************************************************************/
39
40/// \file thread_pool.h
41///
42/// \brief provides a thread pool for vspline's multithread() routine
43///
44/// class thread_pool aims to provide a simple and straightforward implementation
45/// of a thread pool for multithread() in multithread.h, but the class might find
46/// use elsewhere. The operation is simple, I think of it as 'piranha mode' ;)
47///
48/// a set of worker threads is launched which wait for 'tasks', which come in the shape
49/// of std::function<void()>, from a queue. When woken, a worker thread tries to obtain
50/// a task. If it succeeds, the task is executed, and the worker thread tries to get
51/// another task. If none is to be had, it goes to sleep, waiting to be woken once
52/// there are new tasks.
53
54#include <thread>
55#include <mutex>
56#include <queue>
57#include <functional>
58#include <condition_variable>
59
60// KFJ 2019-09-02 new namespace vspline_threadpool
61// I am now keeping class thread_pool and common_thread_pool in this
62// separate namespace to facilitata 'dubbing' of vspline. With dubbing
63// I mean using preprocessor manoevreas like "#define vspline NS_AVX"
64// used to create independent, ISA-specific compiles of vspline in
65// several TUs, to be linked together into a 'monolithic' binary.
66// While this worked with the thread pool code in namespace vspline,
67// it created a separate thread pool for each ISA-specific TU. This
68// is wasteful and unnecessary; the thread pool code is the same and
69// it's ISA-independent. With the new namespace, the ISA-specific TUs
70// are compiled with -D VSPLINE_EXTERN_THREAD_POOL, and one TU has to
71// provide the pool by containing
72//
73// namespace vspline_threadpool
74// {
75// thread_pool common_thread_pool ;
76// } ;
77//
78// less complex scenarios where only one TU #includes vspline
79// get a static thread pool, as before - only that it now lives in
80// a separate namespace.
81
83{
84
85/// number of CPU cores in the system
86
87const int ncores = std::thread::hardware_concurrency() ;
88
89/// when multithreading, use this number of jobs per default.
90/// This looks like overkill and unnecessary signalling overhead,
91/// but it improves performance over just having as many threads as
92/// there are physical cores. Why is this so? There are several
93/// possibilities I've considered:
94/// - while one task waits e.g. for memory, another task can perform
95/// computations
96/// - the scheduler might assign time slices to each thread, so
97/// having more threads yields more time slices
98
99const int default_njobs = 2 * ncores ;
100
102{
103 // used to switch off the worker threads at program termination.
104 // access under task_mutex.
105
106 bool stay_alive = true ;
107
108 // the thread pool itself is held in this variable. The pool
109 // does not change after construction
110
111 std::vector < std::thread * > pool ;
112
113public:
114
115 // mutex and condition variable for interaction with the task queue
116 // and stay_alive
117
118 std::mutex task_mutex ;
119 std::condition_variable task_cv ;
120
121 // queue to hold tasks. access under task_mutex
122
123 std::queue < std::function < void() > > task_queue ;
124
125private:
126
127 /// code to run a worker thread
128 /// We use a thread pool of worker threads. These threads have a very
129 /// simple cycle: They try and obtain a task (std::function<void()>).
130 /// If there is one to be had, it is invoked, otherwise they wait on
131 /// task_cv. When woken up, the flag stay_alive is checked, and if it
132 /// is found to be false, the worker thread ends.
133
134 void worker_thread()
135 {
136 while ( true )
137 {
138 // under task_mutex, check stay_alive and try to obtain a task
139
140 std::unique_lock<std::mutex> task_lock ( task_mutex ) ;
141
142 if ( ! stay_alive )
143 {
144 task_lock.unlock() ;
145 break ; // die
146 }
147
148 if ( task_queue.size() )
149 {
150 // there are tasks in the queue, take one, unlock
151
152 auto task = task_queue.front() ;
153 task_queue.pop() ;
154 task_lock.unlock() ;
155
156 // got a task, perform it, then try for another one
157
158 task() ;
159 }
160 else
161 {
162 // no luck. wait until alerted
163
164 task_cv.wait ( task_lock ) ; // spurious alert is okay
165 }
166
167 // now start next cycle, either after having completed a job
168 // or after having been woken by an alert
169 }
170 }
171
172public:
173
174 // Only as many threads as there are physical cores can run at the same
175 // time, so one might assume that having more threads is futile.
176 // Surprisingly - at least on my system - this is not so: I get the
177 // best performance with a significantly larger number of threads. I'm
178 // not sure why this is so, see above for some possible reasons.
179
180 thread_pool ( int nthreads = default_njobs )
181 {
182 // to launch a thread with a method, we need to bind it to the object:
183
184 std::function < void() > wf
185 = std::bind ( &thread_pool::worker_thread , this ) ;
186
187 // now we can fill the pool with worker threads
188
189 for ( int t = 0 ; t < nthreads ; t++ )
190 pool.push_back ( new std::thread ( wf ) ) ;
191 }
192
193 int get_nthreads() const
194 {
195 return pool.size() ;
196 }
197
198 /// launch simply enqueues a job and calls notify_one. Such a job
199 /// will run to it's completion and end silently - any communication
200 /// of it's state has to be managed by the job itself. See
201 /// multithread.h for code which takes care of managing the life cycle
202 /// of a group of jobs by wrapping them in an additional outer function
203
204 void launch ( std::function < void() > job )
205 {
206 {
207 std::lock_guard<std::mutex> lk_task ( task_mutex ) ;
208 task_queue.push ( job ) ;
209 }
210
211 task_cv.notify_one() ;
212 }
213
214 /// overload of launch invoking the payload on several worker threads
215
216 void launch ( std::function < void() > job , int njobs )
217 {
218 if ( njobs <= 0 )
219 return ;
220
221 {
222 std::lock_guard<std::mutex> lk_task ( task_mutex ) ;
223 for ( int i = 0 ; i < njobs ; i++ )
224 task_queue.push ( job ) ;
225 }
226
227 task_cv.notify_all() ;
228 }
229
231 {
232 {
233 // under task_mutex, set stay_alive to false
234
235 std::lock_guard<std::mutex> task_lock ( task_mutex ) ;
236 stay_alive = false ;
237 }
238
239 // wake all inactive worker threads,
240 // join all worker threads once they are finished
241
242 task_cv.notify_all() ;
243
244 for ( auto threadp : pool )
245 {
246 threadp->join() ;
247 }
248
249 // once all are joined, delete their std::thread object
250
251 for ( auto threadp : pool )
252 {
253 delete threadp ;
254 }
255 }
256} ;
257
258#ifndef VSPLINE_SINGLETHREAD
259
260// if VSPLINE_EXTERN_THREAD_POOL is #defined, we rely on some other TU
261// providing the common thread pool. If it is not #defined, we use a static
262// thread pool for the TU #including this header. This should be safe
263// because the include guards should guarantee that each TU includes
264// this section here precisely once.
265
266#ifdef VSPLINE_EXTERN_THREAD_POOL
267
268extern thread_pool common_thread_pool ;
269
270#else
271
272static thread_pool common_thread_pool ;
273
274#endif // VSPLINE_EXTERN_THREAD_POOL
275
276#endif // VSPLINE_SINGLETHREAD
277
278} ; // end of namespace vspline_threadpool
279
void launch(std::function< void() > job, int njobs)
overload of launch invoking the payload on several worker threads
Definition: thread_pool.h:216
std::queue< std::function< void() > > task_queue
Definition: thread_pool.h:123
std::condition_variable task_cv
Definition: thread_pool.h:119
thread_pool(int nthreads=default_njobs)
Definition: thread_pool.h:180
void launch(std::function< void() > job)
launch simply enqueues a job and calls notify_one. Such a job will run to it's completion and end sil...
Definition: thread_pool.h:204
const int ncores
number of CPU cores in the system
Definition: thread_pool.h:87
const int default_njobs
when multithreading, use this number of jobs per default. This looks like overkill and unnecessary si...
Definition: thread_pool.h:99