10 #include "zmqbufferedproducer.h" 43 using Request = std::vector<char>;
44 using Reply = std::vector<char>;
50 std::vector<std::string> endpoints,
51 std::shared_ptr<zmq::context_t> context =
nullptr);
58 size_t const maxConcurrentRequests_;
59 std::list<std::pair<Request, std::future<Reply>>> pending_;
60 std::atomic<bool> stop_{
false};
62 std::unique_ptr<common::BufferedConsumer<Request>> bcsend_;
63 std::unique_ptr<common::BufferedConsumer<T>> bcser_;
70 std::vector<std::string> endpoints,
71 std::shared_ptr<zmq::context_t> context)
72 : maxConcurrentRequests_(
std::min(maxQueueSize, size_t(64))),
73 client_(maxConcurrentRequests_, endpoints, context) {
76 bcsend_ = std::make_unique<common::BufferedConsumer<Request>>(
77 0, 1, [
this](Request ca) {
86 while (pending_.size() >= maxConcurrentRequests_ && !stop_.load()) {
88 std::this_thread::sleep_for(std::chrono::milliseconds(
89 int(10 * std::pow(2, std::min(ntry, 5)))));
92 for (
auto it = pending_.begin(); it != pending_.end();) {
93 auto& [req, fut] = *it;
94 if (fut.wait_for(std::chrono::seconds(0)) !=
95 std::future_status::ready) {
102 }
catch (std::exception
const& ex) {
105 <<
"ZeroMQBufferedConsumer: got exception instead of reply: " 108 *it = std::make_pair(
109 std::move(copy), client_.
request(std::move(req)));
118 it = pending_.erase(it);
120 VLOG(0) <<
"ZeroMQBufferedConsumer: got non-affirmative " 122 << reply.size() <<
", retrying";
124 *it = std::make_pair(
125 std::move(copy), client_.
request(std::move(req)));
132 pending_.emplace_back(std::move(copy), client_.
request(std::move(ca)));
136 bcser_ = std::make_unique<common::BufferedConsumer<T>>(
137 nthreads, maxQueueSize, [
this](T data) {
141 cereal::BinaryOutputArchive ar(os);
148 template <
typename T>
155 template <
typename T>
157 bcser_->enqueue(std::move(arg));
160 template <
typename T>
162 std::vector<std::string> endpoints) {
A request-reply client backed by ZeroMQ.
Definition: reqrepserver.h:88
bool updateEndpoints(std::vector< std::string > endpoints)
Returns true if the endpoints changed.
Definition: reqrepserver.cpp:268
A stream buffer for writing to an accessible vector of bytes.
Definition: serialization.h:51
void enqueue(T arg)
Definition: zmqbufferedconsumer.h:156
std::string kConfirm
Definition: zmqbufferedproducer.cpp:12
std::future< std::vector< char > > request(std::vector< char > msg)
Definition: reqrepserver.cpp:261
Output stream for Zstd-compressed data.
Definition: zstdstream.h:140
A buffered consumer that sends data via ZeroMQ.
Definition: zmqbufferedconsumer.h:42
~ZeroMQBufferedConsumer()
Definition: zmqbufferedconsumer.h:149
void updateEndpoints(std::vector< std::string > endpoints)
Definition: zmqbufferedconsumer.h:161
The TorchCraftAI training library.
Definition: batcher.cpp:15
ZeroMQBufferedConsumer(uint8_t nthreads, size_t maxQueueSize, std::vector< std::string > endpoints, std::shared_ptr< zmq::context_t > context=nullptr)
Definition: zmqbufferedconsumer.h:67
std::vector< char > takeData()
Definition: serialization.cpp:26