flow_graph.h

Go to the documentation of this file.
00001 /*
00002     Copyright 2005-2011 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_graph_H
00022 #define __TBB_graph_H
00023 
00024 #if !TBB_PREVIEW_GRAPH
00025 #error Set TBB_PREVIEW_GRAPH to include graph.h
00026 #endif
00027 
00028 #include "tbb_stddef.h"
00029 #include "atomic.h"
00030 #include "spin_mutex.h"
00031 #include "null_mutex.h"
00032 #include "spin_rw_mutex.h"
00033 #include "null_rw_mutex.h"
00034 #include "task.h"
00035 #include "concurrent_vector.h"
00036 #include "internal/_aggregator_impl.h"
00037 
00038 // use the VC10 or gcc version of tuple if it is available.
00039 #if TBB_IMPLEMENT_CPP0X && (!defined(_MSC_VER) || _MSC_VER < 1600)
00040 #define TBB_PREVIEW_TUPLE 1
00041 #include "compat/tuple"
00042 #else
00043 #include <tuple>
00044 #endif
00045 
00046 #include<list>
00047 #include<queue>
00048 
00059 namespace tbb {
00060 namespace flow {
00061 
00063 enum concurrency { unlimited = 0, serial = 1 };
00064 
00065 namespace interface6 {
00066 
00068 class graph_node {
00069 public:
00070     virtual ~graph_node() {} 
00071 }; 
00072 
00074 class continue_msg {};
00075         
00076 template< typename T > class sender;
00077 template< typename T > class receiver;
00078 class continue_receiver;
00079         
00081 template< typename T >
00082 class sender {
00083 public:
00085     typedef T output_type;
00086         
00088     typedef receiver<T> successor_type;
00089         
00090     virtual ~sender() {}
00091         
00093     virtual bool register_successor( successor_type &r ) = 0;
00094         
00096     virtual bool remove_successor( successor_type &r ) = 0;
00097         
00099     virtual bool try_get( T & ) { return false; }
00100         
00102     virtual bool try_reserve( T & ) { return false; }
00103         
00105     virtual bool try_release( ) { return false; }
00106         
00108     virtual bool try_consume( ) { return false; }
00109         
00110 };
00111         
00112         
00114 template< typename T >
00115 class receiver {
00116 public:
00117         
00119     typedef T input_type;
00120         
00122     typedef sender<T> predecessor_type;
00123         
00125     virtual ~receiver() {}
00126         
00128     virtual bool try_put( const T& t ) = 0;
00129         
00131     virtual bool register_predecessor( predecessor_type & ) { return false; }
00132         
00134     virtual bool remove_predecessor( predecessor_type & ) { return false; }
00135         
00136 };
00137         
00139 
00140 class continue_receiver : public receiver< continue_msg > {
00141 public:
00142         
00144     typedef continue_msg input_type;
00145         
00147     typedef sender< continue_msg > predecessor_type;
00148         
00150     continue_receiver( int number_of_predecessors = 0 ) { 
00151         my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
00152         my_current_count = 0;
00153     }
00154         
00156     continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() { 
00157         my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
00158         my_current_count = 0;
00159     }
00160         
00162     virtual ~continue_receiver() { }
00163         
00165     /* override */ bool register_predecessor( predecessor_type & ) {
00166         spin_mutex::scoped_lock l(my_mutex);
00167         ++my_predecessor_count;
00168         return true;
00169     }
00170         
00172 
00175     /* override */ bool remove_predecessor( predecessor_type & ) {
00176         spin_mutex::scoped_lock l(my_mutex);
00177         --my_predecessor_count;
00178         return true;
00179     }
00180         
00182 
00184     /* override */ bool try_put( const input_type & ) {
00185         {
00186             spin_mutex::scoped_lock l(my_mutex);
00187             if ( ++my_current_count < my_predecessor_count ) 
00188                 return true;
00189             else
00190                 my_current_count = 0;
00191         }
00192         execute();
00193         return true;
00194     }
00195         
00196 protected:
00197         
00198     spin_mutex my_mutex;
00199     int my_predecessor_count;
00200     int my_current_count;
00201     int my_initial_predecessor_count;
00202         
00204 
00206     virtual void execute() = 0;
00207         
00208 };
00209 
00210 #include "internal/_flow_graph_impl.h"
00211 using namespace internal::graph_policy_namespace;
00212 
00214 
00215 class graph : tbb::internal::no_copy {
00216         
00217     template< typename Body >
00218     class run_task : public task {
00219     public: 
00220         run_task( Body& body ) : my_body(body) {}
00221         task *execute() {
00222             my_body();
00223             return NULL;
00224         }
00225     private:
00226         Body my_body;
00227     };
00228         
00229     template< typename Receiver, typename Body >
00230     class run_and_put_task : public task {
00231     public: 
00232         run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
00233         task *execute() {
00234             my_receiver.try_put( my_body() );
00235             return NULL;
00236         }
00237     private:
00238         Receiver &my_receiver;
00239         Body my_body;
00240     };
00241         
00242 public:
00243         
00244         
00246     graph() : my_root_task( new ( task::allocate_root( ) ) empty_task ) {
00247         my_root_task->set_ref_count(1);
00248     }
00249         
00251 
00253     ~graph() {
00254         wait_for_all();
00255         my_root_task->set_ref_count(0);
00256         task::destroy( *my_root_task );
00257     }
00258         
00259         
00261 
00263     void increment_wait_count() { 
00264         if (my_root_task)
00265             my_root_task->increment_ref_count();
00266     }
00267         
00269 
00271     void decrement_wait_count() { 
00272         if (my_root_task)
00273             my_root_task->decrement_ref_count(); 
00274     }
00275         
00277 
00279     template< typename Receiver, typename Body >
00280         void run( Receiver &r, Body body ) {
00281        task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
00282            run_and_put_task< Receiver, Body >( r, body ) );
00283     }
00284         
00286 
00288     template< typename Body >
00289     void run( Body body ) {
00290        task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
00291            run_task< Body >( body ) );
00292     }
00293         
00295 
00296     void wait_for_all() {
00297         if (my_root_task)
00298             my_root_task->wait_for_all();
00299         my_root_task->set_ref_count(1);
00300     }
00301         
00303     task * root_task() {
00304         return my_root_task;
00305     }
00306         
00307 private:
00308         
00309     task *my_root_task;
00310         
00311 };
00312 
00313 #include "internal/_flow_graph_node_impl.h"
00314 
00316 template < typename Output >
00317 class source_node : public graph_node, public sender< Output > {
00318 public:
00319         
00321     typedef Output output_type;           
00322         
00324     typedef receiver< Output > successor_type;
00325         
00327     template< typename Body >
00328     source_node( graph &g, Body body, bool is_active = true )
00329         : my_root_task(g.root_task()), my_active(is_active), init_my_active(is_active),
00330         my_body( new internal::source_body_leaf< output_type, Body>(body) ),
00331         my_reserved(false), my_has_cached_item(false) 
00332     { 
00333         my_successors.set_owner(this);
00334     }
00335         
00337     source_node( const source_node& src ) :
00338 #if ( __TBB_GCC_VERSION < 40202 )
00339         graph_node(), sender<Output>(),
00340 #endif
00341         my_root_task( src.my_root_task), my_active(src.init_my_active),
00342         init_my_active(src.init_my_active), my_body( src.my_body->clone() ),
00343         my_reserved(false), my_has_cached_item(false)
00344     {
00345         my_successors.set_owner(this);
00346     }
00347 
00349     ~source_node() { delete my_body; }
00350         
00352     /* override */ bool register_successor( receiver<output_type> &r ) {
00353         spin_mutex::scoped_lock lock(my_mutex);
00354         my_successors.register_successor(r);
00355         if ( my_active )
00356             spawn_put();
00357         return true;
00358     }
00359         
00361     /* override */ bool remove_successor( receiver<output_type> &r ) {
00362         spin_mutex::scoped_lock lock(my_mutex);
00363         my_successors.remove_successor(r);
00364         return true;
00365     }
00366         
00368     /*override */ bool try_get( output_type &v ) {
00369         spin_mutex::scoped_lock lock(my_mutex);
00370         if ( my_reserved )  
00371             return false;
00372         
00373         if ( my_has_cached_item ) {
00374             v = my_cached_item;
00375             my_has_cached_item = false;
00376         } else if ( (*my_body)(v) == false ) {
00377             return false;
00378         }
00379         return true;
00380     }
00381         
00383     /* override */ bool try_reserve( output_type &v ) {
00384         spin_mutex::scoped_lock lock(my_mutex);
00385         if ( my_reserved ) {
00386             return false;
00387         }
00388         
00389         if ( !my_has_cached_item && (*my_body)(my_cached_item) )  
00390             my_has_cached_item = true;
00391         
00392         if ( my_has_cached_item ) {
00393             v = my_cached_item;
00394             my_reserved = true;
00395             return true;
00396         } else {
00397             return false;
00398         }
00399     }
00400         
00402 
00403     /* override */ bool try_release( ) {
00404         spin_mutex::scoped_lock lock(my_mutex);
00405         __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
00406         my_reserved = false;
00407         spawn_put();
00408         return true;
00409     }
00410         
00412     /* override */ bool try_consume( ) {
00413         spin_mutex::scoped_lock lock(my_mutex);
00414         __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
00415         my_reserved = false;
00416         my_has_cached_item = false;
00417         if ( !my_successors.empty() ) {
00418             spawn_put();
00419         }
00420         return true;
00421     }
00422         
00424     void activate() {
00425         spin_mutex::scoped_lock lock(my_mutex);
00426         my_active = true;
00427         if ( !my_successors.empty() )
00428             spawn_put();
00429     }
00430         
00431 private:
00432         
00433     task *my_root_task;
00434     spin_mutex my_mutex;
00435     bool my_active;
00436     bool init_my_active;
00437     internal::source_body<output_type> *my_body;
00438     internal::broadcast_cache< output_type > my_successors;
00439     bool my_reserved;
00440     bool my_has_cached_item;
00441     output_type my_cached_item;
00442         
00443     friend class internal::source_task< source_node< output_type > >;
00444         
00446     /* override */ void apply_body( ) {
00447         output_type v;
00448         if ( try_reserve(v) == false )
00449             return;
00450         
00451         if ( my_successors.try_put( v ) ) 
00452             try_consume();
00453         else
00454             try_release();
00455     }
00456         
00458     /* override */ void spawn_put( ) {
00459         task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
00460            internal::source_task< source_node< output_type > >( *this ) ); 
00461     }
00462         
00463 };
00464         
00466 template < typename Input, typename Output = continue_msg, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
00467 class function_node : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00468 public:
00469         
00470     typedef Input input_type;
00471     typedef Output output_type;
00472     typedef sender< input_type > predecessor_type;
00473     typedef receiver< output_type > successor_type;
00474         
00476     template< typename Body >
00477     function_node( graph &g, size_t concurrency, Body body )
00478     : internal::function_input<input_type,output_type,Allocator>( g, concurrency, body ) {
00479         my_successors.set_owner(this);
00480     }
00481 
00483     function_node( const function_node& src ) : 
00484 #if ( __TBB_GCC_VERSION < 40202 )
00485         graph_node(), 
00486 #endif
00487         internal::function_input<input_type,output_type,Allocator>( src )
00488 #if ( __TBB_GCC_VERSION < 40202 )
00489         , internal::function_output<Output>()
00490 #endif
00491     {
00492         my_successors.set_owner(this);
00493     }
00494         
00495 protected:
00496 
00497     internal::broadcast_cache<output_type> my_successors; 
00498     /* override */ internal::broadcast_cache<output_type> &successors () { return my_successors; }
00499         
00500 };
00501 
00503 template < typename Input, typename Output, typename Allocator >
00504 class function_node<Input,Output,queueing,Allocator> : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00505 public:
00506         
00507     typedef Input input_type;
00508     typedef Output output_type;
00509     typedef sender< input_type > predecessor_type;
00510     typedef receiver< output_type > successor_type;
00511         
00513     template< typename Body >
00514     function_node( graph &g, size_t concurrency, Body body )
00515     : internal::function_input< input_type, output_type, Allocator >( g, concurrency, body, new internal::function_input_queue< input_type, Allocator >() ) {
00516         my_successors.set_owner(this);
00517     }
00518 
00520     function_node( const function_node& src ) : 
00521 #if ( __TBB_GCC_VERSION < 40202 )
00522         graph_node(), 
00523 #endif
00524         internal::function_input<input_type,output_type,Allocator>( src, new internal::function_input_queue< input_type, Allocator >() )
00525 #if ( __TBB_GCC_VERSION < 40202 )
00526         , internal::function_output<Output>()
00527 #endif
00528     {
00529         my_successors.set_owner(this);
00530     }
00531 
00532 protected:
00533 
00534     internal::broadcast_cache<output_type> my_successors; 
00535     /* override */ internal::broadcast_cache<output_type> &successors () { return my_successors; }
00536         
00537 };
00538         
00540 template <typename Output>
00541 class continue_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
00542 public:
00543         
00544     typedef continue_msg input_type;
00545     typedef Output output_type;
00546     typedef sender< input_type > predecessor_type;
00547     typedef receiver< output_type > successor_type;
00548         
00550      template <typename Body >
00551      continue_node( graph &g, Body body )
00552              : internal::continue_input<output_type>( g, body ) {
00553          my_successors.set_owner(this);
00554      }
00555         
00557     template <typename Body >
00558     continue_node( graph &g, int number_of_predecessors, Body body )
00559         : internal::continue_input<output_type>( g, number_of_predecessors, body )
00560     {
00561         my_successors.set_owner(this);
00562     }
00563  
00565     continue_node( const continue_node& src ) :
00566 #if ( __TBB_GCC_VERSION < 40202 )
00567         graph_node(),
00568 #endif
00569         internal::continue_input<output_type>(src)
00570 #if ( __TBB_GCC_VERSION < 40202 )
00571         , internal::function_output<Output>()
00572 #endif
00573     {
00574         my_successors.set_owner(this);
00575     }
00576 
00577 protected:
00578         
00579     internal::broadcast_cache<output_type> my_successors; 
00580     /* override */ internal::broadcast_cache<output_type> &successors () { return my_successors; }
00581         
00582 };
00583         
00584 template< typename T >
00585 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
00586 public:
00587         
00588     typedef T input_type;
00589     typedef T output_type;
00590     typedef sender< input_type > predecessor_type;
00591     typedef receiver< output_type > successor_type;
00592         
00593     overwrite_node() : my_buffer_is_valid(false) {
00594         my_successors.set_owner( this );
00595     }
00596 
00597     // Copy constructor; doesn't take anything from src; default won't work
00598     overwrite_node( const overwrite_node& ) : 
00599 #if ( __TBB_GCC_VERSION < 40202 )
00600         graph_node(), receiver<T>(), sender<T>(),
00601 #endif
00602         my_buffer_is_valid(false) 
00603     {
00604         my_successors.set_owner( this );
00605     }
00606         
00607     ~overwrite_node() {}
00608         
00609     /* override */ bool register_successor( successor_type &s ) {
00610         spin_mutex::scoped_lock l( my_mutex );
00611         if ( my_buffer_is_valid ) {
00612             // We have a valid value that must be forwarded immediately.
00613             if ( s.try_put( my_buffer ) || !s.register_predecessor( *this  ) ) {
00614                 // We add the successor: it accepted our put or it rejected it but won't let use become a predecessor
00615                 my_successors.register_successor( s );
00616                 return true;
00617             } else {
00618                 // We don't add the successor: it rejected our put and we became its predecessor instead
00619                 return false;
00620             }
00621         } else {
00622             // No valid value yet, just add as successor
00623             my_successors.register_successor( s );
00624             return true;
00625         }
00626     }
00627         
00628     /* override */ bool remove_successor( successor_type &s ) {
00629         spin_mutex::scoped_lock l( my_mutex );
00630         my_successors.remove_successor(s);
00631         return true;
00632     }
00633         
00634     /* override */ bool try_put( const T &v ) {
00635         spin_mutex::scoped_lock l( my_mutex );
00636         my_buffer = v;
00637         my_buffer_is_valid = true;
00638         my_successors.try_put(v);
00639         return true;
00640     }
00641         
00642     /* override */ bool try_get( T &v ) {
00643         spin_mutex::scoped_lock l( my_mutex );
00644         if ( my_buffer_is_valid ) {
00645             v = my_buffer;
00646             return true;
00647         } else {
00648             return false;
00649         }
00650     }
00651         
00652     bool is_valid() {
00653        spin_mutex::scoped_lock l( my_mutex );
00654        return my_buffer_is_valid;
00655     }
00656         
00657     void clear() {
00658        spin_mutex::scoped_lock l( my_mutex );
00659        my_buffer_is_valid = false;
00660     }
00661         
00662 protected:
00663         
00664     spin_mutex my_mutex;
00665     internal::broadcast_cache< T, null_rw_mutex > my_successors;
00666     T my_buffer;
00667     bool my_buffer_is_valid;
00668         
00669 };
00670         
00671 template< typename T >
00672 class write_once_node : public overwrite_node<T> {
00673 public:
00674         
00675     typedef T input_type;
00676     typedef T output_type;
00677     typedef sender< input_type > predecessor_type;
00678     typedef receiver< output_type > successor_type;
00679         
00681     write_once_node() : overwrite_node<T>() {}
00682 
00684     write_once_node( const write_once_node& src ) : overwrite_node<T>(src) {}
00685 
00686     /* override */ bool try_put( const T &v ) {
00687         spin_mutex::scoped_lock l( this->my_mutex );
00688         if ( this->my_buffer_is_valid ) {
00689             return false;
00690         } else {
00691             this->my_buffer = v;
00692             this->my_buffer_is_valid = true;
00693             this->my_successors.try_put(v);
00694             return true;
00695         }
00696     }
00697 };
00698         
00700 template <typename T>
00701 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
00702         
00703     internal::broadcast_cache<T> my_successors;
00704         
00705 public:
00706         
00707     typedef T input_type;
00708     typedef T output_type;
00709     typedef sender< input_type > predecessor_type;
00710     typedef receiver< output_type > successor_type;
00711         
00712     broadcast_node( ) {
00713        my_successors.set_owner( this ); 
00714     }
00715         
00716     // Copy constructor
00717     broadcast_node( const broadcast_node& ) 
00718 #if ( __TBB_GCC_VERSION < 40202 )
00719         : graph_node(), receiver<T>(), sender<T>()
00720 #endif
00721     {
00722        my_successors.set_owner( this ); 
00723     }
00724         
00726     virtual bool register_successor( receiver<T> &r ) {
00727         my_successors.register_successor( r );
00728         return true;
00729     }
00730         
00732     virtual bool remove_successor( receiver<T> &r ) {
00733         my_successors.remove_successor( r );
00734         return true;
00735     }
00736         
00737     /* override */ bool try_put( const T &t ) {
00738         my_successors.try_put(t);
00739         return true;
00740     }
00741         
00742 };
00743 
00744 #include "internal/_flow_graph_item_buffer_impl.h"
00745 
00747 template <typename T, typename A=cache_aligned_allocator<T> >
00748 class buffer_node : public graph_node, public reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
00749 public:
00750     typedef T input_type;
00751     typedef T output_type;
00752     typedef sender< input_type > predecessor_type;
00753     typedef receiver< output_type > successor_type;
00754     typedef buffer_node<T, A> my_class;
00755 protected:
00756     typedef size_t size_type;
00757     internal::round_robin_cache< T, null_rw_mutex > my_successors;
00758         
00759     task *my_parent;
00760         
00761     friend class internal::forward_task< buffer_node< T, A > >;
00762         
00763     enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd};
00764     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
00765         
00766     // implements the aggregator_operation concept
00767     class buffer_operation : public internal::aggregated_operation< buffer_operation > {
00768     public:
00769         char type;
00770         T *elem;
00771         successor_type *r;
00772         buffer_operation(const T& e, op_type t) :
00773             type(char(t)), elem(const_cast<T*>(&e)), r(NULL) {}
00774         buffer_operation(op_type t) : type(char(t)), r(NULL) {}
00775     };
00776         
00777     bool forwarder_busy;
00778     typedef internal::aggregating_functor<my_class, buffer_operation> my_handler;
00779     friend class internal::aggregating_functor<my_class, buffer_operation>;
00780     internal::aggregator< my_handler, buffer_operation> my_aggregator;
00781         
00782     virtual void handle_operations(buffer_operation *op_list) {
00783         buffer_operation *tmp;
00784         bool try_forwarding=false;
00785         while (op_list) {
00786             tmp = op_list;
00787             op_list = op_list->next;
00788             switch (tmp->type) {
00789             case reg_succ: internal_reg_succ(tmp);  try_forwarding = true; break;
00790             case rem_succ: internal_rem_succ(tmp); break;
00791             case req_item: internal_pop(tmp); break;
00792             case res_item: internal_reserve(tmp); break;
00793             case rel_res:  internal_release(tmp);  try_forwarding = true; break;
00794             case con_res:  internal_consume(tmp);  try_forwarding = true; break;
00795             case put_item: internal_push(tmp);  try_forwarding = true; break;
00796             case try_fwd:  internal_forward(tmp); break;
00797             }
00798         }
00799         if (try_forwarding && !forwarder_busy) {
00800             forwarder_busy = true;
00801             task::enqueue(*new(task::allocate_additional_child_of(*my_parent)) internal::forward_task< buffer_node<input_type, A> >(*this));
00802         }
00803     }
00804         
00806     virtual void forward() {
00807         buffer_operation op_data(try_fwd);
00808         do {
00809             op_data.status = WAIT;
00810             my_aggregator.execute(&op_data);
00811         } while (op_data.status == SUCCEEDED);
00812     }
00813         
00815     virtual void internal_reg_succ(buffer_operation *op) {
00816         my_successors.register_successor(*(op->r));
00817         __TBB_store_with_release(op->status, SUCCEEDED);
00818     }
00819         
00821     virtual void internal_rem_succ(buffer_operation *op) {
00822         my_successors.remove_successor(*(op->r));
00823         __TBB_store_with_release(op->status, SUCCEEDED);
00824     }
00825         
00827     virtual void internal_forward(buffer_operation *op) {
00828         T i_copy;
00829         bool success = false; // flagged when a successor accepts
00830         size_type counter = my_successors.size();
00831         // Try forwarding, giving each successor a chance
00832         while (counter>0 && !this->buffer_empty() && this->item_valid(this->my_tail-1)) {
00833             this->fetch_back(i_copy);
00834             if( my_successors.try_put(i_copy) ) {
00835                 this->invalidate_back();
00836                 --(this->my_tail);
00837                 success = true; // found an accepting successor
00838             }
00839             --counter;
00840         }
00841         if (success && !counter)
00842             __TBB_store_with_release(op->status, SUCCEEDED);
00843         else {
00844             __TBB_store_with_release(op->status, FAILED);
00845             forwarder_busy = false;
00846         }
00847     }
00848         
00849     virtual void internal_push(buffer_operation *op) {
00850         this->push_back(*(op->elem));
00851         __TBB_store_with_release(op->status, SUCCEEDED);
00852     }
00853         
00854     virtual void internal_pop(buffer_operation *op) {
00855         if(this->pop_back(*(op->elem))) {
00856             __TBB_store_with_release(op->status, SUCCEEDED);
00857         }
00858         else {
00859             __TBB_store_with_release(op->status, FAILED);
00860         }
00861     }
00862         
00863     virtual void internal_reserve(buffer_operation *op) {
00864         if(this->reserve_front(*(op->elem))) {
00865             __TBB_store_with_release(op->status, SUCCEEDED);
00866         }
00867         else {
00868             __TBB_store_with_release(op->status, FAILED);
00869         }
00870     }
00871         
00872     virtual void internal_consume(buffer_operation *op) {
00873         this->consume_front();
00874         __TBB_store_with_release(op->status, SUCCEEDED);
00875     }
00876         
00877     virtual void internal_release(buffer_operation *op) {
00878         this->release_front();
00879         __TBB_store_with_release(op->status, SUCCEEDED);
00880     }
00881         
00882 public:
00884     buffer_node( graph &g ) : reservable_item_buffer<T>(),
00885         my_parent( g.root_task() ), forwarder_busy(false) {
00886         my_successors.set_owner(this);
00887         my_aggregator.initialize_handler(my_handler(this));
00888     }
00889 
00891     buffer_node( const buffer_node& src ) : 
00892 #if ( __TBB_GCC_VERSION < 40202 )
00893         graph_node(), 
00894 #endif
00895         reservable_item_buffer<T>(),
00896 #if ( __TBB_GCC_VERSION < 40202 )
00897         receiver<T>(), sender<T>(),
00898 #endif
00899         my_parent( src.my_parent )  
00900     {
00901         forwarder_busy = false;
00902         my_successors.set_owner(this);
00903         my_aggregator.initialize_handler(my_handler(this));
00904     }
00905 
00906     virtual ~buffer_node() {}
00907         
00908     //
00909     // message sender implementation
00910     //
00911         
00913 
00914     /* override */ bool register_successor( receiver<output_type> &r ) {
00915         buffer_operation op_data(reg_succ);
00916         op_data.r = &r;
00917         my_aggregator.execute(&op_data);
00918         return true;
00919     }
00920         
00922 
00924     /* override */ bool remove_successor( receiver<output_type> &r ) {
00925         r.remove_predecessor(*this);
00926         buffer_operation op_data(rem_succ);
00927         op_data.r = &r;
00928         my_aggregator.execute(&op_data);
00929         return true;
00930     }
00931         
00933 
00935     /* override */ bool try_get( T &v ) {
00936         buffer_operation op_data(req_item);
00937         op_data.elem = &v;
00938         my_aggregator.execute(&op_data);
00939         return (op_data.status==SUCCEEDED);
00940     }
00941         
00943 
00945     /* override */ bool try_reserve( T &v ) {
00946         buffer_operation op_data(res_item);
00947         op_data.elem = &v;
00948         my_aggregator.execute(&op_data);
00949         return (op_data.status==SUCCEEDED);
00950     }
00951         
00953 
00954     /* override */ bool try_release() {
00955         buffer_operation op_data(rel_res);
00956         my_aggregator.execute(&op_data);
00957         return true;
00958     }
00959         
00961 
00962     /* override */ bool try_consume() {
00963         buffer_operation op_data(con_res);
00964         my_aggregator.execute(&op_data);
00965         return true;
00966     }
00967         
00969 
00970     /* override */ bool try_put(const T &t) {
00971         buffer_operation op_data(t, put_item);
00972         my_aggregator.execute(&op_data);
00973         return true;
00974     }
00975 };
00976         
00977         
00979 template <typename T, typename A=cache_aligned_allocator<T> >
00980 class queue_node : public buffer_node<T, A> {
00981 protected:
00982 typedef typename buffer_node<T, A>::size_type size_type;
00983 typedef typename buffer_node<T, A>::buffer_operation queue_operation;
00984         
00985     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
00986         
00988     /* override */ void internal_forward(queue_operation *op) {
00989         T i_copy;
00990         bool success = false; // flagged when a successor accepts
00991         size_type counter = this->my_successors.size();
00992         if (this->my_reserved || !this->item_valid(this->my_head)){
00993             __TBB_store_with_release(op->status, FAILED);
00994             this->forwarder_busy = false;
00995             return;
00996         }
00997         // Keep trying to send items while there is at least one accepting successor
00998         while (counter>0 && this->item_valid(this->my_head)) {
00999             this->fetch_front(i_copy);
01000             if(this->my_successors.try_put(i_copy)) {
01001                  this->invalidate_front();
01002                  ++(this->my_head);
01003                 success = true; // found an accepting successor
01004             }
01005             --counter;
01006         }
01007         if (success && !counter)
01008             __TBB_store_with_release(op->status, SUCCEEDED);
01009         else {
01010             __TBB_store_with_release(op->status, FAILED);
01011             this->forwarder_busy = false;
01012         }
01013     }
01014         
01015     /* override */ void internal_pop(queue_operation *op) {
01016         if ( this->my_reserved || !this->item_valid(this->my_head)){
01017             __TBB_store_with_release(op->status, FAILED);
01018         }
01019         else {
01020             this->pop_front(*(op->elem));
01021             __TBB_store_with_release(op->status, SUCCEEDED);
01022         }
01023     }
01024     /* override */ void internal_reserve(queue_operation *op) {
01025         if (this->my_reserved || !this->item_valid(this->my_head)) {
01026             __TBB_store_with_release(op->status, FAILED);
01027         }
01028         else {
01029             this->my_reserved = true;
01030             this->fetch_front(*(op->elem));
01031             this->invalidate_front();
01032             __TBB_store_with_release(op->status, SUCCEEDED);
01033         }
01034     }
01035     /* override */ void internal_consume(queue_operation *op) {
01036         this->consume_front();
01037         __TBB_store_with_release(op->status, SUCCEEDED);
01038     }
01039         
01040 public:
01041         
01042     typedef T input_type;
01043     typedef T output_type;
01044     typedef sender< input_type > predecessor_type;
01045     typedef receiver< output_type > successor_type;
01046         
01048     queue_node( graph &g ) : buffer_node<T, A>(g) {}
01049 
01051     queue_node( const queue_node& src) : buffer_node<T, A>(src) {}
01052 };
01053         
01055 template< typename T, typename A=cache_aligned_allocator<T> >
01056 class sequencer_node : public queue_node<T, A> {
01057     internal::function_body< T, size_t > *my_sequencer;
01058 public:
01059         
01060     typedef T input_type;
01061     typedef T output_type;
01062     typedef sender< input_type > predecessor_type;
01063     typedef receiver< output_type > successor_type;
01064         
01066     template< typename Sequencer >
01067     sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
01068         my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {}
01069 
01071     sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
01072         my_sequencer( src.my_sequencer->clone() ) {}
01073         
01075     ~sequencer_node() { delete my_sequencer; }
01076 protected:
01077     typedef typename buffer_node<T, A>::size_type size_type;
01078     typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
01079         
01080     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01081         
01082 private:
01083     /* override */ void internal_push(sequencer_operation *op) {
01084         size_type tag = (*my_sequencer)(*(op->elem));
01085         
01086         this->my_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
01087         
01088         if(this->size() > this->capacity())
01089             this->grow_my_array(this->size());  // tail already has 1 added to it
01090         this->item(tag) = std::make_pair( *(op->elem), true );
01091         __TBB_store_with_release(op->status, SUCCEEDED);
01092     }
01093 };
01094         
01096 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
01097 class priority_queue_node : public buffer_node<T, A> {
01098 public:
01099     typedef T input_type;
01100     typedef T output_type;
01101     typedef sender< input_type > predecessor_type;
01102     typedef receiver< output_type > successor_type;
01103         
01105     priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {}
01106 
01108     priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {}
01109         
01110 protected:
01111     typedef typename buffer_node<T, A>::size_type size_type;
01112     typedef typename buffer_node<T, A>::item_type item_type;
01113     typedef typename buffer_node<T, A>::buffer_operation prio_operation;
01114         
01115     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01116         
01117     /* override */ void handle_operations(prio_operation *op_list) {
01118         prio_operation *tmp /*, *pop_list*/ ;
01119         bool try_forwarding=false;
01120         while (op_list) {
01121             tmp = op_list;
01122             op_list = op_list->next;
01123             switch (tmp->type) {
01124             case buffer_node<T, A>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
01125             case buffer_node<T, A>::rem_succ: this->internal_rem_succ(tmp); break;
01126             case buffer_node<T, A>::put_item: internal_push(tmp); try_forwarding = true; break;
01127             case buffer_node<T, A>::try_fwd: internal_forward(tmp); break;
01128             case buffer_node<T, A>::rel_res: internal_release(tmp); try_forwarding = true; break;
01129             case buffer_node<T, A>::con_res: internal_consume(tmp); try_forwarding = true; break;
01130             case buffer_node<T, A>::req_item: internal_pop(tmp); break;
01131             case buffer_node<T, A>::res_item: internal_reserve(tmp); break;
01132             }
01133         }
01134         // process pops!  for now, no special pop processing
01135         if (mark<this->my_tail) heapify();
01136         if (try_forwarding && !this->forwarder_busy) {
01137             this->forwarder_busy = true;
01138             task::enqueue(*new(task::allocate_additional_child_of(*(this->my_parent))) internal::forward_task< buffer_node<input_type, A> >(*this));
01139         }
01140     }
01141         
01143     /* override */ void internal_forward(prio_operation *op) {
01144         T i_copy;
01145         bool success = false; // flagged when a successor accepts
01146         size_type counter = this->my_successors.size();
01147         
01148         if (this->my_reserved || this->my_tail == 0) {
01149             __TBB_store_with_release(op->status, FAILED);
01150             this->forwarder_busy = false;
01151             return;
01152         }
01153         // Keep trying to send while there exists an accepting successor
01154         while (counter>0 && this->my_tail > 0) {
01155             i_copy = this->my_array[0].first;
01156             bool msg = this->my_successors.try_put(i_copy);
01157             if ( msg == true ) {
01158                  if (mark == this->my_tail) --mark;
01159                 --(this->my_tail);
01160                 this->my_array[0].first=this->my_array[this->my_tail].first;
01161                 if (this->my_tail > 1) // don't reheap for heap of size 1
01162                     reheap();
01163                 success = true; // found an accepting successor
01164             }
01165             --counter;
01166         }
01167         if (success && !counter)
01168             __TBB_store_with_release(op->status, SUCCEEDED);
01169         else {
01170             __TBB_store_with_release(op->status, FAILED);
01171             this->forwarder_busy = false;
01172         }
01173     }
01174         
01175     /* override */ void internal_push(prio_operation *op) {
01176         if ( this->my_tail >= this->my_array_size )
01177             this->grow_my_array( this->my_tail + 1 );
01178         this->my_array[this->my_tail] = std::make_pair( *(op->elem), true );
01179         ++(this->my_tail);
01180         __TBB_store_with_release(op->status, SUCCEEDED);
01181     }
01182     /* override */ void internal_pop(prio_operation *op) {
01183         if ( this->my_reserved == true || this->my_tail == 0 ) {
01184             __TBB_store_with_release(op->status, FAILED);
01185         }
01186         else {
01187             if (mark<this->my_tail &&
01188                 compare(this->my_array[0].first,
01189                         this->my_array[this->my_tail-1].first)) {
01190                 // there are newly pushed elems; last one higher than top
01191                 // copy the data
01192                 *(op->elem) = this->my_array[this->my_tail-1].first;
01193                 --(this->my_tail);
01194                 __TBB_store_with_release(op->status, SUCCEEDED);
01195             }
01196             else { // extract and push the last element down heap
01197                 *(op->elem) = this->my_array[0].first; // copy the data
01198                 if (mark == this->my_tail) --mark;
01199                 --(this->my_tail);
01200                 __TBB_store_with_release(op->status, SUCCEEDED);
01201                 this->my_array[0].first=this->my_array[this->my_tail].first;
01202                 if (this->my_tail > 1) // don't reheap for heap of size 1
01203                     reheap();
01204             }
01205         }
01206     }
01207     /* override */ void internal_reserve(prio_operation *op) {
01208         if (this->my_reserved == true || this->my_tail == 0) {
01209             __TBB_store_with_release(op->status, FAILED);
01210         }
01211         else {
01212             this->my_reserved = true;
01213             *(op->elem) = reserved_item = this->my_array[0].first;
01214             if (mark == this->my_tail) --mark;
01215             --(this->my_tail);
01216             __TBB_store_with_release(op->status, SUCCEEDED);
01217             this->my_array[0].first = this->my_array[this->my_tail].first;
01218             if (this->my_tail > 1) // don't reheap for heap of size 1
01219                 reheap();
01220         }
01221     }
01222     /* override */ void internal_consume(prio_operation *op) {
01223         this->my_reserved = false;
01224         __TBB_store_with_release(op->status, SUCCEEDED);
01225     }
01226     /* override */ void internal_release(prio_operation *op) {
01227         if (this->my_tail >= this->my_array_size)
01228             this->grow_my_array( this->my_tail + 1 );
01229         this->my_array[this->my_tail] = std::make_pair(reserved_item, true);
01230         ++(this->my_tail);
01231         this->my_reserved = false;
01232         __TBB_store_with_release(op->status, SUCCEEDED);
01233         heapify();
01234     }
01235 private:
01236     Compare compare;
01237     size_type mark;
01238     input_type reserved_item;
01239         
01240     void heapify() {
01241         if (!mark) mark = 1;
01242         for (; mark<this->my_tail; ++mark) { // for each unheaped element
01243             size_type cur_pos = mark;
01244             input_type to_place = this->my_array[mark].first;
01245             do { // push to_place up the heap
01246                 size_type parent = (cur_pos-1)>>1;
01247                 if (!compare(this->my_array[parent].first, to_place))
01248                     break;
01249                 this->my_array[cur_pos].first = this->my_array[parent].first;
01250                 cur_pos = parent;
01251             } while( cur_pos );
01252             this->my_array[cur_pos].first = to_place;
01253         }
01254     }
01255         
01256     void reheap() {
01257         size_type cur_pos=0, child=1;
01258         while (child < mark) {
01259             size_type target = child;
01260             if (child+1<mark &&
01261                 compare(this->my_array[child].first,
01262                         this->my_array[child+1].first))
01263                 ++target;
01264             // target now has the higher priority child
01265             if (compare(this->my_array[target].first,
01266                         this->my_array[this->my_tail].first))
01267                 break;
01268             this->my_array[cur_pos].first = this->my_array[target].first;
01269             cur_pos = target;
01270             child = (cur_pos<<1)+1;
01271         }
01272         this->my_array[cur_pos].first = this->my_array[this->my_tail].first;
01273     }
01274 };
01275         
01277 
01280 template< typename T >
01281 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
01282 public:
01283         
01284     typedef T input_type;
01285     typedef T output_type;
01286     typedef sender< input_type > predecessor_type;
01287     typedef receiver< output_type > successor_type;
01288         
01289 private:
01290         
01291     task *my_root_task;
01292     size_t my_threshold;
01293     size_t my_count;
01294     internal::predecessor_cache< T > my_predecessors;
01295     spin_mutex my_mutex;
01296     internal::broadcast_cache< T > my_successors;
01297     int init_decrement_predecessors;
01298 
01299     friend class internal::forward_task< limiter_node<T> >;
01300         
01301     // Let decrementer call decrement_counter()
01302     friend class internal::decrementer< limiter_node<T> >;
01303         
01304     void decrement_counter() {
01305         input_type v;
01306         
01307         // If we can't get / put an item immediately then drop the count
01308         if ( my_predecessors.get_item( v ) == false 
01309              || my_successors.try_put(v) == false ) {
01310             spin_mutex::scoped_lock lock(my_mutex);
01311             --my_count;
01312             if ( !my_predecessors.empty() ) 
01313                 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
01314                             internal::forward_task< limiter_node<T> >( *this ) );
01315         }
01316     }
01317         
01318     void forward() {
01319         {
01320             spin_mutex::scoped_lock lock(my_mutex);
01321             if ( my_count < my_threshold ) 
01322                 ++my_count;
01323             else
01324                 return;
01325         }
01326         decrement_counter();
01327     }
01328         
01329 public:
01330         
01332     internal::decrementer< limiter_node<T> > decrement;
01333         
01335     limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0) : 
01336         my_root_task(g.root_task()), my_threshold(threshold), my_count(0), 
01337         init_decrement_predecessors(num_decrement_predecessors), 
01338         decrement(num_decrement_predecessors) 
01339     {
01340         my_predecessors.set_owner(this);
01341         my_successors.set_owner(this);
01342         decrement.set_owner(this);
01343     }
01344         
01346     limiter_node( const limiter_node& src ) : 
01347 #if ( __TBB_GCC_VERSION < 40202 )
01348         graph_node(), receiver<T>(), sender<T>(),
01349 #endif
01350         my_root_task(src.my_root_task), my_threshold(src.my_threshold), my_count(0), 
01351         init_decrement_predecessors(src.init_decrement_predecessors), 
01352         decrement(src.init_decrement_predecessors) 
01353     {
01354         my_predecessors.set_owner(this);
01355         my_successors.set_owner(this);
01356         decrement.set_owner(this);
01357     }
01358 
01360     /* override */ bool register_successor( receiver<output_type> &r ) {
01361         my_successors.register_successor(r);
01362         return true;
01363     }
01364         
01366 
01367     /* override */ bool remove_successor( receiver<output_type> &r ) {
01368         r.remove_predecessor(*this);
01369         my_successors.remove_successor(r);
01370         return true;
01371     }
01372         
01374     /* override */ bool try_put( const T &t ) {
01375         {
01376             spin_mutex::scoped_lock lock(my_mutex);
01377             if ( my_count >= my_threshold ) 
01378                 return false;
01379             else
01380                 ++my_count; 
01381         }
01382         
01383         bool msg = my_successors.try_put(t);
01384         
01385         if ( msg != true ) {
01386             spin_mutex::scoped_lock lock(my_mutex);
01387             --my_count;
01388             if ( !my_predecessors.empty() ) 
01389                 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
01390                             internal::forward_task< limiter_node<T> >( *this ) );
01391         }
01392         
01393         return msg;
01394     }
01395         
01397     /* override */ bool register_predecessor( predecessor_type &src ) {
01398         spin_mutex::scoped_lock lock(my_mutex);
01399         my_predecessors.add( src );
01400         if ( my_count < my_threshold && !my_successors.empty() ) 
01401             task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
01402                            internal::forward_task< limiter_node<T> >( *this ) );
01403         return true;
01404     }
01405         
01407     /* override */ bool remove_predecessor( predecessor_type &src ) {
01408         my_predecessors.remove( src );
01409         return true;
01410     }
01411         
01412 };
01413 
01414 #include "internal/_flow_graph_join_impl.h"
01415 
01416 using internal::reserving_port;
01417 using internal::queueing_port;
01418 using internal::tag_matching_port;
01419 using internal::input_port;
01420 using internal::tag_value;
01421 using internal::NO_TAG;
01422 
01423 template<typename OutputTuple, graph_buffer_policy JP=queueing> class join_node;
01424 
01425 template<typename OutputTuple>
01426 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
01427 private:
01428     static const int N = std::tuple_size<OutputTuple>::value;
01429     typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
01430 public:
01431     typedef OutputTuple output_type;
01432     typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
01433     join_node(graph &g) : unfolded_type(g) { }
01434     join_node(const join_node &other) : unfolded_type(other) {}
01435 };
01436 
01437 template<typename OutputTuple>
01438 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
01439 private:
01440     static const int N = std::tuple_size<OutputTuple>::value;
01441     typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
01442 public:
01443     typedef OutputTuple output_type;
01444     typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
01445     join_node(graph &g) : unfolded_type(g) { }
01446     join_node(const join_node &other) : unfolded_type(other) {}
01447 };
01448 
01449 // template for tag_matching join_node
01450 template<typename OutputTuple>
01451 class join_node<OutputTuple, tag_matching> : public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value,
01452       tag_matching_port, OutputTuple, tag_matching> {
01453 private:
01454     static const int N = std::tuple_size<OutputTuple>::value;
01455     typedef typename internal::unfolded_join_node<N, tag_matching_port, OutputTuple, tag_matching> unfolded_type;
01456 public:
01457     typedef OutputTuple output_type;
01458     typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
01459     template<typename B0, typename B1>
01460     join_node(graph &g, B0 b0, B1 b1) : unfolded_type(g, b0, b1) { }
01461     template<typename B0, typename B1, typename B2>
01462     join_node(graph &g, B0 b0, B1 b1, B2 b2) : unfolded_type(g, b0, b1, b2) { }
01463     template<typename B0, typename B1, typename B2, typename B3>
01464     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3) : unfolded_type(g, b0, b1, b2, b3) { }
01465     template<typename B0, typename B1, typename B2, typename B3, typename B4>
01466     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4) : unfolded_type(g, b0, b1, b2, b3, b4) { }
01467     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5>
01468     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5) : unfolded_type(g, b0, b1, b2, b3, b4, b5) { }
01469     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6>
01470     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) { }
01471     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7>
01472     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) { }
01473     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8>
01474     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) { }
01475     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8, typename B9>
01476     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8, B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) { }
01477     join_node(const join_node &other) : unfolded_type(other) {}
01478 };
01479 
01480 // or node
01481 #include "internal/_flow_graph_or_impl.h"
01482 
01483 template<typename InputTuple>
01484 class or_node : public internal::unfolded_or_node<InputTuple> {
01485 private:
01486     static const int N = std::tuple_size<InputTuple>::value;
01487 public:
01488     typedef typename internal::or_output_type<N,InputTuple>::type output_type;
01489     typedef typename internal::unfolded_or_node<InputTuple> unfolded_type;
01490     or_node() : unfolded_type() { }
01491     // Copy constructor
01492     or_node( const or_node& /*other*/ ) : unfolded_type() { }
01493 };
01494 
01496 template< typename T >
01497 inline void make_edge( sender<T> &p, receiver<T> &s ) {
01498     p.register_successor( s );
01499 }
01500         
01502 template< typename T >
01503 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
01504     p.remove_successor( s );
01505 }
01506 
01508 template< typename Body, typename Node >
01509 Body copy_body( Node &n ) {
01510     return n.template copy_function_object<Body>();
01511 }
01512         
01513         
01514 } // interface6
01515 
01516     using interface6::graph;
01517     using interface6::graph_node;
01518     using interface6::continue_msg;
01519     using interface6::sender;
01520     using interface6::receiver;
01521     using interface6::continue_receiver;
01522 
01523     using interface6::source_node;
01524     using interface6::function_node;
01525     using interface6::continue_node;
01526     using interface6::overwrite_node;
01527     using interface6::write_once_node;
01528     using interface6::broadcast_node;
01529     using interface6::buffer_node;
01530     using interface6::queue_node;
01531     using interface6::sequencer_node;
01532     using interface6::priority_queue_node;
01533     using interface6::limiter_node;
01534     using namespace interface6::internal::graph_policy_namespace;
01535     using interface6::join_node;
01536     using interface6::or_node;
01537     using interface6::input_port;
01538     using interface6::copy_body; 
01539     using interface6::make_edge; 
01540     using interface6::remove_edge; 
01541     using interface6::internal::NO_TAG;
01542     using interface6::internal::tag_value;
01543 
01544 } // graph
01545 } // tbb
01546 
01547 #endif
01548 

Copyright © 2005-2011 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.