SatDump 2.0.0-alpha-76a915210
Loading...
Searching...
No Matches
readerwritercircularbuffer.h
1// ©2020 Cameron Desrochers.
2// Distributed under the simplified BSD license (see the license file that
3// should have come with this header).
4
5// Provides a C++11 implementation of a single-producer, single-consumer wait-free concurrent
6// circular buffer (fixed-size queue).
7
8#pragma once
9
10#include <utility>
11#include <chrono>
12#include <memory>
13#include <cstdlib>
14#include <cstdint>
15#include <cassert>
16
17// Note that this implementation is fully modern C++11 (not compatible with old MSVC versions)
18// but we still include atomicops.h for its LightweightSemaphore implementation.
19#include "atomicops.h"
20
21#ifndef MOODYCAMEL_CACHE_LINE_SIZE
22#define MOODYCAMEL_CACHE_LINE_SIZE 64
23#endif
24
25namespace moodycamel {
26
27template<typename T>
28class BlockingReaderWriterCircularBuffer
29{
30public:
31 typedef T value_type;
32
33public:
34 explicit BlockingReaderWriterCircularBuffer(std::size_t capacity)
35 : maxcap(capacity), mask(), rawData(), data(),
36 slots_(new spsc_sema::LightweightSemaphore(static_cast<spsc_sema::LightweightSemaphore::ssize_t>(capacity))),
38 nextSlot(0), nextItem(0)
39 {
40 // Round capacity up to power of two to compute modulo mask.
41 // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
42 --capacity;
43 capacity |= capacity >> 1;
44 capacity |= capacity >> 2;
45 capacity |= capacity >> 4;
46 for (std::size_t i = 1; i < sizeof(std::size_t); i <<= 1)
47 capacity |= capacity >> (i << 3);
48 mask = capacity++;
49 rawData = static_cast<char*>(std::malloc(capacity * sizeof(T) + std::alignment_of<T>::value - 1));
50 data = align_for<T>(rawData);
51 }
52
53 BlockingReaderWriterCircularBuffer(BlockingReaderWriterCircularBuffer&& other)
54 : maxcap(0), mask(0), rawData(nullptr), data(nullptr),
57 nextSlot(), nextItem()
58 {
59 swap(other);
60 }
61
62 BlockingReaderWriterCircularBuffer(BlockingReaderWriterCircularBuffer const&) = delete;
63
64 // Note: The queue should not be accessed concurrently while it's
65 // being deleted. It's up to the user to synchronize this.
66 ~BlockingReaderWriterCircularBuffer()
67 {
68 for (std::size_t i = 0, n = items->availableApprox(); i != n; ++i)
69 reinterpret_cast<T*>(data)[(nextItem + i) & mask].~T();
70 std::free(rawData);
71 }
72
73 BlockingReaderWriterCircularBuffer& operator=(BlockingReaderWriterCircularBuffer&& other) noexcept
74 {
75 swap(other);
76 return *this;
77 }
78
79 BlockingReaderWriterCircularBuffer& operator=(BlockingReaderWriterCircularBuffer const&) = delete;
80
81 // Swaps the contents of this buffer with the contents of another.
82 // Not thread-safe.
83 void swap(BlockingReaderWriterCircularBuffer& other) noexcept
84 {
85 std::swap(maxcap, other.maxcap);
86 std::swap(mask, other.mask);
87 std::swap(rawData, other.rawData);
88 std::swap(data, other.data);
89 std::swap(slots_, other.slots_);
90 std::swap(items, other.items);
91 std::swap(nextSlot, other.nextSlot);
92 std::swap(nextItem, other.nextItem);
93 }
94
95 // Enqueues a single item (by copying it).
96 // Fails if not enough room to enqueue.
97 // Thread-safe when called by producer thread.
98 // No exception guarantee (state will be corrupted) if constructor of T throws.
99 bool try_enqueue(T const& item)
100 {
101 if (!slots_->tryWait())
102 return false;
103 inner_enqueue(item);
104 return true;
105 }
106
107 // Enqueues a single item (by moving it, if possible).
108 // Fails if not enough room to enqueue.
109 // Thread-safe when called by producer thread.
110 // No exception guarantee (state will be corrupted) if constructor of T throws.
111 bool try_enqueue(T&& item)
112 {
113 if (!slots_->tryWait())
114 return false;
115 inner_enqueue(std::move(item));
116 return true;
117 }
118
119 // Blocks the current thread until there's enough space to enqueue the given item,
120 // then enqueues it (via copy).
121 // Thread-safe when called by producer thread.
122 // No exception guarantee (state will be corrupted) if constructor of T throws.
123 void wait_enqueue(T const& item)
124 {
125 while (!slots_->wait());
126 inner_enqueue(item);
127 }
128
129 // Blocks the current thread until there's enough space to enqueue the given item,
130 // then enqueues it (via move, if possible).
131 // Thread-safe when called by producer thread.
132 // No exception guarantee (state will be corrupted) if constructor of T throws.
133 void wait_enqueue(T&& item)
134 {
135 while (!slots_->wait());
136 inner_enqueue(std::move(item));
137 }
138
139 // Blocks the current thread until there's enough space to enqueue the given item,
140 // or the timeout expires. Returns false without enqueueing the item if the timeout
141 // expires, otherwise enqueues the item (via copy) and returns true.
142 // Thread-safe when called by producer thread.
143 // No exception guarantee (state will be corrupted) if constructor of T throws.
144 bool wait_enqueue_timed(T const& item, std::int64_t timeout_usecs)
145 {
146 if (!slots_->wait(timeout_usecs))
147 return false;
148 inner_enqueue(item);
149 return true;
150 }
151
152 // Blocks the current thread until there's enough space to enqueue the given item,
153 // or the timeout expires. Returns false without enqueueing the item if the timeout
154 // expires, otherwise enqueues the item (via move, if possible) and returns true.
155 // Thread-safe when called by producer thread.
156 // No exception guarantee (state will be corrupted) if constructor of T throws.
157 bool wait_enqueue_timed(T&& item, std::int64_t timeout_usecs)
158 {
159 if (!slots_->wait(timeout_usecs))
160 return false;
161 inner_enqueue(std::move(item));
162 return true;
163 }
164
165 // Blocks the current thread until there's enough space to enqueue the given item,
166 // or the timeout expires. Returns false without enqueueing the item if the timeout
167 // expires, otherwise enqueues the item (via copy) and returns true.
168 // Thread-safe when called by producer thread.
169 // No exception guarantee (state will be corrupted) if constructor of T throws.
170 template<typename Rep, typename Period>
171 inline bool wait_enqueue_timed(T const& item, std::chrono::duration<Rep, Period> const& timeout)
172 {
173 return wait_enqueue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
174 }
175
176 // Blocks the current thread until there's enough space to enqueue the given item,
177 // or the timeout expires. Returns false without enqueueing the item if the timeout
178 // expires, otherwise enqueues the item (via move, if possible) and returns true.
179 // Thread-safe when called by producer thread.
180 // No exception guarantee (state will be corrupted) if constructor of T throws.
181 template<typename Rep, typename Period>
182 inline bool wait_enqueue_timed(T&& item, std::chrono::duration<Rep, Period> const& timeout)
183 {
184 return wait_enqueue_timed(std::move(item), std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
185 }
186
187 // Attempts to dequeue a single item.
188 // Returns false if the buffer is empty.
189 // Thread-safe when called by consumer thread.
190 // No exception guarantee (state will be corrupted) if assignment operator of U throws.
191 template<typename U>
192 bool try_dequeue(U& item)
193 {
194 if (!items->tryWait())
195 return false;
196 inner_dequeue(item);
197 return true;
198 }
199
200 // Blocks the current thread until there's something to dequeue, then dequeues it.
201 // Thread-safe when called by consumer thread.
202 // No exception guarantee (state will be corrupted) if assignment operator of U throws.
203 template<typename U>
204 void wait_dequeue(U& item)
205 {
206 while (!items->wait());
207 inner_dequeue(item);
208 }
209
210 // Blocks the current thread until either there's something to dequeue
211 // or the timeout expires. Returns false without setting `item` if the
212 // timeout expires, otherwise assigns to `item` and returns true.
213 // Thread-safe when called by consumer thread.
214 // No exception guarantee (state will be corrupted) if assignment operator of U throws.
215 template<typename U>
216 bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs)
217 {
218 if (!items->wait(timeout_usecs))
219 return false;
220 inner_dequeue(item);
221 return true;
222 }
223
224 // Blocks the current thread until either there's something to dequeue
225 // or the timeout expires. Returns false without setting `item` if the
226 // timeout expires, otherwise assigns to `item` and returns true.
227 // Thread-safe when called by consumer thread.
228 // No exception guarantee (state will be corrupted) if assignment operator of U throws.
229 template<typename U, typename Rep, typename Period>
230 inline bool wait_dequeue_timed(U& item, std::chrono::duration<Rep, Period> const& timeout)
231 {
232 return wait_dequeue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
233 }
234
235 // Returns a pointer to the next element in the queue (the one that would
236 // be removed next by a call to `try_dequeue` or `try_pop`). If the queue
237 // appears empty at the time the method is called, returns nullptr instead.
238 // Thread-safe when called by consumer thread.
239 inline T* peek()
240 {
241 if (!items->availableApprox())
242 return nullptr;
243 return inner_peek();
244 }
245
246 // Pops the next element from the queue, if there is one.
247 // Thread-safe when called by consumer thread.
248 inline bool try_pop()
249 {
250 if (!items->tryWait())
251 return false;
252 inner_pop();
253 return true;
254 }
255
256 // Returns a (possibly outdated) snapshot of the total number of elements currently in the buffer.
257 // Thread-safe.
258 inline std::size_t size_approx() const
259 {
260 return items->availableApprox();
261 }
262
263 // Returns the maximum number of elements that this circular buffer can hold at once.
264 // Thread-safe.
265 inline std::size_t max_capacity() const
266 {
267 return maxcap;
268 }
269
270private:
271 template<typename U>
272 void inner_enqueue(U&& item)
273 {
274 std::size_t i = nextSlot++;
275 new (reinterpret_cast<T*>(data) + (i & mask)) T(std::forward<U>(item));
276 items->signal();
277 }
278
279 template<typename U>
280 void inner_dequeue(U& item)
281 {
282 std::size_t i = nextItem++;
283 T& element = reinterpret_cast<T*>(data)[i & mask];
284 item = std::move(element);
285 element.~T();
286 slots_->signal();
287 }
288
289 T* inner_peek()
290 {
291 return reinterpret_cast<T*>(data) + (nextItem & mask);
292 }
293
294 void inner_pop()
295 {
296 std::size_t i = nextItem++;
297 reinterpret_cast<T*>(data)[i & mask].~T();
298 slots_->signal();
299 }
300
301 template<typename U>
302 static inline char* align_for(char* ptr)
303 {
304 const std::size_t alignment = std::alignment_of<U>::value;
305 return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
306 }
307
308private:
309 std::size_t maxcap; // actual (non-power-of-two) capacity
310 std::size_t mask; // circular buffer capacity mask (for cheap modulo)
311 char* rawData; // raw circular buffer memory
312 char* data; // circular buffer memory aligned to element alignment
313 std::unique_ptr<spsc_sema::LightweightSemaphore> slots_; // number of slots currently free (named with underscore to accommodate Qt's 'slots' macro)
314 std::unique_ptr<spsc_sema::LightweightSemaphore> items; // number of elements currently enqueued
315 char cachelineFiller0[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(char*) * 2 - sizeof(std::size_t) * 2 - sizeof(std::unique_ptr<spsc_sema::LightweightSemaphore>) * 2];
316 std::size_t nextSlot; // index of next free slot to enqueue into
317 char cachelineFiller1[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(std::size_t)];
318 std::size_t nextItem; // index of next element to dequeue from
319};
320
321}