00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_concurrent_priority_queue_H
00022 #define __TBB_concurrent_priority_queue_H
00023
00024 #if !TBB_PREVIEW_CONCURRENT_PRIORITY_QUEUE
00025 #error Set TBB_PREVIEW_CONCURRENT_PRIORITY_QUEUE to include concurrent_priority_queue.h
00026 #endif
00027
00028 #include "atomic.h"
00029 #include "cache_aligned_allocator.h"
00030 #include "tbb_exception.h"
00031 #include "tbb_stddef.h"
00032 #include "tbb_profiling.h"
00033 #include "internal/_aggregator_impl.h"
00034 #include <vector>
00035 #include <iterator>
00036 #include <functional>
00037
00038 namespace tbb {
00039 namespace interface5 {
00040
00041 using namespace tbb::internal;
00042
00044 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
00045 class concurrent_priority_queue {
00046 public:
00048 typedef T value_type;
00049
00051 typedef T& reference;
00052
00054 typedef const T& const_reference;
00055
00057 typedef size_t size_type;
00058
00060 typedef ptrdiff_t difference_type;
00061
00063 typedef A allocator_type;
00064
00066 explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), data(a)
00067 {
00068 my_aggregator.initialize_handler(my_functor_t(this));
00069 }
00070
00072 explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
00073 mark(0), my_size(0), data(a)
00074 {
00075 data.reserve(init_capacity);
00076 my_aggregator.initialize_handler(my_functor_t(this));
00077 }
00078
00080 template<typename InputIterator>
00081 concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
00082 data(begin, end, a)
00083 {
00084 mark = 0;
00085 my_aggregator.initialize_handler(my_functor_t(this));
00086 heapify();
00087 my_size = data.size();
00088 }
00089
00091
00092 explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark),
00093 my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
00094 {
00095 my_aggregator.initialize_handler(my_functor_t(this));
00096 heapify();
00097 }
00098
00100
00101 concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark),
00102 my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
00103 {
00104 my_aggregator.initialize_handler(my_functor_t(this));
00105 heapify();
00106 }
00107
00109
00110 concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
00111 if (this != &src) {
00112 std::vector<value_type, allocator_type>(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
00113 mark = src.mark;
00114 my_size = src.my_size;
00115 }
00116 return *this;
00117 }
00118
00120
00122 bool empty() const { return size()==0; }
00123
00125
00127 size_type size() const { return __TBB_load_with_acquire(my_size); }
00128
00130
00131 void push(const_reference elem) {
00132 cpq_operation op_data(elem, PUSH_OP);
00133 my_aggregator.execute(&op_data);
00134 if (op_data.status == FAILED)
00135 throw_exception(eid_bad_alloc);
00136 }
00137
00139
00142 bool try_pop(reference elem) {
00143 cpq_operation op_data(POP_OP);
00144 op_data.elem = &elem;
00145 my_aggregator.execute(&op_data);
00146 return op_data.status==SUCCEEDED;
00147 }
00148
00150
00153 void clear() {
00154 data.clear();
00155 mark = 0;
00156 my_size = 0;
00157 }
00158
00160
00161 void swap(concurrent_priority_queue& q) {
00162 data.swap(q.data);
00163 std::swap(mark, q.mark);
00164 std::swap(my_size, q.my_size);
00165 }
00166
00168 allocator_type get_allocator() const { return data.get_allocator(); }
00169
00170 private:
00171 enum operation_type {INVALID_OP, PUSH_OP, POP_OP};
00172 enum operation_status { WAIT=0, SUCCEEDED, FAILED };
00173
00174 class cpq_operation : public aggregated_operation<cpq_operation> {
00175 public:
00176 operation_type type;
00177 union {
00178 value_type *elem;
00179 size_type sz;
00180 };
00181 cpq_operation(const_reference e, operation_type t) :
00182 type(t), elem(const_cast<value_type*>(&e)) {}
00183 cpq_operation(operation_type t) : type(t) {}
00184 };
00185
00186 class my_functor_t {
00187 concurrent_priority_queue<T, Compare, A> *cpq;
00188 public:
00189 my_functor_t() {}
00190 my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
00191 void operator()(cpq_operation* op_list) {
00192 cpq->handle_operations(op_list);
00193 }
00194 };
00195
00196 aggregator< my_functor_t, cpq_operation> my_aggregator;
00198 char padding1[NFS_MaxLineSize - sizeof(aggregator< my_functor_t, cpq_operation >)];
00200 size_type mark;
00201 __TBB_atomic size_type my_size;
00202 Compare compare;
00204 char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
00206
00223 std::vector<value_type, allocator_type> data;
00224
00225 void handle_operations(cpq_operation *op_list) {
00226 cpq_operation *tmp, *pop_list=NULL;
00227
00228 __TBB_ASSERT(mark == data.size(), NULL);
00229
00230
00231
00232 while (op_list) {
00233
00234
00235
00236
00237
00238
00239
00240 call_itt_notify(acquired, &(op_list->status));
00241 __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
00242 tmp = op_list;
00243 op_list = itt_hide_load_word(op_list->next);
00244 if (tmp->type == PUSH_OP) {
00245 __TBB_TRY {
00246 data.push_back(*(tmp->elem));
00247 __TBB_store_with_release(my_size, my_size+1);
00248 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00249 } __TBB_CATCH(...) {
00250 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
00251 }
00252 }
00253 else {
00254 __TBB_ASSERT(tmp->type == POP_OP, NULL);
00255 if (mark < data.size() &&
00256 compare(data[0], data[data.size()-1])) {
00257
00258
00259 *(tmp->elem) = data[data.size()-1];
00260 __TBB_store_with_release(my_size, my_size-1);
00261 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00262 data.pop_back();
00263 __TBB_ASSERT(mark<=data.size(), NULL);
00264 }
00265 else {
00266 itt_hide_store_word(tmp->next, pop_list);
00267 pop_list = tmp;
00268 }
00269 }
00270 }
00271
00272
00273 while (pop_list) {
00274 tmp = pop_list;
00275 pop_list = itt_hide_load_word(pop_list->next);
00276 __TBB_ASSERT(tmp->type == POP_OP, NULL);
00277 if (data.empty()) {
00278 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
00279 }
00280 else {
00281 __TBB_ASSERT(mark<=data.size(), NULL);
00282 if (mark < data.size() &&
00283 compare(data[0], data[data.size()-1])) {
00284
00285
00286 *(tmp->elem) = data[data.size()-1];
00287 __TBB_store_with_release(my_size, my_size-1);
00288 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00289 data.pop_back();
00290 }
00291 else {
00292 *(tmp->elem) = data[0];
00293 __TBB_store_with_release(my_size, my_size-1);
00294 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
00295 reheap();
00296 }
00297 }
00298 }
00299
00300
00301
00302 if (mark<data.size()) heapify();
00303 __TBB_ASSERT(mark == data.size(), NULL);
00304 }
00305
00307 void heapify() {
00308 if (!mark && data.size()>0) mark = 1;
00309 for (; mark<data.size(); ++mark) {
00310
00311 size_type cur_pos = mark;
00312 value_type to_place = data[mark];
00313 do {
00314 size_type parent = (cur_pos-1)>>1;
00315 if (!compare(data[parent], to_place)) break;
00316 data[cur_pos] = data[parent];
00317 cur_pos = parent;
00318 } while( cur_pos );
00319 data[cur_pos] = to_place;
00320 }
00321 }
00322
00324
00325 void reheap() {
00326 size_type cur_pos=0, child=1;
00327
00328 while (child < mark) {
00329 size_type target = child;
00330 if (child+1 < mark && compare(data[child], data[child+1]))
00331 ++target;
00332
00333 if (compare(data[target], data[data.size()-1])) break;
00334 data[cur_pos] = data[target];
00335 cur_pos = target;
00336 child = (cur_pos<<1)+1;
00337 }
00338 data[cur_pos] = data[data.size()-1];
00339 data.pop_back();
00340 if (mark > data.size()) mark = data.size();
00341 }
00342 };
00343
00344 }
00345
00346 using interface5::concurrent_priority_queue;
00347
00348 }
00349
00350 #endif