10 #include <common/assert.h> 11 #include <condition_variable> 18 #include <type_traits> 44 using Function = std::function<void(T)>;
94 using Function = std::function<std::optional<T>()>;
105 std::optional<T>
get();
107 void run(Function fn);
114 std::atomic_int running_;
116 std::queue<std::future<std::optional<T>>>
queue_;
118 std::condition_variable queueCV_;
123 template <
typename T>
130 throw std::runtime_error(
131 "Cannot construct BufferedConsumer with > 0 threads but zero-sized " 134 for (
auto i = nthreads; i > 0; i--) {
140 template <
typename T>
143 std::lock_guard<std::mutex> lock(
mutex_);
154 template <
typename T>
156 std::unique_lock<std::mutex> lock(
mutex_);
161 template <
typename T>
165 std::unique_lock<std::mutex> lock(
mutex_);
169 throw std::runtime_error(
"BufferedConsumer not active");
171 queue_.push(std::move(arg));
176 std::unique_lock<std::mutex> lock(
mutex_);
178 throw std::runtime_error(
"BufferedConsumer not active");
188 template <
typename T>
192 "Please use BufferedConsumer::enqueue when not using threads");
194 std::unique_lock<std::mutex> lock(
mutex_);
196 throw std::runtime_error(
"BufferedConsumer not active");
201 queue_.push(std::move(arg));
206 template <
typename T>
208 std::unique_lock<std::mutex> lock(
mutex_);
218 T item = std::move(
queue_.front());
223 fn_(std::move(item));
236 template <
typename T>
242 if (nThreads_ == 0) {
243 throw std::runtime_error(
"Cannot use a buffered producer with no threads");
245 if (maxQueueSize == 0) {
246 throw std::runtime_error(
247 "Cannot consturct a BufferedProducer with 0 queue size");
249 for (
auto i = nThreads; i > 0; i--) {
252 running_ = nThreads_;
256 template <
typename T>
259 std::lock_guard<std::mutex> lock(mutex_);
261 queueCV_.notify_all();
263 for (
auto& th : threads_) {
268 template <
typename T>
270 std::unique_lock<std::mutex> lock(mutex_);
272 lock, [&] {
return stop_ || !queue_.empty() || running_ == 0; });
274 throw std::runtime_error(
"BufferedProducer not active");
276 if (running_ == 0 && queue_.empty()) {
277 return std::optional<T>();
279 auto ret = queue_.front().get();
281 queueCV_.notify_all();
285 template <
typename T>
288 std::unique_lock<std::mutex> lock(mutex_);
289 queueCV_.wait(lock, [&] {
290 return stop_ || queue_.size() + working_ < maxQueueSize_;
296 std::promise<std::optional<T>> dataPromise;
301 bool done = (!result.has_value());
307 queueCV_.notify_all();
310 dataPromise.set_value(std::move(result));
311 queue_.push(dataPromise.get_future());
312 queueCV_.notify_all();
void run()
Definition: parallel.h:207
~BufferedConsumer()
Stops the consumers, discarding any items in the queue.
Definition: parallel.h:141
std::optional< T > get()
Definition: parallel.h:269
Function fn_
Definition: parallel.h:73
void wait()
Blocks until the queue is empty or the consumers are stopped.
Definition: parallel.h:155
int64_t consuming_
Definition: parallel.h:72
std::condition_variable itemReady_
Definition: parallel.h:77
std::queue< T > queue_
Definition: parallel.h:75
std::condition_variable itemDone_
Definition: parallel.h:78
void enqueue(T arg)
Adds another item to the work queue, possibly blocking If the number of threads is zero...
Definition: parallel.h:162
std::mutex mutex_
Definition: parallel.h:76
size_t const maxQueueSize_
Definition: parallel.h:70
void run(Function fn)
Definition: parallel.h:286
General utilities.
Definition: assert.cpp:7
A simple producer class.
Definition: parallel.h:93
T type
Definition: parallel.h:97
BufferedProducer(uint8_t nthreads, size_t maxQueueSize, Function &&fn)
Definition: parallel.h:237
bool stop_
Definition: parallel.h:71
std::vector< std::thread > threads_
Definition: parallel.h:74
A simple producer/consumer class.
Definition: parallel.h:43
BufferedConsumer(uint8_t nthreads, size_t maxQueueSize, Function &&fn)
Definition: parallel.h:124
Request type
Definition: parallel.h:47
~BufferedProducer()
Stops the producers, discarding any items in the queue.
Definition: parallel.h:257
void enqueueOrReplaceOldest(T arg)
Same as 'enqueue', except that if the queue is full, the oldest element will be removed before insert...
Definition: parallel.h:189