OpenWalnut  1.4.0
WThreadedFunction.h
1 //---------------------------------------------------------------------------
2 //
3 // Project: OpenWalnut ( http://www.openwalnut.org )
4 //
5 // Copyright 2009 OpenWalnut Community, BSV@Uni-Leipzig and CNCF@MPI-CBS
6 // For more information see http://www.openwalnut.org/copying
7 //
8 // This file is part of OpenWalnut.
9 //
10 // OpenWalnut is free software: you can redistribute it and/or modify
11 // it under the terms of the GNU Lesser General Public License as published by
12 // the Free Software Foundation, either version 3 of the License, or
13 // (at your option) any later version.
14 //
15 // OpenWalnut is distributed in the hope that it will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 // GNU Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public License
21 // along with OpenWalnut. If not, see <http://www.gnu.org/licenses/>.
22 //
23 //---------------------------------------------------------------------------
24 
25 #ifndef WTHREADEDFUNCTION_H
26 #define WTHREADEDFUNCTION_H
27 
28 #include <memory.h>
29 #include <iostream>
30 
31 #include <string>
32 #include <vector>
33 #ifndef Q_MOC_RUN
34 #include <boost/thread.hpp>
35 #endif
36 
37 #include "WAssert.h"
38 #include "WWorkerThread.h"
39 #include "WSharedObject.h"
40 
41 
42 /**
43  * An enum indicating the status of a multithreaded computation
44  */
45 enum WThreadedFunctionStatus
46 {
47  W_THREADS_INITIALIZED, //! the status after constructing the function
48  W_THREADS_RUNNING, //! the threads were started
49  W_THREADS_STOP_REQUESTED, //! a stop was requested and not all threads have stopped yet
50  W_THREADS_ABORTED, //! at least one thread was aborted due to a stop request or an exception
51  W_THREADS_FINISHED //! all threads completed their work successfully
52 };
53 
54 /**
55  * An enum indicating the number of threads used
56  */
57 enum WThreadedFunctionNbThreads
58 {
59  W_AUTOMATIC_NB_THREADS = 0 //!< Use half the available cores as number of threads
60 };
61 
62 /**
63  * \class WThreadedFunctionBase
64  *
65  * A virtual base class for threaded functions (see below).
66  */
67 class WThreadedFunctionBase // NOLINT
68 {
69  //! a type for exception signals
70  typedef boost::signals2::signal< void ( WException const& ) > ExceptionSignal;
71 
72 public:
73  //! a type for exception callbacks
74  typedef boost::function< void ( WException const& ) > ExceptionFunction;
75 
76  /**
77  * Standard constructor.
78  */
80 
81  /**
82  * Destroys the thread pool and stops all threads, if any one of them is still running.
83  *
84  * \note Of course, the client has to make sure the threads do not work endlessly on a single job.
85  */
86  virtual ~WThreadedFunctionBase();
87 
88  /**
89  * Starts the threads.
90  */
91  virtual void run() = 0;
92 
93  /**
94  * Request all threads to stop. Returns immediately, so you might
95  * have to wait() for the threads to actually finish.
96  */
97  virtual void stop() = 0;
98 
99  /**
100  * Wait for all threads to stop.
101  */
102  virtual void wait() = 0;
103 
104  /**
105  * Get the status of the threads.
106  *
107  * \return The current status.
108  */
109  WThreadedFunctionStatus status();
110 
111  /**
112  * Returns a condition that gets fired when all threads have finished.
113  *
114  * \return The condition indicating all threads are done.
115  */
116  boost::shared_ptr< WCondition > getThreadsDoneCondition();
117 
118  /**
119  * Subscribe a function to an exception signal.
120  *
121  * \param func The function to subscribe.
122  */
123  void subscribeExceptionSignal( ExceptionFunction func );
124 
125 protected:
126  /**
127  * WThreadedFunctionBase is non-copyable, so the copy constructor is not implemented.
128  */
129  WThreadedFunctionBase( WThreadedFunctionBase const& ); // NOLINT
130 
131  /**
132  * WThreadedFunctionBase is non-copyable, so the copy operator is not implemented.
133  *
134  * \return this function
135  */
137 
138  //! a condition that gets notified when the work is complete
139  boost::shared_ptr< WCondition > m_doneCondition;
140 
141  //! a signal for exceptions
142  ExceptionSignal m_exceptionSignal;
143 
144  //! the current status
146 };
147 
148 /**
149  * \class WThreadedFunction
150  *
151  * Creates threads that computes a function in a multithreaded fashion. The template parameter
152  * is an object that provides a function to execute. The following function needs to be implemented:
153  *
154  * void operator ( std::size_t id, std::size_t mx, WBoolFlag const& s );
155  *
156  * Here, 'id' is the number of the thread currently executing the function, ranging from
157  * 0 to mx - 1, where 'mx' is the number of threads running. 's' is a flag that indicates
158  * if the execution should be stopped. Make sure to check the flag often, so that the threads
159  * can be stopped when needed.
160  *
161  * This class itself is NOT thread-safe, do not access it from different threads simultaneously.
162  * Also, make sure any resources used by your function are accessed in a threadsafe manner,
163  * as all threads share the same function object.
164  *
165  * Any exception thrown by your function will be caught and forwarded via the exception
166  * signal. Beware that the signal function will be called in the executing threads, as opposed
167  * to in your module thread. This means that the exception handler bound to the exception
168  * signal must be threadsafe too.
169  *
170  * The status of the execution can be checked via the status() function. Also, when all threads
171  * finish (due to throwing exceptions or actually successfully finishing computation ), a condition
172  * will be notified.
173  *
174  * \ingroup common
175  */
176 template< class Function_T >
178 {
179  //! a type for exception signals
180  typedef boost::signals2::signal< void ( WException const& ) > ExceptionSignal;
181 
182 public:
183  //! a type for exception callbacks
184  typedef boost::function< void ( WException const& ) > ExceptionFunction;
185 
186  /**
187  * Creates the thread pool with a given number of threads.
188  *
189  * \param numThreads The number of threads to create.
190  * \param function The function object.
191  *
192  * \note If the number of threads equals 0, a good number of threads will be determined by the threadpool.
193  */
194  WThreadedFunction( std::size_t numThreads, boost::shared_ptr< Function_T > function );
195 
196  /**
197  * Destroys the thread pool and stops all threads, if any one of them is still running.
198  *
199  * \note Of course, the client has to make sure the threads do not work endlessly on a single job.
200  */
201  virtual ~WThreadedFunction();
202 
203  /**
204  * Starts the threads.
205  */
206  virtual void run();
207 
208  /**
209  * Request all threads to stop. Returns immediately, so you might
210  * have to wait() for the threads to actually finish.
211  */
212  virtual void stop();
213 
214  /**
215  * Wait for all threads to stop.
216  */
217  virtual void wait();
218 
219 private:
220  /**
221  * WThreadedFunction is non-copyable, so the copy constructor is not implemented.
222  */
223  WThreadedFunction( WThreadedFunction const& ); // NOLINT
224 
225  /**
226  * WThreadedFunction is non-copyable, so the copy operator is not implemented.
227  *
228  * \return this function
229  */
231 
232  /**
233  * This function gets subscribed to the threads' stop signals.
234  */
235  void handleThreadDone();
236 
237  /**
238  * This function handles exceptions thrown in the worker threads.
239  *
240  * \param e The exception that was thrown.
241  */
242  void handleThreadException( WException const& e );
243 
244  //! the number of threads to manage
245  std::size_t m_numThreads;
246 
247  //! the threads
248  // use shared_ptr here, because WWorkerThread is non-copyable
249  std::vector< boost::shared_ptr< WWorkerThread< Function_T > > > m_threads;
250 
251  //! the function object
252  boost::shared_ptr< Function_T > m_func;
253 
254  //! a counter that keeps track of how many threads have finished
256 };
257 
258 template< class Function_T >
259 WThreadedFunction< Function_T >::WThreadedFunction( std::size_t numThreads, boost::shared_ptr< Function_T > function )
261  m_numThreads( numThreads ),
262  m_threads(),
263  m_func( function ),
264  m_threadsDone()
265 {
266  if( !m_func )
267  {
268  throw WException( std::string( "No valid thread function pointer." ) );
269  }
270 
271  // find a suitable number of threads
272  if( m_numThreads == W_AUTOMATIC_NB_THREADS )
273  {
274  m_numThreads = 1;
275  while( m_numThreads < boost::thread::hardware_concurrency() / 2 && m_numThreads < 1024 )
276  {
277  m_numThreads *= 2;
278  }
279  }
280 
281  // set number of finished threads to 0
282  m_threadsDone.getWriteTicket()->get() = 0;
283 
284  // create threads
285  for( std::size_t k = 0; k < m_numThreads; ++k )
286  {
287  boost::shared_ptr< WWorkerThread< Function_T > > t( new WWorkerThread< Function_T >( m_func, k, m_numThreads ) );
288  t->subscribeStopSignal( boost::bind( &WThreadedFunction::handleThreadDone, this ) );
289  t->subscribeExceptionSignal( boost::bind( &WThreadedFunction::handleThreadException, this, _1 ) );
290  m_threads.push_back( t );
291  }
292 }
293 
294 template< class Function_T >
296 {
297  stop();
298 }
299 
300 template< class Function_T >
302 {
303  // set the number of finished threads to 0
304  m_threadsDone.getWriteTicket()->get() = 0;
305  // change status
306  m_status.getWriteTicket()->get() = W_THREADS_RUNNING;
307  // start threads
308  for( std::size_t k = 0; k < m_numThreads; ++k )
309  {
310  m_threads[ k ]->run();
311  }
312 }
313 
314 template< class Function_T >
316 {
317  // change status
318  m_status.getWriteTicket()->get() = W_THREADS_STOP_REQUESTED;
319 
320  typename std::vector< boost::shared_ptr< WWorkerThread< Function_T > > >::iterator it;
321  // tell the threads to stop
322  for( it = m_threads.begin(); it != m_threads.end(); ++it )
323  {
324  ( *it )->requestStop();
325  }
326 }
327 
328 template< class Function_T >
330 {
331  typename std::vector< boost::shared_ptr< WWorkerThread< Function_T > > >::iterator it;
332  // wait for the threads to stop
333  for( it = m_threads.begin(); it != m_threads.end(); ++it )
334  {
335  ( *it )->wait();
336  }
337 }
338 
339 template< class Function_T >
341 {
342  typedef typename WSharedObject< std::size_t >::WriteTicket WT;
343 
344  WT t = m_threadsDone.getWriteTicket();
345  WAssert( t->get() < m_numThreads, "" );
346  ++t->get();
347  std::size_t k = t->get();
348  t = WT();
349 
350  if( m_numThreads == k )
351  {
353  ST s = m_status.getWriteTicket();
354  if( s->get() == W_THREADS_RUNNING )
355  {
356  s->get() = W_THREADS_FINISHED;
357  }
358  else if( s->get() == W_THREADS_STOP_REQUESTED )
359  {
360  s->get() = W_THREADS_ABORTED;
361  }
362  else
363  {
364  throw WException( std::string( "Invalid status change." ) );
365  }
366  m_doneCondition->notify();
367  }
368 }
369 
370 template< class Function_T >
372 {
373  // change status
375  WT w = m_status.getWriteTicket();
376  WAssert( w->get() != W_THREADS_FINISHED &&
377  w->get() != W_THREADS_ABORTED, "" );
378  if( w->get() == W_THREADS_RUNNING )
379  {
380  w->get() = W_THREADS_STOP_REQUESTED;
381  }
382  // force destruction of the write ticket
383  w = WT();
384  // update the number of finished threads
385  handleThreadDone();
386 
387  m_exceptionSignal( e );
388 }
389 
390 #endif // WTHREADEDFUNCTION_H
Creates threads that computes a function in a multithreaded fashion.
A worker thread that belongs to a.
Definition: WWorkerThread.h:46
boost::signals2::signal< void(WException const &) > ExceptionSignal
a type for exception signals
std::vector< boost::shared_ptr< WWorkerThread< Function_T > > > m_threads
the threads
virtual void run()
Starts the threads.
A virtual base class for threaded functions (see below).
boost::shared_ptr< Function_T > m_func
the function object
ExceptionSignal m_exceptionSignal
a signal for exceptions
void handleThreadException(WException const &e)
This function handles exceptions thrown in the worker threads.
void handleThreadDone()
This function gets subscribed to the threads' stop signals.
virtual ~WThreadedFunctionBase()
Destroys the thread pool and stops all threads, if any one of them is still running.
virtual ~WThreadedFunction()
Destroys the thread pool and stops all threads, if any one of them is still running.
WThreadedFunction(std::size_t numThreads, boost::shared_ptr< Function_T > function)
Creates the thread pool with a given number of threads.
WriteTicket getWriteTicket(bool suppressNotify=false) const
Returns a ticket to get write access to the contained data.
WThreadedFunctionBase & operator=(WThreadedFunctionBase const &)
WThreadedFunctionBase is non-copyable, so the copy operator is not implemented.
virtual void stop()=0
Request all threads to stop.
WSharedObject< std::size_t > m_threadsDone
a counter that keeps track of how many threads have finished
boost::shared_ptr< WCondition > getThreadsDoneCondition()
Returns a condition that gets fired when all threads have finished.
WThreadedFunctionBase()
Standard constructor.
virtual void wait()=0
Wait for all threads to stop.
WThreadedFunctionStatus status()
Get the status of the threads.
boost::signals2::signal< void(WException const &) > ExceptionSignal
a type for exception signals
boost::function< void(WException const &) > ExceptionFunction
a type for exception callbacks
boost::shared_ptr< WCondition > m_doneCondition
a condition that gets notified when the work is complete
Basic exception handler.
Definition: WException.h:38
WSharedObject< WThreadedFunctionStatus > m_status
the current status
void subscribeExceptionSignal(ExceptionFunction func)
Subscribe a function to an exception signal.
std::size_t m_numThreads
the number of threads to manage
virtual void wait()
Wait for all threads to stop.
virtual void stop()
Request all threads to stop.
WThreadedFunction & operator=(WThreadedFunction const &)
WThreadedFunction is non-copyable, so the copy operator is not implemented.
boost::function< void(WException const &) > ExceptionFunction
a type for exception callbacks
virtual void run()=0
Starts the threads.