28class BlockingReaderWriterCircularBuffer
34 explicit BlockingReaderWriterCircularBuffer(std::size_t capacity)
35 : maxcap(capacity), mask(), rawData(), data(),
38 nextSlot(0), nextItem(0)
43 capacity |= capacity >> 1;
44 capacity |= capacity >> 2;
45 capacity |= capacity >> 4;
46 for (std::size_t i = 1; i <
sizeof(std::size_t); i <<= 1)
47 capacity |= capacity >> (i << 3);
49 rawData =
static_cast<char*
>(std::malloc(capacity *
sizeof(T) + std::alignment_of<T>::value - 1));
50 data = align_for<T>(rawData);
53 BlockingReaderWriterCircularBuffer(BlockingReaderWriterCircularBuffer&& other)
54 : maxcap(0), mask(0), rawData(
nullptr), data(
nullptr),
57 nextSlot(), nextItem()
62 BlockingReaderWriterCircularBuffer(BlockingReaderWriterCircularBuffer
const&) =
delete;
66 ~BlockingReaderWriterCircularBuffer()
68 for (std::size_t i = 0, n = items->availableApprox(); i != n; ++i)
69 reinterpret_cast<T*
>(data)[(nextItem + i) & mask].~T();
73 BlockingReaderWriterCircularBuffer& operator=(BlockingReaderWriterCircularBuffer&& other)
noexcept
79 BlockingReaderWriterCircularBuffer& operator=(BlockingReaderWriterCircularBuffer
const&) =
delete;
83 void swap(BlockingReaderWriterCircularBuffer& other)
noexcept
85 std::swap(maxcap, other.maxcap);
86 std::swap(mask, other.mask);
87 std::swap(rawData, other.rawData);
88 std::swap(data, other.data);
89 std::swap(slots_, other.slots_);
90 std::swap(items, other.items);
91 std::swap(nextSlot, other.nextSlot);
92 std::swap(nextItem, other.nextItem);
99 bool try_enqueue(T
const& item)
101 if (!slots_->tryWait())
111 bool try_enqueue(T&& item)
113 if (!slots_->tryWait())
115 inner_enqueue(std::move(item));
123 void wait_enqueue(T
const& item)
125 while (!slots_->wait());
133 void wait_enqueue(T&& item)
135 while (!slots_->wait());
136 inner_enqueue(std::move(item));
144 bool wait_enqueue_timed(T
const& item, std::int64_t timeout_usecs)
146 if (!slots_->wait(timeout_usecs))
157 bool wait_enqueue_timed(T&& item, std::int64_t timeout_usecs)
159 if (!slots_->wait(timeout_usecs))
161 inner_enqueue(std::move(item));
170 template<
typename Rep,
typename Period>
171 inline bool wait_enqueue_timed(T
const& item, std::chrono::duration<Rep, Period>
const& timeout)
173 return wait_enqueue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
181 template<
typename Rep,
typename Period>
182 inline bool wait_enqueue_timed(T&& item, std::chrono::duration<Rep, Period>
const& timeout)
184 return wait_enqueue_timed(std::move(item), std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
192 bool try_dequeue(U& item)
194 if (!items->tryWait())
204 void wait_dequeue(U& item)
206 while (!items->wait());
216 bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs)
218 if (!items->wait(timeout_usecs))
229 template<
typename U,
typename Rep,
typename Period>
230 inline bool wait_dequeue_timed(U& item, std::chrono::duration<Rep, Period>
const& timeout)
232 return wait_dequeue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
241 if (!items->availableApprox())
248 inline bool try_pop()
250 if (!items->tryWait())
258 inline std::size_t size_approx()
const
260 return items->availableApprox();
265 inline std::size_t max_capacity()
const
272 void inner_enqueue(U&& item)
274 std::size_t i = nextSlot++;
275 new (
reinterpret_cast<T*
>(data) + (i & mask)) T(std::forward<U>(item));
280 void inner_dequeue(U& item)
282 std::size_t i = nextItem++;
283 T& element =
reinterpret_cast<T*
>(data)[i & mask];
284 item = std::move(element);
291 return reinterpret_cast<T*
>(data) + (nextItem & mask);
296 std::size_t i = nextItem++;
297 reinterpret_cast<T*
>(data)[i & mask].~T();
302 static inline char* align_for(
char* ptr)
304 const std::size_t alignment = std::alignment_of<U>::value;
305 return ptr + (alignment - (
reinterpret_cast<std::uintptr_t
>(ptr) % alignment)) % alignment;
313 std::unique_ptr<spsc_sema::LightweightSemaphore> slots_;
314 std::unique_ptr<spsc_sema::LightweightSemaphore> items;
315 char cachelineFiller0[MOODYCAMEL_CACHE_LINE_SIZE -
sizeof(
char*) * 2 -
sizeof(std::size_t) * 2 -
sizeof(std::unique_ptr<spsc_sema::LightweightSemaphore>) * 2];
316 std::size_t nextSlot;
317 char cachelineFiller1[MOODYCAMEL_CACHE_LINE_SIZE -
sizeof(std::size_t)];
318 std::size_t nextItem;