concurrent_priority_queue.h

00001 /*
00002     Copyright 2005-2011 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_concurrent_priority_queue_H
00022 #define __TBB_concurrent_priority_queue_H
00023 
00024 #if !TBB_PREVIEW_CONCURRENT_PRIORITY_QUEUE
00025 #error Set TBB_PREVIEW_CONCURRENT_PRIORITY_QUEUE to include concurrent_priority_queue.h
00026 #endif
00027 
00028 #include "atomic.h"
00029 #include "cache_aligned_allocator.h"
00030 #include "tbb_exception.h"
00031 #include "tbb_stddef.h"
00032 #include "tbb_profiling.h"
00033 #include "internal/_aggregator_impl.h"
00034 #include <vector>
00035 #include <iterator>
00036 #include <functional>
00037 
00038 namespace tbb {
00039 namespace interface5 {
00040 
00041 using namespace tbb::internal;
00042 
00044 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
00045 class concurrent_priority_queue {
00046  public:
00048     typedef T value_type;
00049 
00051     typedef T& reference;
00052 
00054     typedef const T& const_reference;
00055 
00057     typedef size_t size_type;
00058 
00060     typedef ptrdiff_t difference_type;
00061 
00063     typedef A allocator_type;
00064 
00066     explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), data(a)
00067     {
00068         my_aggregator.initialize_handler(my_functor_t(this));
00069     }
00070 
00072     explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
00073         mark(0), my_size(0), data(a)
00074     {
00075         data.reserve(init_capacity);
00076         my_aggregator.initialize_handler(my_functor_t(this));
00077     }
00078 
00080     template<typename InputIterator>
00081     concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
00082         data(begin, end, a)
00083     {
00084         mark = 0;
00085         my_aggregator.initialize_handler(my_functor_t(this));
00086         heapify();
00087         my_size = data.size();
00088     }
00089 
00091 
00092     explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark),
00093         my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
00094     {
00095         my_aggregator.initialize_handler(my_functor_t(this));
00096         heapify();
00097     }
00098 
00100 
00101     concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark),
00102         my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
00103     {
00104         my_aggregator.initialize_handler(my_functor_t(this));
00105         heapify();
00106     }
00107 
00109 
00110     concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
00111         if (this != &src) {
00112             std::vector<value_type, allocator_type>(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
00113             mark = src.mark;
00114             my_size = src.my_size;
00115         }
00116         return *this;
00117     }
00118 
00120 
00122     bool empty() const { return size()==0; }
00123 
00125 
00127     size_type size() const { return __TBB_load_with_acquire(my_size); }
00128 
00130 
00131     void push(const_reference elem) {
00132         cpq_operation op_data(elem, PUSH_OP);
00133         my_aggregator.execute(&op_data);
00134         if (op_data.status == FAILED) // exception thrown
00135             throw_exception(eid_bad_alloc);
00136     }
00137 
00139 
00142     bool try_pop(reference elem) {
00143         cpq_operation op_data(POP_OP);
00144         op_data.elem = &elem;
00145         my_aggregator.execute(&op_data);
00146         return op_data.status==SUCCEEDED;
00147     }
00148 
00150 
00153     void clear() {
00154         data.clear();
00155         mark = 0;
00156         my_size = 0;
00157     }
00158 
00160 
00161     void swap(concurrent_priority_queue& q) {
00162         data.swap(q.data);
00163         std::swap(mark, q.mark);
00164         std::swap(my_size, q.my_size);
00165     }
00166 
00168     allocator_type get_allocator() const { return data.get_allocator(); }
00169 
00170  private:
00171     enum operation_type {INVALID_OP, PUSH_OP, POP_OP};
00172     enum operation_status { WAIT=0, SUCCEEDED, FAILED };
00173 
00174     class cpq_operation : public aggregated_operation<cpq_operation> {
00175      public:
00176         operation_type type;
00177         union {
00178             value_type *elem;
00179             size_type sz;
00180         };
00181         cpq_operation(const_reference e, operation_type t) :
00182             type(t), elem(const_cast<value_type*>(&e)) {}
00183         cpq_operation(operation_type t) : type(t) {}
00184     };
00185 
00186     class my_functor_t {
00187         concurrent_priority_queue<T, Compare, A> *cpq;
00188      public:
00189         my_functor_t() {}
00190         my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
00191         void operator()(cpq_operation* op_list) {
00192             cpq->handle_operations(op_list);
00193         }
00194     };
00195 
00196     aggregator< my_functor_t, cpq_operation> my_aggregator;
00198     char padding1[NFS_MaxLineSize - sizeof(aggregator< my_functor_t, cpq_operation >)];
00200     size_type mark;
00201     __TBB_atomic size_type my_size;
00202     Compare compare;
00204     char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
00206 
00223     std::vector<value_type, allocator_type> data;
00224 
00225     void handle_operations(cpq_operation *op_list) {
00226         cpq_operation *tmp, *pop_list=NULL;
00227 
00228         __TBB_ASSERT(mark == data.size(), NULL);
00229 
00230         // first pass processes all constant time operations: pushes,
00231         // tops, some pops. Also reserve.
00232         while (op_list) {
00233             // ITT note: &(op_list->status) tag is used to cover accesses to op_list
00234             // node. This thread is going to handle the operation, and so will acquire it
00235             // and perform the associated operation w/o triggering a race condition; the
00236             // thread that created the operation is waiting on the status field, so when
00237             // this thread is done with the operation, it will perform a
00238             // store_with_release to give control back to the waiting thread in
00239             // aggregator::insert_operation.
00240             call_itt_notify(acquired, &(op_list->status));
00241             __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
00242             tmp = op_list;
00243             op_list = itt_hide_load_word(op_list->next);
00244             if (tmp->type == PUSH_OP) {
00245                 __TBB_TRY {
00246                     data.push_back(*(tmp->elem));
00247                     __TBB_store_with_release(my_size, my_size+1);
00248                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00249                 } __TBB_CATCH(...) {
00250                     itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
00251                 }
00252             }
00253             else { // tmp->type == POP_OP
00254                 __TBB_ASSERT(tmp->type == POP_OP, NULL);
00255                 if (mark < data.size() &&
00256                     compare(data[0], data[data.size()-1])) {
00257                     // there are newly pushed elems and the last one
00258                     // is higher than top
00259                     *(tmp->elem) = data[data.size()-1]; // copy the data
00260                     __TBB_store_with_release(my_size, my_size-1);
00261                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00262                     data.pop_back();
00263                     __TBB_ASSERT(mark<=data.size(), NULL);
00264                 }
00265                 else { // no convenient item to pop; postpone
00266                     itt_hide_store_word(tmp->next, pop_list);
00267                     pop_list = tmp;
00268                 }
00269             }
00270         }
00271 
00272         // second pass processes pop operations
00273         while (pop_list) {
00274             tmp = pop_list;
00275             pop_list = itt_hide_load_word(pop_list->next);
00276             __TBB_ASSERT(tmp->type == POP_OP, NULL);
00277             if (data.empty()) {
00278                 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
00279             }
00280             else {
00281                 __TBB_ASSERT(mark<=data.size(), NULL);
00282                 if (mark < data.size() &&
00283                     compare(data[0], data[data.size()-1])) {
00284                     // there are newly pushed elems and the last one is
00285                     // higher than top
00286                     *(tmp->elem) = data[data.size()-1]; // copy the data
00287                     __TBB_store_with_release(my_size, my_size-1);
00288                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00289                     data.pop_back();
00290                 }
00291                 else { // extract top and push last element down heap
00292                     *(tmp->elem) = data[0]; // copy the data
00293                     __TBB_store_with_release(my_size, my_size-1);
00294                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00295                     reheap();
00296                 }
00297             }
00298         }
00299 
00300         // heapify any leftover pushed elements before doing the next
00301         // batch of operations
00302         if (mark<data.size()) heapify();
00303         __TBB_ASSERT(mark == data.size(), NULL);
00304     }
00305 
00307     void heapify() {
00308         if (!mark && data.size()>0) mark = 1;
00309         for (; mark<data.size(); ++mark) {
00310             // for each unheapified element under size
00311             size_type cur_pos = mark;
00312             value_type to_place = data[mark];
00313             do { // push to_place up the heap
00314                 size_type parent = (cur_pos-1)>>1;
00315                 if (!compare(data[parent], to_place)) break;
00316                 data[cur_pos] = data[parent];
00317                 cur_pos = parent;
00318             } while( cur_pos );
00319             data[cur_pos] = to_place;
00320         }
00321     }
00322 
00324 
00325     void reheap() {
00326         size_type cur_pos=0, child=1;
00327 
00328         while (child < mark) {
00329             size_type target = child;
00330             if (child+1 < mark && compare(data[child], data[child+1]))
00331                 ++target;
00332             // target now has the higher priority child
00333             if (compare(data[target], data[data.size()-1])) break;
00334             data[cur_pos] = data[target];
00335             cur_pos = target;
00336             child = (cur_pos<<1)+1;
00337         }
00338         data[cur_pos] = data[data.size()-1];
00339         data.pop_back();
00340         if (mark > data.size()) mark = data.size();
00341     }
00342 };
00343 
00344 } // namespace interface5
00345 
00346 using interface5::concurrent_priority_queue;
00347 
00348 } // namespace tbb
00349 
00350 #endif /* __TBB_concurrent_priority_queue_H */

Copyright © 2005-2011 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.