
Anthony Williams-4 wrote:
Tim Blechmann <tim@klingt.org> writes:
i just saw, that you checked in a lockfree fifo implementation to the boost sandbox. it looks like an implementation of the michael/scott queue, isn't it? if so, i suppose, you are missing a thread-safe memory reclamation mechanism ... some time ago, i wrote a boost-style implementation of this data structure, not sure, if you came across it [1] ... maybe it would make sense to join our efforts in implementing a lockfree queue and get it into boost?
[1] http://tim.klingt.org/git?p=boost_lockfree.git;a=summary
Firstly, I'd like to help with this. I have a couple of lock-free queue implementations lying around from my book. I've attached a sample implementation that uses atomic reference-counting --- it would need to be "boostified", as it's written for C++0x, but that should be relatively straightforward. I don't make any claims for performance, but I believe it to be "correct", with no data races or undefined behaviour --- I *really* want to know if that's not the case, so I can correct it before the book goes to press.
Secondly, both these queues (Tim's at the posted link, and Oliver's at https://svn.boost.org/trac/boost/browser/sandbox/threadpool/boost/tp/lockfre...) have race conditions in dequeue/take.
First, let's look at Tim's implementation at http://tim.klingt.org/git?p=boost_lockfree.git;a=blob;f=boost/lockfree/fifo....
Suppose there are two threads calling deqeue() concurrently. If there is an item in the queue, both will proceed identically down to line 137, as neither thread modifies the queue data before then. Now suppose one thread (A) gets preempted, but the other (B) proceeds. Thread B will read next->data and assign it to ret. It will then update head_ with CAS (which will succeed as no other thread is modifying/has modified the data structure), and *deallocate the node* at line 141. This will destroy next->data. Now, when thread A wakes up it will try and read next->data => reading destroyed object, undefined behaviour.
Oliver's take() suffers a similar problem. Suppose there are two threads calling take(). Both proceed as far as line 168, and then thread A gets pre-empted whilst thread B proceeds. There is an item on the queue, so val is non-NULL, and the queue hasn't been modified, so head_==head. No other thread is modifying the queue, so we end up at line 197. We then update head_ in line 198 and *delete head.ptr* at line 202 and set it to NULL at line 203. Now, when thread A wakes up the first thing it does is read head.ptr->prev, which is a dereference of a NULL ptr => undefined behaviour.
Lock-free code is hard. Memory reclamation makes it doubly hard.
Anthony -- Author of C++ Concurrency in Action | http://www.manning.com/williams just::thread C++0x thread library | http://www.stdthread.co.uk Just Software Solutions Ltd | http://www.justsoftwaresolutions.co.uk 15 Carrallack Mews, St Just, Cornwall, TR19 7UL, UK. Company No. 5478976
template<typename T> class queue { private: struct node;
struct counted_node_ptr { int external_count; node* ptr; };
struct node_counter { int internal_count:30; unsigned external_counters:2; };
struct node { std::atomic<T*> data; std::atomic<node_counter> count; std::atomic<counted_node_ptr> next;
node() { node_counter new_count; new_count.internal_count=0; new_count.external_counters=2; count.store(new_count);
counted_node_ptr next_node={0}; next.store(next_node); }
void release_ref() { node_counter old_counter= count.load(std::memory_order_relaxed); node_counter new_counter; do { new_counter=old_counter; --new_counter.internal_count; } while(!count.compare_exchange_strong( old_counter,new_counter, std::memory_order_acquire,std::memory_order_relaxed));
if(!new_counter.internal_count && !new_counter.external_counters) { delete this; } }
};
std::atomic<counted_node_ptr> head; std::atomic<counted_node_ptr> tail;
static void increase_external_count( std::atomic<counted_node_ptr>& counter, counted_node_ptr& old_counter) { counted_node_ptr new_counter;
do { new_counter=old_counter; ++new_counter.external_count; } while(!counter.compare_exchange_strong( old_counter,new_counter, std::memory_order_acquire, std::memory_order_relaxed));
old_counter.external_count=new_counter.external_count; }
void set_new_tail(counted_node_ptr &old_tail, counted_node_ptr const &new_tail) { node* const current_tail_ptr=old_tail.ptr; while(!tail.compare_exchange_weak(old_tail,new_tail) && old_tail.ptr==current_tail_ptr); if(old_tail.ptr==current_tail_ptr) { free_external_counter(old_tail); } else { current_tail_ptr->release_ref(); } }
static void free_external_counter(counted_node_ptr &old_node_ptr) { node* const ptr=old_node_ptr.ptr; int const count_increase=old_node_ptr.external_count-2;
node_counter old_counter= ptr->count.load(std::memory_order_relaxed); node_counter new_counter; do { new_counter=old_counter; --new_counter.external_counters; new_counter.internal_count+=count_increase; } while(!ptr->count.compare_exchange_strong( old_counter,new_counter, std::memory_order_acquire,std::memory_order_relaxed));
if(!new_counter.internal_count && !new_counter.external_counters) { delete ptr; } }
public: queue() { counted_node_ptr new_node; new_node.external_count=1; new_node.ptr=new node;
head.store(new_node); tail.store(new_node); }
queue(const queue& other)=delete; queue& operator=(const queue& other)=delete;
~queue() { while(pop()); delete head.load().ptr; } std::unique_ptr<T> pop() { counted_node_ptr old_head=head.load(std::memory_order_relaxed); for(;;) { increase_external_count(head,old_head); node* const ptr=old_head.ptr; if(ptr==tail.load().ptr) { return std::unique_ptr<T>(); } counted_node_ptr next=ptr->next.load(); if(head.compare_exchange_strong(old_head,next)) { T* const res=ptr->data.exchange(NULL); free_external_counter(old_head); return std::unique_ptr<T>(res); } ptr->release_ref(); } }
void push(T new_value) { std::unique_ptr<T> new_data(new T(new_value)); counted_node_ptr new_next; new_next.ptr=new node; new_next.external_count=1; counted_node_ptr old_tail=tail.load();
for(;;) { increase_external_count(tail,old_tail);
T* old_data=NULL; if(old_tail.ptr->data.compare_exchange_strong( old_data,new_data.get())) { counted_node_ptr old_next={0}; if(!old_tail.ptr->next.compare_exchange_strong( old_next,new_next)) { delete new_next.ptr; new_next=old_next; } set_new_tail(old_tail, new_next); new_data.release(); break; } else { counted_node_ptr old_next={0}; if(old_tail.ptr->next.compare_exchange_strong( old_next,new_next)) { old_next=new_next; new_next.ptr=new node; } set_new_tail(old_tail, old_next); } } } };
_______________________________________________ Unsubscribe & other changes: http://lists.boost.org/mailman/listinfo.cgi/boost
Hi, I'm also interesteed on lock-free queues. In particular I need one that have * single-writer single-reader So there is not issue push/push and front/front race conditions. I think that we can extend this and provide implementation thats are lock-free for * single-writer single-reader * multiple-writer single-reader * single-writer multiple-reader * multiple-writer multiple-reader So we can have the better implementation for each case. I'm also interested on a dequeue in which * multiple-writer (push) multiple-reader(pop) single_reader(front). I think that the Thread pool has one like that for the internal queues, but I need to verify. Best, Vicente -- View this message in context: http://www.nabble.com/threadpool-lockfree_channel-tp22529985p22535465.html Sent from the Boost - Dev mailing list archive at Nabble.com.