Intel® OpenMP* Runtime Library
kmp_taskdeps.cpp
1 /*
2  * kmp_taskdeps.cpp
3  */
4 
5 /* <copyright>
6  Copyright (c) 1997-2015 Intel Corporation. All Rights Reserved.
7 
8  Redistribution and use in source and binary forms, with or without
9  modification, are permitted provided that the following conditions
10  are met:
11 
12  * Redistributions of source code must retain the above copyright
13  notice, this list of conditions and the following disclaimer.
14  * Redistributions in binary form must reproduce the above copyright
15  notice, this list of conditions and the following disclaimer in the
16  documentation and/or other materials provided with the distribution.
17  * Neither the name of Intel Corporation nor the names of its
18  contributors may be used to endorse or promote products derived
19  from this software without specific prior written permission.
20 
21  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 
33 </copyright> */
34 
35 //#define KMP_SUPPORT_GRAPH_OUTPUT 1
36 
37 #include "kmp.h"
38 #include "kmp_io.h"
39 #include "kmp_wait_release.h"
40 
41 #if OMP_40_ENABLED
42 
43 //TODO: Improve memory allocation? keep a list of pre-allocated structures? allocate in blocks? re-use list finished list entries?
44 //TODO: don't use atomic ref counters for stack-allocated nodes.
45 //TODO: find an alternate to atomic refs for heap-allocated nodes?
46 //TODO: Finish graph output support
47 //TODO: kmp_lock_t seems a tad to big (and heavy weight) for this. Check other runtime locks
48 //TODO: Any ITT support needed?
49 
50 #ifdef KMP_SUPPORT_GRAPH_OUTPUT
51 static kmp_int32 kmp_node_id_seed = 0;
52 #endif
53 
54 static void
55 __kmp_init_node ( kmp_depnode_t *node )
56 {
57  node->dn.task = NULL; // set to null initially, it will point to the right task once dependences have been processed
58  node->dn.successors = NULL;
59  __kmp_init_lock(&node->dn.lock);
60  node->dn.nrefs = 1; // init creates the first reference to the node
61 #ifdef KMP_SUPPORT_GRAPH_OUTPUT
62  node->dn.id = KMP_TEST_THEN_INC32(&kmp_node_id_seed);
63 #endif
64 }
65 
66 static inline kmp_depnode_t *
67 __kmp_node_ref ( kmp_depnode_t *node )
68 {
69  KMP_TEST_THEN_INC32(&node->dn.nrefs);
70  return node;
71 }
72 
73 static inline void
74 __kmp_node_deref ( kmp_info_t *thread, kmp_depnode_t *node )
75 {
76  if (!node) return;
77 
78  kmp_int32 n = KMP_TEST_THEN_DEC32(&node->dn.nrefs) - 1;
79  if ( n == 0 ) {
80  KMP_ASSERT(node->dn.nrefs == 0);
81 #if USE_FAST_MEMORY
82  __kmp_fast_free(thread,node);
83 #else
84  __kmp_thread_free(thread,node);
85 #endif
86  }
87 }
88 
89 #define KMP_ACQUIRE_DEPNODE(gtid,n) __kmp_acquire_lock(&(n)->dn.lock,(gtid))
90 #define KMP_RELEASE_DEPNODE(gtid,n) __kmp_release_lock(&(n)->dn.lock,(gtid))
91 
92 static void
93 __kmp_depnode_list_free ( kmp_info_t *thread, kmp_depnode_list *list );
94 
95 static const kmp_int32 kmp_dephash_log2 = 6;
96 static const kmp_int32 kmp_dephash_size = (1 << kmp_dephash_log2);
97 
98 static inline kmp_int32
99 __kmp_dephash_hash ( kmp_intptr_t addr )
100 {
101  //TODO alternate to try: set = (((Addr64)(addrUsefulBits * 9.618)) % m_num_sets );
102  return ((addr >> kmp_dephash_log2) ^ addr) % kmp_dephash_size;
103 }
104 
105 static kmp_dephash_t *
106 __kmp_dephash_create ( kmp_info_t *thread )
107 {
108  kmp_dephash_t *h;
109 
110  kmp_int32 size = kmp_dephash_size * sizeof(kmp_dephash_entry_t) + sizeof(kmp_dephash_t);
111 
112 #if USE_FAST_MEMORY
113  h = (kmp_dephash_t *) __kmp_fast_allocate( thread, size );
114 #else
115  h = (kmp_dephash_t *) __kmp_thread_malloc( thread, size );
116 #endif
117 
118 #ifdef KMP_DEBUG
119  h->nelements = 0;
120 #endif
121  h->buckets = (kmp_dephash_entry **)(h+1);
122 
123  for ( kmp_int32 i = 0; i < kmp_dephash_size; i++ )
124  h->buckets[i] = 0;
125 
126  return h;
127 }
128 
129 static void
130 __kmp_dephash_free ( kmp_info_t *thread, kmp_dephash_t *h )
131 {
132  for ( kmp_int32 i=0; i < kmp_dephash_size; i++ ) {
133  if ( h->buckets[i] ) {
134  kmp_dephash_entry_t *next;
135  for ( kmp_dephash_entry_t *entry = h->buckets[i]; entry; entry = next ) {
136  next = entry->next_in_bucket;
137  __kmp_depnode_list_free(thread,entry->last_ins);
138  __kmp_node_deref(thread,entry->last_out);
139 #if USE_FAST_MEMORY
140  __kmp_fast_free(thread,entry);
141 #else
142  __kmp_thread_free(thread,entry);
143 #endif
144  }
145  }
146  }
147 #if USE_FAST_MEMORY
148  __kmp_fast_free(thread,h);
149 #else
150  __kmp_thread_free(thread,h);
151 #endif
152 }
153 
154 static kmp_dephash_entry *
155 __kmp_dephash_find ( kmp_info_t *thread, kmp_dephash_t *h, kmp_intptr_t addr )
156 {
157  kmp_int32 bucket = __kmp_dephash_hash(addr);
158 
159  kmp_dephash_entry_t *entry;
160  for ( entry = h->buckets[bucket]; entry; entry = entry->next_in_bucket )
161  if ( entry->addr == addr ) break;
162 
163  if ( entry == NULL ) {
164  // create entry. This is only done by one thread so no locking required
165 #if USE_FAST_MEMORY
166  entry = (kmp_dephash_entry_t *) __kmp_fast_allocate( thread, sizeof(kmp_dephash_entry_t) );
167 #else
168  entry = (kmp_dephash_entry_t *) __kmp_thread_malloc( thread, sizeof(kmp_dephash_entry_t) );
169 #endif
170  entry->addr = addr;
171  entry->last_out = NULL;
172  entry->last_ins = NULL;
173  entry->next_in_bucket = h->buckets[bucket];
174  h->buckets[bucket] = entry;
175 #ifdef KMP_DEBUG
176  h->nelements++;
177  if ( entry->next_in_bucket ) h->nconflicts++;
178 #endif
179  }
180  return entry;
181 }
182 
183 static kmp_depnode_list_t *
184 __kmp_add_node ( kmp_info_t *thread, kmp_depnode_list_t *list, kmp_depnode_t *node )
185 {
186  kmp_depnode_list_t *new_head;
187 
188 #if USE_FAST_MEMORY
189  new_head = (kmp_depnode_list_t *) __kmp_fast_allocate(thread,sizeof(kmp_depnode_list_t));
190 #else
191  new_head = (kmp_depnode_list_t *) __kmp_thread_malloc(thread,sizeof(kmp_depnode_list_t));
192 #endif
193 
194  new_head->node = __kmp_node_ref(node);
195  new_head->next = list;
196 
197  return new_head;
198 }
199 
200 static void
201 __kmp_depnode_list_free ( kmp_info_t *thread, kmp_depnode_list *list )
202 {
203  kmp_depnode_list *next;
204 
205  for ( ; list ; list = next ) {
206  next = list->next;
207 
208  __kmp_node_deref(thread,list->node);
209 #if USE_FAST_MEMORY
210  __kmp_fast_free(thread,list);
211 #else
212  __kmp_thread_free(thread,list);
213 #endif
214  }
215 }
216 
217 static inline void
218 __kmp_track_dependence ( kmp_depnode_t *source, kmp_depnode_t *sink )
219 {
220 #ifdef KMP_SUPPORT_GRAPH_OUTPUT
221  kmp_taskdata_t * task_source = KMP_TASK_TO_TASKDATA(source->dn.task);
222  kmp_taskdata_t * task_sink = KMP_TASK_TO_TASKDATA(sink->dn.task); // this can be NULL when if(0) ...
223 
224  __kmp_printf("%d(%s) -> %d(%s)\n", source->dn.id, task_source->td_ident->psource, sink->dn.id, task_sink->td_ident->psource);
225 #endif
226 }
227 
228 template< bool filter >
229 static inline kmp_int32
230 __kmp_process_deps ( kmp_int32 gtid, kmp_depnode_t *node, kmp_dephash_t *hash,
231  bool dep_barrier,kmp_int32 ndeps, kmp_depend_info_t *dep_list)
232 {
233  KA_TRACE(30, ("__kmp_process_deps<%d>: T#%d processing %d depencies : dep_barrier = %d\n", filter, gtid, ndeps, dep_barrier ) );
234 
235  kmp_info_t *thread = __kmp_threads[ gtid ];
236  kmp_int32 npredecessors=0;
237  for ( kmp_int32 i = 0; i < ndeps ; i++ ) {
238  const kmp_depend_info_t * dep = &dep_list[i];
239 
240  KMP_DEBUG_ASSERT(dep->flags.in);
241 
242  if ( filter && dep->base_addr == 0 ) continue; // skip filtered entries
243 
244  kmp_dephash_entry_t *info = __kmp_dephash_find(thread,hash,dep->base_addr);
245  kmp_depnode_t *last_out = info->last_out;
246 
247  if ( dep->flags.out && info->last_ins ) {
248  for ( kmp_depnode_list_t * p = info->last_ins; p; p = p->next ) {
249  kmp_depnode_t * indep = p->node;
250  if ( indep->dn.task ) {
251  KMP_ACQUIRE_DEPNODE(gtid,indep);
252  if ( indep->dn.task ) {
253  __kmp_track_dependence(indep,node);
254  indep->dn.successors = __kmp_add_node(thread, indep->dn.successors, node);
255  KA_TRACE(40,("__kmp_process_deps<%d>: T#%d adding dependence from %p to %p",
256  filter,gtid, KMP_TASK_TO_TASKDATA(indep->dn.task), KMP_TASK_TO_TASKDATA(node->dn.task)));
257  npredecessors++;
258  }
259  KMP_RELEASE_DEPNODE(gtid,indep);
260  }
261  }
262 
263  __kmp_depnode_list_free(thread,info->last_ins);
264  info->last_ins = NULL;
265 
266  } else if ( last_out && last_out->dn.task ) {
267  KMP_ACQUIRE_DEPNODE(gtid,last_out);
268  if ( last_out->dn.task ) {
269  __kmp_track_dependence(last_out,node);
270  last_out->dn.successors = __kmp_add_node(thread, last_out->dn.successors, node);
271  KA_TRACE(40,("__kmp_process_deps<%d>: T#%d adding dependence from %p to %p",
272  filter,gtid, KMP_TASK_TO_TASKDATA(last_out->dn.task), KMP_TASK_TO_TASKDATA(node->dn.task)));
273 
274  npredecessors++;
275  }
276  KMP_RELEASE_DEPNODE(gtid,last_out);
277  }
278 
279  if ( dep_barrier ) {
280  // if this is a sync point in the serial sequence, then the previous outputs are guaranteed to be completed after
281  // the execution of this task so the previous output nodes can be cleared.
282  __kmp_node_deref(thread,last_out);
283  info->last_out = NULL;
284  } else {
285  if ( dep->flags.out ) {
286  __kmp_node_deref(thread,last_out);
287  info->last_out = __kmp_node_ref(node);
288  } else
289  info->last_ins = __kmp_add_node(thread, info->last_ins, node);
290  }
291 
292  }
293 
294  KA_TRACE(30, ("__kmp_process_deps<%d>: T#%d found %d predecessors\n", filter, gtid, npredecessors ) );
295 
296  return npredecessors;
297 }
298 
299 #define NO_DEP_BARRIER (false)
300 #define DEP_BARRIER (true)
301 
302 // returns true if the task has any outstanding dependence
303 static bool
304 __kmp_check_deps ( kmp_int32 gtid, kmp_depnode_t *node, kmp_task_t *task, kmp_dephash_t *hash, bool dep_barrier,
305  kmp_int32 ndeps, kmp_depend_info_t *dep_list,
306  kmp_int32 ndeps_noalias, kmp_depend_info_t *noalias_dep_list )
307 {
308  int i;
309 
310  kmp_taskdata_t * taskdata = KMP_TASK_TO_TASKDATA(task);
311  KA_TRACE(20, ("__kmp_check_deps: T#%d checking dependencies for task %p : %d possibly aliased dependencies, %d non-aliased depedencies : dep_barrier=%d .\n", gtid, taskdata, ndeps, ndeps_noalias, dep_barrier ) );
312 
313  // Filter deps in dep_list
314  // TODO: Different algorithm for large dep_list ( > 10 ? )
315  for ( i = 0; i < ndeps; i ++ ) {
316  if ( dep_list[i].base_addr != 0 )
317  for ( int j = i+1; j < ndeps; j++ )
318  if ( dep_list[i].base_addr == dep_list[j].base_addr ) {
319  dep_list[i].flags.in |= dep_list[j].flags.in;
320  dep_list[i].flags.out |= dep_list[j].flags.out;
321  dep_list[j].base_addr = 0; // Mark j element as void
322  }
323  }
324 
325  // doesn't need to be atomic as no other thread is going to be accessing this node just yet
326  // npredecessors is set -1 to ensure that none of the releasing tasks queues this task before we have finished processing all the dependencies
327  node->dn.npredecessors = -1;
328 
329  // used to pack all npredecessors additions into a single atomic operation at the end
330  int npredecessors;
331 
332  npredecessors = __kmp_process_deps<true>(gtid, node, hash, dep_barrier, ndeps, dep_list);
333  npredecessors += __kmp_process_deps<false>(gtid, node, hash, dep_barrier, ndeps_noalias, noalias_dep_list);
334 
335  node->dn.task = task;
336  KMP_MB();
337 
338  // Account for our initial fake value
339  npredecessors++;
340 
341  // Update predecessors and obtain current value to check if there are still any outstandig dependences (some tasks may have finished while we processed the dependences)
342  npredecessors = KMP_TEST_THEN_ADD32(&node->dn.npredecessors, npredecessors) + npredecessors;
343 
344  KA_TRACE(20, ("__kmp_check_deps: T#%d found %d predecessors for task %p \n", gtid, npredecessors, taskdata ) );
345 
346  // beyond this point the task could be queued (and executed) by a releasing task...
347  return npredecessors > 0 ? true : false;
348 }
349 
350 void
351 __kmp_release_deps ( kmp_int32 gtid, kmp_taskdata_t *task )
352 {
353  kmp_info_t *thread = __kmp_threads[ gtid ];
354  kmp_depnode_t *node = task->td_depnode;
355 
356  if ( task->td_dephash ) {
357  KA_TRACE(40, ("__kmp_realease_deps: T#%d freeing dependencies hash of task %p.\n", gtid, task ) );
358  __kmp_dephash_free(thread,task->td_dephash);
359  }
360 
361  if ( !node ) return;
362 
363  KA_TRACE(20, ("__kmp_realease_deps: T#%d notifying succesors of task %p.\n", gtid, task ) );
364 
365  KMP_ACQUIRE_DEPNODE(gtid,node);
366  node->dn.task = NULL; // mark this task as finished, so no new dependencies are generated
367  KMP_RELEASE_DEPNODE(gtid,node);
368 
369  kmp_depnode_list_t *next;
370  for ( kmp_depnode_list_t *p = node->dn.successors; p; p = next ) {
371  kmp_depnode_t *successor = p->node;
372  kmp_int32 npredecessors = KMP_TEST_THEN_DEC32(&successor->dn.npredecessors) - 1;
373 
374  // successor task can be NULL for wait_depends or because deps are still being processed
375  if ( npredecessors == 0 ) {
376  KMP_MB();
377  if ( successor->dn.task ) {
378  KA_TRACE(20, ("__kmp_realease_deps: T#%d successor %p of %p scheduled for execution.\n", gtid, successor->dn.task, task ) );
379  __kmp_omp_task(gtid,successor->dn.task,false);
380  }
381  }
382 
383  next = p->next;
384  __kmp_node_deref(thread,p->node);
385 #if USE_FAST_MEMORY
386  __kmp_fast_free(thread,p);
387 #else
388  __kmp_thread_free(thread,p);
389 #endif
390  }
391 
392  __kmp_node_deref(thread,node);
393 
394  KA_TRACE(20, ("__kmp_realease_deps: T#%d all successors of %p notified of completation\n", gtid, task ) );
395 }
396 
411 kmp_int32
412 __kmpc_omp_task_with_deps( ident_t *loc_ref, kmp_int32 gtid, kmp_task_t * new_task,
413  kmp_int32 ndeps, kmp_depend_info_t *dep_list,
414  kmp_int32 ndeps_noalias, kmp_depend_info_t *noalias_dep_list )
415 {
416 
417  kmp_taskdata_t * new_taskdata = KMP_TASK_TO_TASKDATA(new_task);
418  KA_TRACE(10, ("__kmpc_omp_task_with_deps(enter): T#%d loc=%p task=%p\n",
419  gtid, loc_ref, new_taskdata ) );
420 
421  kmp_info_t *thread = __kmp_threads[ gtid ];
422  kmp_taskdata_t * current_task = thread->th.th_current_task;
423 
424  bool serial = current_task->td_flags.team_serial || current_task->td_flags.tasking_ser || current_task->td_flags.final;
425 #if OMP_41_ENABLED
426  serial = serial && !(new_taskdata->td_flags.proxy == TASK_PROXY);
427 #endif
428 
429  if ( !serial && ( ndeps > 0 || ndeps_noalias > 0 )) {
430  /* if no dependencies have been tracked yet, create the dependence hash */
431  if ( current_task->td_dephash == NULL )
432  current_task->td_dephash = __kmp_dephash_create(thread);
433 
434 #if USE_FAST_MEMORY
435  kmp_depnode_t *node = (kmp_depnode_t *) __kmp_fast_allocate(thread,sizeof(kmp_depnode_t));
436 #else
437  kmp_depnode_t *node = (kmp_depnode_t *) __kmp_thread_malloc(thread,sizeof(kmp_depnode_t));
438 #endif
439 
440  __kmp_init_node(node);
441  new_taskdata->td_depnode = node;
442 
443  if ( __kmp_check_deps( gtid, node, new_task, current_task->td_dephash, NO_DEP_BARRIER,
444  ndeps, dep_list, ndeps_noalias,noalias_dep_list ) ) {
445  KA_TRACE(10, ("__kmpc_omp_task_with_deps(exit): T#%d task had blocking dependencies: "
446  "loc=%p task=%p, return: TASK_CURRENT_NOT_QUEUED\n", gtid, loc_ref,
447  new_taskdata ) );
448  return TASK_CURRENT_NOT_QUEUED;
449  }
450  } else {
451 #if OMP_41_ENABLED
452  kmp_task_team_t * task_team = thread->th.th_task_team;
453  if ( task_team && task_team->tt.tt_found_proxy_tasks )
454  __kmpc_omp_wait_deps ( loc_ref, gtid, ndeps, dep_list, ndeps_noalias, noalias_dep_list );
455  else
456 #endif
457  KA_TRACE(10, ("__kmpc_omp_task_with_deps(exit): T#%d ignored dependencies for task (serialized)"
458  "loc=%p task=%p\n", gtid, loc_ref, new_taskdata ) );
459  }
460 
461  KA_TRACE(10, ("__kmpc_omp_task_with_deps(exit): T#%d task had no blocking dependencies : "
462  "loc=%p task=%p, transferring to __kmpc_omp_task\n", gtid, loc_ref,
463  new_taskdata ) );
464 
465  return __kmpc_omp_task(loc_ref,gtid,new_task);
466 }
467 
479 void
480 __kmpc_omp_wait_deps ( ident_t *loc_ref, kmp_int32 gtid, kmp_int32 ndeps, kmp_depend_info_t *dep_list,
481  kmp_int32 ndeps_noalias, kmp_depend_info_t *noalias_dep_list )
482 {
483  KA_TRACE(10, ("__kmpc_omp_wait_deps(enter): T#%d loc=%p\n", gtid, loc_ref) );
484 
485  if ( ndeps == 0 && ndeps_noalias == 0 ) {
486  KA_TRACE(10, ("__kmpc_omp_wait_deps(exit): T#%d has no dependencies to wait upon : loc=%p\n", gtid, loc_ref) );
487  return;
488  }
489 
490  kmp_info_t *thread = __kmp_threads[ gtid ];
491  kmp_taskdata_t * current_task = thread->th.th_current_task;
492 
493  // We can return immediately as:
494  // - dependences are not computed in serial teams (except if we have proxy tasks)
495  // - if the dephash is not yet created it means we have nothing to wait for
496  bool ignore = current_task->td_flags.team_serial || current_task->td_flags.tasking_ser || current_task->td_flags.final;
497 #if OMP_41_ENABLED
498  ignore = ignore && thread->th.th_task_team->tt.tt_found_proxy_tasks == FALSE;
499 #endif
500  ignore = ignore || current_task->td_dephash == NULL;
501 
502  if ( ignore ) {
503  KA_TRACE(10, ("__kmpc_omp_wait_deps(exit): T#%d has no blocking dependencies : loc=%p\n", gtid, loc_ref) );
504  return;
505  }
506 
507  kmp_depnode_t node;
508  __kmp_init_node(&node);
509 
510  if (!__kmp_check_deps( gtid, &node, NULL, current_task->td_dephash, DEP_BARRIER,
511  ndeps, dep_list, ndeps_noalias, noalias_dep_list )) {
512  KA_TRACE(10, ("__kmpc_omp_wait_deps(exit): T#%d has no blocking dependencies : loc=%p\n", gtid, loc_ref) );
513  return;
514  }
515 
516  int thread_finished = FALSE;
517  kmp_flag_32 flag((volatile kmp_uint32 *)&(node.dn.npredecessors), 0U);
518  while ( node.dn.npredecessors > 0 ) {
519  flag.execute_tasks(thread, gtid, FALSE, &thread_finished,
520 #if USE_ITT_BUILD
521  NULL,
522 #endif
523  __kmp_task_stealing_constraint );
524  }
525 
526  KA_TRACE(10, ("__kmpc_omp_wait_deps(exit): T#%d finished waiting : loc=%p\n", gtid, loc_ref) );
527 }
528 
529 #endif /* OMP_40_ENABLED */
530 
void __kmpc_omp_wait_deps(ident_t *loc_ref, kmp_int32 gtid, kmp_int32 ndeps, kmp_depend_info_t *dep_list, kmp_int32 ndeps_noalias, kmp_depend_info_t *noalias_dep_list)
kmp_int32 __kmpc_omp_task_with_deps(ident_t *loc_ref, kmp_int32 gtid, kmp_task_t *new_task, kmp_int32 ndeps, kmp_depend_info_t *dep_list, kmp_int32 ndeps_noalias, kmp_depend_info_t *noalias_dep_list)
Definition: kmp.h:218