Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
flow_graph.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2020 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_H
18 #define __TBB_flow_graph_H
19 
20 #define __TBB_flow_graph_H_include_area
22 
23 #include "tbb_stddef.h"
24 #include "atomic.h"
25 #include "spin_mutex.h"
26 #include "null_mutex.h"
27 #include "spin_rw_mutex.h"
28 #include "null_rw_mutex.h"
29 #include "task.h"
31 #include "tbb_exception.h"
32 #include "pipeline.h"
36 #include "tbb_profiling.h"
37 #include "task_arena.h"
38 
39 #if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
40  #if __INTEL_COMPILER
41  // Disabled warning "routine is both inline and noinline"
42  #pragma warning (push)
43  #pragma warning( disable: 2196 )
44  #endif
45  #define __TBB_NOINLINE_SYM __attribute__((noinline))
46 #else
47  #define __TBB_NOINLINE_SYM
48 #endif
49 
50 #if __TBB_PREVIEW_ASYNC_MSG
51 #include <vector> // std::vector in internal::async_storage
52 #include <memory> // std::shared_ptr in async_msg
53 #endif
54 
55 #if __TBB_PREVIEW_STREAMING_NODE
56 // For streaming_node
57 #include <array> // std::array
58 #include <unordered_map> // std::unordered_map
59 #include <type_traits> // std::decay, std::true_type, std::false_type
60 #endif // __TBB_PREVIEW_STREAMING_NODE
61 
62 #if TBB_DEPRECATED_FLOW_ENQUEUE
63 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
64 #else
65 #define FLOW_SPAWN(a) tbb::task::spawn((a))
66 #endif
67 
68 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
69 #define __TBB_DEFAULT_NODE_ALLOCATOR(T) cache_aligned_allocator<T>
70 #else
71 #define __TBB_DEFAULT_NODE_ALLOCATOR(T) null_type
72 #endif
73 
74 // use the VC10 or gcc version of tuple if it is available.
75 #if __TBB_CPP11_TUPLE_PRESENT
76  #include <tuple>
77 namespace tbb {
78  namespace flow {
79  using std::tuple;
80  using std::tuple_size;
81  using std::tuple_element;
82  using std::get;
83  }
84 }
85 #else
86  #include "compat/tuple"
87 #endif
88 
89 #include<list>
90 #include<queue>
91 
102 namespace tbb {
103 namespace flow {
104 
106 enum concurrency { unlimited = 0, serial = 1 };
107 
108 namespace interface11 {
109 
111 struct null_type {};
112 
114 class continue_msg {};
115 
117 template< typename T > class sender;
118 template< typename T > class receiver;
119 class continue_receiver;
120 
121 template< typename T, typename U > class limiter_node; // needed for resetting decrementer
122 
123 template< typename R, typename B > class run_and_put_task;
124 
125 namespace internal {
126 
127 template<typename T, typename M> class successor_cache;
128 template<typename T, typename M> class broadcast_cache;
129 template<typename T, typename M> class round_robin_cache;
130 template<typename T, typename M> class predecessor_cache;
131 template<typename T, typename M> class reservable_predecessor_cache;
132 
133 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
134 namespace order {
135 struct following;
136 struct preceding;
137 }
138 template<typename Order, typename... Args> struct node_set;
139 #endif
140 
141 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
142 // Holder of edges both for caches and for those nodes which do not have predecessor caches.
143 // C == receiver< ... > or sender< ... >, depending.
144 template<typename C>
145 class edge_container {
146 
147 public:
148  typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
149 
150  void add_edge(C &s) {
151  built_edges.push_back(&s);
152  }
153 
154  void delete_edge(C &s) {
155  for (typename edge_list_type::iterator i = built_edges.begin(); i != built_edges.end(); ++i) {
156  if (*i == &s) {
157  (void)built_edges.erase(i);
158  return; // only remove one predecessor per request
159  }
160  }
161  }
162 
163  void copy_edges(edge_list_type &v) {
164  v = built_edges;
165  }
166 
167  size_t edge_count() {
168  return (size_t)(built_edges.size());
169  }
170 
171  void clear() {
172  built_edges.clear();
173  }
174 
175  // methods remove the statement from all predecessors/successors liste in the edge
176  // container.
177  template< typename S > void sender_extract(S &s);
178  template< typename R > void receiver_extract(R &r);
179 
180 private:
181  edge_list_type built_edges;
182 }; // class edge_container
183 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
184 
185 } // namespace internal
186 
187 } // namespace interfaceX
188 } // namespace flow
189 } // namespace tbb
190 
193 
194 namespace tbb {
195 namespace flow {
196 namespace interface11 {
197 
198 // enqueue left task if necessary. Returns the non-enqueued task if there is one.
199 static inline tbb::task *combine_tasks(graph& g, tbb::task * left, tbb::task * right) {
200  // if no RHS task, don't change left.
201  if (right == NULL) return left;
202  // right != NULL
203  if (left == NULL) return right;
204  if (left == SUCCESSFULLY_ENQUEUED) return right;
205  // left contains a task
206  if (right != SUCCESSFULLY_ENQUEUED) {
207  // both are valid tasks
209  return right;
210  }
211  return left;
212 }
213 
214 #if __TBB_PREVIEW_ASYNC_MSG
215 
216 template < typename T > class __TBB_DEPRECATED async_msg;
217 
218 namespace internal {
219 
220 template < typename T > class async_storage;
221 
222 template< typename T, typename = void >
224  typedef async_msg<T> async_type;
225  typedef T filtered_type;
226 
227  static const bool is_async_type = false;
228 
229  static const void* to_void_ptr(const T& t) {
230  return static_cast<const void*>(&t);
231  }
232 
233  static void* to_void_ptr(T& t) {
234  return static_cast<void*>(&t);
235  }
236 
237  static const T& from_void_ptr(const void* p) {
238  return *static_cast<const T*>(p);
239  }
240 
241  static T& from_void_ptr(void* p) {
242  return *static_cast<T*>(p);
243  }
244 
245  static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
246  if (is_async) {
247  // This (T) is NOT async and incoming 'A<X> t' IS async
248  // Get data from async_msg
249  const async_msg<filtered_type>& msg = async_helpers< async_msg<filtered_type> >::from_void_ptr(p);
250  task* const new_task = msg.my_storage->subscribe(*this_recv, this_recv->graph_reference());
251  // finalize() must be called after subscribe() because set() can be called in finalize()
252  // and 'this_recv' client must be subscribed by this moment
253  msg.finalize();
254  return new_task;
255  }
256  else {
257  // Incoming 't' is NOT async
258  return this_recv->try_put_task(from_void_ptr(p));
259  }
260  }
261 };
262 
263 template< typename T >
264 struct async_helpers< T, typename std::enable_if< std::is_base_of<async_msg<typename T::async_msg_data_type>, T>::value >::type > {
265  typedef T async_type;
266  typedef typename T::async_msg_data_type filtered_type;
267 
268  static const bool is_async_type = true;
269 
270  // Receiver-classes use const interfaces
271  static const void* to_void_ptr(const T& t) {
272  return static_cast<const void*>(&static_cast<const async_msg<filtered_type>&>(t));
273  }
274 
275  static void* to_void_ptr(T& t) {
276  return static_cast<void*>(&static_cast<async_msg<filtered_type>&>(t));
277  }
278 
279  // Sender-classes use non-const interfaces
280  static const T& from_void_ptr(const void* p) {
281  return *static_cast<const T*>(static_cast<const async_msg<filtered_type>*>(p));
282  }
283 
284  static T& from_void_ptr(void* p) {
285  return *static_cast<T*>(static_cast<async_msg<filtered_type>*>(p));
286  }
287 
288  // Used in receiver<T> class
289  static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
290  if (is_async) {
291  // Both are async
292  return this_recv->try_put_task(from_void_ptr(p));
293  }
294  else {
295  // This (T) is async and incoming 'X t' is NOT async
296  // Create async_msg for X
298  const T msg(t);
299  return this_recv->try_put_task(msg);
300  }
301  }
302 };
303 
304 class untyped_receiver;
305 
307  template< typename, typename > friend class internal::predecessor_cache;
308  template< typename, typename > friend class internal::reservable_predecessor_cache;
309 public:
312 
313  virtual ~untyped_sender() {}
314 
315  // NOTE: Following part of PUBLIC section is copy-paste from original sender<T> class
316 
317  // TODO: Prevent untyped successor registration
318 
320  virtual bool register_successor( successor_type &r ) = 0;
321 
323  virtual bool remove_successor( successor_type &r ) = 0;
324 
326  virtual bool try_release( ) { return false; }
327 
329  virtual bool try_consume( ) { return false; }
330 
331 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
333  typedef internal::edge_container<successor_type> built_successors_type;
334  typedef built_successors_type::edge_list_type successor_list_type;
335  virtual built_successors_type &built_successors() = 0;
336  virtual void internal_add_built_successor( successor_type & ) = 0;
337  virtual void internal_delete_built_successor( successor_type & ) = 0;
338  virtual void copy_successors( successor_list_type &) = 0;
339  virtual size_t successor_count() = 0;
340 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
341 protected:
343  template< typename X >
344  bool try_get( X &t ) {
346  }
347 
349  template< typename X >
350  bool try_reserve( X &t ) {
352  }
353 
354  virtual bool try_get_wrapper( void* p, bool is_async ) = 0;
355  virtual bool try_reserve_wrapper( void* p, bool is_async ) = 0;
356 };
357 
359  template< typename, typename > friend class run_and_put_task;
360 
361  template< typename, typename > friend class internal::broadcast_cache;
362  template< typename, typename > friend class internal::round_robin_cache;
363  template< typename, typename > friend class internal::successor_cache;
364 
365 #if __TBB_PREVIEW_OPENCL_NODE
366  template< typename, typename > friend class proxy_dependency_receiver;
367 #endif /* __TBB_PREVIEW_OPENCL_NODE */
368 public:
371 
373  virtual ~untyped_receiver() {}
374 
376  template<typename X>
377  bool try_put(const X& t) {
378  task *res = try_put_task(t);
379  if (!res) return false;
381  return true;
382  }
383 
384  // NOTE: Following part of PUBLIC section is copy-paste from original receiver<T> class
385 
386  // TODO: Prevent untyped predecessor registration
387 
389  virtual bool register_predecessor( predecessor_type & ) { return false; }
390 
392  virtual bool remove_predecessor( predecessor_type & ) { return false; }
393 
394 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
395  typedef internal::edge_container<predecessor_type> built_predecessors_type;
396  typedef built_predecessors_type::edge_list_type predecessor_list_type;
397  virtual built_predecessors_type &built_predecessors() = 0;
398  virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
399  virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
400  virtual void copy_predecessors( predecessor_list_type & ) = 0;
401  virtual size_t predecessor_count() = 0;
402 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
403 protected:
404  template<typename X>
405  task *try_put_task(const X& t) {
407  }
408 
409  virtual task* try_put_task_wrapper( const void* p, bool is_async ) = 0;
410 
411  virtual graph& graph_reference() const = 0;
412 
413  // NOTE: Following part of PROTECTED and PRIVATE sections is copy-paste from original receiver<T> class
414 
417 
418  virtual bool is_continue_receiver() { return false; }
419 };
420 
421 } // namespace internal
422 
424 template< typename T >
426 public:
429 
431 
433  virtual bool try_get( T & ) { return false; }
434 
436  virtual bool try_reserve( T & ) { return false; }
437 
438 protected:
439  virtual bool try_get_wrapper( void* p, bool is_async ) __TBB_override {
440  // Both async OR both are NOT async
441  if ( internal::async_helpers<T>::is_async_type == is_async ) {
443  }
444  // Else: this (T) is async OR incoming 't' is async
445  __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_get()");
446  return false;
447  }
448 
449  virtual bool try_reserve_wrapper( void* p, bool is_async ) __TBB_override {
450  // Both async OR both are NOT async
451  if ( internal::async_helpers<T>::is_async_type == is_async ) {
453  }
454  // Else: this (T) is async OR incoming 't' is async
455  __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_reserve()");
456  return false;
457  }
458 }; // class sender<T>
459 
461 template< typename T >
463  template< typename > friend class internal::async_storage;
464  template< typename, typename > friend struct internal::async_helpers;
465 public:
468 
470 
474  }
475 
478  }
479 
480 protected:
481  virtual task* try_put_task_wrapper( const void *p, bool is_async ) __TBB_override {
483  }
484 
486  virtual task *try_put_task(const T& t) = 0;
487 
488 }; // class receiver<T>
489 
490 #else // __TBB_PREVIEW_ASYNC_MSG
491 
493 template< typename T >
494 class sender {
495 public:
497  __TBB_DEPRECATED typedef T output_type;
498 
501 
502  virtual ~sender() {}
503 
504  // NOTE: Following part of PUBLIC section is partly copy-pasted in sender<T> under #if __TBB_PREVIEW_ASYNC_MSG
505 
507  __TBB_DEPRECATED virtual bool register_successor( successor_type &r ) = 0;
508 
510  __TBB_DEPRECATED virtual bool remove_successor( successor_type &r ) = 0;
511 
513  virtual bool try_get( T & ) { return false; }
514 
516  virtual bool try_reserve( T & ) { return false; }
517 
519  virtual bool try_release( ) { return false; }
520 
522  virtual bool try_consume( ) { return false; }
523 
524 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
526  __TBB_DEPRECATED typedef typename internal::edge_container<successor_type> built_successors_type;
527  __TBB_DEPRECATED typedef typename built_successors_type::edge_list_type successor_list_type;
528  __TBB_DEPRECATED virtual built_successors_type &built_successors() = 0;
529  __TBB_DEPRECATED virtual void internal_add_built_successor( successor_type & ) = 0;
530  __TBB_DEPRECATED virtual void internal_delete_built_successor( successor_type & ) = 0;
531  __TBB_DEPRECATED virtual void copy_successors( successor_list_type &) = 0;
532  __TBB_DEPRECATED virtual size_t successor_count() = 0;
533 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
534 }; // class sender<T>
535 
537 template< typename T >
538 class receiver {
539 public:
541  __TBB_DEPRECATED typedef T input_type;
542 
544  __TBB_DEPRECATED typedef sender<T> predecessor_type;
545 
547  virtual ~receiver() {}
548 
550  bool try_put( const T& t ) {
551  task *res = try_put_task(t);
552  if (!res) return false;
554  return true;
555  }
556 
558 protected:
559  template< typename R, typename B > friend class run_and_put_task;
560  template< typename X, typename Y > friend class internal::broadcast_cache;
561  template< typename X, typename Y > friend class internal::round_robin_cache;
562  virtual task *try_put_task(const T& t) = 0;
563  virtual graph& graph_reference() const = 0;
564 public:
565  // NOTE: Following part of PUBLIC and PROTECTED sections is copy-pasted in receiver<T> under #if __TBB_PREVIEW_ASYNC_MSG
566 
568  __TBB_DEPRECATED virtual bool register_predecessor( predecessor_type & ) { return false; }
569 
571  __TBB_DEPRECATED virtual bool remove_predecessor( predecessor_type & ) { return false; }
572 
573 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
574  __TBB_DEPRECATED typedef typename internal::edge_container<predecessor_type> built_predecessors_type;
575  __TBB_DEPRECATED typedef typename built_predecessors_type::edge_list_type predecessor_list_type;
576  __TBB_DEPRECATED virtual built_predecessors_type &built_predecessors() = 0;
577  __TBB_DEPRECATED virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
578  __TBB_DEPRECATED virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
579  __TBB_DEPRECATED virtual void copy_predecessors( predecessor_list_type & ) = 0;
580  __TBB_DEPRECATED virtual size_t predecessor_count() = 0;
581 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
582 
583 protected:
585  virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
586 
587  template<typename TT, typename M> friend class internal::successor_cache;
588  virtual bool is_continue_receiver() { return false; }
589 
590 #if __TBB_PREVIEW_OPENCL_NODE
591  template< typename, typename > friend class proxy_dependency_receiver;
592 #endif /* __TBB_PREVIEW_OPENCL_NODE */
593 }; // class receiver<T>
594 
595 #endif // __TBB_PREVIEW_ASYNC_MSG
596 
598 
599 class continue_receiver : public receiver< continue_msg > {
600 public:
601 
604 
607 
610  __TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority)) {
611  my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
612  my_current_count = 0;
613  __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = priority; )
614  }
615 
619  my_current_count = 0;
620  __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = src.my_priority; )
621  }
622 
627  return true;
628  }
629 
631 
637  return true;
638  }
639 
640 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
641  __TBB_DEPRECATED typedef internal::edge_container<predecessor_type> built_predecessors_type;
642  __TBB_DEPRECATED typedef built_predecessors_type::edge_list_type predecessor_list_type;
643  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
644 
645  __TBB_DEPRECATED void internal_add_built_predecessor( predecessor_type &s) __TBB_override {
647  my_built_predecessors.add_edge( s );
648  }
649 
650  __TBB_DEPRECATED void internal_delete_built_predecessor( predecessor_type &s) __TBB_override {
652  my_built_predecessors.delete_edge(s);
653  }
654 
655  __TBB_DEPRECATED void copy_predecessors( predecessor_list_type &v) __TBB_override {
657  my_built_predecessors.copy_edges(v);
658  }
659 
660  __TBB_DEPRECATED size_t predecessor_count() __TBB_override {
662  return my_built_predecessors.edge_count();
663  }
664 
665 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
666 
667 protected:
668  template< typename R, typename B > friend class run_and_put_task;
669  template<typename X, typename Y> friend class internal::broadcast_cache;
670  template<typename X, typename Y> friend class internal::round_robin_cache;
671  // execute body is supposed to be too small to create a task for.
673  {
676  return SUCCESSFULLY_ENQUEUED;
677  else
678  my_current_count = 0;
679  }
680  task * res = execute();
681  return res? res : SUCCESSFULLY_ENQUEUED;
682  }
683 
684 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
685  // continue_receiver must contain its own built_predecessors because it does
686  // not have a node_cache.
687  built_predecessors_type my_built_predecessors;
688 #endif
694  // the friend declaration in the base class did not eliminate the "protected class"
695  // error in gcc 4.1.2
696  template<typename U, typename V> friend class tbb::flow::interface11::limiter_node;
697 
699  my_current_count = 0;
700  if (f & rf_clear_edges) {
701 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
702  my_built_predecessors.clear();
703 #endif
705  }
706  }
707 
709 
711  virtual task * execute() = 0;
712  template<typename TT, typename M> friend class internal::successor_cache;
713  bool is_continue_receiver() __TBB_override { return true; }
714 
715 }; // class continue_receiver
716 
717 } // interfaceX
718 
719 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
720  template <typename K, typename T>
721  K key_from_message( const T &t ) {
722  return t.key();
723  }
724 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
725 
726  using interface11::sender;
727  using interface11::receiver;
728  using interface11::continue_receiver;
729 } // flow
730 } // tbb
731 
734 
735 namespace tbb {
736 namespace flow {
737 namespace interface11 {
738 
742 #if __TBB_PREVIEW_ASYNC_MSG
744 #endif
745 using namespace internal::graph_policy_namespace;
746 
747 template <typename C, typename N>
748 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
749 {
750  if (begin) current_node = my_graph->my_nodes;
751  //else it is an end iterator by default
752 }
753 
754 template <typename C, typename N>
756  __TBB_ASSERT(current_node, "graph_iterator at end");
757  return *operator->();
758 }
759 
760 template <typename C, typename N>
762  return current_node;
763 }
764 
765 template <typename C, typename N>
767  if (current_node) current_node = current_node->next;
768 }
769 
770 } // namespace interfaceX
771 
772 namespace interface10 {
774 inline graph::graph() : my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
776  own_context = true;
777  cancelled = false;
778  caught_exception = false;
779  my_context = new task_group_context(tbb::internal::FLOW_TASKS);
783  my_is_active = true;
784 }
785 
786 inline graph::graph(task_group_context& use_this_context) :
787  my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
789  own_context = false;
790  cancelled = false;
791  caught_exception = false;
795  my_is_active = true;
796 }
797 
798 inline graph::~graph() {
799  wait_for_all();
801  tbb::task::destroy(*my_root_task);
802  if (own_context) delete my_context;
803  delete my_task_arena;
804 }
805 
806 inline void graph::reserve_wait() {
807  if (my_root_task) {
810  }
811 }
812 
813 inline void graph::release_wait() {
814  if (my_root_task) {
817  }
818 }
819 
821  n->next = NULL;
822  {
824  n->prev = my_nodes_last;
825  if (my_nodes_last) my_nodes_last->next = n;
826  my_nodes_last = n;
827  if (!my_nodes) my_nodes = n;
828  }
829 }
830 
832  {
834  __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
835  if (n->prev) n->prev->next = n->next;
836  if (n->next) n->next->prev = n->prev;
837  if (my_nodes_last == n) my_nodes_last = n->prev;
838  if (my_nodes == n) my_nodes = n->next;
839  }
840  n->prev = n->next = NULL;
841 }
842 
844  // reset context
846 
847  if(my_context) my_context->reset();
848  cancelled = false;
849  caught_exception = false;
850  // reset all the nodes comprising the graph
851  for(iterator ii = begin(); ii != end(); ++ii) {
852  tbb::flow::interface11::graph_node *my_p = &(*ii);
853  my_p->reset_node(f);
854  }
855  // Reattach the arena. Might be useful to run the graph in a particular task_arena
856  // while not limiting graph lifetime to a single task_arena::execute() call.
857  prepare_task_arena( /*reinit=*/true );
859  // now spawn the tasks necessary to start the graph
860  for(task_list_type::iterator rti = my_reset_task_list.begin(); rti != my_reset_task_list.end(); ++rti) {
862  }
863  my_reset_task_list.clear();
864 }
865 
866 inline graph::iterator graph::begin() { return iterator(this, true); }
867 
868 inline graph::iterator graph::end() { return iterator(this, false); }
869 
870 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
871 
872 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
873 
874 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
875 
876 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
877 
878 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
879 inline void graph::set_name(const char *name) {
881 }
882 #endif
883 
884 } // namespace interface10
885 
886 namespace interface11 {
887 
888 inline graph_node::graph_node(graph& g) : my_graph(g) {
889  my_graph.register_node(this);
890 }
891 
893  my_graph.remove_node(this);
894 }
895 
897 
898 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
899 using internal::node_set;
900 #endif
901 
903 template < typename Output >
904 class input_node : public graph_node, public sender< Output > {
905 public:
907  typedef Output output_type;
908 
911 
912  //Source node has no input type
914 
915 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
916  typedef typename sender<output_type>::built_successors_type built_successors_type;
917  typedef typename sender<output_type>::successor_list_type successor_list_type;
918 #endif
919 
921  template< typename Body >
923  : graph_node(g), my_active(false),
924  my_body( new internal::input_body_leaf< output_type, Body>(body) ),
925  my_init_body( new internal::input_body_leaf< output_type, Body>(body) ),
926  my_reserved(false), my_has_cached_item(false)
927  {
928  my_successors.set_owner(this);
929  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
930  static_cast<sender<output_type> *>(this), this->my_body );
931  }
932 
933 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
934  template <typename Body, typename... Successors>
935  input_node( const node_set<internal::order::preceding, Successors...>& successors, Body body )
936  : input_node(successors.graph_reference(), body) {
937  make_edges(*this, successors);
938  }
939 #endif
940 
943  graph_node(src.my_graph), sender<Output>(),
944  my_active(false),
945  my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
946  my_reserved(false), my_has_cached_item(false)
947  {
948  my_successors.set_owner(this);
949  tbb::internal::fgt_node_with_body(CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
950  static_cast<sender<output_type> *>(this), this->my_body );
951  }
952 
954  ~input_node() { delete my_body; delete my_init_body; }
955 
956 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
957  void set_name( const char *name ) __TBB_override {
959  }
960 #endif
961 
966  if ( my_active )
967  spawn_put();
968  return true;
969  }
970 
975  return true;
976  }
977 
978 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
979 
980  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
981 
982  void internal_add_built_successor( successor_type &r) __TBB_override {
984  my_successors.internal_add_built_successor(r);
985  }
986 
987  void internal_delete_built_successor( successor_type &r) __TBB_override {
989  my_successors.internal_delete_built_successor(r);
990  }
991 
992  size_t successor_count() __TBB_override {
994  return my_successors.successor_count();
995  }
996 
997  void copy_successors(successor_list_type &v) __TBB_override {
999  my_successors.copy_successors(v);
1000  }
1001 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1002 
1006  if ( my_reserved )
1007  return false;
1008 
1009  if ( my_has_cached_item ) {
1010  v = my_cached_item;
1011  my_has_cached_item = false;
1012  return true;
1013  }
1014  // we've been asked to provide an item, but we have none. enqueue a task to
1015  // provide one.
1016  if ( my_active )
1017  spawn_put();
1018  return false;
1019  }
1020 
1024  if ( my_reserved ) {
1025  return false;
1026  }
1027 
1028  if ( my_has_cached_item ) {
1029  v = my_cached_item;
1030  my_reserved = true;
1031  return true;
1032  } else {
1033  return false;
1034  }
1035  }
1036 
1038 
1041  __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1042  my_reserved = false;
1043  if(!my_successors.empty())
1044  spawn_put();
1045  return true;
1046  }
1047 
1051  __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1052  my_reserved = false;
1053  my_has_cached_item = false;
1054  if ( !my_successors.empty() ) {
1055  spawn_put();
1056  }
1057  return true;
1058  }
1059 
1061  void activate() {
1063  my_active = true;
1064  if (!my_successors.empty())
1065  spawn_put();
1066  }
1067 
1068  template<typename Body>
1070  internal::input_body<output_type> &body_ref = *this->my_body;
1071  return dynamic_cast< internal::input_body_leaf<output_type, Body> & >(body_ref).get_body();
1072  }
1073 
1074 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1075  void extract( ) __TBB_override {
1076  my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
1077  my_active = false;
1078  my_reserved = false;
1080  }
1081 #endif
1082 
1083 protected:
1084 
1087  my_active = false;
1088  my_reserved = false;
1089  my_has_cached_item = false;
1090 
1092  if(f & rf_reset_bodies) {
1094  delete my_body;
1095  my_body = tmp;
1096  }
1097  }
1098 
1099 private:
1108 
1109  // used by apply_body_bypass, can invoke body of node.
1112  if ( my_reserved ) {
1113  return false;
1114  }
1115  if ( !my_has_cached_item ) {
1117 
1118 #if TBB_DEPRECATED_INPUT_NODE_BODY
1119  bool r = (*my_body)(my_cached_item);
1120  if (r) {
1121  my_has_cached_item = true;
1122  }
1123 #else
1124  flow_control control;
1125  my_cached_item = (*my_body)(control);
1127 #endif
1129  }
1130  if ( my_has_cached_item ) {
1131  v = my_cached_item;
1132  my_reserved = true;
1133  return true;
1134  } else {
1135  return false;
1136  }
1137  }
1138 
1140  return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1142  }
1143 
1145  void spawn_put( ) {
1146  if(internal::is_graph_active(this->my_graph)) {
1148  }
1149  }
1150 
1154  output_type v;
1155  if ( !try_reserve_apply_body(v) )
1156  return NULL;
1157 
1158  task *last_task = my_successors.try_put_task(v);
1159  if ( last_task )
1160  try_consume();
1161  else
1162  try_release();
1163  return last_task;
1164  }
1165 }; // class input_node
1166 
1167 #if TBB_USE_SOURCE_NODE_AS_ALIAS
1168 template < typename Output >
1169 class source_node : public input_node <Output> {
1170 public:
1172  template< typename Body >
1173  __TBB_NOINLINE_SYM source_node( graph &g, Body body )
1174  : input_node<Output>(g, body)
1175  {
1176  }
1177 
1178 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1179  template <typename Body, typename... Successors>
1180  source_node( const node_set<internal::order::preceding, Successors...>& successors, Body body )
1181  : input_node<Output>(successors, body) {
1182  }
1183 #endif
1184 };
1185 #else // TBB_USE_SOURCE_NODE_AS_ALIAS
1187 template < typename Output > class
1188 __TBB_DEPRECATED_MSG("TBB Warning: tbb::flow::source_node is deprecated, use tbb::flow::input_node." )
1189 source_node : public graph_node, public sender< Output > {
1190 public:
1192  typedef Output output_type;
1193 
1196 
1197  //Source node has no input type
1199 
1200 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1201  typedef typename sender<output_type>::built_successors_type built_successors_type;
1202  typedef typename sender<output_type>::successor_list_type successor_list_type;
1203 #endif
1204 
1206  template< typename Body >
1207  __TBB_NOINLINE_SYM source_node( graph &g, Body body, bool is_active = true )
1208  : graph_node(g), my_active(is_active), init_my_active(is_active),
1209  my_body( new internal::source_body_leaf< output_type, Body>(body) ),
1210  my_init_body( new internal::source_body_leaf< output_type, Body>(body) ),
1211  my_reserved(false), my_has_cached_item(false)
1212  {
1213  my_successors.set_owner(this);
1214  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
1215  static_cast<sender<output_type> *>(this), this->my_body );
1216  }
1217 
1218 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1219  template <typename Body, typename... Successors>
1220  source_node( const node_set<internal::order::preceding, Successors...>& successors, Body body, bool is_active = true )
1221  : source_node(successors.graph_reference(), body, is_active) {
1222  make_edges(*this, successors);
1223  }
1224 #endif
1225 
1228  graph_node(src.my_graph), sender<Output>(),
1229  my_active(src.init_my_active),
1230  init_my_active(src.init_my_active), my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
1231  my_reserved(false), my_has_cached_item(false)
1232  {
1233  my_successors.set_owner(this);
1234  tbb::internal::fgt_node_with_body(CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
1235  static_cast<sender<output_type> *>(this), this->my_body );
1236  }
1237 
1239  ~source_node() { delete my_body; delete my_init_body; }
1240 
1241 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1242  void set_name( const char *name ) __TBB_override {
1244  }
1245 #endif
1246 
1249  spin_mutex::scoped_lock lock(my_mutex);
1250  my_successors.register_successor(r);
1251  if ( my_active )
1252  spawn_put();
1253  return true;
1254  }
1255 
1258  spin_mutex::scoped_lock lock(my_mutex);
1259  my_successors.remove_successor(r);
1260  return true;
1261  }
1262 
1263 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1264 
1265  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1266 
1267  void internal_add_built_successor( successor_type &r) __TBB_override {
1268  spin_mutex::scoped_lock lock(my_mutex);
1269  my_successors.internal_add_built_successor(r);
1270  }
1271 
1272  void internal_delete_built_successor( successor_type &r) __TBB_override {
1273  spin_mutex::scoped_lock lock(my_mutex);
1274  my_successors.internal_delete_built_successor(r);
1275  }
1276 
1277  size_t successor_count() __TBB_override {
1278  spin_mutex::scoped_lock lock(my_mutex);
1279  return my_successors.successor_count();
1280  }
1281 
1282  void copy_successors(successor_list_type &v) __TBB_override {
1283  spin_mutex::scoped_lock l(my_mutex);
1284  my_successors.copy_successors(v);
1285  }
1286 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1287 
1290  spin_mutex::scoped_lock lock(my_mutex);
1291  if ( my_reserved )
1292  return false;
1293 
1294  if ( my_has_cached_item ) {
1295  v = my_cached_item;
1296  my_has_cached_item = false;
1297  return true;
1298  }
1299  // we've been asked to provide an item, but we have none. enqueue a task to
1300  // provide one.
1301  spawn_put();
1302  return false;
1303  }
1304 
1307  spin_mutex::scoped_lock lock(my_mutex);
1308  if ( my_reserved ) {
1309  return false;
1310  }
1311 
1312  if ( my_has_cached_item ) {
1313  v = my_cached_item;
1314  my_reserved = true;
1315  return true;
1316  } else {
1317  return false;
1318  }
1319  }
1320 
1322 
1324  spin_mutex::scoped_lock lock(my_mutex);
1325  __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1326  my_reserved = false;
1327  if(!my_successors.empty())
1328  spawn_put();
1329  return true;
1330  }
1331 
1334  spin_mutex::scoped_lock lock(my_mutex);
1335  __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1336  my_reserved = false;
1337  my_has_cached_item = false;
1338  if ( !my_successors.empty() ) {
1339  spawn_put();
1340  }
1341  return true;
1342  }
1343 
1345  void activate() {
1346  spin_mutex::scoped_lock lock(my_mutex);
1347  my_active = true;
1348  if (!my_successors.empty())
1349  spawn_put();
1350  }
1351 
1352  template<typename Body>
1354  internal::source_body<output_type> &body_ref = *this->my_body;
1355  return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
1356  }
1357 
1358 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1359  void extract( ) __TBB_override {
1360  my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
1361  my_active = init_my_active;
1362  my_reserved = false;
1363  if(my_has_cached_item) my_has_cached_item = false;
1364  }
1365 #endif
1366 
1367 protected:
1368 
1371  my_active = init_my_active;
1372  my_reserved =false;
1373  if(my_has_cached_item) {
1374  my_has_cached_item = false;
1375  }
1376  if(f & rf_clear_edges) my_successors.clear();
1377  if(f & rf_reset_bodies) {
1378  internal::source_body<output_type> *tmp = my_init_body->clone();
1379  delete my_body;
1380  my_body = tmp;
1381  }
1382  if(my_active)
1383  internal::add_task_to_graph_reset_list(this->my_graph, create_put_task());
1384  }
1385 
1386 private:
1396 
1397  // used by apply_body_bypass, can invoke body of node.
1399  spin_mutex::scoped_lock lock(my_mutex);
1400  if ( my_reserved ) {
1401  return false;
1402  }
1403  if ( !my_has_cached_item ) {
1404  tbb::internal::fgt_begin_body( my_body );
1405  bool r = (*my_body)(my_cached_item);
1406  tbb::internal::fgt_end_body( my_body );
1407  if (r) {
1408  my_has_cached_item = true;
1409  }
1410  }
1411  if ( my_has_cached_item ) {
1412  v = my_cached_item;
1413  my_reserved = true;
1414  return true;
1415  } else {
1416  return false;
1417  }
1418  }
1419 
1420  // when resetting, and if the source_node was created with my_active == true, then
1421  // when we reset the node we must store a task to run the node, and spawn it only
1422  // after the reset is complete and is_active() is again true. This is why we don't
1423  // test for is_active() here.
1425  return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1427  }
1428 
1430  void spawn_put( ) {
1431  if(internal::is_graph_active(this->my_graph)) {
1432  internal::spawn_in_graph_arena(this->my_graph, *create_put_task());
1433  }
1434  }
1435 
1436  friend class internal::source_task_bypass< source_node< output_type > >;
1439  output_type v;
1440  if ( !try_reserve_apply_body(v) )
1441  return NULL;
1442 
1443  task *last_task = my_successors.try_put_task(v);
1444  if ( last_task )
1445  try_consume();
1446  else
1447  try_release();
1448  return last_task;
1449  }
1450 }; // class source_node
1451 #endif // TBB_USE_SOURCE_NODE_AS_ALIAS
1452 
1454 template<typename Input, typename Output = continue_msg, typename Policy = queueing,
1455  typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input)>
1457  : public graph_node
1458 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1459  , public internal::function_input< Input, Output, Policy, Allocator >
1460 #else
1461  , public internal::function_input< Input, Output, Policy, cache_aligned_allocator<Input> >
1462 #endif
1463  , public internal::function_output<Output> {
1464 
1465 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1466  typedef Allocator internals_allocator;
1467 #else
1469 
1472  "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1473  "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1474  );
1475 #endif
1476 
1477 public:
1478  typedef Input input_type;
1479  typedef Output output_type;
1485 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1486  typedef typename input_impl_type::predecessor_list_type predecessor_list_type;
1487  typedef typename fOutput_type::successor_list_type successor_list_type;
1488 #endif
1490 
1492  // input_queue_type is allocated here, but destroyed in the function_input_base.
1493  // TODO: pass the graph_buffer_policy to the function_input_base so it can all
1494  // be done in one place. This would be an interface-breaking change.
1495  template< typename Body >
1498  Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority ))
1499 #else
1501 #endif
1503  fOutput_type(g) {
1504  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1505  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1506  }
1507 
1508 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1509  template <typename Body>
1510  function_node( graph& g, size_t concurrency, Body body, node_priority_t priority )
1511  : function_node(g, concurrency, body, Policy(), priority) {}
1512 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1513 
1514 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1515  template <typename Body, typename... Args>
1516  function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
1519  make_edges_in_order(nodes, *this);
1520  }
1521 
1522 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1523  template <typename Body, typename... Args>
1524  function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority )
1525  : function_node(nodes, concurrency, body, Policy(), priority) {}
1526 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1527 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1528 
1531  graph_node(src.my_graph),
1532  input_impl_type(src),
1533  fOutput_type(src.my_graph) {
1534  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1535  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1536  }
1537 
1538 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1539  void set_name( const char *name ) __TBB_override {
1541  }
1542 #endif
1543 
1544 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1545  void extract( ) __TBB_override {
1546  my_predecessors.built_predecessors().receiver_extract(*this);
1547  successors().built_successors().sender_extract(*this);
1548  }
1549 #endif
1550 
1551 protected:
1552  template< typename R, typename B > friend class run_and_put_task;
1553  template<typename X, typename Y> friend class internal::broadcast_cache;
1554  template<typename X, typename Y> friend class internal::round_robin_cache;
1556 
1558 
1561  // TODO: use clear() instead.
1562  if(f & rf_clear_edges) {
1563  successors().clear();
1565  }
1566  __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
1567  __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
1568  }
1569 
1570 }; // class function_node
1571 
1573 // Output is a tuple of output types.
1574 template<typename Input, typename Output, typename Policy = queueing,
1575  typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input)>
1577  public graph_node,
1579  <
1580  Input,
1581  typename internal::wrap_tuple_elements<
1582  tbb::flow::tuple_size<Output>::value, // #elements in tuple
1583  internal::multifunction_output, // wrap this around each element
1584  Output // the tuple providing the types
1585  >::type,
1586  Policy,
1587 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1588  Allocator
1589 #else
1590  cache_aligned_allocator<Input>
1591 #endif
1592  > {
1593 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1594  typedef Allocator internals_allocator;
1595 #else
1597 
1600  "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1601  "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1602  );
1603 #endif
1604 
1605 protected:
1607 public:
1608  typedef Input input_type;
1614 private:
1616 public:
1617  template<typename Body>
1619  graph &g, size_t concurrency,
1621  Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1622 #else
1624 #endif
1626  tbb::internal::fgt_multioutput_node_with_body<N>(
1627  CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1628  &this->my_graph, static_cast<receiver<input_type> *>(this),
1629  this->output_ports(), this->my_body
1630  );
1631  }
1632 
1633 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1634  template <typename Body>
1635  __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t priority)
1636  : multifunction_node(g, concurrency, body, Policy(), priority) {}
1637 #endif // TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1638 
1639 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1640  template <typename Body, typename... Args>
1641  __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
1644  make_edges_in_order(nodes, *this);
1645  }
1646 
1647 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1648  template <typename Body, typename... Args>
1649  __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
1650  : multifunction_node(nodes, concurrency, body, Policy(), priority) {}
1651 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1652 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1653 
1655  graph_node(other.my_graph), input_impl_type(other) {
1656  tbb::internal::fgt_multioutput_node_with_body<N>( CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1657  &this->my_graph, static_cast<receiver<input_type> *>(this),
1658  this->output_ports(), this->my_body );
1659  }
1660 
1661 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1662  void set_name( const char *name ) __TBB_override {
1664  }
1665 #endif
1666 
1667 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1668  void extract( ) __TBB_override {
1669  my_predecessors.built_predecessors().receiver_extract(*this);
1670  input_impl_type::extract();
1671  }
1672 #endif
1673  // all the guts are in multifunction_input...
1674 protected:
1676 }; // multifunction_node
1677 
1679 // successors. The node has unlimited concurrency, so it does not reject inputs.
1680 template<typename TupleType, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(TupleType)>
1681 class split_node : public graph_node, public receiver<TupleType> {
1684 public:
1685  typedef TupleType input_type;
1686 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
1687  typedef Allocator allocator_type;
1688 #else
1691  "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
1692  "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
1693  );
1694 #endif
1695 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1697  typedef typename base_type::predecessor_list_type predecessor_list_type;
1698  typedef internal::predecessor_cache<input_type, null_mutex > predecessor_cache_type;
1699  typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
1700 #endif
1701 
1702  typedef typename internal::wrap_tuple_elements<
1703  N, // #elements in tuple
1704  internal::multifunction_output, // wrap this around each element
1705  TupleType // the tuple providing the types
1707 
1709  : graph_node(g),
1711  {
1712  tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1713  static_cast<receiver<input_type> *>(this), this->output_ports());
1714  }
1715 
1716 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1717  template <typename... Args>
1718  __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
1719  make_edges_in_order(nodes, *this);
1720  }
1721 #endif
1722 
1724  : graph_node(other.my_graph), base_type(other),
1726  {
1727  tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1728  static_cast<receiver<input_type> *>(this), this->output_ports());
1729  }
1730 
1731 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1732  void set_name( const char *name ) __TBB_override {
1734  }
1735 #endif
1736 
1738 
1739 protected:
1740  task *try_put_task(const TupleType& t) __TBB_override {
1741  // Sending split messages in parallel is not justified, as overheads would prevail.
1742  // Also, we do not have successors here. So we just tell the task returned here is successful.
1744  }
1746  if (f & rf_clear_edges)
1748 
1750  }
1753  return my_graph;
1754  }
1755 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1756 private:
1757  void extract() __TBB_override {}
1758 
1760  void internal_add_built_predecessor(predecessor_type&) __TBB_override {}
1761 
1763  void internal_delete_built_predecessor(predecessor_type&) __TBB_override {}
1764 
1765  size_t predecessor_count() __TBB_override { return 0; }
1766 
1767  void copy_predecessors(predecessor_list_type&) __TBB_override {}
1768 
1769  built_predecessors_type &built_predecessors() __TBB_override { return my_predessors; }
1770 
1772  built_predecessors_type my_predessors;
1773 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1774 
1775 private:
1777 };
1778 
1780 template <typename Output, typename Policy = internal::Policy<void> >
1781 class continue_node : public graph_node, public internal::continue_input<Output, Policy>,
1782  public internal::function_output<Output> {
1783 public:
1785  typedef Output output_type;
1788  typedef typename input_impl_type::predecessor_type predecessor_type;
1790 
1792  template <typename Body >
1794  graph &g,
1796  Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1797 #else
1799 #endif
1800  ) : graph_node(g), input_impl_type( g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority) ),
1801  fOutput_type(g) {
1802  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1803 
1804  static_cast<receiver<input_type> *>(this),
1805  static_cast<sender<output_type> *>(this), this->my_body );
1806  }
1807 
1808 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1809  template <typename Body>
1810  continue_node( graph& g, Body body, node_priority_t priority )
1811  : continue_node(g, body, Policy(), priority) {}
1812 #endif
1813 
1814 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1815  template <typename Body, typename... Args>
1816  continue_node( const node_set<Args...>& nodes, Body body,
1818  : continue_node(nodes.graph_reference(), body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority) ) {
1819  make_edges_in_order(nodes, *this);
1820  }
1821 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1822  template <typename Body, typename... Args>
1823  continue_node( const node_set<Args...>& nodes, Body body, node_priority_t priority)
1824  : continue_node(nodes, body, Policy(), priority) {}
1825 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1826 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1827 
1829  template <typename Body >
1831  graph &g, int number_of_predecessors,
1833  Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1834 #else
1836 #endif
1837  ) : graph_node(g)
1838  , input_impl_type(g, number_of_predecessors, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)),
1839  fOutput_type(g) {
1840  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1841  static_cast<receiver<input_type> *>(this),
1842  static_cast<sender<output_type> *>(this), this->my_body );
1843  }
1844 
1845 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1846  template <typename Body>
1847  continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t priority)
1848  : continue_node(g, number_of_predecessors, body, Policy(), priority) {}
1849 #endif
1850 
1851 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1852  template <typename Body, typename... Args>
1853  continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1854  Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy p = Policy(), node_priority_t priority = tbb::flow::internal::no_priority ))
1855  : continue_node(nodes.graph_reference(), number_of_predecessors, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1856  make_edges_in_order(nodes, *this);
1857  }
1858 
1859 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1860  template <typename Body, typename... Args>
1861  continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1862  Body body, node_priority_t priority )
1863  : continue_node(nodes, number_of_predecessors, body, Policy(), priority) {}
1864 #endif
1865 #endif
1866 
1869  graph_node(src.my_graph), input_impl_type(src),
1870  internal::function_output<Output>(src.my_graph) {
1871  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1872  static_cast<receiver<input_type> *>(this),
1873  static_cast<sender<output_type> *>(this), this->my_body );
1874  }
1875 
1876 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1877  void set_name( const char *name ) __TBB_override {
1879  }
1880 #endif
1881 
1882 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1883  void extract() __TBB_override {
1884  input_impl_type::my_built_predecessors.receiver_extract(*this);
1885  successors().built_successors().sender_extract(*this);
1886  }
1887 #endif
1888 
1889 protected:
1890  template< typename R, typename B > friend class run_and_put_task;
1891  template<typename X, typename Y> friend class internal::broadcast_cache;
1892  template<typename X, typename Y> friend class internal::round_robin_cache;
1893  using input_impl_type::try_put_task;
1895 
1898  if(f & rf_clear_edges)successors().clear();
1899  __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1900  }
1901 }; // continue_node
1902 
1904 template <typename T>
1905 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1906 public:
1907  typedef T input_type;
1908  typedef T output_type;
1911 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1912  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1913  typedef typename sender<output_type>::successor_list_type successor_list_type;
1914 #endif
1915 private:
1917 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1918  internal::edge_container<predecessor_type> my_built_predecessors;
1919  spin_mutex pred_mutex; // serialize accesses on edge_container
1920 #endif
1921 public:
1922 
1924  my_successors.set_owner( this );
1925  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1926  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1927  }
1928 
1929 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1930  template <typename... Args>
1931  broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
1932  make_edges_in_order(nodes, *this);
1933  }
1934 #endif
1935 
1936  // Copy constructor
1938  graph_node(src.my_graph), receiver<T>(), sender<T>()
1939  {
1940  my_successors.set_owner( this );
1941  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1942  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1943  }
1944 
1945 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1946  void set_name( const char *name ) __TBB_override {
1948  }
1949 #endif
1950 
1953  my_successors.register_successor( r );
1954  return true;
1955  }
1956 
1959  my_successors.remove_successor( r );
1960  return true;
1961  }
1962 
1963 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1964  typedef typename sender<T>::built_successors_type built_successors_type;
1965 
1966  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1967 
1968  void internal_add_built_successor(successor_type &r) __TBB_override {
1969  my_successors.internal_add_built_successor(r);
1970  }
1971 
1972  void internal_delete_built_successor(successor_type &r) __TBB_override {
1973  my_successors.internal_delete_built_successor(r);
1974  }
1975 
1976  size_t successor_count() __TBB_override {
1977  return my_successors.successor_count();
1978  }
1979 
1980  void copy_successors(successor_list_type &v) __TBB_override {
1981  my_successors.copy_successors(v);
1982  }
1983 
1984  typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1985 
1986  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1987 
1988  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
1989  spin_mutex::scoped_lock l(pred_mutex);
1990  my_built_predecessors.add_edge(p);
1991  }
1992 
1993  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
1994  spin_mutex::scoped_lock l(pred_mutex);
1995  my_built_predecessors.delete_edge(p);
1996  }
1997 
1998  size_t predecessor_count() __TBB_override {
1999  spin_mutex::scoped_lock l(pred_mutex);
2000  return my_built_predecessors.edge_count();
2001  }
2002 
2003  void copy_predecessors(predecessor_list_type &v) __TBB_override {
2004  spin_mutex::scoped_lock l(pred_mutex);
2005  my_built_predecessors.copy_edges(v);
2006  }
2007 
2008  void extract() __TBB_override {
2009  my_built_predecessors.receiver_extract(*this);
2010  my_successors.built_successors().sender_extract(*this);
2011  }
2012 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2013 
2014 protected:
2015  template< typename R, typename B > friend class run_and_put_task;
2016  template<typename X, typename Y> friend class internal::broadcast_cache;
2017  template<typename X, typename Y> friend class internal::round_robin_cache;
2020  task *new_task = my_successors.try_put_task(t);
2021  if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
2022  return new_task;
2023  }
2024 
2026  return my_graph;
2027  }
2028 
2030 
2032  if (f&rf_clear_edges) {
2033  my_successors.clear();
2034 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2035  my_built_predecessors.clear();
2036 #endif
2037  }
2038  __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
2039  }
2040 }; // broadcast_node
2041 
2043 template <typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2045  : public graph_node
2046 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2047  , public internal::reservable_item_buffer< T, Allocator >
2048 #else
2049  , public internal::reservable_item_buffer< T, cache_aligned_allocator<T> >
2050 #endif
2051  , public receiver<T>, public sender<T> {
2052 #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2053  typedef Allocator internals_allocator;
2054 #else
2056 #endif
2057 public:
2058  typedef T input_type;
2059  typedef T output_type;
2063 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2064  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2065  typedef typename sender<output_type>::successor_list_type successor_list_type;
2066 #endif
2067 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2070  "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2071  "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2072  );
2073 #endif
2074 
2075 protected:
2076  typedef size_t size_type;
2078 
2079 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2080  internal::edge_container<predecessor_type> my_built_predecessors;
2081 #endif
2082 
2083  friend class internal::forward_task_bypass< class_type >;
2084 
2086 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2087  , add_blt_succ, del_blt_succ,
2088  add_blt_pred, del_blt_pred,
2089  blt_succ_cnt, blt_pred_cnt,
2090  blt_succ_cpy, blt_pred_cpy // create vector copies of preds and succs
2091 #endif
2092  };
2093 
2094  // implements the aggregator_operation concept
2095  class buffer_operation : public internal::aggregated_operation< buffer_operation > {
2096  public:
2097  char type;
2098 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2099  task * ltask;
2100  union {
2101  input_type *elem;
2102  successor_type *r;
2104  size_t cnt_val;
2105  successor_list_type *svec;
2106  predecessor_list_type *pvec;
2107  };
2108 #else
2109  T *elem;
2112 #endif
2113  buffer_operation(const T& e, op_type t) : type(char(t))
2114 
2115 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2116  , ltask(NULL), elem(const_cast<T*>(&e))
2117 #else
2118  , elem(const_cast<T*>(&e)) , ltask(NULL)
2119 #endif
2120  {}
2121  buffer_operation(op_type t) : type(char(t)), ltask(NULL) {}
2122  };
2123 
2128 
2129  virtual void handle_operations(buffer_operation *op_list) {
2130  handle_operations_impl(op_list, this);
2131  }
2132 
2133  template<typename derived_type>
2134  void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
2135  __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
2136 
2137  buffer_operation *tmp = NULL;
2138  bool try_forwarding = false;
2139  while (op_list) {
2140  tmp = op_list;
2141  op_list = op_list->next;
2142  switch (tmp->type) {
2143  case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
2144  case rem_succ: internal_rem_succ(tmp); break;
2145  case req_item: internal_pop(tmp); break;
2146  case res_item: internal_reserve(tmp); break;
2147  case rel_res: internal_release(tmp); try_forwarding = true; break;
2148  case con_res: internal_consume(tmp); try_forwarding = true; break;
2149  case put_item: try_forwarding = internal_push(tmp); break;
2150  case try_fwd_task: internal_forward_task(tmp); break;
2151 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2152  // edge recording
2153  case add_blt_succ: internal_add_built_succ(tmp); break;
2154  case del_blt_succ: internal_del_built_succ(tmp); break;
2155  case add_blt_pred: internal_add_built_pred(tmp); break;
2156  case del_blt_pred: internal_del_built_pred(tmp); break;
2157  case blt_succ_cnt: internal_succ_cnt(tmp); break;
2158  case blt_pred_cnt: internal_pred_cnt(tmp); break;
2159  case blt_succ_cpy: internal_copy_succs(tmp); break;
2160  case blt_pred_cpy: internal_copy_preds(tmp); break;
2161 #endif
2162  }
2163  }
2164 
2165  derived->order();
2166 
2167  if (try_forwarding && !forwarder_busy) {
2168  if(internal::is_graph_active(this->my_graph)) {
2169  forwarder_busy = true;
2170  task *new_task = new(task::allocate_additional_child_of(*(this->my_graph.root_task()))) internal::
2171  forward_task_bypass<class_type>(*this);
2172  // tmp should point to the last item handled by the aggregator. This is the operation
2173  // the handling thread enqueued. So modifying that record will be okay.
2174  // workaround for icc bug
2175  tbb::task *z = tmp->ltask;
2176  graph &g = this->my_graph;
2177  tmp->ltask = combine_tasks(g, z, new_task); // in case the op generated a task
2178  }
2179  }
2180  } // handle_operations
2181 
2183  return op_data.ltask;
2184  }
2185 
2187  task *ft = grab_forwarding_task(op_data);
2188  if(ft) {
2190  return true;
2191  }
2192  return false;
2193  }
2194 
2196  virtual task *forward_task() {
2197  buffer_operation op_data(try_fwd_task);
2198  task *last_task = NULL;
2199  do {
2200  op_data.status = internal::WAIT;
2201  op_data.ltask = NULL;
2202  my_aggregator.execute(&op_data);
2203 
2204  // workaround for icc bug
2205  tbb::task *xtask = op_data.ltask;
2206  graph& g = this->my_graph;
2207  last_task = combine_tasks(g, last_task, xtask);
2208  } while (op_data.status ==internal::SUCCEEDED);
2209  return last_task;
2210  }
2211 
2214  my_successors.register_successor(*(op->r));
2216  }
2217 
2220  my_successors.remove_successor(*(op->r));
2222  }
2223 
2224 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2225  typedef typename sender<T>::built_successors_type built_successors_type;
2226 
2227  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
2228 
2229  virtual void internal_add_built_succ(buffer_operation *op) {
2230  my_successors.internal_add_built_successor(*(op->r));
2232  }
2233 
2234  virtual void internal_del_built_succ(buffer_operation *op) {
2235  my_successors.internal_delete_built_successor(*(op->r));
2237  }
2238 
2239  typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
2240 
2241  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
2242 
2243  virtual void internal_add_built_pred(buffer_operation *op) {
2244  my_built_predecessors.add_edge(*(op->p));
2246  }
2247 
2248  virtual void internal_del_built_pred(buffer_operation *op) {
2249  my_built_predecessors.delete_edge(*(op->p));
2251  }
2252 
2253  virtual void internal_succ_cnt(buffer_operation *op) {
2254  op->cnt_val = my_successors.successor_count();
2256  }
2257 
2258  virtual void internal_pred_cnt(buffer_operation *op) {
2259  op->cnt_val = my_built_predecessors.edge_count();
2261  }
2262 
2263  virtual void internal_copy_succs(buffer_operation *op) {
2264  my_successors.copy_successors(*(op->svec));
2266  }
2267 
2268  virtual void internal_copy_preds(buffer_operation *op) {
2269  my_built_predecessors.copy_edges(*(op->pvec));
2271  }
2272 
2273 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2274 
2275 private:
2276  void order() {}
2277 
2278  bool is_item_valid() {
2279  return this->my_item_valid(this->my_tail - 1);
2280  }
2281 
2282  void try_put_and_add_task(task*& last_task) {
2283  task *new_task = my_successors.try_put_task(this->back());
2284  if (new_task) {
2285  // workaround for icc bug
2286  graph& g = this->my_graph;
2287  last_task = combine_tasks(g, last_task, new_task);
2288  this->destroy_back();
2289  }
2290  }
2291 
2292 protected:
2295  internal_forward_task_impl(op, this);
2296  }
2297 
2298  template<typename derived_type>
2299  void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
2300  __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
2301 
2302  if (this->my_reserved || !derived->is_item_valid()) {
2304  this->forwarder_busy = false;
2305  return;
2306  }
2307  // Try forwarding, giving each successor a chance
2308  task * last_task = NULL;
2309  size_type counter = my_successors.size();
2310  for (; counter > 0 && derived->is_item_valid(); --counter)
2311  derived->try_put_and_add_task(last_task);
2312 
2313  op->ltask = last_task; // return task
2314  if (last_task && !counter) {
2316  }
2317  else {
2319  forwarder_busy = false;
2320  }
2321  }
2322 
2323  virtual bool internal_push(buffer_operation *op) {
2324  this->push_back(*(op->elem));
2326  return true;
2327  }
2328 
2329  virtual void internal_pop(buffer_operation *op) {
2330  if(this->pop_back(*(op->elem))) {
2332  }
2333  else {
2335  }
2336  }
2337 
2339  if(this->reserve_front(*(op->elem))) {
2341  }
2342  else {
2344  }
2345  }
2346 
2348  this->consume_front();
2350  }
2351 
2353  this->release_front();
2355  }
2356 
2357 public:
2361  sender<T>(), forwarder_busy(false)
2362  {
2363  my_successors.set_owner(this);
2364  my_aggregator.initialize_handler(handler_type(this));
2365  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2366  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2367  }
2368 
2369 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2370  template <typename... Args>
2371  buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
2372  make_edges_in_order(nodes, *this);
2373  }
2374 #endif
2375 
2379  receiver<T>(), sender<T>(), forwarder_busy(false)
2380  {
2381  my_successors.set_owner(this);
2382  my_aggregator.initialize_handler(handler_type(this));
2383  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2384  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2385  }
2386 
2387 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2388  void set_name( const char *name ) __TBB_override {
2390  }
2391 #endif
2392 
2393  //
2394  // message sender implementation
2395  //
2396 
2398 
2400  buffer_operation op_data(reg_succ);
2401  op_data.r = &r;
2402  my_aggregator.execute(&op_data);
2403  (void)enqueue_forwarding_task(op_data);
2404  return true;
2405  }
2406 
2407 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2408  void internal_add_built_successor( successor_type &r) __TBB_override {
2409  buffer_operation op_data(add_blt_succ);
2410  op_data.r = &r;
2411  my_aggregator.execute(&op_data);
2412  }
2413 
2414  void internal_delete_built_successor( successor_type &r) __TBB_override {
2415  buffer_operation op_data(del_blt_succ);
2416  op_data.r = &r;
2417  my_aggregator.execute(&op_data);
2418  }
2419 
2420  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
2421  buffer_operation op_data(add_blt_pred);
2422  op_data.p = &p;
2423  my_aggregator.execute(&op_data);
2424  }
2425 
2426  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
2427  buffer_operation op_data(del_blt_pred);
2428  op_data.p = &p;
2429  my_aggregator.execute(&op_data);
2430  }
2431 
2432  size_t predecessor_count() __TBB_override {
2433  buffer_operation op_data(blt_pred_cnt);
2434  my_aggregator.execute(&op_data);
2435  return op_data.cnt_val;
2436  }
2437 
2438  size_t successor_count() __TBB_override {
2439  buffer_operation op_data(blt_succ_cnt);
2440  my_aggregator.execute(&op_data);
2441  return op_data.cnt_val;
2442  }
2443 
2444  void copy_predecessors( predecessor_list_type &v ) __TBB_override {
2445  buffer_operation op_data(blt_pred_cpy);
2446  op_data.pvec = &v;
2447  my_aggregator.execute(&op_data);
2448  }
2449 
2450  void copy_successors( successor_list_type &v ) __TBB_override {
2451  buffer_operation op_data(blt_succ_cpy);
2452  op_data.svec = &v;
2453  my_aggregator.execute(&op_data);
2454  }
2455 
2456 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2457 
2459 
2462  r.remove_predecessor(*this);
2463  buffer_operation op_data(rem_succ);
2464  op_data.r = &r;
2465  my_aggregator.execute(&op_data);
2466  // even though this operation does not cause a forward, if we are the handler, and
2467  // a forward is scheduled, we may be the first to reach this point after the aggregator,
2468  // and so should check for the task.
2469  (void)enqueue_forwarding_task(op_data);
2470  return true;
2471  }
2472 
2474 
2476  bool try_get( T &v ) __TBB_override {
2477  buffer_operation op_data(req_item);
2478  op_data.elem = &v;
2479  my_aggregator.execute(&op_data);
2480  (void)enqueue_forwarding_task(op_data);
2481  return (op_data.status==internal::SUCCEEDED);
2482  }
2483 
2485 
2488  buffer_operation op_data(res_item);
2489  op_data.elem = &v;
2490  my_aggregator.execute(&op_data);
2491  (void)enqueue_forwarding_task(op_data);
2492  return (op_data.status==internal::SUCCEEDED);
2493  }
2494 
2496 
2498  buffer_operation op_data(rel_res);
2499  my_aggregator.execute(&op_data);
2500  (void)enqueue_forwarding_task(op_data);
2501  return true;
2502  }
2503 
2505 
2507  buffer_operation op_data(con_res);
2508  my_aggregator.execute(&op_data);
2509  (void)enqueue_forwarding_task(op_data);
2510  return true;
2511  }
2512 
2513 protected:
2514 
2515  template< typename R, typename B > friend class run_and_put_task;
2516  template<typename X, typename Y> friend class internal::broadcast_cache;
2517  template<typename X, typename Y> friend class internal::round_robin_cache;
2520  buffer_operation op_data(t, put_item);
2521  my_aggregator.execute(&op_data);
2522  task *ft = grab_forwarding_task(op_data);
2523  // sequencer_nodes can return failure (if an item has been previously inserted)
2524  // We have to spawn the returned task if our own operation fails.
2525 
2526  if(ft && op_data.status ==internal::FAILED) {
2527  // we haven't succeeded queueing the item, but for some reason the
2528  // call returned a task (if another request resulted in a successful
2529  // forward this could happen.) Queue the task and reset the pointer.
2531  }
2532  else if(!ft && op_data.status ==internal::SUCCEEDED) {
2533  ft = SUCCESSFULLY_ENQUEUED;
2534  }
2535  return ft;
2536  }
2537 
2539  return my_graph;
2540  }
2541 
2543 
2544 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2545 public:
2546  void extract() __TBB_override {
2547  my_built_predecessors.receiver_extract(*this);
2548  my_successors.built_successors().sender_extract(*this);
2549  }
2550 #endif
2551 
2552 protected:
2555  // TODO: just clear structures
2556  if (f&rf_clear_edges) {
2557  my_successors.clear();
2558 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2559  my_built_predecessors.clear();
2560 #endif
2561  }
2562  forwarder_busy = false;
2563  }
2564 }; // buffer_node
2565 
2567 template <typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2568 class queue_node : public buffer_node<T, Allocator> {
2569 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2572  "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2573  "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2574  );
2575 #endif
2576 protected:
2581 
2582 private:
2583  template<typename, typename> friend class buffer_node;
2584 
2585  bool is_item_valid() {
2586  return this->my_item_valid(this->my_head);
2587  }
2588 
2589  void try_put_and_add_task(task*& last_task) {
2590  task *new_task = this->my_successors.try_put_task(this->front());
2591  if (new_task) {
2592  // workaround for icc bug
2593  graph& graph_ref = this->graph_reference();
2594  last_task = combine_tasks(graph_ref, last_task, new_task);
2595  this->destroy_front();
2596  }
2597  }
2598 
2599 protected:
2600  void internal_forward_task(queue_operation *op) __TBB_override {
2601  this->internal_forward_task_impl(op, this);
2602  }
2603 
2605  if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2607  }
2608  else {
2609  this->pop_front(*(op->elem));
2611  }
2612  }
2614  if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2616  }
2617  else {
2618  this->reserve_front(*(op->elem));
2620  }
2621  }
2623  this->consume_front();
2625  }
2626 
2627 public:
2628  typedef T input_type;
2629  typedef T output_type;
2632 
2635  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2636  static_cast<receiver<input_type> *>(this),
2637  static_cast<sender<output_type> *>(this) );
2638  }
2639 
2640 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2641  template <typename... Args>
2642  queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
2643  make_edges_in_order(nodes, *this);
2644  }
2645 #endif
2646 
2649  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2650  static_cast<receiver<input_type> *>(this),
2651  static_cast<sender<output_type> *>(this) );
2652  }
2653 
2654 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2655  void set_name( const char *name ) __TBB_override {
2657  }
2658 #endif
2659 
2660 protected:
2663  }
2664 }; // queue_node
2665 
2667 template< typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
2668 class sequencer_node : public queue_node<T, Allocator> {
2670  // my_sequencer should be a benign function and must be callable
2671  // from a parallel context. Does this mean it needn't be reset?
2672 public:
2673 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2676  "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
2677  "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
2678  );
2679 #endif
2680  typedef T input_type;
2681  typedef T output_type;
2684 
2686  template< typename Sequencer >
2687  __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, Allocator>(g),
2688  my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
2689  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2690  static_cast<receiver<input_type> *>(this),
2691  static_cast<sender<output_type> *>(this) );
2692  }
2693 
2694 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2695  template <typename Sequencer, typename... Args>
2696  sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
2697  : sequencer_node(nodes.graph_reference(), s) {
2698  make_edges_in_order(nodes, *this);
2699  }
2700 #endif
2701 
2703  __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T, Allocator>(src),
2704  my_sequencer( src.my_sequencer->clone() ) {
2705  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2706  static_cast<receiver<input_type> *>(this),
2707  static_cast<sender<output_type> *>(this) );
2708  }
2709 
2712 
2713 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2714  void set_name( const char *name ) __TBB_override {
2716  }
2717 #endif
2718 
2719 protected:
2722 
2723 private:
2725  size_type tag = (*my_sequencer)(*(op->elem));
2726 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
2727  if (tag < this->my_head) {
2728  // have already emitted a message with this tag
2730  return false;
2731  }
2732 #endif
2733  // cannot modify this->my_tail now; the buffer would be inconsistent.
2734  size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2735 
2736  if (this->size(new_tail) > this->capacity()) {
2737  this->grow_my_array(this->size(new_tail));
2738  }
2739  this->my_tail = new_tail;
2740 
2741  const internal::op_stat res = this->place_item(tag, *(op->elem)) ? internal::SUCCEEDED : internal::FAILED;
2742  __TBB_store_with_release(op->status, res);
2743  return res ==internal::SUCCEEDED;
2744  }
2745 }; // sequencer_node
2746 
2748 template<typename T, typename Compare = std::less<T>, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T)>
2749 class priority_queue_node : public buffer_node<T, Allocator> {
2750 public:
2751 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
2754  "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. "
2755  "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR."
2756  );
2757 #endif
2758  typedef T input_type;
2759  typedef T output_type;
2764 
2766  __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
2767  : buffer_node<T, Allocator>(g), compare(comp), mark(0) {
2768  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2769  static_cast<receiver<input_type> *>(this),
2770  static_cast<sender<output_type> *>(this) );
2771  }
2772 
2773 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2774  template <typename... Args>
2775  priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
2776  : priority_queue_node(nodes.graph_reference(), comp) {
2777  make_edges_in_order(nodes, *this);
2778  }
2779 #endif
2780 
2783  : buffer_node<T, Allocator>(src), mark(0)
2784  {
2785  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2786  static_cast<receiver<input_type> *>(this),
2787  static_cast<sender<output_type> *>(this) );
2788  }
2789 
2790 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2791  void set_name( const char *name ) __TBB_override {
2793  }
2794 #endif
2795 
2796 protected:
2797 
2799  mark = 0;
2801  }
2802 
2806 
2809  this->internal_forward_task_impl(op, this);
2810  }
2811 
2813  this->handle_operations_impl(op_list, this);
2814  }
2815 
2817  prio_push(*(op->elem));
2819  return true;
2820  }
2821 
2823  // if empty or already reserved, don't pop
2824  if ( this->my_reserved == true || this->my_tail == 0 ) {
2826  return;
2827  }
2828 
2829  *(op->elem) = prio();
2831  prio_pop();
2832 
2833  }
2834 
2835  // pops the highest-priority item, saves copy
2837  if (this->my_reserved == true || this->my_tail == 0) {
2839  return;
2840  }
2841  this->my_reserved = true;
2842  *(op->elem) = prio();
2843  reserved_item = *(op->elem);
2845  prio_pop();
2846  }
2847 
2850  this->my_reserved = false;
2852  }
2853 
2854  void internal_release(prio_operation *op) __TBB_override {
2857  this->my_reserved = false;
2859  }
2860 
2861 private:
2862  template<typename, typename> friend class buffer_node;
2863 
2864  void order() {
2865  if (mark < this->my_tail) heapify();
2866  __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
2867  }
2868 
2869  bool is_item_valid() {
2870  return this->my_tail > 0;
2871  }
2872 
2873  void try_put_and_add_task(task*& last_task) {
2874  task * new_task = this->my_successors.try_put_task(this->prio());
2875  if (new_task) {
2876  // workaround for icc bug
2877  graph& graph_ref = this->graph_reference();
2878  last_task = combine_tasks(graph_ref, last_task, new_task);
2879  prio_pop();
2880  }
2881  }
2882 
2883 private:
2884  Compare compare;
2886 
2888 
2889  // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
2890  bool prio_use_tail() {
2891  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
2892  return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2893  }
2894 
2895  // prio_push: checks that the item will fit, expand array if necessary, put at end
2896  void prio_push(const T &src) {
2897  if ( this->my_tail >= this->my_array_size )
2898  this->grow_my_array( this->my_tail + 1 );
2899  (void) this->place_item(this->my_tail, src);
2900  ++(this->my_tail);
2901  __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
2902  }
2903 
2904  // prio_pop: deletes highest priority item from the array, and if it is item
2905  // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail
2906  // and mark. Assumes the array has already been tested for emptiness; no failure.
2907  void prio_pop() {
2908  if (prio_use_tail()) {
2909  // there are newly pushed elements; last one higher than top
2910  // copy the data
2911  this->destroy_item(this->my_tail-1);
2912  --(this->my_tail);
2913  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2914  return;
2915  }
2916  this->destroy_item(0);
2917  if(this->my_tail > 1) {
2918  // push the last element down heap
2919  __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2920  this->move_item(0,this->my_tail - 1);
2921  }
2922  --(this->my_tail);
2923  if(mark > this->my_tail) --mark;
2924  if (this->my_tail > 1) // don't reheap for heap of size 1
2925  reheap();
2926  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2927  }
2928 
2929  const T& prio() {
2930  return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2931  }
2932 
2933  // turn array into heap
2934  void heapify() {
2935  if(this->my_tail == 0) {
2936  mark = 0;
2937  return;
2938  }
2939  if (!mark) mark = 1;
2940  for (; mark<this->my_tail; ++mark) { // for each unheaped element
2941  size_type cur_pos = mark;
2942  input_type to_place;
2943  this->fetch_item(mark,to_place);
2944  do { // push to_place up the heap
2945  size_type parent = (cur_pos-1)>>1;
2946  if (!compare(this->get_my_item(parent), to_place))
2947  break;
2948  this->move_item(cur_pos, parent);
2949  cur_pos = parent;
2950  } while( cur_pos );
2951  (void) this->place_item(cur_pos, to_place);
2952  }
2953  }
2954 
2955  // otherwise heapified array with new root element; rearrange to heap
2956  void reheap() {
2957  size_type cur_pos=0, child=1;
2958  while (child < mark) {
2959  size_type target = child;
2960  if (child+1<mark &&
2961  compare(this->get_my_item(child),
2962  this->get_my_item(child+1)))
2963  ++target;
2964  // target now has the higher priority child
2965  if (compare(this->get_my_item(target),
2966  this->get_my_item(cur_pos)))
2967  break;
2968  // swap
2969  this->swap_items(cur_pos, target);
2970  cur_pos = target;
2971  child = (cur_pos<<1)+1;
2972  }
2973  }
2974 }; // priority_queue_node
2975 
2976 } // interfaceX
2977 
2978 namespace interface11 {
2979 
2981 
2984 template< typename T, typename DecrementType=continue_msg >
2985 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2986 public:
2987  typedef T input_type;
2988  typedef T output_type;
2991 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2992  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
2993  typedef typename sender<output_type>::built_successors_type built_successors_type;
2994  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2995  typedef typename sender<output_type>::successor_list_type successor_list_type;
2996 #endif
2997  //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
2998 
2999 private:
3001  size_t my_count; //number of successful puts
3002  size_t my_tries; //number of active put attempts
3006  __TBB_DEPRECATED_LIMITER_EXPR( int init_decrement_predecessors; )
3007 
3009 
3010  // Let decrementer call decrement_counter()
3011  friend class internal::decrementer< limiter_node<T,DecrementType>, DecrementType >;
3012 
3013  bool check_conditions() { // always called under lock
3014  return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
3015  }
3016 
3017  // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
3019  input_type v;
3020  task *rval = NULL;
3021  bool reserved = false;
3022  {
3024  if ( check_conditions() )
3025  ++my_tries;
3026  else
3027  return NULL;
3028  }
3029 
3030  //SUCCESS
3031  // if we can reserve and can put, we consume the reservation
3032  // we increment the count and decrement the tries
3033  if ( (my_predecessors.try_reserve(v)) == true ){
3034  reserved=true;
3035  if ( (rval = my_successors.try_put_task(v)) != NULL ){
3036  {
3038  ++my_count;
3039  --my_tries;
3040  my_predecessors.try_consume();
3041  if ( check_conditions() ) {
3042  if ( internal::is_graph_active(this->my_graph) ) {
3043  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3046  }
3047  }
3048  }
3049  return rval;
3050  }
3051  }
3052  //FAILURE
3053  //if we can't reserve, we decrement the tries
3054  //if we can reserve but can't put, we decrement the tries and release the reservation
3055  {
3057  --my_tries;
3058  if (reserved) my_predecessors.try_release();
3059  if ( check_conditions() ) {
3060  if ( internal::is_graph_active(this->my_graph) ) {
3061  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3063  __TBB_ASSERT(!rval, "Have two tasks to handle");
3064  return rtask;
3065  }
3066  }
3067  return rval;
3068  }
3069  }
3070 
3071  void forward() {
3072  __TBB_ASSERT(false, "Should never be called");
3073  return;
3074  }
3075 
3076  task* decrement_counter( long long delta ) {
3077  {
3079  if( delta > 0 && size_t(delta) > my_count )
3080  my_count = 0;
3081  else if( delta < 0 && size_t(delta) > my_threshold - my_count )
3083  else
3084  my_count -= size_t(delta); // absolute value of delta is sufficiently small
3085  }
3086  return forward_task();
3087  }
3088 
3089  void initialize() {
3090  my_predecessors.set_owner(this);
3091  my_successors.set_owner(this);
3092  decrement.set_owner(this);
3094  CODEPTR(), tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
3095  static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
3096  static_cast<sender<output_type> *>(this)
3097  );
3098  }
3099 public:
3102 
3103 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
3105  "Deprecated interface of the limiter node can be used only in conjunction "
3106  "with continue_msg as the type of DecrementType template parameter." );
3107 #endif // Check for incompatible interface
3108 
3111  __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
3112  : graph_node(g), my_threshold(threshold), my_count(0),
3114  my_tries(0), decrement(),
3115  init_decrement_predecessors(num_decrement_predecessors),
3116  decrement(num_decrement_predecessors)) {
3117  initialize();
3118  }
3119 
3120 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3121  template <typename... Args>
3122  limiter_node(const node_set<Args...>& nodes, size_t threshold)
3123  : limiter_node(nodes.graph_reference(), threshold) {
3124  make_edges_in_order(nodes, *this);
3125  }
3126 #endif
3127 
3129  limiter_node( const limiter_node& src ) :
3130  graph_node(src.my_graph), receiver<T>(), sender<T>(),
3133  my_tries(0), decrement(),
3134  init_decrement_predecessors(src.init_decrement_predecessors),
3135  decrement(src.init_decrement_predecessors)) {
3136  initialize();
3137  }
3138 
3139 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3140  void set_name( const char *name ) __TBB_override {
3142  }
3143 #endif
3144 
3148  bool was_empty = my_successors.empty();
3149  my_successors.register_successor(r);
3150  //spawn a forward task if this is the only successor
3151  if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
3152  if ( internal::is_graph_active(this->my_graph) ) {
3153  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3156  }
3157  }
3158  return true;
3159  }
3160 
3162 
3164  r.remove_predecessor(*this);
3165  my_successors.remove_successor(r);
3166  return true;
3167  }
3168 
3169 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3170  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
3171  built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
3172 
3173  void internal_add_built_successor(successor_type &src) __TBB_override {
3174  my_successors.internal_add_built_successor(src);
3175  }
3176 
3177  void internal_delete_built_successor(successor_type &src) __TBB_override {
3178  my_successors.internal_delete_built_successor(src);
3179  }
3180 
3181  size_t successor_count() __TBB_override { return my_successors.successor_count(); }
3182 
3183  void copy_successors(successor_list_type &v) __TBB_override {
3184  my_successors.copy_successors(v);
3185  }
3186 
3187  void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
3188  my_predecessors.internal_add_built_predecessor(src);
3189  }
3190 
3191  void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
3192  my_predecessors.internal_delete_built_predecessor(src);
3193  }
3194 
3195  size_t predecessor_count() __TBB_override { return my_predecessors.predecessor_count(); }
3196 
3197  void copy_predecessors(predecessor_list_type &v) __TBB_override {
3198  my_predecessors.copy_predecessors(v);
3199  }
3200 
3201  void extract() __TBB_override {
3202  my_count = 0;
3203  my_successors.built_successors().sender_extract(*this);
3204  my_predecessors.built_predecessors().receiver_extract(*this);
3205  decrement.built_predecessors().receiver_extract(decrement);
3206  }
3207 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3208 
3212  my_predecessors.add( src );
3213  if ( my_count + my_tries < my_threshold && !my_successors.empty() && internal::is_graph_active(this->my_graph) ) {
3214  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3217  }
3218  return true;
3219  }
3220 
3223  my_predecessors.remove( src );
3224  return true;
3225  }
3226 
3227 protected:
3228 
3229  template< typename R, typename B > friend class run_and_put_task;
3230  template<typename X, typename Y> friend class internal::broadcast_cache;
3231  template<typename X, typename Y> friend class internal::round_robin_cache;
3234  {
3236  if ( my_count + my_tries >= my_threshold )
3237  return NULL;
3238  else
3239  ++my_tries;
3240  }
3241 
3242  task * rtask = my_successors.try_put_task(t);
3243 
3244  if ( !rtask ) { // try_put_task failed.
3246  --my_tries;
3248  rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3250  }
3251  }
3252  else {
3254  ++my_count;
3255  --my_tries;
3256  }
3257  return rtask;
3258  }
3259 
3261 
3263  __TBB_ASSERT(false,NULL); // should never be called
3264  }
3265 
3267  my_count = 0;
3268  if(f & rf_clear_edges) {
3269  my_predecessors.clear();
3270  my_successors.clear();
3271  }
3272  else
3273  {
3274  my_predecessors.reset( );
3275  }
3276  decrement.reset_receiver(f);
3277  }
3278 }; // limiter_node
3279 
3281 
3285 using internal::input_port;
3286 using internal::tag_value;
3287 
3288 template<typename OutputTuple, typename JP=queueing> class join_node;
3289 
3290 template<typename OutputTuple>
3291 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
3292 private:
3295 public:
3296  typedef OutputTuple output_type;
3299  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
3300  this->input_ports(), static_cast< sender< output_type > *>(this) );
3301  }
3302 
3303 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3304  template <typename... Args>
3305  __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
3306  make_edges_in_order(nodes, *this);
3307  }
3308 #endif
3309 
3311  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
3312  this->input_ports(), static_cast< sender< output_type > *>(this) );
3313  }
3314 
3315 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3316  void set_name( const char *name ) __TBB_override {
3318  }
3319 #endif
3320 
3321 };
3322 
3323 template<typename OutputTuple>
3324 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
3325 private:
3328 public:
3329  typedef OutputTuple output_type;
3332  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
3333  this->input_ports(), static_cast< sender< output_type > *>(this) );
3334  }
3335 
3336 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3337  template <typename... Args>
3338  __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
3339  make_edges_in_order(nodes, *this);
3340  }
3341 #endif
3342 
3344  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
3345  this->input_ports(), static_cast< sender< output_type > *>(this) );
3346  }
3347 
3348 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3349  void set_name( const char *name ) __TBB_override {
3351  }
3352 #endif
3353 
3354 };
3355 
3356 // template for key_matching join_node
3357 // tag_matching join_node is a specialization of key_matching, and is source-compatible.
3358 template<typename OutputTuple, typename K, typename KHash>
3359 class join_node<OutputTuple, key_matching<K, KHash> > : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
3360  key_matching_port, OutputTuple, key_matching<K,KHash> > {
3361 private:
3364 public:
3365  typedef OutputTuple output_type;
3367 
3368 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
3370 
3371 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3372  template <typename... Args>
3373  join_node(const node_set<Args...>& nodes, key_matching<K, KHash> = key_matching<K, KHash>())
3374  : join_node(nodes.graph_reference()) {
3375  make_edges_in_order(nodes, *this);
3376  }
3377 #endif
3378 
3379 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
3380 
3381  template<typename __TBB_B0, typename __TBB_B1>
3382  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
3383  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3384  this->input_ports(), static_cast< sender< output_type > *>(this) );
3385  }
3386  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
3387  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
3388  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3389  this->input_ports(), static_cast< sender< output_type > *>(this) );
3390  }
3391  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
3392  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
3393  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3394  this->input_ports(), static_cast< sender< output_type > *>(this) );
3395  }
3396  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
3397  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
3398  unfolded_type(g, b0, b1, b2, b3, b4) {
3399  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3400  this->input_ports(), static_cast< sender< output_type > *>(this) );
3401  }
3402 #if __TBB_VARIADIC_MAX >= 6
3403  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3404  typename __TBB_B5>
3405  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
3406  unfolded_type(g, b0, b1, b2, b3, b4, b5) {
3407  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3408  this->input_ports(), static_cast< sender< output_type > *>(this) );
3409  }
3410 #endif
3411 #if __TBB_VARIADIC_MAX >= 7
3412  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3413  typename __TBB_B5, typename __TBB_B6>
3414  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
3415  unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
3416  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3417  this->input_ports(), static_cast< sender< output_type > *>(this) );
3418  }
3419 #endif
3420 #if __TBB_VARIADIC_MAX >= 8
3421  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3422  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
3423  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3424  __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
3425  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3426  this->input_ports(), static_cast< sender< output_type > *>(this) );
3427  }
3428 #endif
3429 #if __TBB_VARIADIC_MAX >= 9
3430  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3431  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
3432  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3433  __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
3434  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3435  this->input_ports(), static_cast< sender< output_type > *>(this) );
3436  }
3437 #endif
3438 #if __TBB_VARIADIC_MAX >= 10
3439  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3440  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
3441  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3442  __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
3443  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3444  this->input_ports(), static_cast< sender< output_type > *>(this) );
3445  }
3446 #endif
3447 
3448 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3449  template <typename... Args, typename... Bodies>
3450  __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
3451  : join_node(nodes.graph_reference(), bodies...) {
3452  make_edges_in_order(nodes, *this);
3453  }
3454 #endif
3455 
3457  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3458  this->input_ports(), static_cast< sender< output_type > *>(this) );
3459  }
3460 
3461 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3462  void set_name( const char *name ) __TBB_override {
3464  }
3465 #endif
3466 
3467 };
3468 
3469 // indexer node
3471 
3472 // TODO: Implement interface with variadic template or tuple
3473 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
3474  typename T4=null_type, typename T5=null_type, typename T6=null_type,
3475  typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
3476 
3477 //indexer node specializations
3478 template<typename T0>
3479 class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
3480 private:
3481  static const int N = 1;
3482 public:
3483  typedef tuple<T0> InputTuple;
3487  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3488  this->input_ports(), static_cast< sender< output_type > *>(this) );
3489  }
3490 
3491 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3492  template <typename... Args>
3493  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3494  make_edges_in_order(nodes, *this);
3495  }
3496 #endif
3497 
3498  // Copy constructor
3500  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3501  this->input_ports(), static_cast< sender< output_type > *>(this) );
3502  }
3503 
3504 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3505  void set_name( const char *name ) __TBB_override {
3507  }
3508 #endif
3509 };
3510 
3511 template<typename T0, typename T1>
3512 class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
3513 private:
3514  static const int N = 2;
3515 public:
3516  typedef tuple<T0, T1> InputTuple;
3520  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3521  this->input_ports(), static_cast< sender< output_type > *>(this) );
3522  }
3523 
3524 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3525  template <typename... Args>
3526  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3527  make_edges_in_order(nodes, *this);
3528  }
3529 #endif
3530 
3531  // Copy constructor
3533  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3534  this->input_ports(), static_cast< sender< output_type > *>(this) );
3535  }
3536 
3537 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3538  void set_name( const char *name ) __TBB_override {
3540  }
3541 #endif
3542 };
3543 
3544 template<typename T0, typename T1, typename T2>
3545 class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
3546 private:
3547  static const int N = 3;
3548 public:
3549  typedef tuple<T0, T1, T2> InputTuple;
3553  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3554  this->input_ports(), static_cast< sender< output_type > *>(this) );
3555  }
3556 
3557 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3558  template <typename... Args>
3559  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3560  make_edges_in_order(nodes, *this);
3561  }
3562 #endif
3563 
3564  // Copy constructor
3566  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3567  this->input_ports(), static_cast< sender< output_type > *>(this) );
3568  }
3569 
3570 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3571  void set_name( const char *name ) __TBB_override {
3573  }
3574 #endif
3575 };
3576 
3577 template<typename T0, typename T1, typename T2, typename T3>
3578 class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
3579 private:
3580  static const int N = 4;
3581 public:
3582  typedef tuple<T0, T1, T2, T3> InputTuple;
3586  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3587  this->input_ports(), static_cast< sender< output_type > *>(this) );
3588  }
3589 
3590 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3591  template <typename... Args>
3592  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3593  make_edges_in_order(nodes, *this);
3594  }
3595 #endif
3596 
3597  // Copy constructor
3599  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3600  this->input_ports(), static_cast< sender< output_type > *>(this) );
3601  }
3602 
3603 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3604  void set_name( const char *name ) __TBB_override {
3606  }
3607 #endif
3608 };
3609 
3610 template<typename T0, typename T1, typename T2, typename T3, typename T4>
3611 class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
3612 private:
3613  static const int N = 5;
3614 public:
3615  typedef tuple<T0, T1, T2, T3, T4> InputTuple;
3619  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3620  this->input_ports(), static_cast< sender< output_type > *>(this) );
3621  }
3622 
3623 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3624  template <typename... Args>
3625  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3626  make_edges_in_order(nodes, *this);
3627  }
3628 #endif
3629 
3630  // Copy constructor
3632  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3633  this->input_ports(), static_cast< sender< output_type > *>(this) );
3634  }
3635 
3636 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3637  void set_name( const char *name ) __TBB_override {
3639  }
3640 #endif
3641 };
3642 
3643 #if __TBB_VARIADIC_MAX >= 6
3644 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
3645 class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
3646 private:
3647  static const int N = 6;
3648 public:
3649  typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
3653  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3654  this->input_ports(), static_cast< sender< output_type > *>(this) );
3655  }
3656 
3657 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3658  template <typename... Args>
3659  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3660  make_edges_in_order(nodes, *this);
3661  }
3662 #endif
3663 
3664  // Copy constructor
3666  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3667  this->input_ports(), static_cast< sender< output_type > *>(this) );
3668  }
3669 
3670 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3671  void set_name( const char *name ) __TBB_override {
3673  }
3674 #endif
3675 };
3676 #endif //variadic max 6
3677 
3678 #if __TBB_VARIADIC_MAX >= 7
3679 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3680  typename T6>
3681 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3682 private:
3683  static const int N = 7;
3684 public:
3685  typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3689  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3690  this->input_ports(), static_cast< sender< output_type > *>(this) );
3691  }
3692 
3693 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3694  template <typename... Args>
3695  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3696  make_edges_in_order(nodes, *this);
3697  }
3698 #endif
3699 
3700  // Copy constructor
3702  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3703  this->input_ports(), static_cast< sender< output_type > *>(this) );
3704  }
3705 
3706 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3707  void set_name( const char *name ) __TBB_override {
3709  }
3710 #endif
3711 };
3712 #endif //variadic max 7
3713 
3714 #if __TBB_VARIADIC_MAX >= 8
3715 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3716  typename T6, typename T7>
3717 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3718 private:
3719  static const int N = 8;
3720 public:
3721  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3725  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3726  this->input_ports(), static_cast< sender< output_type > *>(this) );
3727  }
3728 
3729 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3730  template <typename... Args>
3731  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3732  make_edges_in_order(nodes, *this);
3733  }
3734 #endif
3735 
3736  // Copy constructor
3737  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3738  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3739  this->input_ports(), static_cast< sender< output_type > *>(this) );
3740  }
3741 
3742 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3743  void set_name( const char *name ) __TBB_override {
3745  }
3746 #endif
3747 };
3748 #endif //variadic max 8
3749 
3750 #if __TBB_VARIADIC_MAX >= 9
3751 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3752  typename T6, typename T7, typename T8>
3753 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3754 private:
3755  static const int N = 9;
3756 public:
3757  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3761  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3762  this->input_ports(), static_cast< sender< output_type > *>(this) );
3763  }
3764 
3765 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3766  template <typename... Args>
3767  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3768  make_edges_in_order(nodes, *this);
3769  }
3770 #endif
3771 
3772  // Copy constructor
3774  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3775  this->input_ports(), static_cast< sender< output_type > *>(this) );
3776  }
3777 
3778 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3779  void set_name( const char *name ) __TBB_override {
3781  }
3782 #endif
3783 };
3784 #endif //variadic max 9
3785 
3786 #if __TBB_VARIADIC_MAX >= 10
3787 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3788  typename T6, typename T7, typename T8, typename T9>
3789 class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
3790 private:
3791  static const int N = 10;
3792 public:
3793  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3797  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3798  this->input_ports(), static_cast< sender< output_type > *>(this) );
3799  }
3800 
3801 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3802  template <typename... Args>
3803  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3804  make_edges_in_order(nodes, *this);
3805  }
3806 #endif
3807 
3808  // Copy constructor
3810  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3811  this->input_ports(), static_cast< sender< output_type > *>(this) );
3812  }
3813 
3814 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3815  void set_name( const char *name ) __TBB_override {
3817  }
3818 #endif
3819 };
3820 #endif //variadic max 10
3821 
3822 #if __TBB_PREVIEW_ASYNC_MSG
3824 #else
3825 template< typename T >
3826 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
3827 #endif
3828 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3829  s.internal_add_built_predecessor(p);
3830  p.internal_add_built_successor(s);
3831 #endif
3832  p.register_successor( s );
3834 }
3835 
3837 template< typename T >
3838 inline void make_edge( sender<T> &p, receiver<T> &s ) {
3839  internal_make_edge( p, s );
3840 }
3841 
3842 #if __TBB_PREVIEW_ASYNC_MSG
3843 template< typename TS, typename TR,
3846 inline void make_edge( TS &p, TR &s ) {
3847  internal_make_edge( p, s );
3848 }
3849 
3850 template< typename T >
3852  internal_make_edge( p, s );
3853 }
3854 
3855 template< typename T >
3857  internal_make_edge( p, s );
3858 }
3859 
3860 #endif // __TBB_PREVIEW_ASYNC_MSG
3861 
3862 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3863 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
3864 template< typename T, typename V,
3865  typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3866 inline void make_edge( T& output, V& input) {
3867  make_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3868 }
3869 
3870 //Makes an edge from port 0 of a multi-output predecessor to a receiver.
3871 template< typename T, typename R,
3872  typename = typename T::output_ports_type >
3873 inline void make_edge( T& output, receiver<R>& input) {
3874  make_edge(get<0>(output.output_ports()), input);
3875 }
3876 
3877 //Makes an edge from a sender to port 0 of a multi-input successor.
3878 template< typename S, typename V,
3879  typename = typename V::input_ports_type >
3880 inline void make_edge( sender<S>& output, V& input) {
3881  make_edge(output, get<0>(input.input_ports()));
3882 }
3883 #endif
3884 
3885 #if __TBB_PREVIEW_ASYNC_MSG
3887 #else
3888 template< typename T >
3889 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
3890 #endif
3891  p.remove_successor( s );
3892 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3893  // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
3894  p.internal_delete_built_successor(s);
3895  s.internal_delete_built_predecessor(p);
3896 #endif
3898 }
3899 
3901 template< typename T >
3902 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
3903  internal_remove_edge( p, s );
3904 }
3905 
3906 #if __TBB_PREVIEW_ASYNC_MSG
3907 template< typename TS, typename TR,
3910 inline void remove_edge( TS &p, TR &s ) {
3911  internal_remove_edge( p, s );
3912 }
3913 
3914 template< typename T >
3916  internal_remove_edge( p, s );
3917 }
3918 
3919 template< typename T >
3921  internal_remove_edge( p, s );
3922 }
3923 #endif // __TBB_PREVIEW_ASYNC_MSG
3924 
3925 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3926 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
3927 template< typename T, typename V,
3928  typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3929 inline void remove_edge( T& output, V& input) {
3930  remove_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3931 }
3932 
3933 //Removes an edge between port 0 of a multi-output predecessor and a receiver.
3934 template< typename T, typename R,
3935  typename = typename T::output_ports_type >
3936 inline void remove_edge( T& output, receiver<R>& input) {
3937  remove_edge(get<0>(output.output_ports()), input);
3938 }
3939 //Removes an edge between a sender and port 0 of a multi-input successor.
3940 template< typename S, typename V,
3941  typename = typename V::input_ports_type >
3942 inline void remove_edge( sender<S>& output, V& input) {
3943  remove_edge(output, get<0>(input.input_ports()));
3944 }
3945 #endif
3946 
3947 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3948 template<typename C >
3949 template< typename S >
3950 void internal::edge_container<C>::sender_extract( S &s ) {
3951  edge_list_type e = built_edges;
3952  for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3953  remove_edge(s, **i);
3954  }
3955 }
3956 
3957 template<typename C >
3958 template< typename R >
3959 void internal::edge_container<C>::receiver_extract( R &r ) {
3960  edge_list_type e = built_edges;
3961  for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3962  remove_edge(**i, r);
3963  }
3964 }
3965 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3966 
3968 template< typename Body, typename Node >
3969 Body copy_body( Node &n ) {
3970  return n.template copy_function_object<Body>();
3971 }
3972 
3973 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3974 
3975 //composite_node
3976 template< typename InputTuple, typename OutputTuple > class composite_node;
3977 
3978 template< typename... InputTypes, typename... OutputTypes>
3979 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<OutputTypes...> > : public graph_node{
3980 
3981 public:
3982  typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3983  typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3984 
3985 private:
3986  std::unique_ptr<input_ports_type> my_input_ports;
3987  std::unique_ptr<output_ports_type> my_output_ports;
3988 
3989  static const size_t NUM_INPUTS = sizeof...(InputTypes);
3990  static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3991 
3992 protected:
3994 
3995 public:
3996 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3997  composite_node( graph &g, const char *type_name = "composite_node" ) : graph_node(g) {
3998  tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
4000  }
4001 #else
4003  tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
4004  }
4005 #endif
4006 
4007  template<typename T1, typename T2>
4008  void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
4009  __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
4010  __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
4011  my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T1>(input_ports_tuple));
4012  my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T2>(output_ports_tuple));
4013 
4016  }
4017 
4018  template< typename... NodeTypes >
4019  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4020 
4021  template< typename... NodeTypes >
4022  void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4023 
4024 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4025  void set_name( const char *name ) __TBB_override {
4027  }
4028 #endif
4029 
4031  __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
4032  return *my_input_ports;
4033  }
4034 
4036  __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
4037  return *my_output_ports;
4038  }
4039 
4040 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4041  void extract() __TBB_override {
4042  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4043  }
4044 #endif
4045 }; // class composite_node
4046 
4047 //composite_node with only input ports
4048 template< typename... InputTypes>
4049 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> > : public graph_node {
4050 public:
4051  typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
4052 
4053 private:
4054  std::unique_ptr<input_ports_type> my_input_ports;
4055  static const size_t NUM_INPUTS = sizeof...(InputTypes);
4056 
4057 protected:
4059 
4060 public:
4061 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4062  composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
4063  tbb::internal::fgt_composite( CODEPTR(), this, &g );
4065  }
4066 #else
4068  tbb::internal::fgt_composite( CODEPTR(), this, &g );
4069  }
4070 #endif
4071 
4072  template<typename T>
4073  void set_external_ports(T&& input_ports_tuple) {
4074  __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
4075 
4076  my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T>(input_ports_tuple));
4077 
4078  tbb::internal::fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, std::forward<T>(input_ports_tuple));
4079  }
4080 
4081  template< typename... NodeTypes >
4082  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4083 
4084  template< typename... NodeTypes >
4085  void add_nodes( const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4086 
4087 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4088  void set_name( const char *name ) __TBB_override {
4090  }
4091 #endif
4092 
4094  __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
4095  return *my_input_ports;
4096  }
4097 
4098 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4099  void extract() __TBB_override {
4100  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4101  }
4102 #endif
4103 
4104 }; // class composite_node
4105 
4106 //composite_nodes with only output_ports
4107 template<typename... OutputTypes>
4108 class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...> > : public graph_node {
4109 public:
4110  typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
4111 
4112 private:
4113  std::unique_ptr<output_ports_type> my_output_ports;
4114  static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
4115 
4116 protected:
4118 
4119 public:
4120 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4121  __TBB_NOINLINE_SYM composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
4122  tbb::internal::fgt_composite( CODEPTR(), this, &g );
4124  }
4125 #else
4127  tbb::internal::fgt_composite( CODEPTR(), this, &g );
4128  }
4129 #endif
4130 
4131  template<typename T>
4132  void set_external_ports(T&& output_ports_tuple) {
4133  __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
4134 
4135  my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T>(output_ports_tuple));
4136 
4137  tbb::internal::fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, std::forward<T>(output_ports_tuple));
4138  }
4139 
4140  template<typename... NodeTypes >
4141  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
4142 
4143  template<typename... NodeTypes >
4144  void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
4145 
4146 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4147  void set_name( const char *name ) __TBB_override {
4149  }
4150 #endif
4151 
4153  __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
4154  return *my_output_ports;
4155  }
4156 
4157 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4158  void extract() __TBB_override {
4159  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
4160  }
4161 #endif
4162 
4163 }; // class composite_node
4164 
4165 #endif // __TBB_FLOW_GRAPH_CPP11_FEATURES
4166 
4167 namespace internal {
4168 
4169 template<typename Gateway>
4171 public:
4172  typedef Gateway gateway_type;
4173 
4174  async_body_base(gateway_type *gateway): my_gateway(gateway) { }
4175  void set_gateway(gateway_type *gateway) {
4176  my_gateway = gateway;
4177  }
4178 
4179 protected:
4181 };
4182 
4183 template<typename Input, typename Ports, typename Gateway, typename Body>
4184 class async_body: public async_body_base<Gateway> {
4185 public:
4187  typedef Gateway gateway_type;
4188 
4189  async_body(const Body &body, gateway_type *gateway)
4190  : base_type(gateway), my_body(body) { }
4191 
4192  void operator()( const Input &v, Ports & ) {
4193  my_body(v, *this->my_gateway);
4194  }
4195 
4196  Body get_body() { return my_body; }
4197 
4198 private:
4199  Body my_body;
4200 };
4201 
4202 } // namespace internal
4203 
4204 } // namespace interfaceX
4205 namespace interface11 {
4206 
4208 template < typename Input, typename Output,
4209  typename Policy = queueing_lightweight,
4210  typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input) >
4212  : public multifunction_node< Input, tuple< Output >, Policy, Allocator >, public sender< Output >
4213 {
4214 #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
4217  "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. "
4218  "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR."
4219  );
4220 #endif
4223 
4224 public:
4225  typedef Input input_type;
4226  typedef Output output_type;
4233 
4234 private:
4238  // TODO: pass value by copy since we do not want to block asynchronous thread.
4239  const Output *value;
4240  bool result;
4241  try_put_functor(output_port_type &p, const Output &v) : port(&p), value(&v), result(false) { }
4242  void operator()() {
4243  result = port->try_put(*value);
4244  }
4245  };
4246 
4247  class receiver_gateway_impl: public receiver_gateway<Output> {
4248  public:
4249  receiver_gateway_impl(async_node* node): my_node(node) {}
4251  tbb::internal::fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
4252  my_node->my_graph.reserve_wait();
4253  }
4254 
4256  my_node->my_graph.release_wait();
4257  tbb::internal::fgt_async_commit(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
4258  }
4259 
4261  bool try_put(const Output &i) __TBB_override {
4262  return my_node->try_put_impl(i);
4263  }
4264 
4265  private:
4267  } my_gateway;
4268 
4269  //The substitute of 'this' for member construction, to prevent compiler warnings
4270  async_node* self() { return this; }
4271 
4273  bool try_put_impl(const Output &i) {
4274  internal::multifunction_output<Output> &port_0 = internal::output_port<0>(*this);
4275  internal::broadcast_cache<output_type>& port_successors = port_0.successors();
4277  task_list tasks;
4278  bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
4279  __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
4280  "Return status is inconsistent with the method operation." );
4281 
4282  while( !tasks.empty() ) {
4283  internal::enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
4284  }
4285  tbb::internal::fgt_async_try_put_end(this, &port_0);
4286  return is_at_least_one_put_successful;
4287  }
4288 
4289 public:
4290  template<typename Body>
4292  graph &g, size_t concurrency,
4294  Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority)
4295 #else
4297 #endif
4298  ) : base_type(
4299  g, concurrency,
4300  internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
4301  (body, &my_gateway) __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority) ), my_gateway(self()) {
4302  tbb::internal::fgt_multioutput_node_with_body<1>(
4303  CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
4304  &this->my_graph, static_cast<receiver<input_type> *>(this),
4305  this->output_ports(), this->my_body
4306  );
4307  }
4308 
4309 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
4310  template <typename Body, typename... Args>
4311  __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t priority)
4312  : async_node(g, concurrency, body, Policy(), priority) {}
4313 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4314 
4315 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4316  template <typename Body, typename... Args>
4318  const node_set<Args...>& nodes, size_t concurrency, Body body,
4320  ) : async_node(nodes.graph_reference(), concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
4321  make_edges_in_order(nodes, *this);
4322  }
4323 
4324 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4325  template <typename Body, typename... Args>
4326  __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
4327  : async_node(nodes, concurrency, body, Policy(), priority) {}
4328 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4329 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4330 
4331  __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
4332  static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
4333  static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
4334 
4335  tbb::internal::fgt_multioutput_node_with_body<1>( CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
4336  &this->my_graph, static_cast<receiver<input_type> *>(this),
4337  this->output_ports(), this->my_body );
4338  }
4339 
4341  return my_gateway;
4342  }
4343 
4344 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4345  void set_name( const char *name ) __TBB_override {
4347  }
4348 #endif
4349 
4350  // Define sender< Output >
4351 
4354  return internal::output_port<0>(*this).register_successor(r);
4355  }
4356 
4359  return internal::output_port<0>(*this).remove_successor(r);
4360  }
4361 
4362  template<typename Body>
4366  mfn_body_type &body_ref = *this->my_body;
4367  async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< internal::multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
4368  return ab.get_body();
4369  }
4370 
4371 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4373  typedef typename internal::edge_container<successor_type> built_successors_type;
4374  typedef typename built_successors_type::edge_list_type successor_list_type;
4375  built_successors_type &built_successors() __TBB_override {
4376  return internal::output_port<0>(*this).built_successors();
4377  }
4378 
4379  void internal_add_built_successor( successor_type &r ) __TBB_override {
4380  internal::output_port<0>(*this).internal_add_built_successor(r);
4381  }
4382 
4383  void internal_delete_built_successor( successor_type &r ) __TBB_override {
4384  internal::output_port<0>(*this).internal_delete_built_successor(r);
4385  }
4386 
4387  void copy_successors( successor_list_type &l ) __TBB_override {
4388  internal::output_port<0>(*this).copy_successors(l);
4389  }
4390 
4391  size_t successor_count() __TBB_override {
4392  return internal::output_port<0>(*this).successor_count();
4393  }
4394 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
4395 
4396 protected:
4397 
4399  base_type::reset_node(f);
4400  }
4401 };
4402 
4403 #if __TBB_PREVIEW_STREAMING_NODE
4405 #endif // __TBB_PREVIEW_STREAMING_NODE
4406 
4408 
4409 template< typename T >
4410 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
4411 public:
4412  typedef T input_type;
4413  typedef T output_type;
4416 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4417  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
4418  typedef typename sender<output_type>::built_successors_type built_successors_type;
4419  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
4420  typedef typename sender<output_type>::successor_list_type successor_list_type;
4421 #endif
4422 
4423  __TBB_NOINLINE_SYM explicit overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
4424  my_successors.set_owner( this );
4425  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4426  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4427  }
4428 
4429 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4430  template <typename... Args>
4431  overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
4432  make_edges_in_order(nodes, *this);
4433  }
4434 #endif
4435 
4438  graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
4439  {
4440  my_successors.set_owner( this );
4441  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4442  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4443  }
4444 
4446 
4447 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4448  void set_name( const char *name ) __TBB_override {
4450  }
4451 #endif
4452 
4454  spin_mutex::scoped_lock l( my_mutex );
4455  if (my_buffer_is_valid && internal::is_graph_active( my_graph )) {
4456  // We have a valid value that must be forwarded immediately.
4457  bool ret = s.try_put( my_buffer );
4458  if ( ret ) {
4459  // We add the successor that accepted our put
4460  my_successors.register_successor( s );
4461  } else {
4462  // In case of reservation a race between the moment of reservation and register_successor can appear,
4463  // because failed reserve does not mean that register_successor is not ready to put a message immediately.
4464  // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
4465  // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
4466  task *rtask = new ( task::allocate_additional_child_of( *( my_graph.root_task() ) ) )
4467  register_predecessor_task( *this, s );
4468  internal::spawn_in_graph_arena( my_graph, *rtask );
4469  }
4470  } else {
4471  // No valid value yet, just add as successor
4472  my_successors.register_successor( s );
4473  }
4474  return true;
4475  }
4476 
4478  spin_mutex::scoped_lock l( my_mutex );
4479  my_successors.remove_successor(s);
4480  return true;
4481  }
4482 
4483 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4484  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
4485  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
4486 
4487  void internal_add_built_successor( successor_type &s) __TBB_override {
4488  spin_mutex::scoped_lock l( my_mutex );
4489  my_successors.internal_add_built_successor(s);
4490  }
4491 
4492  void internal_delete_built_successor( successor_type &s) __TBB_override {
4493  spin_mutex::scoped_lock l( my_mutex );
4494  my_successors.internal_delete_built_successor(s);
4495  }
4496 
4497  size_t successor_count() __TBB_override {
4498  spin_mutex::scoped_lock l( my_mutex );
4499  return my_successors.successor_count();
4500  }
4501 
4502  void copy_successors(successor_list_type &v) __TBB_override {
4503  spin_mutex::scoped_lock l( my_mutex );
4504  my_successors.copy_successors(v);
4505  }
4506 
4507  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
4508  spin_mutex::scoped_lock l( my_mutex );
4509  my_built_predecessors.add_edge(p);
4510  }
4511 
4512  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
4513  spin_mutex::scoped_lock l( my_mutex );
4514  my_built_predecessors.delete_edge(p);
4515  }
4516 
4517  size_t predecessor_count() __TBB_override {
4518  spin_mutex::scoped_lock l( my_mutex );
4519  return my_built_predecessors.edge_count();
4520  }
4521 
4522  void copy_predecessors( predecessor_list_type &v ) __TBB_override {
4523  spin_mutex::scoped_lock l( my_mutex );
4524  my_built_predecessors.copy_edges(v);
4525  }
4526 
4527  void extract() __TBB_override {
4528  my_buffer_is_valid = false;
4529  built_successors().sender_extract(*this);
4530  built_predecessors().receiver_extract(*this);
4531  }
4532 
4533 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
4534 
4536  spin_mutex::scoped_lock l( my_mutex );
4537  if ( my_buffer_is_valid ) {
4538  v = my_buffer;
4539  return true;
4540  }
4541  return false;
4542  }
4543 
4546  return try_get(v);
4547  }
4548 
4550  bool try_release() __TBB_override { return true; }
4551 
4553  bool try_consume() __TBB_override { return true; }
4554 
4555  bool is_valid() {
4556  spin_mutex::scoped_lock l( my_mutex );
4557  return my_buffer_is_valid;
4558  }
4559 
4560  void clear() {
4561  spin_mutex::scoped_lock l( my_mutex );
4562  my_buffer_is_valid = false;
4563  }
4564 
4565 protected:
4566 
4567  template< typename R, typename B > friend class run_and_put_task;
4568  template<typename X, typename Y> friend class internal::broadcast_cache;
4569  template<typename X, typename Y> friend class internal::round_robin_cache;
4572  return try_put_task_impl(v);
4573  }
4574 
4576  my_buffer = v;
4577  my_buffer_is_valid = true;
4578  task * rtask = my_successors.try_put_task(v);
4579  if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
4580  return rtask;
4581  }
4582 
4584  return my_graph;
4585  }
4586 
4589 
4591  o(owner), s(succ) {};
4592 
4594  if (!s.register_predecessor(o)) {
4595  o.register_successor(s);
4596  }
4597  return NULL;
4598  }
4599 
4602  };
4603 
4606 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4607  internal::edge_container<predecessor_type> my_built_predecessors;
4608 #endif
4612 
4614  my_buffer_is_valid = false;
4615  if (f&rf_clear_edges) {
4616  my_successors.clear();
4617  }
4618  }
4619 }; // overwrite_node
4620 
4621 template< typename T >
4622 class write_once_node : public overwrite_node<T> {
4623 public:
4624  typedef T input_type;
4625  typedef T output_type;
4629 
4632  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4633  static_cast<receiver<input_type> *>(this),
4634  static_cast<sender<output_type> *>(this) );
4635  }
4636 
4637 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4638  template <typename... Args>
4639  write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
4640  make_edges_in_order(nodes, *this);
4641  }
4642 #endif
4643 
4646  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4647  static_cast<receiver<input_type> *>(this),
4648  static_cast<sender<output_type> *>(this) );
4649  }
4650 
4651 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4652  void set_name( const char *name ) __TBB_override {
4654  }
4655 #endif
4656 
4657 protected:
4658  template< typename R, typename B > friend class run_and_put_task;
4659  template<typename X, typename Y> friend class internal::broadcast_cache;
4660  template<typename X, typename Y> friend class internal::round_robin_cache;
4662  spin_mutex::scoped_lock l( this->my_mutex );
4663  return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
4664  }
4665 };
4666 
4667 } // interfaceX
4668 
4673 
4674  using interface11::graph;
4675  using interface11::graph_node;
4676  using interface11::continue_msg;
4677  using interface11::source_node;
4678  using interface11::input_node;
4679  using interface11::function_node;
4680  using interface11::multifunction_node;
4681  using interface11::split_node;
4683  using interface11::indexer_node;
4684  using interface11::internal::tagged_msg;
4687  using interface11::continue_node;
4688  using interface11::overwrite_node;
4689  using interface11::write_once_node;
4690  using interface11::broadcast_node;
4691  using interface11::buffer_node;
4692  using interface11::queue_node;
4693  using interface11::sequencer_node;
4694  using interface11::priority_queue_node;
4695  using interface11::limiter_node;
4696  using namespace interface11::internal::graph_policy_namespace;
4697  using interface11::join_node;
4699  using interface11::copy_body;
4700  using interface11::make_edge;
4703 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
4704  using interface11::composite_node;
4705 #endif
4706  using interface11::async_node;
4707 #if __TBB_PREVIEW_ASYNC_MSG
4708  using interface11::async_msg;
4709 #endif
4710 #if __TBB_PREVIEW_STREAMING_NODE
4711  using interface11::port_ref;
4713 #endif // __TBB_PREVIEW_STREAMING_NODE
4714 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4716  using internal::no_priority;
4717 #endif
4718 
4719 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4720  using interface11::internal::follows;
4721  using interface11::internal::precedes;
4722  using interface11::internal::make_node_set;
4723  using interface11::internal::make_edges;
4724 #endif
4725 
4726 } // flow
4727 } // tbb
4728 
4729 // Include deduction guides for node classes
4731 
4732 #undef __TBB_PFG_RESET_ARG
4733 #undef __TBB_COMMA
4734 #undef __TBB_DEFAULT_NODE_ALLOCATOR
4735 
4737 #undef __TBB_flow_graph_H_include_area
4738 
4739 #if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
4740  #undef __TBB_NOINLINE_SYM
4741 #endif
4742 
4743 #endif // __TBB_flow_graph_H
#define __TBB_NOINLINE_SYM
Definition: flow_graph.h:47
#define __TBB_DEFAULT_NODE_ALLOCATOR(T)
Definition: flow_graph.h:71
#define __TBB_FLOW_GRAPH_PRIORITY_ARG1(arg1, priority)
#define __TBB_DEPRECATED_LIMITER_ARG2(arg1, arg2)
#define __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority)
#define __TBB_FLOW_GRAPH_PRIORITY_EXPR(expr)
#define __TBB_DEPRECATED_LIMITER_ARG4(arg1, arg2, arg3, arg4)
#define __TBB_DEPRECATED_LIMITER_EXPR(expr)
__TBB_DEPRECATED internal::port_ref_impl< N1, N2 > port_ref()
class __TBB_DEPRECATED streaming_node
#define CODEPTR()
#define __TBB_CPP11_PRESENT
Definition: tbb_config.h:149
#define __TBB_DEPRECATED
Definition: tbb_config.h:636
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
#define __TBB_override
Definition: tbb_stddef.h:240
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:553
void const char const char int ITT_FORMAT __itt_group_sync s
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark S
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id parent
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void const char const char int ITT_FORMAT __itt_group_sync p
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
The graph class.
class __TBB_DEPRECATED_MSG("tbb::tbb_hash is deprecated, use std::hash") tbb_hash
static void fgt_async_try_put_end(void *, void *)
static void fgt_async_reserve(void *, void *)
static void fgt_multioutput_node_desc(const NodeType *, const char *)
static void fgt_async_try_put_begin(void *, void *)
static void fgt_begin_body(void *)
void call(F &&f, Pack &&p)
Calls the given function with arguments taken from a stored_pack.
static void fgt_node(void *, string_index, void *, void *)
static void fgt_reserve_wait(void *)
static void fgt_graph(void *)
static void fgt_node_with_body(void *, string_index, void *, void *, void *)
static void fgt_release_wait(void *)
static void fgt_async_commit(void *, void *)
static void fgt_composite(void *, void *, void *)
void fgt_multiinput_multioutput_node_desc(const NodeType *, const char *)
static void fgt_multiinput_multioutput_node(void *, string_index, void *, void *)
static void fgt_remove_edge(void *, void *)
static void fgt_end_body(void *)
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:713
static void fgt_node_desc(const NodeType *, const char *)
static void fgt_make_edge(void *, void *)
static void fgt_graph_desc(void *, const char *)
K key_from_message(const T &t)
Definition: flow_graph.h:721
concurrency
An enumeration the provides the two most common concurrency levels: unlimited and serial.
Definition: flow_graph.h:106
void internal_remove_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3886
void internal_make_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3823
void make_edge(sender< T > &p, receiver< T > &s)
Makes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3838
class __TBB_DEPRECATED async_msg
Definition: flow_graph.h:216
void remove_edge(sender< T > &p, receiver< T > &s)
Removes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3902
Body copy_body(Node &n)
Returns a copy of the body from a function or continue node.
Definition: flow_graph.h:3969
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:199
void spawn_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
void deactivate_graph(tbb::flow::interface10::graph &g)
void enqueue_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Enqueues a task inside graph arena.
bool is_graph_active(tbb::flow::interface10::graph &g)
void add_task_to_graph_reset_list(tbb::flow::interface10::graph &g, tbb::task *tp)
void activate_graph(tbb::flow::interface10::graph &g)
tbb::flow::tuple_element< N, typename MOP::output_ports_type >::type & output_port(MOP &op)
const V & cast_to(T const &t)
void add_nodes_impl(CompositeType *, bool)
tbb::internal::uint64_t tag_value
bool is_a(T const &t)
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
interface11::internal::Policy< queueing, lightweight > queueing_lightweight
static const node_priority_t no_priority
unsigned int node_priority_t
static tbb::task *const SUCCESSFULLY_ENQUEUED
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
An empty class used for messages that mean "I'm done".
Definition: flow_graph.h:114
Forward declaration section.
Definition: flow_graph.h:425
__TBB_DEPRECATED typedef T output_type
The output type of this sender.
Definition: flow_graph.h:428
virtual bool try_get(T &)
Request an item from the sender.
Definition: flow_graph.h:433
virtual bool try_get_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:439
virtual bool try_reserve(T &)
Reserves an item in the sender.
Definition: flow_graph.h:436
__TBB_DEPRECATED typedef internal::async_helpers< T >::filtered_type filtered_type
Definition: flow_graph.h:430
virtual bool try_reserve_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:449
Pure virtual template class that defines a receiver of messages of type T.
Definition: flow_graph.h:462
virtual task * try_put_task(const T &t)=0
Put item to successor; return task to run the successor if possible.
virtual task * try_put_task_wrapper(const void *p, bool is_async) __TBB_override
Definition: flow_graph.h:481
__TBB_DEPRECATED typedef T input_type
The input type of this receiver.
Definition: flow_graph.h:467
__TBB_DEPRECATED typedef internal::async_helpers< T >::filtered_type filtered_type
Definition: flow_graph.h:469
bool try_put(const typename internal::async_helpers< T >::async_type &t)
Definition: flow_graph.h:476
bool try_put(const typename internal::async_helpers< T >::filtered_type &t)
Put an item to the receiver.
Definition: flow_graph.h:472
Forwards messages only if the threshold has not been reached.
Definition: flow_graph.h:2985
task * try_put_task(const T &t) __TBB_override
Puts an item to this receiver.
Definition: flow_graph.h:3233
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:3163
task * decrement_counter(long long delta)
Definition: flow_graph.h:3076
bool register_successor(successor_type &r) __TBB_override
Replace the current successor with this new successor.
Definition: flow_graph.h:3146
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:3260
limiter_node(graph &g, __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
Constructor.
Definition: flow_graph.h:3110
bool register_predecessor(predecessor_type &src) __TBB_override
Adds src to the list of cached predecessors.
Definition: flow_graph.h:3210
internal::broadcast_cache< T > my_successors
Definition: flow_graph.h:3005
limiter_node(const limiter_node &src)
Copy constructor.
Definition: flow_graph.h:3129
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:3262
bool remove_predecessor(predecessor_type &src) __TBB_override
Removes src from the list of cached predecessors.
Definition: flow_graph.h:3222
internal::decrementer< limiter_node< T, DecrementType >, DecrementType > decrement
The internal receiver< DecrementType > that decrements the count.
Definition: flow_graph.h:3101
internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors
Definition: flow_graph.h:3003
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2989
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:3266
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2990
static const void * to_void_ptr(const T &t)
Definition: flow_graph.h:229
static const T & from_void_ptr(const void *p)
Definition: flow_graph.h:237
static task * try_put_task_wrapper_impl(receiver< T > *const this_recv, const void *p, bool is_async)
Definition: flow_graph.h:245
virtual bool try_get_wrapper(void *p, bool is_async)=0
virtual bool try_consume()
Consumes the reserved item.
Definition: flow_graph.h:329
bool try_reserve(X &t)
Reserves an item in the sender.
Definition: flow_graph.h:350
untyped_receiver successor_type
The successor type for this node.
Definition: flow_graph.h:311
virtual bool register_successor(successor_type &r)=0
Add a new successor to this node.
virtual bool try_release()
Releases the reserved item.
Definition: flow_graph.h:326
virtual bool remove_successor(successor_type &r)=0
Removes a successor from this node.
bool try_get(X &t)
Request an item from the sender.
Definition: flow_graph.h:344
virtual bool try_reserve_wrapper(void *p, bool is_async)=0
virtual bool register_predecessor(predecessor_type &)
Add a predecessor to the node.
Definition: flow_graph.h:389
virtual task * try_put_task_wrapper(const void *p, bool is_async)=0
bool try_put(const X &t)
Put an item to the receiver.
Definition: flow_graph.h:377
virtual void reset_receiver(reset_flags f=rf_reset_protocol)=0
put receiver back in initial state
virtual bool remove_predecessor(predecessor_type &)
Remove a predecessor from the node.
Definition: flow_graph.h:392
untyped_sender predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:370
Base class for receivers of completion messages.
Definition: flow_graph.h:599
__TBB_DEPRECATED typedef continue_msg input_type
The input type.
Definition: flow_graph.h:603
__TBB_DEPRECATED bool register_predecessor(predecessor_type &) __TBB_override
Increments the trigger threshold.
Definition: flow_graph.h:624
void reset_receiver(reset_flags f) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:698
__TBB_DEPRECATED continue_receiver(__TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority))
Constructor.
Definition: flow_graph.h:609
virtual task * execute()=0
Does whatever should happen when the threshold is reached.
bool is_continue_receiver() __TBB_override
Definition: flow_graph.h:713
task * try_put_task(const input_type &) __TBB_override
Definition: flow_graph.h:672
__TBB_DEPRECATED continue_receiver(const continue_receiver &src)
Copy constructor.
Definition: flow_graph.h:617
__TBB_DEPRECATED typedef receiver< input_type >::predecessor_type predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:606
__TBB_DEPRECATED bool remove_predecessor(predecessor_type &) __TBB_override
Decrements the trigger threshold.
Definition: flow_graph.h:634
An executable node that acts as a source, i.e. it has no predecessors.
Definition: flow_graph.h:904
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:1039
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:972
void spawn_put()
Spawns a task that applies the body.
Definition: flow_graph.h:1145
bool try_reserve_apply_body(output_type &v)
Definition: flow_graph.h:1110
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:963
__TBB_NOINLINE_SYM input_node(const input_node &src)
Copy constructor.
Definition: flow_graph.h:942
internal::input_body< output_type > * my_init_body
Definition: flow_graph.h:1103
bool try_reserve(output_type &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:1022
internal::input_body< output_type > * my_body
Definition: flow_graph.h:1102
void activate()
Activates a node that was created in the inactive state.
Definition: flow_graph.h:1061
internal::broadcast_cache< output_type > my_successors
Definition: flow_graph.h:1104
void reset_node(reset_flags f) __TBB_override
resets the input_node to its initial state
Definition: flow_graph.h:1086
sender< output_type >::successor_type successor_type
The type of successors of this node.
Definition: flow_graph.h:910
task * apply_body_bypass()
Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
Definition: flow_graph.h:1153
Output output_type
The type of the output message, which is complete.
Definition: flow_graph.h:907
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:1049
bool try_get(output_type &v) __TBB_override
Request an item from the node.
Definition: flow_graph.h:1004
__TBB_NOINLINE_SYM input_node(graph &g, Body body)
Constructor for a node with a successor.
Definition: flow_graph.h:922
An executable node that acts as a source, i.e. it has no predecessors.
Definition: flow_graph.h:1189
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:1333
__TBB_NOINLINE_SYM source_node(const source_node &src)
Copy constructor.
Definition: flow_graph.h:1227
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:1257
bool try_reserve_apply_body(output_type &v)
Definition: flow_graph.h:1398
__TBB_NOINLINE_SYM source_node(graph &g, Body body, bool is_active=true)
Constructor for a node with a successor.
Definition: flow_graph.h:1207
void activate()
Activates a node that was created in the inactive state.
Definition: flow_graph.h:1345
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:1248
Output output_type
The type of the output message, which is complete.
Definition: flow_graph.h:1192
void reset_node(reset_flags f) __TBB_override
resets the source_node to its initial state
Definition: flow_graph.h:1370
task * apply_body_bypass()
Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
Definition: flow_graph.h:1438
internal::source_body< output_type > * my_init_body
Definition: flow_graph.h:1391
internal::source_body< output_type > * my_body
Definition: flow_graph.h:1390
sender< output_type >::successor_type successor_type
The type of successors of this node.
Definition: flow_graph.h:1195
internal::broadcast_cache< output_type > my_successors
Definition: flow_graph.h:1392
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:1323
bool try_get(output_type &v) __TBB_override
Request an item from the node.
Definition: flow_graph.h:1289
bool try_reserve(output_type &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:1306
Implements a function node that supports Input -> Output.
Definition: flow_graph.h:1463
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1484
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1482
internal::function_input_queue< input_type, internals_allocator > input_queue_type
Definition: flow_graph.h:1481
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1483
cache_aligned_allocator< Input > internals_allocator
Definition: flow_graph.h:1468
__TBB_NOINLINE_SYM function_node(const function_node &src)
Copy constructor.
Definition: flow_graph.h:1530
internal::function_input< input_type, output_type, Policy, internals_allocator > input_impl_type
Definition: flow_graph.h:1480
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
__TBB_NOINLINE_SYM function_node(graph &g, size_t concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body body, node_priority_t priority=tbb::flow::internal::no_priority))
Constructor.
Definition: flow_graph.h:1496
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1559
internal::broadcast_cache< output_type > & successors() __TBB_override
Definition: flow_graph.h:1557
implements a function node that supports Input -> (set of outputs)
Definition: flow_graph.h:1592
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1675
__TBB_NOINLINE_SYM multifunction_node(const multifunction_node &other)
Definition: flow_graph.h:1654
internal::wrap_tuple_elements< N, internal::multifunction_output, Output >::type output_ports_type
Definition: flow_graph.h:1610
__TBB_NOINLINE_SYM multifunction_node(graph &g, size_t concurrency,)
Definition: flow_graph.h:1618
internal::multifunction_input< input_type, output_ports_type, Policy, internals_allocator > input_impl_type
Definition: flow_graph.h:1612
internal::function_input_queue< input_type, internals_allocator > input_queue_type
Definition: flow_graph.h:1613
cache_aligned_allocator< Input > internals_allocator
Definition: flow_graph.h:1596
split_node: accepts a tuple as input, forwards each element of the tuple to its
Definition: flow_graph.h:1681
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:1752
__TBB_NOINLINE_SYM split_node(graph &g)
Definition: flow_graph.h:1708
output_ports_type & output_ports()
Definition: flow_graph.h:1737
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
task * try_put_task(const TupleType &t) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:1740
receiver< TupleType > base_type
Definition: flow_graph.h:1683
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:1751
internal::wrap_tuple_elements< N, internal::multifunction_output, TupleType >::type output_ports_type
Definition: flow_graph.h:1706
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1745
__TBB_NOINLINE_SYM split_node(const split_node &other)
Definition: flow_graph.h:1723
Implements an executable node that supports continue_msg -> Output.
Definition: flow_graph.h:1782
internal::broadcast_cache< output_type > & successors() __TBB_override
Definition: flow_graph.h:1894
internal::continue_input< Output, Policy > input_impl_type
Definition: flow_graph.h:1786
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1787
__TBB_NOINLINE_SYM continue_node(const continue_node &src)
Copy constructor.
Definition: flow_graph.h:1868
__TBB_NOINLINE_SYM continue_node(graph &g,)
Constructor for executable node with continue_msg -> Output.
Definition: flow_graph.h:1793
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1896
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1788
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1789
__TBB_NOINLINE_SYM continue_node(graph &g, int number_of_predecessors,)
Constructor for executable node with continue_msg -> Output.
Definition: flow_graph.h:1830
Forwards messages of type T to all successors.
Definition: flow_graph.h:1905
__TBB_NOINLINE_SYM broadcast_node(graph &g)
Definition: flow_graph.h:1923
internal::broadcast_cache< input_type > my_successors
Definition: flow_graph.h:1916
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2029
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:2025
__TBB_NOINLINE_SYM broadcast_node(const broadcast_node &src)
Definition: flow_graph.h:1937
task * try_put_task(const T &t) __TBB_override
build a task to run the successor if possible. Default is old behavior.
Definition: flow_graph.h:2019
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:1910
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2031
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:1909
bool register_successor(successor_type &r) __TBB_override
Adds a successor.
Definition: flow_graph.h:1952
bool remove_successor(successor_type &r) __TBB_override
Removes s as a successor.
Definition: flow_graph.h:1958
Forwards messages in arbitrary order.
Definition: flow_graph.h:2051
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:2506
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:2487
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:2538
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2061
virtual bool internal_push(buffer_operation *op)
Definition: flow_graph.h:2323
__TBB_NOINLINE_SYM buffer_node(graph &g)
Constructor.
Definition: flow_graph.h:2359
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2282
task * grab_forwarding_task(buffer_operation &op_data)
Definition: flow_graph.h:2182
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2060
void handle_operations_impl(buffer_operation *op_list, derived_type *derived)
Definition: flow_graph.h:2134
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:2497
virtual void internal_rem_succ(buffer_operation *op)
Remove successor.
Definition: flow_graph.h:2219
internal::round_robin_cache< T, null_rw_mutex > my_successors
Definition: flow_graph.h:2077
__TBB_NOINLINE_SYM buffer_node(const buffer_node &src)
Copy constructor.
Definition: flow_graph.h:2377
bool try_get(T &v) __TBB_override
Request an item from the buffer_node.
Definition: flow_graph.h:2476
virtual void internal_consume(buffer_operation *op)
Definition: flow_graph.h:2347
virtual task * forward_task()
This is executed by an enqueued task, the "forwarder".
Definition: flow_graph.h:2196
virtual void internal_pop(buffer_operation *op)
Definition: flow_graph.h:2329
virtual void internal_reserve(buffer_operation *op)
Definition: flow_graph.h:2338
internal::aggregator< handler_type, buffer_operation > my_aggregator
Definition: flow_graph.h:2127
task * try_put_task(const T &t) __TBB_override
receive an item, return a task *if possible
Definition: flow_graph.h:2519
internal::aggregating_functor< class_type, buffer_operation > handler_type
Definition: flow_graph.h:2125
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2542
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
buffer_node< T, Allocator > class_type
Definition: flow_graph.h:2062
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2553
bool register_successor(successor_type &r) __TBB_override
Adds a new successor.
Definition: flow_graph.h:2399
bool enqueue_forwarding_task(buffer_operation &op_data)
Definition: flow_graph.h:2186
cache_aligned_allocator< T > internals_allocator
Definition: flow_graph.h:2055
void internal_forward_task_impl(buffer_operation *op, derived_type *derived)
Definition: flow_graph.h:2299
virtual void handle_operations(buffer_operation *op_list)
Definition: flow_graph.h:2129
virtual void internal_forward_task(buffer_operation *op)
Tries to forward valid items to successors.
Definition: flow_graph.h:2294
virtual void internal_reg_succ(buffer_operation *op)
Register successor.
Definition: flow_graph.h:2213
bool remove_successor(successor_type &r) __TBB_override
Removes a successor.
Definition: flow_graph.h:2461
virtual void internal_release(buffer_operation *op)
Definition: flow_graph.h:2352
Forwards messages in FIFO order.
Definition: flow_graph.h:2568
__TBB_NOINLINE_SYM queue_node(const queue_node &src)
Copy constructor.
Definition: flow_graph.h:2648
__TBB_NOINLINE_SYM queue_node(graph &g)
Constructor.
Definition: flow_graph.h:2634
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2589
base_type::size_type size_type
Definition: flow_graph.h:2578
void internal_pop(queue_operation *op) __TBB_override
Definition: flow_graph.h:2604
void internal_consume(queue_operation *op) __TBB_override
Definition: flow_graph.h:2622
buffer_node< T, Allocator > base_type
Definition: flow_graph.h:2577
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2630
void internal_forward_task(queue_operation *op) __TBB_override
Definition: flow_graph.h:2600
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2661
void internal_reserve(queue_operation *op) __TBB_override
Definition: flow_graph.h:2613
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
base_type::buffer_operation queue_operation
Definition: flow_graph.h:2579
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2631
Forwards messages in sequence order.
Definition: flow_graph.h:2668
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2682
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will be removed. " "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface.")
bool internal_push(sequencer_operation *op) __TBB_override
Definition: flow_graph.h:2724
__TBB_NOINLINE_SYM sequencer_node(graph &g, const Sequencer &s)
Constructor.
Definition: flow_graph.h:2687
buffer_node< T, Allocator >::size_type size_type
Definition: flow_graph.h:2720
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2683
buffer_node< T, Allocator >::buffer_operation sequencer_operation
Definition: flow_graph.h:2721
internal::function_body< T, size_t > * my_sequencer
Definition: flow_graph.h:2669
__TBB_NOINLINE_SYM sequencer_node(const sequencer_node &src)
Copy constructor.
Definition: flow_graph.h:2703
Forwards messages in priority order.
Definition: flow_graph.h:2749
bool internal_push(prio_operation *op) __TBB_override
Definition: flow_graph.h:2816
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2873
void internal_forward_task(prio_operation *op) __TBB_override
Tries to forward valid items to successors.
Definition: flow_graph.h:2808
void internal_pop(prio_operation *op) __TBB_override
Definition: flow_graph.h:2822
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. " "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR.")
buffer_node< T, Allocator >::buffer_operation prio_operation
Definition: flow_graph.h:2805
buffer_node< T, Allocator > base_type
Definition: flow_graph.h:2760
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2762
void handle_operations(prio_operation *op_list) __TBB_override
Definition: flow_graph.h:2812
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2763
__TBB_NOINLINE_SYM priority_queue_node(const priority_queue_node &src)
Copy constructor.
Definition: flow_graph.h:2782
buffer_node< T, Allocator >::size_type size_type
Definition: flow_graph.h:2803
buffer_node< T, Allocator >::item_type item_type
Definition: flow_graph.h:2804
void internal_reserve(prio_operation *op) __TBB_override
Definition: flow_graph.h:2836
void internal_consume(prio_operation *op) __TBB_override
Definition: flow_graph.h:2848
__TBB_NOINLINE_SYM priority_queue_node(graph &g, const Compare &comp=Compare())
Constructor.
Definition: flow_graph.h:2766
void internal_release(prio_operation *op) __TBB_override
Definition: flow_graph.h:2854
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2798
__TBB_NOINLINE_SYM join_node(const join_node &other)
Definition: flow_graph.h:3310
internal::unfolded_join_node< N, reserving_port, OutputTuple, reserving > unfolded_type
Definition: flow_graph.h:3294
__TBB_NOINLINE_SYM join_node(const join_node &other)
Definition: flow_graph.h:3343
internal::unfolded_join_node< N, queueing_port, OutputTuple, queueing > unfolded_type
Definition: flow_graph.h:3327
internal::unfolded_join_node< N, key_matching_port, OutputTuple, key_matching< K, KHash > > unfolded_type
Definition: flow_graph.h:3363
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2)
Definition: flow_graph.h:3387
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1)
Definition: flow_graph.h:3382
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6)
Definition: flow_graph.h:3414
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8)
Definition: flow_graph.h:3432
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9)
Definition: flow_graph.h:3441
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4)
Definition: flow_graph.h:3397
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5)
Definition: flow_graph.h:3405
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3)
Definition: flow_graph.h:3392
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7)
Definition: flow_graph.h:3423
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3796
tuple< T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > InputTuple
Definition: flow_graph.h:3793
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3795
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > output_type
Definition: flow_graph.h:3794
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3809
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3499
internal::tagged_msg< size_t, T0 > output_type
Definition: flow_graph.h:3484
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3486
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3485
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3518
internal::tagged_msg< size_t, T0, T1 > output_type
Definition: flow_graph.h:3517
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3519
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3532
internal::tagged_msg< size_t, T0, T1, T2 > output_type
Definition: flow_graph.h:3550
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3551
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3565
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3584
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3598
internal::tagged_msg< size_t, T0, T1, T2, T3 > output_type
Definition: flow_graph.h:3583
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3631
internal::tagged_msg< size_t, T0, T1, T2, T3, T4 > output_type
Definition: flow_graph.h:3616
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3617
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3665
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3651
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5 > output_type
Definition: flow_graph.h:3650
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3701
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6 > output_type
Definition: flow_graph.h:3686
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3687
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3723
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7 > output_type
Definition: flow_graph.h:3722
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3773
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3759
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8 > output_type
Definition: flow_graph.h:3758
async_body(const Body &body, gateway_type *gateway)
Definition: flow_graph.h:4189
async_body_base< Gateway > base_type
Definition: flow_graph.h:4186
void operator()(const Input &v, Ports &)
Definition: flow_graph.h:4192
Implements async node.
Definition: flow_graph.h:4213
receiver_type::predecessor_type predecessor_type
Definition: flow_graph.h:4228
bool try_put_impl(const Output &i)
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:4273
multifunction_node< Input, tuple< Output >, Policy, Allocator > base_type
Definition: flow_graph.h:4221
__TBB_STATIC_ASSERT((tbb::internal::is_same_type< Allocator, null_type >::value), "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. " "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR.")
internal::multifunction_input< Input, typename base_type::output_ports_type, Policy, Allocator > mfn_input_type
Definition: flow_graph.h:4222
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:4358
base_type::output_ports_type output_ports_type
Definition: flow_graph.h:4232
internal::async_body_base< gateway_type > async_body_base_type
Definition: flow_graph.h:4231
__TBB_NOINLINE_SYM async_node(const async_node &other)
Definition: flow_graph.h:4331
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:4229
receiver< input_type > receiver_type
Definition: flow_graph.h:4227
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:4353
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:4398
__TBB_NOINLINE_SYM async_node(graph &g, size_t concurrency,)
Definition: flow_graph.h:4291
receiver_gateway< output_type > gateway_type
Definition: flow_graph.h:4230
internal::multifunction_output< Output > output_port_type
Definition: flow_graph.h:4236
try_put_functor(output_port_type &p, const Output &v)
Definition: flow_graph.h:4241
void reserve_wait() __TBB_override
Inform a graph that messages may come from outside, to prevent premature graph completion.
Definition: flow_graph.h:4250
void release_wait() __TBB_override
Inform a graph that a previous call to reserve_wait is no longer in effect.
Definition: flow_graph.h:4255
bool try_put(const Output &i) __TBB_override
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:4261
bool try_get(input_type &v) __TBB_override
Request an item from the sender.
Definition: flow_graph.h:4535
internal::broadcast_cache< input_type, null_rw_mutex > my_successors
Definition: flow_graph.h:4605
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:4414
bool register_successor(successor_type &s) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:4453
__TBB_NOINLINE_SYM overwrite_node(const overwrite_node &src)
Copy constructor; doesn't take anything from src; default won't work.
Definition: flow_graph.h:4437
bool try_release() __TBB_override
Releases the reserved item.
Definition: flow_graph.h:4550
__TBB_NOINLINE_SYM overwrite_node(graph &g)
Definition: flow_graph.h:4423
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:4611
task * try_put_task(const input_type &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:4570
task * try_put_task_impl(const input_type &v)
Definition: flow_graph.h:4575
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:4583
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:4415
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:4545
bool try_consume() __TBB_override
Consumes the reserved item.
Definition: flow_graph.h:4553
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:4613
bool remove_successor(successor_type &s) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:4477
Breaks an infinite loop between the node reservation and register_successor call.
Definition: flow_graph.h:4588
register_predecessor_task(predecessor_type &owner, successor_type &succ)
Definition: flow_graph.h:4590
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
Definition: flow_graph.h:4593
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:4628
__TBB_NOINLINE_SYM write_once_node(const write_once_node &src)
Copy constructor: call base class copy constructor.
Definition: flow_graph.h:4645
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:4627
__TBB_NOINLINE_SYM write_once_node(graph &g)
Constructor.
Definition: flow_graph.h:4631
task * try_put_task(const T &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:4661
uintptr_t status
Zero value means "wait" status, all other values are "user" specified values and are defined into the...
field of type K being used for matching.
virtual input_body * clone()=0
The leaf for input_body.
virtual source_body * clone()=0
The leaf for source_body.
function_body that takes an Input and a set of output ports
leaf for multifunction. OutputSet can be a std::tuple or a vector.
A task that calls a node's forward_task function.
A task that calls a node's apply_body_bypass function with no input.
An abstract cache of successors.
void register_successor(successor_type &r)
void set_owner(owner_type *owner)
void remove_successor(successor_type &r)
A cache of successors that are broadcast to.
task * try_put_task(const T &t) __TBB_override
bool gather_successful_try_puts(const T &t, task_list &tasks)
A cache of successors that are put in a round-robin fashion.
Base class for tasks generated by graph nodes.
void register_node(tbb::flow::interface11::graph_node *n)
Definition: flow_graph.h:820
tbb::flow::interface11::graph_iterator< const graph, const tbb::flow::interface11::graph_node > const_iterator
const_iterator cbegin() const
start const iterator
Definition: flow_graph.h:874
tbb::task_group_context * my_context
tbb::flow::interface11::graph_node * my_nodes
void reset(tbb::flow::interface11::reset_flags f=tbb::flow::interface11::rf_reset_protocol)
Definition: flow_graph.h:843
iterator end()
end iterator
Definition: flow_graph.h:868
graph()
Constructs a graph with isolated task_group_context.
Definition: flow_graph.h:774
const_iterator cend() const
end const iterator
Definition: flow_graph.h:876
void remove_node(tbb::flow::interface11::graph_node *n)
Definition: flow_graph.h:831
__TBB_DEPRECATED tbb::task * root_task()
Returns the root task of the graph.
void reserve_wait() __TBB_override
Used to register that an external entity may still interact with the graph.
Definition: flow_graph.h:806
tbb::flow::interface11::graph_iterator< graph, tbb::flow::interface11::graph_node > iterator
void wait_for_all()
Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
void prepare_task_arena(bool reinit=false)
tbb::flow::interface11::graph_node * my_nodes_last
iterator begin()
start iterator
Definition: flow_graph.h:866
void release_wait() __TBB_override
Deregisters an external entity that may have interacted with the graph.
Definition: flow_graph.h:813
~graph()
Destroys the graph.
Definition: flow_graph.h:798
The base of all graph nodes.
virtual void reset_node(reset_flags f=rf_reset_protocol)=0
void grow_my_array(size_t minimum_size)
Grows the internal array.
item_buffer with reservable front-end. NOTE: if reserving, do not
The two-phase join port.
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
graph & graph_reference() const __TBB_override
Implements methods for a function node that takes a type Input as input and sends.
void reset_function_input(reset_flags f)
Implements methods for a function node that takes a type Input as input.
static task * emit_this(graph &g, const T &t, P &p)
Implements methods for an executable node that takes continue_msg as input.
void reset_receiver(reset_flags f) __TBB_override
Implements methods for both executable and function nodes that puts Output to its successors.
broadcast_cache_type my_successors
broadcast_cache_type & successors()
sender< output_type >::successor_type successor_type
bool try_put(const output_type &i)
Enables one or the other code branches.
Detects whether two given types are the same.
input_filter control to signal end-of-input for parallel_pipeline
Definition: pipeline.h:316
A lock that occupies a single byte.
Definition: spin_mutex.h:39
friend class scoped_lock
Definition: spin_mutex.h:179
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
Used to form groups of tasks.
Definition: task.h:358
void __TBB_EXPORTED_METHOD reset()
Forcefully reinitializes the context after the task tree it was associated with is completed.
Base class for user-defined tasks.
Definition: task.h:615
int decrement_ref_count()
Atomically decrement reference count and returns its new value.
Definition: task.h:788
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
Definition: task.h:663
void set_ref_count(int count)
Set reference count.
Definition: task.h:761
void increment_ref_count()
Atomically increment reference count.
Definition: task.h:771
task that does nothing. Useful for synchronization.
Definition: task.h:1042
A list of children.
Definition: task.h:1074
bool empty() const
True if list is empty; false otherwise.
Definition: task.h:1088
task & pop_front()
Pop the front task from the list.
Definition: task.h:1109
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
Class for determining type of std::allocator<T>::value_type.
Definition: tbb_stddef.h:471

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.