Bounded MPMC queue

According to the classification it's MPMC, array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO, blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lockfree in the official meaning, just implemented by means of atomic RMW operations w/o mutexes.

The cost of enqueue/dequeue is 1 CAS per operation. No amortization, just 1 CAS. No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue), i.e. do not touch the same data while queue is not empty.

On my dual-core laptop enqueue/dequeue takes 75 cycles on average in a synthetic multi-threaded benchmark.

Source code test suite are attached below (the file contains limited implementation of std::atomic, ready to run on Windows, MSVC, x86-32) .

template<typename T>

class mpmc_bounded_queue

{

public:

mpmc_bounded_queue(size_t buffer_size)

: buffer_(new cell_t [buffer_size])

, buffer_mask_(buffer_size - 1)

{

assert((buffer_size >= 2) &&

((buffer_size & (buffer_size - 1)) == 0));

for (size_t i = 0; i != buffer_size; i += 1)

buffer_[i].sequence_.store(i, std::memory_order_relaxed);

enqueue_pos_.store(0, std::memory_order_relaxed);

dequeue_pos_.store(0, std::memory_order_relaxed);

}

~mpmc_bounded_queue()

{

delete [] buffer_;

}

bool enqueue(T const& data)

{

cell_t* cell;

size_t pos = enqueue_pos_.load(std::memory_order_relaxed);

for (;;)

{

cell = &buffer_[pos & buffer_mask_];

size_t seq =

cell->sequence_.load(std::memory_order_acquire);

intptr_t dif = (intptr_t)seq - (intptr_t)pos;

if (dif == 0)

{

if (enqueue_pos_.compare_exchange_weak

(pos, pos + 1, std::memory_order_relaxed))

break;

}

else if (dif < 0)

return false;

else

pos = enqueue_pos_.load(std::memory_order_relaxed);

}

cell->data_ = data;

cell->sequence_.store(pos + 1, std::memory_order_release);

return true;

}

bool dequeue(T& data)

{

cell_t* cell;

size_t pos = dequeue_pos_.load(std::memory_order_relaxed);

for (;;)

{

cell = &buffer_[pos & buffer_mask_];

size_t seq =

cell->sequence_.load(std::memory_order_acquire);

intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);

if (dif == 0)

{

if (dequeue_pos_.compare_exchange_weak

(pos, pos + 1, std::memory_order_relaxed))

break;

}

else if (dif < 0)

return false;

else

pos = dequeue_pos_.load(std::memory_order_relaxed);

}

data = cell->data_;

cell->sequence_.store

(pos + buffer_mask_ + 1, std::memory_order_release);

return true;

}

private:

struct cell_t

{

std::atomic<size_t> sequence_;

T data_;

};

static size_t const cacheline_size = 64;

typedef char cacheline_pad_t [cacheline_size];

cacheline_pad_t pad0_;

cell_t* const buffer_;

size_t const buffer_mask_;

cacheline_pad_t pad1_;

std::atomic<size_t> enqueue_pos_;

cacheline_pad_t pad2_;

std::atomic<size_t> dequeue_pos_;

cacheline_pad_t pad3_;

mpmc_bounded_queue(mpmc_bounded_queue const&);

void operator = (mpmc_bounded_queue const&);

};