10 #include "distributed.h" 11 #include "reqrepserver.h" 13 #include <common/parallel.h> 14 #include <common/serialization.h> 15 #include <common/zstdstream.h> 20 extern std::string
kDeny;
56 std::string endpoint = std::string());
59 std::optional<T>
get();
61 return rrs_->endpoint();
69 std::optional<T> produce();
72 std::condition_variable cv_;
73 std::queue<std::vector<char>> queue_;
74 size_t const maxInQueue_;
75 std::atomic<bool> stop_{
false};
76 std::unique_ptr<common::BufferedProducer<T>> bprod_;
77 std::unique_ptr<ReqRepServer> rrs_;
85 : maxInQueue_(maxQueueSize) {
86 bprod_ = std::make_unique<common::BufferedProducer<T>>(
87 nthreads, maxQueueSize, [
this] {
return produce(); });
88 rrs_ = std::make_unique<ReqRepServer>(
101 template <
typename T>
103 return bprod_->get();
106 template <
typename T>
112 template <
typename T>
117 VLOG(2) <<
"ZeroMQBufferedProducer: received " << len <<
" bytes";
119 std::lock_guard<std::mutex> lock(mutex_);
120 if (queue_.size() >= maxInQueue_) {
121 VLOG(0) <<
"ZeroMQBufferedProducer: queue is full, cannot accept message";
124 }
else if (queue_.size() > 0) {
125 VLOG(1) <<
"ZeroMQBufferedProducer: queue size " << queue_.size();
129 static_cast<char const*>(buf), static_cast<char const*>(buf) + len);
137 template <
typename T>
139 std::unique_lock<std::mutex> lock(mutex_);
140 cv_.wait(lock, [&] {
return stop_ || !queue_.empty(); });
145 auto data = std::move(queue_.front());
151 cereal::BinaryInputArchive ar(is);
std::string kDeny
Definition: zmqbufferedproducer.cpp:13
std::function< void(void const *buf, size_t len)> ReplyFn
Definition: reqrepserver.h:40
std::optional< T > get()
Definition: zmqbufferedproducer.h:102
~ZeroMQBufferedProducer()
Definition: zmqbufferedproducer.h:97
std::string kConfirm
Definition: zmqbufferedproducer.cpp:12
A buffered producer that obtains data via ZeroMQ.
Definition: zmqbufferedproducer.h:51
The TorchCraftAI training library.
Definition: batcher.cpp:15
std::string endpoint() const
Definition: zmqbufferedproducer.h:60
void stop()
Definition: zmqbufferedproducer.h:107
A stream buffer for reading from a vector of bytes.
Definition: serialization.h:33
ZeroMQBufferedProducer(uint8_t nthreads, size_t maxQueueSize, std::string endpoint=std::string())
Definition: zmqbufferedproducer.h:81
Definition: zstdstream.h:130
void handleRequest(void const *buf, size_t len, ReqRepServer::ReplyFn reply)
Definition: zmqbufferedproducer.h:113