00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_graph_H
00022 #define __TBB_graph_H
00023
00024 #if !TBB_PREVIEW_GRAPH
00025 #error Set TBB_PREVIEW_GRAPH to include graph.h
00026 #endif
00027
00028 #include "tbb_stddef.h"
00029 #include "atomic.h"
00030 #include "spin_mutex.h"
00031 #include "null_mutex.h"
00032 #include "spin_rw_mutex.h"
00033 #include "null_rw_mutex.h"
00034 #include "task.h"
00035 #include "concurrent_vector.h"
00036 #include "internal/_aggregator_impl.h"
00037
00038
00039 #if TBB_IMPLEMENT_CPP0X && (!defined(_MSC_VER) || _MSC_VER < 1600)
00040 #define TBB_PREVIEW_TUPLE 1
00041 #include "compat/tuple"
00042 #else
00043 #include <tuple>
00044 #endif
00045
00046 #include<list>
00047 #include<queue>
00048
00059 namespace tbb {
00060 namespace flow {
00061
00063 enum concurrency { unlimited = 0, serial = 1 };
00064
00065 namespace interface6 {
00066
00068 class graph_node {
00069 public:
00070 virtual ~graph_node() {}
00071 };
00072
00074 class continue_msg {};
00075
00076 template< typename T > class sender;
00077 template< typename T > class receiver;
00078 class continue_receiver;
00079
00081 template< typename T >
00082 class sender {
00083 public:
00085 typedef T output_type;
00086
00088 typedef receiver<T> successor_type;
00089
00090 virtual ~sender() {}
00091
00093 virtual bool register_successor( successor_type &r ) = 0;
00094
00096 virtual bool remove_successor( successor_type &r ) = 0;
00097
00099 virtual bool try_get( T & ) { return false; }
00100
00102 virtual bool try_reserve( T & ) { return false; }
00103
00105 virtual bool try_release( ) { return false; }
00106
00108 virtual bool try_consume( ) { return false; }
00109
00110 };
00111
00112
00114 template< typename T >
00115 class receiver {
00116 public:
00117
00119 typedef T input_type;
00120
00122 typedef sender<T> predecessor_type;
00123
00125 virtual ~receiver() {}
00126
00128 virtual bool try_put( const T& t ) = 0;
00129
00131 virtual bool register_predecessor( predecessor_type & ) { return false; }
00132
00134 virtual bool remove_predecessor( predecessor_type & ) { return false; }
00135
00136 };
00137
00139
00140 class continue_receiver : public receiver< continue_msg > {
00141 public:
00142
00144 typedef continue_msg input_type;
00145
00147 typedef sender< continue_msg > predecessor_type;
00148
00150 continue_receiver( int number_of_predecessors = 0 ) {
00151 my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
00152 my_current_count = 0;
00153 }
00154
00156 continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
00157 my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
00158 my_current_count = 0;
00159 }
00160
00162 virtual ~continue_receiver() { }
00163
00165 bool register_predecessor( predecessor_type & ) {
00166 spin_mutex::scoped_lock l(my_mutex);
00167 ++my_predecessor_count;
00168 return true;
00169 }
00170
00172
00175 bool remove_predecessor( predecessor_type & ) {
00176 spin_mutex::scoped_lock l(my_mutex);
00177 --my_predecessor_count;
00178 return true;
00179 }
00180
00182
00184 bool try_put( const input_type & ) {
00185 {
00186 spin_mutex::scoped_lock l(my_mutex);
00187 if ( ++my_current_count < my_predecessor_count )
00188 return true;
00189 else
00190 my_current_count = 0;
00191 }
00192 execute();
00193 return true;
00194 }
00195
00196 protected:
00197
00198 spin_mutex my_mutex;
00199 int my_predecessor_count;
00200 int my_current_count;
00201 int my_initial_predecessor_count;
00202
00204
00206 virtual void execute() = 0;
00207
00208 };
00209
00210 #include "internal/_flow_graph_impl.h"
00211 using namespace internal::graph_policy_namespace;
00212
00214
00215 class graph : tbb::internal::no_copy {
00216
00217 template< typename Body >
00218 class run_task : public task {
00219 public:
00220 run_task( Body& body ) : my_body(body) {}
00221 task *execute() {
00222 my_body();
00223 return NULL;
00224 }
00225 private:
00226 Body my_body;
00227 };
00228
00229 template< typename Receiver, typename Body >
00230 class run_and_put_task : public task {
00231 public:
00232 run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
00233 task *execute() {
00234 my_receiver.try_put( my_body() );
00235 return NULL;
00236 }
00237 private:
00238 Receiver &my_receiver;
00239 Body my_body;
00240 };
00241
00242 public:
00243
00244
00246 graph() : my_root_task( new ( task::allocate_root( ) ) empty_task ) {
00247 my_root_task->set_ref_count(1);
00248 }
00249
00251
00253 ~graph() {
00254 wait_for_all();
00255 my_root_task->set_ref_count(0);
00256 task::destroy( *my_root_task );
00257 }
00258
00259
00261
00263 void increment_wait_count() {
00264 if (my_root_task)
00265 my_root_task->increment_ref_count();
00266 }
00267
00269
00271 void decrement_wait_count() {
00272 if (my_root_task)
00273 my_root_task->decrement_ref_count();
00274 }
00275
00277
00279 template< typename Receiver, typename Body >
00280 void run( Receiver &r, Body body ) {
00281 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00282 run_and_put_task< Receiver, Body >( r, body ) );
00283 }
00284
00286
00288 template< typename Body >
00289 void run( Body body ) {
00290 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00291 run_task< Body >( body ) );
00292 }
00293
00295
00296 void wait_for_all() {
00297 if (my_root_task)
00298 my_root_task->wait_for_all();
00299 my_root_task->set_ref_count(1);
00300 }
00301
00303 task * root_task() {
00304 return my_root_task;
00305 }
00306
00307 private:
00308
00309 task *my_root_task;
00310
00311 };
00312
00313 #include "internal/_flow_graph_node_impl.h"
00314
00316 template < typename Output >
00317 class source_node : public graph_node, public sender< Output > {
00318 public:
00319
00321 typedef Output output_type;
00322
00324 typedef receiver< Output > successor_type;
00325
00327 template< typename Body >
00328 source_node( graph &g, Body body, bool is_active = true )
00329 : my_root_task(g.root_task()), my_active(is_active), init_my_active(is_active),
00330 my_body( new internal::source_body_leaf< output_type, Body>(body) ),
00331 my_reserved(false), my_has_cached_item(false)
00332 {
00333 my_successors.set_owner(this);
00334 }
00335
00337 source_node( const source_node& src ) :
00338 #if ( __TBB_GCC_VERSION < 40202 )
00339 graph_node(), sender<Output>(),
00340 #endif
00341 my_root_task( src.my_root_task), my_active(src.init_my_active),
00342 init_my_active(src.init_my_active), my_body( src.my_body->clone() ),
00343 my_reserved(false), my_has_cached_item(false)
00344 {
00345 my_successors.set_owner(this);
00346 }
00347
00349 ~source_node() { delete my_body; }
00350
00352 bool register_successor( receiver<output_type> &r ) {
00353 spin_mutex::scoped_lock lock(my_mutex);
00354 my_successors.register_successor(r);
00355 if ( my_active )
00356 spawn_put();
00357 return true;
00358 }
00359
00361 bool remove_successor( receiver<output_type> &r ) {
00362 spin_mutex::scoped_lock lock(my_mutex);
00363 my_successors.remove_successor(r);
00364 return true;
00365 }
00366
00368 bool try_get( output_type &v ) {
00369 spin_mutex::scoped_lock lock(my_mutex);
00370 if ( my_reserved )
00371 return false;
00372
00373 if ( my_has_cached_item ) {
00374 v = my_cached_item;
00375 my_has_cached_item = false;
00376 } else if ( (*my_body)(v) == false ) {
00377 return false;
00378 }
00379 return true;
00380 }
00381
00383 bool try_reserve( output_type &v ) {
00384 spin_mutex::scoped_lock lock(my_mutex);
00385 if ( my_reserved ) {
00386 return false;
00387 }
00388
00389 if ( !my_has_cached_item && (*my_body)(my_cached_item) )
00390 my_has_cached_item = true;
00391
00392 if ( my_has_cached_item ) {
00393 v = my_cached_item;
00394 my_reserved = true;
00395 return true;
00396 } else {
00397 return false;
00398 }
00399 }
00400
00402
00403 bool try_release( ) {
00404 spin_mutex::scoped_lock lock(my_mutex);
00405 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
00406 my_reserved = false;
00407 spawn_put();
00408 return true;
00409 }
00410
00412 bool try_consume( ) {
00413 spin_mutex::scoped_lock lock(my_mutex);
00414 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
00415 my_reserved = false;
00416 my_has_cached_item = false;
00417 if ( !my_successors.empty() ) {
00418 spawn_put();
00419 }
00420 return true;
00421 }
00422
00424 void activate() {
00425 spin_mutex::scoped_lock lock(my_mutex);
00426 my_active = true;
00427 if ( !my_successors.empty() )
00428 spawn_put();
00429 }
00430
00431 private:
00432
00433 task *my_root_task;
00434 spin_mutex my_mutex;
00435 bool my_active;
00436 bool init_my_active;
00437 internal::source_body<output_type> *my_body;
00438 internal::broadcast_cache< output_type > my_successors;
00439 bool my_reserved;
00440 bool my_has_cached_item;
00441 output_type my_cached_item;
00442
00443 friend class internal::source_task< source_node< output_type > >;
00444
00446 void apply_body( ) {
00447 output_type v;
00448 if ( try_reserve(v) == false )
00449 return;
00450
00451 if ( my_successors.try_put( v ) )
00452 try_consume();
00453 else
00454 try_release();
00455 }
00456
00458 void spawn_put( ) {
00459 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00460 internal::source_task< source_node< output_type > >( *this ) );
00461 }
00462
00463 };
00464
00466 template < typename Input, typename Output = continue_msg, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
00467 class function_node : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00468 public:
00469
00470 typedef Input input_type;
00471 typedef Output output_type;
00472 typedef sender< input_type > predecessor_type;
00473 typedef receiver< output_type > successor_type;
00474
00476 template< typename Body >
00477 function_node( graph &g, size_t concurrency, Body body )
00478 : internal::function_input<input_type,output_type,Allocator>( g, concurrency, body ) {
00479 my_successors.set_owner(this);
00480 }
00481
00483 function_node( const function_node& src ) :
00484 #if ( __TBB_GCC_VERSION < 40202 )
00485 graph_node(),
00486 #endif
00487 internal::function_input<input_type,output_type,Allocator>( src )
00488 #if ( __TBB_GCC_VERSION < 40202 )
00489 , internal::function_output<Output>()
00490 #endif
00491 {
00492 my_successors.set_owner(this);
00493 }
00494
00495 protected:
00496
00497 internal::broadcast_cache<output_type> my_successors;
00498 internal::broadcast_cache<output_type> &successors () { return my_successors; }
00499
00500 };
00501
00503 template < typename Input, typename Output, typename Allocator >
00504 class function_node<Input,Output,queueing,Allocator> : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00505 public:
00506
00507 typedef Input input_type;
00508 typedef Output output_type;
00509 typedef sender< input_type > predecessor_type;
00510 typedef receiver< output_type > successor_type;
00511
00513 template< typename Body >
00514 function_node( graph &g, size_t concurrency, Body body )
00515 : internal::function_input< input_type, output_type, Allocator >( g, concurrency, body, new internal::function_input_queue< input_type, Allocator >() ) {
00516 my_successors.set_owner(this);
00517 }
00518
00520 function_node( const function_node& src ) :
00521 #if ( __TBB_GCC_VERSION < 40202 )
00522 graph_node(),
00523 #endif
00524 internal::function_input<input_type,output_type,Allocator>( src, new internal::function_input_queue< input_type, Allocator >() )
00525 #if ( __TBB_GCC_VERSION < 40202 )
00526 , internal::function_output<Output>()
00527 #endif
00528 {
00529 my_successors.set_owner(this);
00530 }
00531
00532 protected:
00533
00534 internal::broadcast_cache<output_type> my_successors;
00535 internal::broadcast_cache<output_type> &successors () { return my_successors; }
00536
00537 };
00538
00540 template <typename Output>
00541 class continue_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
00542 public:
00543
00544 typedef continue_msg input_type;
00545 typedef Output output_type;
00546 typedef sender< input_type > predecessor_type;
00547 typedef receiver< output_type > successor_type;
00548
00550 template <typename Body >
00551 continue_node( graph &g, Body body )
00552 : internal::continue_input<output_type>( g, body ) {
00553 my_successors.set_owner(this);
00554 }
00555
00557 template <typename Body >
00558 continue_node( graph &g, int number_of_predecessors, Body body )
00559 : internal::continue_input<output_type>( g, number_of_predecessors, body )
00560 {
00561 my_successors.set_owner(this);
00562 }
00563
00565 continue_node( const continue_node& src ) :
00566 #if ( __TBB_GCC_VERSION < 40202 )
00567 graph_node(),
00568 #endif
00569 internal::continue_input<output_type>(src)
00570 #if ( __TBB_GCC_VERSION < 40202 )
00571 , internal::function_output<Output>()
00572 #endif
00573 {
00574 my_successors.set_owner(this);
00575 }
00576
00577 protected:
00578
00579 internal::broadcast_cache<output_type> my_successors;
00580 internal::broadcast_cache<output_type> &successors () { return my_successors; }
00581
00582 };
00583
00584 template< typename T >
00585 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
00586 public:
00587
00588 typedef T input_type;
00589 typedef T output_type;
00590 typedef sender< input_type > predecessor_type;
00591 typedef receiver< output_type > successor_type;
00592
00593 overwrite_node() : my_buffer_is_valid(false) {
00594 my_successors.set_owner( this );
00595 }
00596
00597
00598 overwrite_node( const overwrite_node& ) :
00599 #if ( __TBB_GCC_VERSION < 40202 )
00600 graph_node(), receiver<T>(), sender<T>(),
00601 #endif
00602 my_buffer_is_valid(false)
00603 {
00604 my_successors.set_owner( this );
00605 }
00606
00607 ~overwrite_node() {}
00608
00609 bool register_successor( successor_type &s ) {
00610 spin_mutex::scoped_lock l( my_mutex );
00611 if ( my_buffer_is_valid ) {
00612
00613 if ( s.try_put( my_buffer ) || !s.register_predecessor( *this ) ) {
00614
00615 my_successors.register_successor( s );
00616 return true;
00617 } else {
00618
00619 return false;
00620 }
00621 } else {
00622
00623 my_successors.register_successor( s );
00624 return true;
00625 }
00626 }
00627
00628 bool remove_successor( successor_type &s ) {
00629 spin_mutex::scoped_lock l( my_mutex );
00630 my_successors.remove_successor(s);
00631 return true;
00632 }
00633
00634 bool try_put( const T &v ) {
00635 spin_mutex::scoped_lock l( my_mutex );
00636 my_buffer = v;
00637 my_buffer_is_valid = true;
00638 my_successors.try_put(v);
00639 return true;
00640 }
00641
00642 bool try_get( T &v ) {
00643 spin_mutex::scoped_lock l( my_mutex );
00644 if ( my_buffer_is_valid ) {
00645 v = my_buffer;
00646 return true;
00647 } else {
00648 return false;
00649 }
00650 }
00651
00652 bool is_valid() {
00653 spin_mutex::scoped_lock l( my_mutex );
00654 return my_buffer_is_valid;
00655 }
00656
00657 void clear() {
00658 spin_mutex::scoped_lock l( my_mutex );
00659 my_buffer_is_valid = false;
00660 }
00661
00662 protected:
00663
00664 spin_mutex my_mutex;
00665 internal::broadcast_cache< T, null_rw_mutex > my_successors;
00666 T my_buffer;
00667 bool my_buffer_is_valid;
00668
00669 };
00670
00671 template< typename T >
00672 class write_once_node : public overwrite_node<T> {
00673 public:
00674
00675 typedef T input_type;
00676 typedef T output_type;
00677 typedef sender< input_type > predecessor_type;
00678 typedef receiver< output_type > successor_type;
00679
00681 write_once_node() : overwrite_node<T>() {}
00682
00684 write_once_node( const write_once_node& src ) : overwrite_node<T>(src) {}
00685
00686 bool try_put( const T &v ) {
00687 spin_mutex::scoped_lock l( this->my_mutex );
00688 if ( this->my_buffer_is_valid ) {
00689 return false;
00690 } else {
00691 this->my_buffer = v;
00692 this->my_buffer_is_valid = true;
00693 this->my_successors.try_put(v);
00694 return true;
00695 }
00696 }
00697 };
00698
00700 template <typename T>
00701 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
00702
00703 internal::broadcast_cache<T> my_successors;
00704
00705 public:
00706
00707 typedef T input_type;
00708 typedef T output_type;
00709 typedef sender< input_type > predecessor_type;
00710 typedef receiver< output_type > successor_type;
00711
00712 broadcast_node( ) {
00713 my_successors.set_owner( this );
00714 }
00715
00716
00717 broadcast_node( const broadcast_node& )
00718 #if ( __TBB_GCC_VERSION < 40202 )
00719 : graph_node(), receiver<T>(), sender<T>()
00720 #endif
00721 {
00722 my_successors.set_owner( this );
00723 }
00724
00726 virtual bool register_successor( receiver<T> &r ) {
00727 my_successors.register_successor( r );
00728 return true;
00729 }
00730
00732 virtual bool remove_successor( receiver<T> &r ) {
00733 my_successors.remove_successor( r );
00734 return true;
00735 }
00736
00737 bool try_put( const T &t ) {
00738 my_successors.try_put(t);
00739 return true;
00740 }
00741
00742 };
00743
00744 #include "internal/_flow_graph_item_buffer_impl.h"
00745
00747 template <typename T, typename A=cache_aligned_allocator<T> >
00748 class buffer_node : public graph_node, public reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
00749 public:
00750 typedef T input_type;
00751 typedef T output_type;
00752 typedef sender< input_type > predecessor_type;
00753 typedef receiver< output_type > successor_type;
00754 typedef buffer_node<T, A> my_class;
00755 protected:
00756 typedef size_t size_type;
00757 internal::round_robin_cache< T, null_rw_mutex > my_successors;
00758
00759 task *my_parent;
00760
00761 friend class internal::forward_task< buffer_node< T, A > >;
00762
00763 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd};
00764 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
00765
00766
00767 class buffer_operation : public internal::aggregated_operation< buffer_operation > {
00768 public:
00769 char type;
00770 T *elem;
00771 successor_type *r;
00772 buffer_operation(const T& e, op_type t) :
00773 type(char(t)), elem(const_cast<T*>(&e)), r(NULL) {}
00774 buffer_operation(op_type t) : type(char(t)), r(NULL) {}
00775 };
00776
00777 bool forwarder_busy;
00778 typedef internal::aggregating_functor<my_class, buffer_operation> my_handler;
00779 friend class internal::aggregating_functor<my_class, buffer_operation>;
00780 internal::aggregator< my_handler, buffer_operation> my_aggregator;
00781
00782 virtual void handle_operations(buffer_operation *op_list) {
00783 buffer_operation *tmp;
00784 bool try_forwarding=false;
00785 while (op_list) {
00786 tmp = op_list;
00787 op_list = op_list->next;
00788 switch (tmp->type) {
00789 case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
00790 case rem_succ: internal_rem_succ(tmp); break;
00791 case req_item: internal_pop(tmp); break;
00792 case res_item: internal_reserve(tmp); break;
00793 case rel_res: internal_release(tmp); try_forwarding = true; break;
00794 case con_res: internal_consume(tmp); try_forwarding = true; break;
00795 case put_item: internal_push(tmp); try_forwarding = true; break;
00796 case try_fwd: internal_forward(tmp); break;
00797 }
00798 }
00799 if (try_forwarding && !forwarder_busy) {
00800 forwarder_busy = true;
00801 task::enqueue(*new(task::allocate_additional_child_of(*my_parent)) internal::forward_task< buffer_node<input_type, A> >(*this));
00802 }
00803 }
00804
00806 virtual void forward() {
00807 buffer_operation op_data(try_fwd);
00808 do {
00809 op_data.status = WAIT;
00810 my_aggregator.execute(&op_data);
00811 } while (op_data.status == SUCCEEDED);
00812 }
00813
00815 virtual void internal_reg_succ(buffer_operation *op) {
00816 my_successors.register_successor(*(op->r));
00817 __TBB_store_with_release(op->status, SUCCEEDED);
00818 }
00819
00821 virtual void internal_rem_succ(buffer_operation *op) {
00822 my_successors.remove_successor(*(op->r));
00823 __TBB_store_with_release(op->status, SUCCEEDED);
00824 }
00825
00827 virtual void internal_forward(buffer_operation *op) {
00828 T i_copy;
00829 bool success = false;
00830 size_type counter = my_successors.size();
00831
00832 while (counter>0 && !this->buffer_empty() && this->item_valid(this->my_tail-1)) {
00833 this->fetch_back(i_copy);
00834 if( my_successors.try_put(i_copy) ) {
00835 this->invalidate_back();
00836 --(this->my_tail);
00837 success = true;
00838 }
00839 --counter;
00840 }
00841 if (success && !counter)
00842 __TBB_store_with_release(op->status, SUCCEEDED);
00843 else {
00844 __TBB_store_with_release(op->status, FAILED);
00845 forwarder_busy = false;
00846 }
00847 }
00848
00849 virtual void internal_push(buffer_operation *op) {
00850 this->push_back(*(op->elem));
00851 __TBB_store_with_release(op->status, SUCCEEDED);
00852 }
00853
00854 virtual void internal_pop(buffer_operation *op) {
00855 if(this->pop_back(*(op->elem))) {
00856 __TBB_store_with_release(op->status, SUCCEEDED);
00857 }
00858 else {
00859 __TBB_store_with_release(op->status, FAILED);
00860 }
00861 }
00862
00863 virtual void internal_reserve(buffer_operation *op) {
00864 if(this->reserve_front(*(op->elem))) {
00865 __TBB_store_with_release(op->status, SUCCEEDED);
00866 }
00867 else {
00868 __TBB_store_with_release(op->status, FAILED);
00869 }
00870 }
00871
00872 virtual void internal_consume(buffer_operation *op) {
00873 this->consume_front();
00874 __TBB_store_with_release(op->status, SUCCEEDED);
00875 }
00876
00877 virtual void internal_release(buffer_operation *op) {
00878 this->release_front();
00879 __TBB_store_with_release(op->status, SUCCEEDED);
00880 }
00881
00882 public:
00884 buffer_node( graph &g ) : reservable_item_buffer<T>(),
00885 my_parent( g.root_task() ), forwarder_busy(false) {
00886 my_successors.set_owner(this);
00887 my_aggregator.initialize_handler(my_handler(this));
00888 }
00889
00891 buffer_node( const buffer_node& src ) :
00892 #if ( __TBB_GCC_VERSION < 40202 )
00893 graph_node(),
00894 #endif
00895 reservable_item_buffer<T>(),
00896 #if ( __TBB_GCC_VERSION < 40202 )
00897 receiver<T>(), sender<T>(),
00898 #endif
00899 my_parent( src.my_parent )
00900 {
00901 forwarder_busy = false;
00902 my_successors.set_owner(this);
00903 my_aggregator.initialize_handler(my_handler(this));
00904 }
00905
00906 virtual ~buffer_node() {}
00907
00908
00909
00910
00911
00913
00914 bool register_successor( receiver<output_type> &r ) {
00915 buffer_operation op_data(reg_succ);
00916 op_data.r = &r;
00917 my_aggregator.execute(&op_data);
00918 return true;
00919 }
00920
00922
00924 bool remove_successor( receiver<output_type> &r ) {
00925 r.remove_predecessor(*this);
00926 buffer_operation op_data(rem_succ);
00927 op_data.r = &r;
00928 my_aggregator.execute(&op_data);
00929 return true;
00930 }
00931
00933
00935 bool try_get( T &v ) {
00936 buffer_operation op_data(req_item);
00937 op_data.elem = &v;
00938 my_aggregator.execute(&op_data);
00939 return (op_data.status==SUCCEEDED);
00940 }
00941
00943
00945 bool try_reserve( T &v ) {
00946 buffer_operation op_data(res_item);
00947 op_data.elem = &v;
00948 my_aggregator.execute(&op_data);
00949 return (op_data.status==SUCCEEDED);
00950 }
00951
00953
00954 bool try_release() {
00955 buffer_operation op_data(rel_res);
00956 my_aggregator.execute(&op_data);
00957 return true;
00958 }
00959
00961
00962 bool try_consume() {
00963 buffer_operation op_data(con_res);
00964 my_aggregator.execute(&op_data);
00965 return true;
00966 }
00967
00969
00970 bool try_put(const T &t) {
00971 buffer_operation op_data(t, put_item);
00972 my_aggregator.execute(&op_data);
00973 return true;
00974 }
00975 };
00976
00977
00979 template <typename T, typename A=cache_aligned_allocator<T> >
00980 class queue_node : public buffer_node<T, A> {
00981 protected:
00982 typedef typename buffer_node<T, A>::size_type size_type;
00983 typedef typename buffer_node<T, A>::buffer_operation queue_operation;
00984
00985 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
00986
00988 void internal_forward(queue_operation *op) {
00989 T i_copy;
00990 bool success = false;
00991 size_type counter = this->my_successors.size();
00992 if (this->my_reserved || !this->item_valid(this->my_head)){
00993 __TBB_store_with_release(op->status, FAILED);
00994 this->forwarder_busy = false;
00995 return;
00996 }
00997
00998 while (counter>0 && this->item_valid(this->my_head)) {
00999 this->fetch_front(i_copy);
01000 if(this->my_successors.try_put(i_copy)) {
01001 this->invalidate_front();
01002 ++(this->my_head);
01003 success = true;
01004 }
01005 --counter;
01006 }
01007 if (success && !counter)
01008 __TBB_store_with_release(op->status, SUCCEEDED);
01009 else {
01010 __TBB_store_with_release(op->status, FAILED);
01011 this->forwarder_busy = false;
01012 }
01013 }
01014
01015 void internal_pop(queue_operation *op) {
01016 if ( this->my_reserved || !this->item_valid(this->my_head)){
01017 __TBB_store_with_release(op->status, FAILED);
01018 }
01019 else {
01020 this->pop_front(*(op->elem));
01021 __TBB_store_with_release(op->status, SUCCEEDED);
01022 }
01023 }
01024 void internal_reserve(queue_operation *op) {
01025 if (this->my_reserved || !this->item_valid(this->my_head)) {
01026 __TBB_store_with_release(op->status, FAILED);
01027 }
01028 else {
01029 this->my_reserved = true;
01030 this->fetch_front(*(op->elem));
01031 this->invalidate_front();
01032 __TBB_store_with_release(op->status, SUCCEEDED);
01033 }
01034 }
01035 void internal_consume(queue_operation *op) {
01036 this->consume_front();
01037 __TBB_store_with_release(op->status, SUCCEEDED);
01038 }
01039
01040 public:
01041
01042 typedef T input_type;
01043 typedef T output_type;
01044 typedef sender< input_type > predecessor_type;
01045 typedef receiver< output_type > successor_type;
01046
01048 queue_node( graph &g ) : buffer_node<T, A>(g) {}
01049
01051 queue_node( const queue_node& src) : buffer_node<T, A>(src) {}
01052 };
01053
01055 template< typename T, typename A=cache_aligned_allocator<T> >
01056 class sequencer_node : public queue_node<T, A> {
01057 internal::function_body< T, size_t > *my_sequencer;
01058 public:
01059
01060 typedef T input_type;
01061 typedef T output_type;
01062 typedef sender< input_type > predecessor_type;
01063 typedef receiver< output_type > successor_type;
01064
01066 template< typename Sequencer >
01067 sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
01068 my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {}
01069
01071 sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
01072 my_sequencer( src.my_sequencer->clone() ) {}
01073
01075 ~sequencer_node() { delete my_sequencer; }
01076 protected:
01077 typedef typename buffer_node<T, A>::size_type size_type;
01078 typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
01079
01080 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01081
01082 private:
01083 void internal_push(sequencer_operation *op) {
01084 size_type tag = (*my_sequencer)(*(op->elem));
01085
01086 this->my_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
01087
01088 if(this->size() > this->capacity())
01089 this->grow_my_array(this->size());
01090 this->item(tag) = std::make_pair( *(op->elem), true );
01091 __TBB_store_with_release(op->status, SUCCEEDED);
01092 }
01093 };
01094
01096 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
01097 class priority_queue_node : public buffer_node<T, A> {
01098 public:
01099 typedef T input_type;
01100 typedef T output_type;
01101 typedef sender< input_type > predecessor_type;
01102 typedef receiver< output_type > successor_type;
01103
01105 priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {}
01106
01108 priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {}
01109
01110 protected:
01111 typedef typename buffer_node<T, A>::size_type size_type;
01112 typedef typename buffer_node<T, A>::item_type item_type;
01113 typedef typename buffer_node<T, A>::buffer_operation prio_operation;
01114
01115 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01116
01117 void handle_operations(prio_operation *op_list) {
01118 prio_operation *tmp ;
01119 bool try_forwarding=false;
01120 while (op_list) {
01121 tmp = op_list;
01122 op_list = op_list->next;
01123 switch (tmp->type) {
01124 case buffer_node<T, A>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
01125 case buffer_node<T, A>::rem_succ: this->internal_rem_succ(tmp); break;
01126 case buffer_node<T, A>::put_item: internal_push(tmp); try_forwarding = true; break;
01127 case buffer_node<T, A>::try_fwd: internal_forward(tmp); break;
01128 case buffer_node<T, A>::rel_res: internal_release(tmp); try_forwarding = true; break;
01129 case buffer_node<T, A>::con_res: internal_consume(tmp); try_forwarding = true; break;
01130 case buffer_node<T, A>::req_item: internal_pop(tmp); break;
01131 case buffer_node<T, A>::res_item: internal_reserve(tmp); break;
01132 }
01133 }
01134
01135 if (mark<this->my_tail) heapify();
01136 if (try_forwarding && !this->forwarder_busy) {
01137 this->forwarder_busy = true;
01138 task::enqueue(*new(task::allocate_additional_child_of(*(this->my_parent))) internal::forward_task< buffer_node<input_type, A> >(*this));
01139 }
01140 }
01141
01143 void internal_forward(prio_operation *op) {
01144 T i_copy;
01145 bool success = false;
01146 size_type counter = this->my_successors.size();
01147
01148 if (this->my_reserved || this->my_tail == 0) {
01149 __TBB_store_with_release(op->status, FAILED);
01150 this->forwarder_busy = false;
01151 return;
01152 }
01153
01154 while (counter>0 && this->my_tail > 0) {
01155 i_copy = this->my_array[0].first;
01156 bool msg = this->my_successors.try_put(i_copy);
01157 if ( msg == true ) {
01158 if (mark == this->my_tail) --mark;
01159 --(this->my_tail);
01160 this->my_array[0].first=this->my_array[this->my_tail].first;
01161 if (this->my_tail > 1)
01162 reheap();
01163 success = true;
01164 }
01165 --counter;
01166 }
01167 if (success && !counter)
01168 __TBB_store_with_release(op->status, SUCCEEDED);
01169 else {
01170 __TBB_store_with_release(op->status, FAILED);
01171 this->forwarder_busy = false;
01172 }
01173 }
01174
01175 void internal_push(prio_operation *op) {
01176 if ( this->my_tail >= this->my_array_size )
01177 this->grow_my_array( this->my_tail + 1 );
01178 this->my_array[this->my_tail] = std::make_pair( *(op->elem), true );
01179 ++(this->my_tail);
01180 __TBB_store_with_release(op->status, SUCCEEDED);
01181 }
01182 void internal_pop(prio_operation *op) {
01183 if ( this->my_reserved == true || this->my_tail == 0 ) {
01184 __TBB_store_with_release(op->status, FAILED);
01185 }
01186 else {
01187 if (mark<this->my_tail &&
01188 compare(this->my_array[0].first,
01189 this->my_array[this->my_tail-1].first)) {
01190
01191
01192 *(op->elem) = this->my_array[this->my_tail-1].first;
01193 --(this->my_tail);
01194 __TBB_store_with_release(op->status, SUCCEEDED);
01195 }
01196 else {
01197 *(op->elem) = this->my_array[0].first;
01198 if (mark == this->my_tail) --mark;
01199 --(this->my_tail);
01200 __TBB_store_with_release(op->status, SUCCEEDED);
01201 this->my_array[0].first=this->my_array[this->my_tail].first;
01202 if (this->my_tail > 1)
01203 reheap();
01204 }
01205 }
01206 }
01207 void internal_reserve(prio_operation *op) {
01208 if (this->my_reserved == true || this->my_tail == 0) {
01209 __TBB_store_with_release(op->status, FAILED);
01210 }
01211 else {
01212 this->my_reserved = true;
01213 *(op->elem) = reserved_item = this->my_array[0].first;
01214 if (mark == this->my_tail) --mark;
01215 --(this->my_tail);
01216 __TBB_store_with_release(op->status, SUCCEEDED);
01217 this->my_array[0].first = this->my_array[this->my_tail].first;
01218 if (this->my_tail > 1)
01219 reheap();
01220 }
01221 }
01222 void internal_consume(prio_operation *op) {
01223 this->my_reserved = false;
01224 __TBB_store_with_release(op->status, SUCCEEDED);
01225 }
01226 void internal_release(prio_operation *op) {
01227 if (this->my_tail >= this->my_array_size)
01228 this->grow_my_array( this->my_tail + 1 );
01229 this->my_array[this->my_tail] = std::make_pair(reserved_item, true);
01230 ++(this->my_tail);
01231 this->my_reserved = false;
01232 __TBB_store_with_release(op->status, SUCCEEDED);
01233 heapify();
01234 }
01235 private:
01236 Compare compare;
01237 size_type mark;
01238 input_type reserved_item;
01239
01240 void heapify() {
01241 if (!mark) mark = 1;
01242 for (; mark<this->my_tail; ++mark) {
01243 size_type cur_pos = mark;
01244 input_type to_place = this->my_array[mark].first;
01245 do {
01246 size_type parent = (cur_pos-1)>>1;
01247 if (!compare(this->my_array[parent].first, to_place))
01248 break;
01249 this->my_array[cur_pos].first = this->my_array[parent].first;
01250 cur_pos = parent;
01251 } while( cur_pos );
01252 this->my_array[cur_pos].first = to_place;
01253 }
01254 }
01255
01256 void reheap() {
01257 size_type cur_pos=0, child=1;
01258 while (child < mark) {
01259 size_type target = child;
01260 if (child+1<mark &&
01261 compare(this->my_array[child].first,
01262 this->my_array[child+1].first))
01263 ++target;
01264
01265 if (compare(this->my_array[target].first,
01266 this->my_array[this->my_tail].first))
01267 break;
01268 this->my_array[cur_pos].first = this->my_array[target].first;
01269 cur_pos = target;
01270 child = (cur_pos<<1)+1;
01271 }
01272 this->my_array[cur_pos].first = this->my_array[this->my_tail].first;
01273 }
01274 };
01275
01277
01280 template< typename T >
01281 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
01282 public:
01283
01284 typedef T input_type;
01285 typedef T output_type;
01286 typedef sender< input_type > predecessor_type;
01287 typedef receiver< output_type > successor_type;
01288
01289 private:
01290
01291 task *my_root_task;
01292 size_t my_threshold;
01293 size_t my_count;
01294 internal::predecessor_cache< T > my_predecessors;
01295 spin_mutex my_mutex;
01296 internal::broadcast_cache< T > my_successors;
01297 int init_decrement_predecessors;
01298
01299 friend class internal::forward_task< limiter_node<T> >;
01300
01301
01302 friend class internal::decrementer< limiter_node<T> >;
01303
01304 void decrement_counter() {
01305 input_type v;
01306
01307
01308 if ( my_predecessors.get_item( v ) == false
01309 || my_successors.try_put(v) == false ) {
01310 spin_mutex::scoped_lock lock(my_mutex);
01311 --my_count;
01312 if ( !my_predecessors.empty() )
01313 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01314 internal::forward_task< limiter_node<T> >( *this ) );
01315 }
01316 }
01317
01318 void forward() {
01319 {
01320 spin_mutex::scoped_lock lock(my_mutex);
01321 if ( my_count < my_threshold )
01322 ++my_count;
01323 else
01324 return;
01325 }
01326 decrement_counter();
01327 }
01328
01329 public:
01330
01332 internal::decrementer< limiter_node<T> > decrement;
01333
01335 limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0) :
01336 my_root_task(g.root_task()), my_threshold(threshold), my_count(0),
01337 init_decrement_predecessors(num_decrement_predecessors),
01338 decrement(num_decrement_predecessors)
01339 {
01340 my_predecessors.set_owner(this);
01341 my_successors.set_owner(this);
01342 decrement.set_owner(this);
01343 }
01344
01346 limiter_node( const limiter_node& src ) :
01347 #if ( __TBB_GCC_VERSION < 40202 )
01348 graph_node(), receiver<T>(), sender<T>(),
01349 #endif
01350 my_root_task(src.my_root_task), my_threshold(src.my_threshold), my_count(0),
01351 init_decrement_predecessors(src.init_decrement_predecessors),
01352 decrement(src.init_decrement_predecessors)
01353 {
01354 my_predecessors.set_owner(this);
01355 my_successors.set_owner(this);
01356 decrement.set_owner(this);
01357 }
01358
01360 bool register_successor( receiver<output_type> &r ) {
01361 my_successors.register_successor(r);
01362 return true;
01363 }
01364
01366
01367 bool remove_successor( receiver<output_type> &r ) {
01368 r.remove_predecessor(*this);
01369 my_successors.remove_successor(r);
01370 return true;
01371 }
01372
01374 bool try_put( const T &t ) {
01375 {
01376 spin_mutex::scoped_lock lock(my_mutex);
01377 if ( my_count >= my_threshold )
01378 return false;
01379 else
01380 ++my_count;
01381 }
01382
01383 bool msg = my_successors.try_put(t);
01384
01385 if ( msg != true ) {
01386 spin_mutex::scoped_lock lock(my_mutex);
01387 --my_count;
01388 if ( !my_predecessors.empty() )
01389 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01390 internal::forward_task< limiter_node<T> >( *this ) );
01391 }
01392
01393 return msg;
01394 }
01395
01397 bool register_predecessor( predecessor_type &src ) {
01398 spin_mutex::scoped_lock lock(my_mutex);
01399 my_predecessors.add( src );
01400 if ( my_count < my_threshold && !my_successors.empty() )
01401 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01402 internal::forward_task< limiter_node<T> >( *this ) );
01403 return true;
01404 }
01405
01407 bool remove_predecessor( predecessor_type &src ) {
01408 my_predecessors.remove( src );
01409 return true;
01410 }
01411
01412 };
01413
01414 #include "internal/_flow_graph_join_impl.h"
01415
01416 using internal::reserving_port;
01417 using internal::queueing_port;
01418 using internal::tag_matching_port;
01419 using internal::input_port;
01420 using internal::tag_value;
01421 using internal::NO_TAG;
01422
01423 template<typename OutputTuple, graph_buffer_policy JP=queueing> class join_node;
01424
01425 template<typename OutputTuple>
01426 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
01427 private:
01428 static const int N = std::tuple_size<OutputTuple>::value;
01429 typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
01430 public:
01431 typedef OutputTuple output_type;
01432 typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
01433 join_node(graph &g) : unfolded_type(g) { }
01434 join_node(const join_node &other) : unfolded_type(other) {}
01435 };
01436
01437 template<typename OutputTuple>
01438 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
01439 private:
01440 static const int N = std::tuple_size<OutputTuple>::value;
01441 typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
01442 public:
01443 typedef OutputTuple output_type;
01444 typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
01445 join_node(graph &g) : unfolded_type(g) { }
01446 join_node(const join_node &other) : unfolded_type(other) {}
01447 };
01448
01449
01450 template<typename OutputTuple>
01451 class join_node<OutputTuple, tag_matching> : public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value,
01452 tag_matching_port, OutputTuple, tag_matching> {
01453 private:
01454 static const int N = std::tuple_size<OutputTuple>::value;
01455 typedef typename internal::unfolded_join_node<N, tag_matching_port, OutputTuple, tag_matching> unfolded_type;
01456 public:
01457 typedef OutputTuple output_type;
01458 typedef typename unfolded_type::input_ports_tuple_type input_ports_tuple_type;
01459 template<typename B0, typename B1>
01460 join_node(graph &g, B0 b0, B1 b1) : unfolded_type(g, b0, b1) { }
01461 template<typename B0, typename B1, typename B2>
01462 join_node(graph &g, B0 b0, B1 b1, B2 b2) : unfolded_type(g, b0, b1, b2) { }
01463 template<typename B0, typename B1, typename B2, typename B3>
01464 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3) : unfolded_type(g, b0, b1, b2, b3) { }
01465 template<typename B0, typename B1, typename B2, typename B3, typename B4>
01466 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4) : unfolded_type(g, b0, b1, b2, b3, b4) { }
01467 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5>
01468 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5) : unfolded_type(g, b0, b1, b2, b3, b4, b5) { }
01469 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6>
01470 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) { }
01471 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7>
01472 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) { }
01473 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8>
01474 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) { }
01475 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8, typename B9>
01476 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8, B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) { }
01477 join_node(const join_node &other) : unfolded_type(other) {}
01478 };
01479
01480
01481 #include "internal/_flow_graph_or_impl.h"
01482
01483 template<typename InputTuple>
01484 class or_node : public internal::unfolded_or_node<InputTuple> {
01485 private:
01486 static const int N = std::tuple_size<InputTuple>::value;
01487 public:
01488 typedef typename internal::or_output_type<N,InputTuple>::type output_type;
01489 typedef typename internal::unfolded_or_node<InputTuple> unfolded_type;
01490 or_node() : unfolded_type() { }
01491
01492 or_node( const or_node& ) : unfolded_type() { }
01493 };
01494
01496 template< typename T >
01497 inline void make_edge( sender<T> &p, receiver<T> &s ) {
01498 p.register_successor( s );
01499 }
01500
01502 template< typename T >
01503 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
01504 p.remove_successor( s );
01505 }
01506
01508 template< typename Body, typename Node >
01509 Body copy_body( Node &n ) {
01510 return n.template copy_function_object<Body>();
01511 }
01512
01513
01514 }
01515
01516 using interface6::graph;
01517 using interface6::graph_node;
01518 using interface6::continue_msg;
01519 using interface6::sender;
01520 using interface6::receiver;
01521 using interface6::continue_receiver;
01522
01523 using interface6::source_node;
01524 using interface6::function_node;
01525 using interface6::continue_node;
01526 using interface6::overwrite_node;
01527 using interface6::write_once_node;
01528 using interface6::broadcast_node;
01529 using interface6::buffer_node;
01530 using interface6::queue_node;
01531 using interface6::sequencer_node;
01532 using interface6::priority_queue_node;
01533 using interface6::limiter_node;
01534 using namespace interface6::internal::graph_policy_namespace;
01535 using interface6::join_node;
01536 using interface6::or_node;
01537 using interface6::input_port;
01538 using interface6::copy_body;
01539 using interface6::make_edge;
01540 using interface6::remove_edge;
01541 using interface6::internal::NO_TAG;
01542 using interface6::internal::tag_value;
01543
01544 }
01545 }
01546
01547 #endif
01548