Intel(R) Threading Building Blocks Doxygen Documentation version 4.2.3
Loading...
Searching...
No Matches
_flow_graph_join_impl.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef __TBB__flow_graph_join_impl_H
18#define __TBB__flow_graph_join_impl_H
19
20#ifndef __TBB_flow_graph_H
21#error Do not #include this internal file directly; use public TBB headers instead.
22#endif
23
24namespace internal {
25
27 forwarding_base(graph &g) : graph_ref(g) {}
28 virtual ~forwarding_base() {}
29 // decrement_port_count may create a forwarding task. If we cannot handle the task
30 // ourselves, ask decrement_port_count to deal with it.
31 virtual task * decrement_port_count(bool handle_task) = 0;
32 virtual void increment_port_count() = 0;
33 // moved here so input ports can queue tasks
34 graph& graph_ref;
35 };
36
37 // specialization that lets us keep a copy of the current_key for building results.
38 // KeyType can be a reference type.
39 template<typename KeyType>
43 virtual task * increment_key_count(current_key_type const & /*t*/, bool /*handle_task*/) = 0; // {return NULL;}
44 current_key_type current_key; // so ports can refer to FE's desired items
45 };
46
47 template< int N >
48 struct join_helper {
49
50 template< typename TupleType, typename PortType >
51 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
52 tbb::flow::get<N-1>( my_input ).set_join_node_pointer(port);
54 }
55 template< typename TupleType >
56 static inline void consume_reservations( TupleType &my_input ) {
57 tbb::flow::get<N-1>( my_input ).consume();
59 }
60
61 template< typename TupleType >
62 static inline void release_my_reservation( TupleType &my_input ) {
63 tbb::flow::get<N-1>( my_input ).release();
64 }
65
66 template <typename TupleType>
67 static inline void release_reservations( TupleType &my_input) {
69 release_my_reservation(my_input);
70 }
71
72 template< typename InputTuple, typename OutputTuple >
73 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
74 if ( !tbb::flow::get<N-1>( my_input ).reserve( tbb::flow::get<N-1>( out ) ) ) return false;
75 if ( !join_helper<N-1>::reserve( my_input, out ) ) {
76 release_my_reservation( my_input );
77 return false;
78 }
79 return true;
80 }
81
82 template<typename InputTuple, typename OutputTuple>
83 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
84 bool res = tbb::flow::get<N-1>(my_input).get_item(tbb::flow::get<N-1>(out) ); // may fail
85 return join_helper<N-1>::get_my_item(my_input, out) && res; // do get on other inputs before returning
86 }
87
88 template<typename InputTuple, typename OutputTuple>
89 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
90 return get_my_item(my_input, out);
91 }
92
93 template<typename InputTuple>
94 static inline void reset_my_port(InputTuple &my_input) {
96 tbb::flow::get<N-1>(my_input).reset_port();
97 }
98
99 template<typename InputTuple>
100 static inline void reset_ports(InputTuple& my_input) {
101 reset_my_port(my_input);
102 }
103
104 template<typename InputTuple, typename KeyFuncTuple>
105 static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
106 tbb::flow::get<N-1>(my_input).set_my_key_func(tbb::flow::get<N-1>(my_key_funcs));
107 tbb::flow::get<N-1>(my_key_funcs) = NULL;
108 join_helper<N-1>::set_key_functors(my_input, my_key_funcs);
109 }
110
111 template< typename KeyFuncTuple>
112 static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
113 if(tbb::flow::get<N-1>(other_inputs).get_my_key_func()) {
114 tbb::flow::get<N-1>(my_inputs).set_my_key_func(tbb::flow::get<N-1>(other_inputs).get_my_key_func()->clone());
115 }
116 join_helper<N-1>::copy_key_functors(my_inputs, other_inputs);
117 }
118
119 template<typename InputTuple>
120 static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
122 tbb::flow::get<N-1>(my_input).reset_receiver(f);
123 }
124
125#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
126 template<typename InputTuple>
127 static inline void extract_inputs(InputTuple &my_input) {
129 tbb::flow::get<N-1>(my_input).extract_receiver();
130 }
131#endif
132 }; // join_helper<N>
133
134 template< >
135 struct join_helper<1> {
136
137 template< typename TupleType, typename PortType >
138 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
139 tbb::flow::get<0>( my_input ).set_join_node_pointer(port);
140 }
141
142 template< typename TupleType >
143 static inline void consume_reservations( TupleType &my_input ) {
144 tbb::flow::get<0>( my_input ).consume();
145 }
146
147 template< typename TupleType >
148 static inline void release_my_reservation( TupleType &my_input ) {
149 tbb::flow::get<0>( my_input ).release();
150 }
151
152 template<typename TupleType>
153 static inline void release_reservations( TupleType &my_input) {
154 release_my_reservation(my_input);
155 }
156
157 template< typename InputTuple, typename OutputTuple >
158 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
159 return tbb::flow::get<0>( my_input ).reserve( tbb::flow::get<0>( out ) );
160 }
161
162 template<typename InputTuple, typename OutputTuple>
163 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
164 return tbb::flow::get<0>(my_input).get_item(tbb::flow::get<0>(out));
165 }
166
167 template<typename InputTuple, typename OutputTuple>
168 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
169 return get_my_item(my_input, out);
170 }
171
172 template<typename InputTuple>
173 static inline void reset_my_port(InputTuple &my_input) {
174 tbb::flow::get<0>(my_input).reset_port();
175 }
176
177 template<typename InputTuple>
178 static inline void reset_ports(InputTuple& my_input) {
179 reset_my_port(my_input);
180 }
181
182 template<typename InputTuple, typename KeyFuncTuple>
183 static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
184 tbb::flow::get<0>(my_input).set_my_key_func(tbb::flow::get<0>(my_key_funcs));
185 tbb::flow::get<0>(my_key_funcs) = NULL;
186 }
187
188 template< typename KeyFuncTuple>
189 static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
190 if(tbb::flow::get<0>(other_inputs).get_my_key_func()) {
191 tbb::flow::get<0>(my_inputs).set_my_key_func(tbb::flow::get<0>(other_inputs).get_my_key_func()->clone());
192 }
193 }
194 template<typename InputTuple>
195 static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
196 tbb::flow::get<0>(my_input).reset_receiver(f);
197 }
198
199#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
200 template<typename InputTuple>
201 static inline void extract_inputs(InputTuple &my_input) {
202 tbb::flow::get<0>(my_input).extract_receiver();
203 }
204#endif
205 }; // join_helper<1>
206
208 template< typename T >
209 class reserving_port : public receiver<T> {
210 public:
211 typedef T input_type;
212 typedef typename receiver<input_type>::predecessor_type predecessor_type;
213#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
214 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
215 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
216#endif
217 private:
218 // ----------- Aggregator ------------
220#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
221 , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
222#endif
223 };
225
226 class reserving_port_operation : public aggregated_operation<reserving_port_operation> {
227 public:
228 char type;
229 union {
232#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
233 size_t cnt_val;
234 predecessor_list_type *plist;
235#endif
236 };
238 type(char(t)), my_arg(const_cast<T*>(&e)) {}
240 my_pred(const_cast<predecessor_type *>(&s)) {}
242 };
243
247
250 bool no_predecessors;
251 while(op_list) {
252 current = op_list;
253 op_list = op_list->next;
254 switch(current->type) {
255 case reg_pred:
256 no_predecessors = my_predecessors.empty();
257 my_predecessors.add(*(current->my_pred));
258 if ( no_predecessors ) {
259 (void) my_join->decrement_port_count(true); // may try to forward
260 }
261 __TBB_store_with_release(current->status, SUCCEEDED);
262 break;
263 case rem_pred:
264 my_predecessors.remove(*(current->my_pred));
266 __TBB_store_with_release(current->status, SUCCEEDED);
267 break;
268 case res_item:
269 if ( reserved ) {
270 __TBB_store_with_release(current->status, FAILED);
271 }
272 else if ( my_predecessors.try_reserve( *(current->my_arg) ) ) {
273 reserved = true;
274 __TBB_store_with_release(current->status, SUCCEEDED);
275 } else {
276 if ( my_predecessors.empty() ) {
278 }
279 __TBB_store_with_release(current->status, FAILED);
280 }
281 break;
282 case rel_res:
283 reserved = false;
285 __TBB_store_with_release(current->status, SUCCEEDED);
286 break;
287 case con_res:
288 reserved = false;
290 __TBB_store_with_release(current->status, SUCCEEDED);
291 break;
292#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
293 case add_blt_pred:
294 my_predecessors.internal_add_built_predecessor(*(current->my_pred));
295 __TBB_store_with_release(current->status, SUCCEEDED);
296 break;
297 case del_blt_pred:
298 my_predecessors.internal_delete_built_predecessor(*(current->my_pred));
299 __TBB_store_with_release(current->status, SUCCEEDED);
300 break;
301 case blt_pred_cnt:
302 current->cnt_val = my_predecessors.predecessor_count();
303 __TBB_store_with_release(current->status, SUCCEEDED);
304 break;
305 case blt_pred_cpy:
306 my_predecessors.copy_predecessors(*(current->plist));
307 __TBB_store_with_release(current->status, SUCCEEDED);
308 break;
309#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
310 }
311 }
312 }
313
314 protected:
315 template< typename R, typename B > friend class run_and_put_task;
316 template<typename X, typename Y> friend class internal::broadcast_cache;
317 template<typename X, typename Y> friend class internal::round_robin_cache;
319 return NULL;
320 }
321
323 return my_join->graph_ref;
324 }
325
326 public:
327
330 my_join = NULL;
332 my_aggregator.initialize_handler(handler_type(this));
333 }
334
335 // copy constructor
336 reserving_port(const reserving_port& /* other */) : receiver<T>() {
337 reserved = false;
338 my_join = NULL;
340 my_aggregator.initialize_handler(handler_type(this));
341 }
342
344 my_join = join;
345 }
346
350 my_aggregator.execute(&op_data);
351 return op_data.status == SUCCEEDED;
352 }
353
357 my_aggregator.execute(&op_data);
358 return op_data.status == SUCCEEDED;
359 }
360
362 bool reserve( T &v ) {
364 my_aggregator.execute(&op_data);
365 return op_data.status == SUCCEEDED;
366 }
367
369 void release( ) {
371 my_aggregator.execute(&op_data);
372 }
373
375 void consume( ) {
377 my_aggregator.execute(&op_data);
378 }
379
380#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
381 built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
382 void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
383 reserving_port_operation op_data(src, add_blt_pred);
384 my_aggregator.execute(&op_data);
385 }
386
387 void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
388 reserving_port_operation op_data(src, del_blt_pred);
389 my_aggregator.execute(&op_data);
390 }
391
392 size_t predecessor_count() __TBB_override {
393 reserving_port_operation op_data(blt_pred_cnt);
394 my_aggregator.execute(&op_data);
395 return op_data.cnt_val;
396 }
397
398 void copy_predecessors(predecessor_list_type &l) __TBB_override {
399 reserving_port_operation op_data(blt_pred_cpy);
400 op_data.plist = &l;
401 my_aggregator.execute(&op_data);
402 }
403
404 void extract_receiver() {
405 my_predecessors.built_predecessors().receiver_extract(*this);
406 }
407
408#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
409
410 void reset_receiver( reset_flags f) __TBB_override {
411 if(f & rf_clear_edges) my_predecessors.clear();
412 else
414 reserved = false;
415 __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(), "port edges not removed");
416 }
417
418 private:
419#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
420 friend class get_graph_helper;
421#endif
422
426 }; // reserving_port
427
429 template<typename T>
430 class queueing_port : public receiver<T>, public item_buffer<T> {
431 public:
432 typedef T input_type;
433 typedef typename receiver<input_type>::predecessor_type predecessor_type;
435#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
436 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
437 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
438#endif
439
440 // ----------- Aggregator ------------
441 private:
443#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
444 , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
445#endif
446 };
447
448 class queueing_port_operation : public aggregated_operation<queueing_port_operation> {
449 public:
450 char type;
453#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
454 predecessor_type *pred;
455 size_t cnt_val;
456 predecessor_list_type *plist;
457#endif
459 // constructor for value parameter
461 type(char(t)), my_val(e)
462 , bypass_t(NULL)
463 {}
464 // constructor for pointer parameter
466 type(char(t)), my_arg(const_cast<T*>(p))
467 , bypass_t(NULL)
468 {}
469 // constructor with no parameter
471 , bypass_t(NULL)
472 {}
473 };
474
478
481 bool was_empty;
482 while(op_list) {
483 current = op_list;
484 op_list = op_list->next;
485 switch(current->type) {
486 case try__put_task: {
487 task *rtask = NULL;
488 was_empty = this->buffer_empty();
489 this->push_back(current->my_val);
490 if (was_empty) rtask = my_join->decrement_port_count(false);
491 else
492 rtask = SUCCESSFULLY_ENQUEUED;
493 current->bypass_t = rtask;
494 __TBB_store_with_release(current->status, SUCCEEDED);
495 }
496 break;
497 case get__item:
498 if(!this->buffer_empty()) {
499 *(current->my_arg) = this->front();
500 __TBB_store_with_release(current->status, SUCCEEDED);
501 }
502 else {
503 __TBB_store_with_release(current->status, FAILED);
504 }
505 break;
506 case res_port:
507 __TBB_ASSERT(this->my_item_valid(this->my_head), "No item to reset");
508 this->destroy_front();
509 if(this->my_item_valid(this->my_head)) {
511 }
512 __TBB_store_with_release(current->status, SUCCEEDED);
513 break;
514#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
515 case add_blt_pred:
516 my_built_predecessors.add_edge(*(current->pred));
517 __TBB_store_with_release(current->status, SUCCEEDED);
518 break;
519 case del_blt_pred:
520 my_built_predecessors.delete_edge(*(current->pred));
521 __TBB_store_with_release(current->status, SUCCEEDED);
522 break;
523 case blt_pred_cnt:
524 current->cnt_val = my_built_predecessors.edge_count();
525 __TBB_store_with_release(current->status, SUCCEEDED);
526 break;
527 case blt_pred_cpy:
528 my_built_predecessors.copy_edges(*(current->plist));
529 __TBB_store_with_release(current->status, SUCCEEDED);
530 break;
531#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
532 }
533 }
534 }
535 // ------------ End Aggregator ---------------
536
537 protected:
538 template< typename R, typename B > friend class run_and_put_task;
539 template<typename X, typename Y> friend class internal::broadcast_cache;
540 template<typename X, typename Y> friend class internal::round_robin_cache;
543 my_aggregator.execute(&op_data);
544 __TBB_ASSERT(op_data.status == SUCCEEDED || !op_data.bypass_t, "inconsistent return from aggregator");
545 if(!op_data.bypass_t) return SUCCESSFULLY_ENQUEUED;
546 return op_data.bypass_t;
547 }
548
550 return my_join->graph_ref;
551 }
552
553 public:
554
557 my_join = NULL;
558 my_aggregator.initialize_handler(handler_type(this));
559 }
560
562 queueing_port(const queueing_port& /* other */) : receiver<T>(), item_buffer<T>() {
563 my_join = NULL;
564 my_aggregator.initialize_handler(handler_type(this));
565 }
566
569 my_join = join;
570 }
571
572 bool get_item( T &v ) {
574 my_aggregator.execute(&op_data);
575 return op_data.status == SUCCEEDED;
576 }
577
578 // reset_port is called when item is accepted by successor, but
579 // is initiated by join_node.
580 void reset_port() {
582 my_aggregator.execute(&op_data);
583 return;
584 }
585
586#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
587 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
588
589 void internal_add_built_predecessor(predecessor_type &p) __TBB_override {
590 queueing_port_operation op_data(add_blt_pred);
591 op_data.pred = &p;
592 my_aggregator.execute(&op_data);
593 }
594
595 void internal_delete_built_predecessor(predecessor_type &p) __TBB_override {
596 queueing_port_operation op_data(del_blt_pred);
597 op_data.pred = &p;
598 my_aggregator.execute(&op_data);
599 }
600
601 size_t predecessor_count() __TBB_override {
602 queueing_port_operation op_data(blt_pred_cnt);
603 my_aggregator.execute(&op_data);
604 return op_data.cnt_val;
605 }
606
607 void copy_predecessors(predecessor_list_type &l) __TBB_override {
608 queueing_port_operation op_data(blt_pred_cpy);
609 op_data.plist = &l;
610 my_aggregator.execute(&op_data);
611 }
612
613 void extract_receiver() {
615 my_built_predecessors.receiver_extract(*this);
616 }
617#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
618
619 void reset_receiver(reset_flags f) __TBB_override {
622#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
623 if (f & rf_clear_edges)
624 my_built_predecessors.clear();
625#endif
626 }
627
628 private:
629#if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
630 friend class get_graph_helper;
631#endif
632
634#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
635 edge_container<predecessor_type> my_built_predecessors;
636#endif
637 }; // queueing_port
638
640
641 template<typename K>
644 size_t my_value;
645 };
646
647 // method to access the key in the counting table
648 // the ref has already been removed from K
649 template< typename K >
652 const K& operator()(const table_item_type& v) { return v.my_key; }
653 };
654
655 // the ports can have only one template parameter. We wrap the types needed in
656 // a traits type
657 template< class TraitsType >
659 public receiver<typename TraitsType::T>,
660 public hash_buffer< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
661 typename TraitsType::KHash > {
662 public:
663 typedef TraitsType traits;
665 typedef typename TraitsType::T input_type;
666 typedef typename TraitsType::K key_type;
668 typedef typename receiver<input_type>::predecessor_type predecessor_type;
669 typedef typename TraitsType::TtoK type_to_key_func_type;
670 typedef typename TraitsType::KHash hash_compare_type;
672#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
673 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
674 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
675#endif
676 private:
677// ----------- Aggregator ------------
678 private:
680#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
681 , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
682#endif
683 };
684
685 class key_matching_port_operation : public aggregated_operation<key_matching_port_operation> {
686 public:
687 char type;
690#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
691 predecessor_type *pred;
692 size_t cnt_val;
693 predecessor_list_type *plist;
694#endif
695 // constructor for value parameter
697 type(char(t)), my_val(e) {}
698 // constructor for pointer parameter
700 type(char(t)), my_arg(const_cast<input_type*>(p)) {}
701 // constructor with no parameter
703 };
704
708
711 while(op_list) {
712 current = op_list;
713 op_list = op_list->next;
714 switch(current->type) {
715 case try__put: {
716 bool was_inserted = this->insert_with_key(current->my_val);
717 // return failure if a duplicate insertion occurs
718 __TBB_store_with_release(current->status, was_inserted ? SUCCEEDED : FAILED);
719 }
720 break;
721 case get__item:
722 // use current_key from FE for item
723 if(!this->find_with_key(my_join->current_key, *(current->my_arg))) {
724 __TBB_ASSERT(false, "Failed to find item corresponding to current_key.");
725 }
726 __TBB_store_with_release(current->status, SUCCEEDED);
727 break;
728 case res_port:
729 // use current_key from FE for item
731 __TBB_store_with_release(current->status, SUCCEEDED);
732 break;
733#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
734 case add_blt_pred:
735 my_built_predecessors.add_edge(*(current->pred));
736 __TBB_store_with_release(current->status, SUCCEEDED);
737 break;
738 case del_blt_pred:
739 my_built_predecessors.delete_edge(*(current->pred));
740 __TBB_store_with_release(current->status, SUCCEEDED);
741 break;
742 case blt_pred_cnt:
743 current->cnt_val = my_built_predecessors.edge_count();
744 __TBB_store_with_release(current->status, SUCCEEDED);
745 break;
746 case blt_pred_cpy:
747 my_built_predecessors.copy_edges(*(current->plist));
748 __TBB_store_with_release(current->status, SUCCEEDED);
749 break;
750#endif
751 }
752 }
753 }
754// ------------ End Aggregator ---------------
755 protected:
756 template< typename R, typename B > friend class run_and_put_task;
757 template<typename X, typename Y> friend class internal::broadcast_cache;
758 template<typename X, typename Y> friend class internal::round_robin_cache;
761 task *rtask = NULL;
762 my_aggregator.execute(&op_data);
763 if(op_data.status == SUCCEEDED) {
764 rtask = my_join->increment_key_count((*(this->get_key_func()))(v), false); // may spawn
765 // rtask has to reflect the return status of the try_put
766 if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
767 }
768 return rtask;
769 }
770
772 return my_join->graph_ref;
773 }
774
775 public:
776
778 my_join = NULL;
779 my_aggregator.initialize_handler(handler_type(this));
780 }
781
782 // copy constructor
783 key_matching_port(const key_matching_port& /*other*/) : receiver<input_type>(), buffer_type() {
784 my_join = NULL;
785 my_aggregator.initialize_handler(handler_type(this));
786 }
787
789
791 my_join = dynamic_cast<matching_forwarding_base<key_type>*>(join);
792 }
793
795
797
798 bool get_item( input_type &v ) {
799 // aggregator uses current_key from FE for Key
801 my_aggregator.execute(&op_data);
802 return op_data.status == SUCCEEDED;
803 }
804
805#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
806 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
807
808 void internal_add_built_predecessor(predecessor_type &p) __TBB_override {
809 key_matching_port_operation op_data(add_blt_pred);
810 op_data.pred = &p;
811 my_aggregator.execute(&op_data);
812 }
813
814 void internal_delete_built_predecessor(predecessor_type &p) __TBB_override {
815 key_matching_port_operation op_data(del_blt_pred);
816 op_data.pred = &p;
817 my_aggregator.execute(&op_data);
818 }
819
820 size_t predecessor_count() __TBB_override {
821 key_matching_port_operation op_data(blt_pred_cnt);
822 my_aggregator.execute(&op_data);
823 return op_data.cnt_val;
824 }
825
826 void copy_predecessors(predecessor_list_type &l) __TBB_override {
827 key_matching_port_operation op_data(blt_pred_cpy);
828 op_data.plist = &l;
829 my_aggregator.execute(&op_data);
830 }
831#endif
832
833 // reset_port is called when item is accepted by successor, but
834 // is initiated by join_node.
835 void reset_port() {
837 my_aggregator.execute(&op_data);
838 return;
839 }
840
841#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
842 void extract_receiver() {
844 my_built_predecessors.receiver_extract(*this);
845 }
846#endif
847 void reset_receiver(reset_flags f ) __TBB_override {
850#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
851 if (f & rf_clear_edges)
852 my_built_predecessors.clear();
853#endif
854 }
855
856 private:
857 // my_join forwarding base used to count number of inputs that
858 // received key.
860#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
861 edge_container<predecessor_type> my_built_predecessors;
862#endif
863 }; // key_matching_port
864
865 using namespace graph_policy_namespace;
866
867 template<typename JP, typename InputTuple, typename OutputTuple>
868 class join_node_base;
869
871 template<typename JP, typename InputTuple, typename OutputTuple>
873
874 template<typename InputTuple, typename OutputTuple>
875 class join_node_FE<reserving, InputTuple, OutputTuple> : public forwarding_base {
876 public:
877 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
878 typedef OutputTuple output_type;
879 typedef InputTuple input_type;
881
882 join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) {
883 ports_with_no_inputs = N;
885 }
886
887 join_node_FE(const join_node_FE& other) : forwarding_base((other.forwarding_base::graph_ref)), my_node(NULL) {
888 ports_with_no_inputs = N;
890 }
891
892 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
893
895 ++ports_with_no_inputs;
896 }
897
898 // if all input_ports have predecessors, spawn forward to try and consume tuples
900 if(ports_with_no_inputs.fetch_and_decrement() == 1) {
901 if(internal::is_graph_active(this->graph_ref)) {
902 task *rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
904 if(!handle_task) return rtask;
905 internal::spawn_in_graph_arena(this->graph_ref, *rtask);
906 }
907 }
908 return NULL;
909 }
910
911 input_type &input_ports() { return my_inputs; }
912
913 protected:
914
915 void reset( reset_flags f) {
916 // called outside of parallel contexts
917 ports_with_no_inputs = N;
918 join_helper<N>::reset_inputs(my_inputs, f);
919 }
920
921#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
922 void extract( ) {
923 // called outside of parallel contexts
924 ports_with_no_inputs = N;
926 }
927#endif
928
929 // all methods on input ports should be called under mutual exclusion from join_node_base.
930
932 return !ports_with_no_inputs;
933 }
934
936 if(ports_with_no_inputs) return false;
937 return join_helper<N>::reserve(my_inputs, out);
938 }
939
942 }
945 }
946
949 atomic<size_t> ports_with_no_inputs;
950 }; // join_node_FE<reserving, ... >
951
952 template<typename InputTuple, typename OutputTuple>
953 class join_node_FE<queueing, InputTuple, OutputTuple> : public forwarding_base {
954 public:
955 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
956 typedef OutputTuple output_type;
957 typedef InputTuple input_type;
959
960 join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) {
961 ports_with_no_items = N;
963 }
964
965 join_node_FE(const join_node_FE& other) : forwarding_base((other.forwarding_base::graph_ref)), my_node(NULL) {
966 ports_with_no_items = N;
968 }
969
970 // needed for forwarding
971 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
972
974 ports_with_no_items = N;
975 }
976
977 // if all input_ports have items, spawn forward to try and consume tuples
979 {
980 if(ports_with_no_items.fetch_and_decrement() == 1) {
981 if(internal::is_graph_active(this->graph_ref)) {
982 task *rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
983 forward_task_bypass <base_node_type>(*my_node);
984 if(!handle_task) return rtask;
985 internal::spawn_in_graph_arena(this->graph_ref, *rtask);
986 }
987 }
988 return NULL;
989 }
990
991 void increment_port_count() __TBB_override { __TBB_ASSERT(false, NULL); } // should never be called
992
993 input_type &input_ports() { return my_inputs; }
994
995 protected:
996
997 void reset( reset_flags f) {
998 reset_port_count();
999 join_helper<N>::reset_inputs(my_inputs, f );
1000 }
1001
1002#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1003 void extract() {
1004 reset_port_count();
1006 }
1007#endif
1008 // all methods on input ports should be called under mutual exclusion from join_node_base.
1009
1011 return !ports_with_no_items;
1012 }
1013
1015 if(ports_with_no_items) return false;
1016 return join_helper<N>::get_items(my_inputs, out);
1017 }
1018
1020 reset_port_count();
1021 join_helper<N>::reset_ports(my_inputs);
1022 }
1024 // nothing to do.
1025 }
1026
1029 atomic<size_t> ports_with_no_items;
1030 }; // join_node_FE<queueing, ...>
1031
1032 // key_matching join front-end.
1033 template<typename InputTuple, typename OutputTuple, typename K, typename KHash>
1034 class join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple> : public matching_forwarding_base<K>,
1035 // buffer of key value counts
1036 public hash_buffer< // typedefed below to key_to_count_buffer_type
1037 typename tbb::internal::strip<K>::type&, // force ref type on K
1038 count_element<typename tbb::internal::strip<K>::type>,
1039 internal::type_to_key_function_body<
1040 count_element<typename tbb::internal::strip<K>::type>,
1041 typename tbb::internal::strip<K>::type& >,
1042 KHash >,
1043 // buffer of output items
1044 public item_buffer<OutputTuple> {
1045 public:
1046 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
1047 typedef OutputTuple output_type;
1048 typedef InputTuple input_type;
1049 typedef K key_type;
1051 typedef KHash key_hash_compare;
1052 // must use K without ref.
1054 // method that lets us refer to the key of this type.
1058 // this is the type of the special table that keeps track of the number of discrete
1059 // elements corresponding to each key that we've seen.
1063 typedef join_node_base<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> base_node_type; // for forwarding
1065
1066// ----------- Aggregator ------------
1067 // the aggregator is only needed to serialize the access to the hash table.
1068 // and the output_buffer_type base class
1069 private:
1070 enum op_type { res_count, inc_count, may_succeed, try_make };
1072
1073 class key_matching_FE_operation : public aggregated_operation<key_matching_FE_operation> {
1074 public:
1075 char type;
1080 // constructor for value parameter
1081 key_matching_FE_operation(const unref_key_type& e , bool q_task , op_type t) : type(char(t)), my_val(e),
1082 my_output(NULL), bypass_t(NULL), enqueue_task(q_task) {}
1083 key_matching_FE_operation(output_type *p, op_type t) : type(char(t)), my_output(p), bypass_t(NULL),
1084 enqueue_task(true) {}
1085 // constructor with no parameter
1086 key_matching_FE_operation(op_type t) : type(char(t)), my_output(NULL), bypass_t(NULL), enqueue_task(true) {}
1087 };
1088
1090 friend class internal::aggregating_functor<class_type, key_matching_FE_operation>;
1092
1093 // called from aggregator, so serialized
1094 // returns a task pointer if the a task would have been enqueued but we asked that
1095 // it be returned. Otherwise returns NULL.
1096 task * fill_output_buffer(unref_key_type &t, bool should_enqueue, bool handle_task) {
1097 output_type l_out;
1098 task *rtask = NULL;
1099 bool do_fwd = should_enqueue && this->buffer_empty() && internal::is_graph_active(this->graph_ref);
1100 this->current_key = t;
1101 this->delete_with_key(this->current_key); // remove the key
1102 if(join_helper<N>::get_items(my_inputs, l_out)) { // <== call back
1103 this->push_back(l_out);
1104 if(do_fwd) { // we enqueue if receiving an item from predecessor, not if successor asks for item
1105 rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
1107 if(handle_task) {
1108 internal::spawn_in_graph_arena(this->graph_ref, *rtask);
1109 rtask = NULL;
1110 }
1111 do_fwd = false;
1112 }
1113 // retire the input values
1114 join_helper<N>::reset_ports(my_inputs); // <== call back
1115 }
1116 else {
1117 __TBB_ASSERT(false, "should have had something to push");
1118 }
1119 return rtask;
1120 }
1121
1122 void handle_operations(key_matching_FE_operation* op_list) {
1123 key_matching_FE_operation *current;
1124 while(op_list) {
1125 current = op_list;
1126 op_list = op_list->next;
1127 switch(current->type) {
1128 case res_count: // called from BE
1129 {
1130 this->destroy_front();
1131 __TBB_store_with_release(current->status, SUCCEEDED);
1132 }
1133 break;
1134 case inc_count: { // called from input ports
1135 count_element_type *p = 0;
1136 unref_key_type &t = current->my_val;
1137 bool do_enqueue = current->enqueue_task;
1138 if(!(this->find_ref_with_key(t,p))) {
1140 ev.my_key = t;
1141 ev.my_value = 0;
1142 this->insert_with_key(ev);
1143 if(!(this->find_ref_with_key(t,p))) {
1144 __TBB_ASSERT(false, "should find key after inserting it");
1145 }
1146 }
1147 if(++(p->my_value) == size_t(N)) {
1148 task *rtask = fill_output_buffer(t, true, do_enqueue);
1149 __TBB_ASSERT(!rtask || !do_enqueue, "task should not be returned");
1150 current->bypass_t = rtask;
1151 }
1152 }
1153 __TBB_store_with_release(current->status, SUCCEEDED);
1154 break;
1155 case may_succeed: // called from BE
1156 __TBB_store_with_release(current->status, this->buffer_empty() ? FAILED : SUCCEEDED);
1157 break;
1158 case try_make: // called from BE
1159 if(this->buffer_empty()) {
1160 __TBB_store_with_release(current->status, FAILED);
1161 }
1162 else {
1163 *(current->my_output) = this->front();
1164 __TBB_store_with_release(current->status, SUCCEEDED);
1165 }
1166 break;
1167 }
1168 }
1169 }
1170// ------------ End Aggregator ---------------
1171
1172 public:
1173 template<typename FunctionTuple>
1174 join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(NULL) {
1176 join_helper<N>::set_key_functors(my_inputs, TtoK_funcs);
1177 my_aggregator.initialize_handler(handler_type(this));
1179 this->set_key_func(cfb);
1180 }
1181
1184 my_node = NULL;
1186 join_helper<N>::copy_key_functors(my_inputs, const_cast<input_type &>(other.my_inputs));
1187 my_aggregator.initialize_handler(handler_type(this));
1189 this->set_key_func(cfb);
1190 }
1191
1192 // needed for forwarding
1193 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
1194
1195 void reset_port_count() { // called from BE
1196 key_matching_FE_operation op_data(res_count);
1197 my_aggregator.execute(&op_data);
1198 return;
1199 }
1200
1201 // if all input_ports have items, spawn forward to try and consume tuples
1202 // return a task if we are asked and did create one.
1203 task *increment_key_count(unref_key_type const & t, bool handle_task) __TBB_override { // called from input_ports
1204 key_matching_FE_operation op_data(t, handle_task, inc_count);
1205 my_aggregator.execute(&op_data);
1206 return op_data.bypass_t;
1207 }
1208
1209 task *decrement_port_count(bool /*handle_task*/) __TBB_override { __TBB_ASSERT(false, NULL); return NULL; }
1210
1211 void increment_port_count() __TBB_override { __TBB_ASSERT(false, NULL); } // should never be called
1212
1213 input_type &input_ports() { return my_inputs; }
1214
1215 protected:
1216
1217 void reset( reset_flags f ) {
1218 // called outside of parallel contexts
1219 join_helper<N>::reset_inputs(my_inputs, f);
1220
1221 key_to_count_buffer_type::reset();
1222 output_buffer_type::reset();
1223 }
1224
1225#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1226 void extract() {
1227 // called outside of parallel contexts
1229 key_to_count_buffer_type::reset(); // have to reset the tag counts
1230 output_buffer_type::reset(); // also the queue of outputs
1231 // my_node->current_tag = NO_TAG;
1232 }
1233#endif
1234 // all methods on input ports should be called under mutual exclusion from join_node_base.
1235
1236 bool tuple_build_may_succeed() { // called from back-end
1237 key_matching_FE_operation op_data(may_succeed);
1238 my_aggregator.execute(&op_data);
1239 return op_data.status == SUCCEEDED;
1240 }
1241
1242 // cannot lock while calling back to input_ports. current_key will only be set
1243 // and reset under the aggregator, so it will remain consistent.
1245 key_matching_FE_operation op_data(&out,try_make);
1246 my_aggregator.execute(&op_data);
1247 return op_data.status == SUCCEEDED;
1248 }
1249
1251 reset_port_count(); // reset current_key after ports reset.
1252 }
1253
1255 // nothing to do.
1256 }
1257
1258 input_type my_inputs; // input ports
1260 }; // join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple>
1261
1263 template<typename JP, typename InputTuple, typename OutputTuple>
1264 class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>,
1265 public sender<OutputTuple> {
1266 protected:
1267 using graph_node::my_graph;
1268 public:
1269 typedef OutputTuple output_type;
1270
1271 typedef typename sender<output_type>::successor_type successor_type;
1273 using input_ports_type::tuple_build_may_succeed;
1274 using input_ports_type::try_to_make_tuple;
1275 using input_ports_type::tuple_accepted;
1276 using input_ports_type::tuple_rejected;
1277#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1278 typedef typename sender<output_type>::built_successors_type built_successors_type;
1279 typedef typename sender<output_type>::successor_list_type successor_list_type;
1280#endif
1281
1282 private:
1283 // ----------- Aggregator ------------
1285#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1286 , add_blt_succ, del_blt_succ, blt_succ_cnt, blt_succ_cpy
1287#endif
1290
1291 class join_node_base_operation : public aggregated_operation<join_node_base_operation> {
1292 public:
1293 char type;
1294 union {
1297#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1298 size_t cnt_val;
1299 successor_list_type *slist;
1300#endif
1301 };
1304 my_arg(const_cast<output_type*>(&e)), bypass_t(NULL) {}
1306 my_succ(const_cast<successor_type *>(&s)), bypass_t(NULL) {}
1308 };
1309
1314
1316 join_node_base_operation *current;
1317 while(op_list) {
1318 current = op_list;
1319 op_list = op_list->next;
1320 switch(current->type) {
1321 case reg_succ: {
1323 if(tuple_build_may_succeed() && !forwarder_busy && internal::is_graph_active(my_graph)) {
1324 task *rtask = new ( task::allocate_additional_child_of(*(my_graph.root_task())) )
1327 internal::spawn_in_graph_arena(my_graph, *rtask);
1328 forwarder_busy = true;
1329 }
1330 __TBB_store_with_release(current->status, SUCCEEDED);
1331 }
1332 break;
1333 case rem_succ:
1335 __TBB_store_with_release(current->status, SUCCEEDED);
1336 break;
1337 case try__get:
1338 if(tuple_build_may_succeed()) {
1339 if(try_to_make_tuple(*(current->my_arg))) {
1340 tuple_accepted();
1341 __TBB_store_with_release(current->status, SUCCEEDED);
1342 }
1343 else __TBB_store_with_release(current->status, FAILED);
1344 }
1345 else __TBB_store_with_release(current->status, FAILED);
1346 break;
1347 case do_fwrd_bypass: {
1348 bool build_succeeded;
1349 task *last_task = NULL;
1350 output_type out;
1351 if(tuple_build_may_succeed()) { // checks output queue of FE
1352 do {
1353 build_succeeded = try_to_make_tuple(out); // fetch front_end of queue
1354 if(build_succeeded) {
1355 task *new_task = my_successors.try_put_task(out);
1356 last_task = combine_tasks(my_graph, last_task, new_task);
1357 if(new_task) {
1358 tuple_accepted();
1359 }
1360 else {
1361 tuple_rejected();
1362 build_succeeded = false;
1363 }
1364 }
1365 } while(build_succeeded);
1366 }
1367 current->bypass_t = last_task;
1368 __TBB_store_with_release(current->status, SUCCEEDED);
1369 forwarder_busy = false;
1370 }
1371 break;
1372#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1373 case add_blt_succ:
1374 my_successors.internal_add_built_successor(*(current->my_succ));
1375 __TBB_store_with_release(current->status, SUCCEEDED);
1376 break;
1377 case del_blt_succ:
1378 my_successors.internal_delete_built_successor(*(current->my_succ));
1379 __TBB_store_with_release(current->status, SUCCEEDED);
1380 break;
1381 case blt_succ_cnt:
1382 current->cnt_val = my_successors.successor_count();
1383 __TBB_store_with_release(current->status, SUCCEEDED);
1384 break;
1385 case blt_succ_cpy:
1386 my_successors.copy_successors(*(current->slist));
1387 __TBB_store_with_release(current->status, SUCCEEDED);
1388 break;
1389#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1390 }
1391 }
1392 }
1393 // ---------- end aggregator -----------
1394 public:
1395 join_node_base(graph &g) : graph_node(g), input_ports_type(g), forwarder_busy(false) {
1397 input_ports_type::set_my_node(this);
1398 my_aggregator.initialize_handler(handler_type(this));
1399 }
1400
1402 graph_node(other.graph_node::my_graph), input_ports_type(other),
1403 sender<OutputTuple>(), forwarder_busy(false), my_successors() {
1405 input_ports_type::set_my_node(this);
1406 my_aggregator.initialize_handler(handler_type(this));
1407 }
1408
1409 template<typename FunctionTuple>
1410 join_node_base(graph &g, FunctionTuple f) : graph_node(g), input_ports_type(g, f), forwarder_busy(false) {
1412 input_ports_type::set_my_node(this);
1413 my_aggregator.initialize_handler(handler_type(this));
1414 }
1415
1418 my_aggregator.execute(&op_data);
1419 return op_data.status == SUCCEEDED;
1420 }
1421
1424 my_aggregator.execute(&op_data);
1425 return op_data.status == SUCCEEDED;
1426 }
1427
1430 my_aggregator.execute(&op_data);
1431 return op_data.status == SUCCEEDED;
1432 }
1433
1434#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1435 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1436
1437 void internal_add_built_successor( successor_type &r) __TBB_override {
1438 join_node_base_operation op_data(r, add_blt_succ);
1439 my_aggregator.execute(&op_data);
1440 }
1441
1442 void internal_delete_built_successor( successor_type &r) __TBB_override {
1443 join_node_base_operation op_data(r, del_blt_succ);
1444 my_aggregator.execute(&op_data);
1445 }
1446
1447 size_t successor_count() __TBB_override {
1448 join_node_base_operation op_data(blt_succ_cnt);
1449 my_aggregator.execute(&op_data);
1450 return op_data.cnt_val;
1451 }
1452
1453 void copy_successors(successor_list_type &l) __TBB_override {
1454 join_node_base_operation op_data(blt_succ_cpy);
1455 op_data.slist = &l;
1456 my_aggregator.execute(&op_data);
1457 }
1458#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1459
1460#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1461 void extract() __TBB_override {
1462 input_ports_type::extract();
1463 my_successors.built_successors().sender_extract(*this);
1464 }
1465#endif
1466
1467 protected:
1468
1469 void reset_node(reset_flags f) __TBB_override {
1470 input_ports_type::reset(f);
1471 if(f & rf_clear_edges) my_successors.clear();
1472 }
1473
1474 private:
1476
1477 friend class forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> >;
1480 my_aggregator.execute(&op_data);
1481 return op_data.bypass_t;
1482 }
1483
1484 }; // join_node_base
1485
1486 // join base class type generator
1487 template<int N, template<class> class PT, typename OutputTuple, typename JP>
1488 struct join_base {
1490 };
1491
1492 template<int N, typename OutputTuple, typename K, typename KHash>
1493 struct join_base<N, key_matching_port, OutputTuple, key_matching<K,KHash> > {
1495 typedef K key_type;
1496 typedef KHash key_hash_compare;
1497 typedef typename internal::join_node_base< key_traits_type,
1498 // ports type
1500 OutputTuple > type;
1501 };
1502
1504 // using tuple_element. The class PT is the port type (reserving_port, queueing_port, key_matching_port)
1505 // and should match the typename.
1506
1507 template<int N, template<class> class PT, typename OutputTuple, typename JP>
1508 class unfolded_join_node : public join_base<N,PT,OutputTuple,JP>::type {
1509 public:
1511 typedef OutputTuple output_type;
1512 private:
1514 public:
1517 };
1518
1519#if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1520 template <typename K, typename T>
1521 struct key_from_message_body {
1522 K operator()(const T& t) const {
1524 return key_from_message<K>(t);
1525 }
1526 };
1527 // Adds const to reference type
1528 template <typename K, typename T>
1529 struct key_from_message_body<K&,T> {
1530 const K& operator()(const T& t) const {
1532 return key_from_message<const K&>(t);
1533 }
1534 };
1535#endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1536 // key_matching unfolded_join_node. This must be a separate specialization because the constructors
1537 // differ.
1538
1539 template<typename OutputTuple, typename K, typename KHash>
1540 class unfolded_join_node<2,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1541 join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1542 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1543 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1544 public:
1546 typedef OutputTuple output_type;
1547 private:
1551 typedef typename tbb::flow::tuple< f0_p, f1_p > func_initializer_type;
1552 public:
1553#if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1554 unfolded_join_node(graph &g) : base_type(g,
1556 new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1557 new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>())
1558 ) ) {
1559 }
1560#endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1561 template<typename Body0, typename Body1>
1562 unfolded_join_node(graph &g, Body0 body0, Body1 body1) : base_type(g,
1564 new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1565 new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1)
1566 ) ) {
1567 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 2, "wrong number of body initializers");
1568 }
1570 };
1571
1572 template<typename OutputTuple, typename K, typename KHash>
1573 class unfolded_join_node<3,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1574 join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1575 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1576 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1577 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1578 public:
1580 typedef OutputTuple output_type;
1581 private:
1586 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p > func_initializer_type;
1587 public:
1588#if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1589 unfolded_join_node(graph &g) : base_type(g,
1591 new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1592 new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1593 new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>())
1594 ) ) {
1595 }
1596#endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1597 template<typename Body0, typename Body1, typename Body2>
1598 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2) : base_type(g,
1600 new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1601 new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1602 new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2)
1603 ) ) {
1604 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 3, "wrong number of body initializers");
1605 }
1607 };
1608
1609 template<typename OutputTuple, typename K, typename KHash>
1610 class unfolded_join_node<4,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1611 join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1612 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1613 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1614 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1615 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1616 public:
1618 typedef OutputTuple output_type;
1619 private:
1625 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type;
1626 public:
1627#if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1628 unfolded_join_node(graph &g) : base_type(g,
1630 new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1631 new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1632 new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1633 new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>())
1634 ) ) {
1635 }
1636#endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1637 template<typename Body0, typename Body1, typename Body2, typename Body3>
1638 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3) : base_type(g,
1640 new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1641 new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1642 new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1643 new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3)
1644 ) ) {
1645 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 4, "wrong number of body initializers");
1646 }
1648 };
1649
1650 template<typename OutputTuple, typename K, typename KHash>
1651 class unfolded_join_node<5,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1652 join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1653 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1654 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1655 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1656 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1657 typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1658 public:
1660 typedef OutputTuple output_type;
1661 private:
1668 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type;
1669 public:
1670#if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1671 unfolded_join_node(graph &g) : base_type(g,
1673 new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1674 new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1675 new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1676 new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1677 new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>())
1678 ) ) {
1679 }
1680#endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1681 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4>
1682 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g,
1684 new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1685 new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1686 new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1687 new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1688 new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4)
1689 ) ) {
1690 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 5, "wrong number of body initializers");
1691 }
1693 };
1694
1695#if __TBB_VARIADIC_MAX >= 6
1696 template<typename OutputTuple, typename K, typename KHash>
1697 class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1698 join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1699 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1700 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1701 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1702 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1703 typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1704 typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1705 public:
1706 typedef typename wrap_key_tuple_elements<6,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1707 typedef OutputTuple output_type;
1708 private:
1709 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1710 typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1711 typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1712 typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1713 typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1714 typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1715 typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1716 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p > func_initializer_type;
1717 public:
1718#if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1719 unfolded_join_node(graph &g) : base_type(g,
1720 func_initializer_type(
1721 new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1722 new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1723 new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1724 new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1725 new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1726 new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>())
1727 ) ) {
1728 }
1729#endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1730 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, typename Body5>
1731 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5)
1732 : base_type(g, func_initializer_type(
1733 new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1734 new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1735 new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1736 new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1737 new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1738 new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5)
1739 ) ) {
1740 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 6, "wrong number of body initializers");
1741 }
1742 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1743 };
1744#endif
1745
1746#if __TBB_VARIADIC_MAX >= 7
1747 template<typename OutputTuple, typename K, typename KHash>
1748 class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1749 join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1750 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1751 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1752 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1753 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1754 typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1755 typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1756 typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1757 public:
1758 typedef typename wrap_key_tuple_elements<7,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1759 typedef OutputTuple output_type;
1760 private:
1761 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1762 typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1763 typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1764 typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1765 typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1766 typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1767 typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1768 typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1769 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p > func_initializer_type;
1770 public:
1771#if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1772 unfolded_join_node(graph &g) : base_type(g,
1773 func_initializer_type(
1774 new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1775 new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1776 new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1777 new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1778 new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1779 new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1780 new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>())
1781 ) ) {
1782 }
1783#endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1784 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1785 typename Body5, typename Body6>
1786 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1787 Body5 body5, Body6 body6) : base_type(g, func_initializer_type(
1788 new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1789 new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1790 new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1791 new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1792 new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1793 new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1794 new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6)
1795 ) ) {
1796 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 7, "wrong number of body initializers");
1797 }
1798 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1799 };
1800#endif
1801
1802#if __TBB_VARIADIC_MAX >= 8
1803 template<typename OutputTuple, typename K, typename KHash>
1804 class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1805 join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1806 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1807 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1808 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1809 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1810 typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1811 typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1812 typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1813 typedef typename tbb::flow::tuple_element<7, OutputTuple>::type T7;
1814 public:
1815 typedef typename wrap_key_tuple_elements<8,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1816 typedef OutputTuple output_type;
1817 private:
1818 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1819 typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1820 typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1821 typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1822 typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1823 typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1824 typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1825 typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1826 typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1827 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p > func_initializer_type;
1828 public:
1829#if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1830 unfolded_join_node(graph &g) : base_type(g,
1831 func_initializer_type(
1832 new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1833 new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1834 new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1835 new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1836 new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1837 new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1838 new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1839 new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>())
1840 ) ) {
1841 }
1842#endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1843 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1844 typename Body5, typename Body6, typename Body7>
1845 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1846 Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type(
1847 new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1848 new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1849 new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1850 new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1851 new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1852 new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1853 new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6),
1854 new internal::type_to_key_function_body_leaf<T7, K, Body7>(body7)
1855 ) ) {
1856 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 8, "wrong number of body initializers");
1857 }
1858 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1859 };
1860#endif
1861
1862#if __TBB_VARIADIC_MAX >= 9
1863 template<typename OutputTuple, typename K, typename KHash>
1864 class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1865 join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1866 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1867 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1868 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1869 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1870 typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1871 typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1872 typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1873 typedef typename tbb::flow::tuple_element<7, OutputTuple>::type T7;
1874 typedef typename tbb::flow::tuple_element<8, OutputTuple>::type T8;
1875 public:
1876 typedef typename wrap_key_tuple_elements<9,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1877 typedef OutputTuple output_type;
1878 private:
1879 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1880 typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1881 typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1882 typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1883 typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1884 typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1885 typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1886 typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1887 typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1888 typedef typename internal::type_to_key_function_body<T8, K> *f8_p;
1889 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p > func_initializer_type;
1890 public:
1891#if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1892 unfolded_join_node(graph &g) : base_type(g,
1893 func_initializer_type(
1894 new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1895 new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1896 new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1897 new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1898 new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1899 new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1900 new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1901 new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1902 new internal::type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>())
1903 ) ) {
1904 }
1905#endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1906 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1907 typename Body5, typename Body6, typename Body7, typename Body8>
1908 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1909 Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type(
1910 new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1911 new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1912 new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1913 new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1914 new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1915 new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1916 new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6),
1917 new internal::type_to_key_function_body_leaf<T7, K, Body7>(body7),
1918 new internal::type_to_key_function_body_leaf<T8, K, Body8>(body8)
1919 ) ) {
1920 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 9, "wrong number of body initializers");
1921 }
1922 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1923 };
1924#endif
1925
1926#if __TBB_VARIADIC_MAX >= 10
1927 template<typename OutputTuple, typename K, typename KHash>
1928 class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1929 join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1930 typedef typename tbb::flow::tuple_element<0, OutputTuple>::type T0;
1931 typedef typename tbb::flow::tuple_element<1, OutputTuple>::type T1;
1932 typedef typename tbb::flow::tuple_element<2, OutputTuple>::type T2;
1933 typedef typename tbb::flow::tuple_element<3, OutputTuple>::type T3;
1934 typedef typename tbb::flow::tuple_element<4, OutputTuple>::type T4;
1935 typedef typename tbb::flow::tuple_element<5, OutputTuple>::type T5;
1936 typedef typename tbb::flow::tuple_element<6, OutputTuple>::type T6;
1937 typedef typename tbb::flow::tuple_element<7, OutputTuple>::type T7;
1938 typedef typename tbb::flow::tuple_element<8, OutputTuple>::type T8;
1939 typedef typename tbb::flow::tuple_element<9, OutputTuple>::type T9;
1940 public:
1941 typedef typename wrap_key_tuple_elements<10,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1942 typedef OutputTuple output_type;
1943 private:
1944 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1945 typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1946 typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1947 typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1948 typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1949 typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1950 typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1951 typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1952 typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1953 typedef typename internal::type_to_key_function_body<T8, K> *f8_p;
1954 typedef typename internal::type_to_key_function_body<T9, K> *f9_p;
1955 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p > func_initializer_type;
1956 public:
1957#if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1958 unfolded_join_node(graph &g) : base_type(g,
1959 func_initializer_type(
1960 new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1961 new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1962 new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1963 new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1964 new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1965 new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1966 new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1967 new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1968 new internal::type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>()),
1969 new internal::type_to_key_function_body_leaf<T9, K, key_from_message_body<K,T9> >(key_from_message_body<K,T9>())
1970 ) ) {
1971 }
1972#endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1973 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1974 typename Body5, typename Body6, typename Body7, typename Body8, typename Body9>
1975 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1976 Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type(
1977 new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1978 new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1979 new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1980 new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1981 new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1982 new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1983 new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6),
1984 new internal::type_to_key_function_body_leaf<T7, K, Body7>(body7),
1985 new internal::type_to_key_function_body_leaf<T8, K, Body8>(body8),
1986 new internal::type_to_key_function_body_leaf<T9, K, Body9>(body9)
1987 ) ) {
1988 __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 10, "wrong number of body initializers");
1989 }
1990 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1991 };
1992#endif
1993
1995 template<size_t N, typename JNT>
1996 typename tbb::flow::tuple_element<N, typename JNT::input_ports_type>::type &input_port(JNT &jn) {
1997 return tbb::flow::get<N>(jn.input_ports());
1998 }
1999
2000}
2001#endif // __TBB__flow_graph_join_impl_H
2002
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition tbb_stddef.h:165
#define __TBB_override
Definition tbb_stddef.h:240
#define __TBB_STATIC_ASSERT(condition, msg)
Definition tbb_stddef.h:553
void const char const char int ITT_FORMAT __itt_group_sync s
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
void const char const char int ITT_FORMAT __itt_group_sync p
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
void suppress_unused_warning(const T1 &)
Utility template function to prevent "unused" warnings by various compilers.
Definition tbb_stddef.h:398
K key_from_message(const T &t)
Definition flow_graph.h:721
tbb::flow::tuple_element< N, typenameJNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
uintptr_t status
Zero value means "wait" status, all other values are "user" specified values and are defined into the...
field of type K being used for matching.
A task that calls a node's forward_task function.
void set_owner(successor_type *owner)
void register_successor(successor_type &r)
void set_owner(owner_type *owner)
void remove_successor(successor_type &r)
A cache of successors that are broadcast to.
task * try_put_task(const T &t) __TBB_override
A cache of successors that are put in a round-robin fashion.
const item_type & front() const
bool my_item_valid(size_type i) const
virtual void increment_port_count()=0
virtual task * decrement_port_count(bool handle_task)=0
tbb::internal::strip< KeyType >::type current_key_type
virtual task * increment_key_count(current_key_type const &, bool)=0
static void reset_my_port(InputTuple &my_input)
static bool reserve(InputTuple &my_input, OutputTuple &out)
static void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs)
static bool get_my_item(InputTuple &my_input, OutputTuple &out)
static bool get_items(InputTuple &my_input, OutputTuple &out)
static void release_reservations(TupleType &my_input)
static void set_join_node_pointer(TupleType &my_input, PortType *port)
static void reset_inputs(InputTuple &my_input, reset_flags f)
static void reset_ports(InputTuple &my_input)
static void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs)
static void consume_reservations(TupleType &my_input)
static void release_my_reservation(TupleType &my_input)
static void reset_my_port(InputTuple &my_input)
static bool get_my_item(InputTuple &my_input, OutputTuple &out)
static void reset_ports(InputTuple &my_input)
static void consume_reservations(TupleType &my_input)
static void reset_inputs(InputTuple &my_input, reset_flags f)
static bool get_items(InputTuple &my_input, OutputTuple &out)
static void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs)
static bool reserve(InputTuple &my_input, OutputTuple &out)
static void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs)
static void set_join_node_pointer(TupleType &my_input, PortType *port)
static void release_my_reservation(TupleType &my_input)
static void release_reservations(TupleType &my_input)
The two-phase join port.
void release()
Release the port.
bool remove_predecessor(predecessor_type &src) __TBB_override
Remove a predecessor.
internal::aggregating_functor< class_type, reserving_port_operation > handler_type
reserving_port(const reserving_port &)
void handle_operations(reserving_port_operation *op_list)
void set_join_node_pointer(forwarding_base *join)
task * try_put_task(const T &) __TBB_override
reservable_predecessor_cache< T, null_mutex > my_predecessors
aggregator< handler_type, reserving_port_operation > my_aggregator
graph & graph_reference() const __TBB_override
bool reserve(T &v)
Reserve an item from the port.
receiver< input_type >::predecessor_type predecessor_type
void consume()
Complete use of the port.
bool register_predecessor(predecessor_type &src) __TBB_override
Add a predecessor.
void reset_receiver(reset_flags f) __TBB_override
reserving_port_operation(const predecessor_type &s, op_type t)
aggregator< handler_type, queueing_port_operation > my_aggregator
internal::aggregating_functor< class_type, queueing_port_operation > handler_type
void reset_receiver(reset_flags f) __TBB_override
void handle_operations(queueing_port_operation *op_list)
void set_join_node_pointer(forwarding_base *join)
record parent for tallying available items
queueing_port(const queueing_port &)
copy constructor
receiver< input_type >::predecessor_type predecessor_type
graph & graph_reference() const __TBB_override
task * try_put_task(const T &v) __TBB_override
const K & operator()(const table_item_type &v)
type_to_key_func_type * get_my_key_func()
void handle_operations(key_matching_port_operation *op_list)
tbb::internal::strip< key_type >::type noref_key_type
matching_forwarding_base< key_type > * my_join
key_matching_port(const key_matching_port &)
receiver< input_type >::predecessor_type predecessor_type
void set_join_node_pointer(forwarding_base *join)
task * try_put_task(const input_type &v) __TBB_override
aggregator< handler_type, key_matching_port_operation > my_aggregator
graph & graph_reference() const __TBB_override
internal::aggregating_functor< class_type, key_matching_port_operation > handler_type
void set_my_key_func(type_to_key_func_type *f)
void reset_receiver(reset_flags f) __TBB_override
key_matching_port< traits > class_type
hash_buffer< key_type, input_type, type_to_key_func_type, hash_compare_type > buffer_type
bool try_get(output_type &v) __TBB_override
sender< output_type >::successor_type successor_type
join_node_base(const join_node_base &other)
bool remove_successor(successor_type &r) __TBB_override
join_node_base(graph &g, FunctionTuple f)
join_node_base< JP, InputTuple, OutputTuple > class_type
void handle_operations(join_node_base_operation *op_list)
void reset_node(reset_flags f) __TBB_override
bool register_successor(successor_type &r) __TBB_override
aggregator< handler_type, join_node_base_operation > my_aggregator
broadcast_cache< output_type, null_rw_mutex > my_successors
internal::aggregating_functor< class_type, join_node_base_operation > handler_type
join_node_FE< JP, InputTuple, OutputTuple > input_ports_type
join_node_FE : implements input port policy
join_node_base< reserving, InputTuple, OutputTuple > base_node_type
join_node_base< queueing, InputTuple, OutputTuple > base_node_type
join_node_FE< key_matching< key_type, key_hash_compare >, InputTuple, OutputTuple > class_type
hash_buffer< unref_key_type &, count_element_type, TtoK_function_body_type, key_hash_compare > key_to_count_buffer_type
task * fill_output_buffer(unref_key_type &t, bool should_enqueue, bool handle_task)
join_node_base< key_matching< key_type, key_hash_compare >, InputTuple, OutputTuple > base_node_type
task * increment_key_count(unref_key_type const &t, bool handle_task) __TBB_override
internal::type_to_key_function_body< count_element_type, unref_key_type & > TtoK_function_body_type
internal::aggregating_functor< class_type, key_matching_FE_operation > handler_type
internal::type_to_key_function_body_leaf< count_element_type, unref_key_type &, key_to_count_func > TtoK_function_body_leaf_type
join_node_base_operation(const successor_type &s, op_type t)
internal::join_node_base< JP, typename wrap_tuple_elements< N, PT, OutputTuple >::type, OutputTuple > type
internal::join_node_base< key_traits_type, typename wrap_key_tuple_elements< N, key_matching_port, key_traits_type, OutputTuple >::type, OutputTuple > type
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type
join_node_base< JP, input_ports_type, output_type > base_type
unfolded_join_node(const unfolded_join_node &other)
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
wrap_key_tuple_elements< 2, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
wrap_key_tuple_elements< 3, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
wrap_key_tuple_elements< 4, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
wrap_key_tuple_elements< 5, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4)
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
Base class for types that should not be assigned.
Definition tbb_stddef.h:322

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.