15 #include <shared_mutex> 40 using ReplyFn = std::function<void(void const* buf, size_t len)>;
42 std::function<void(void const* buf, size_t len, ReplyFn reply)>;
51 size_t numThreads = 1,
52 std::string endpoint = std::string());
55 std::string endpoint()
const;
58 void listen(std::string endpoint, std::promise<std::string>&& endpointP);
59 void runWorker(std::string
const& endpoint);
63 std::shared_ptr<zmq::context_t> context_;
65 mutable std::string endpoint_;
66 mutable std::future<std::string> endpointF_;
67 mutable std::mutex endpointM_;
90 using Clock = std::chrono::steady_clock;
92 using Blob = std::vector<char>;
95 size_t maxConcurrentRequests,
96 std::vector<std::string> endpoints,
97 std::shared_ptr<zmq::context_t> context =
nullptr);
100 std::future<std::vector<char>> request(std::vector<char> msg);
102 bool updateEndpoints(std::vector<std::string> endpoints);
105 setReplyTimeoutMs(timeout.count());
107 void setReplyTimeoutMs(
size_t timeoutMs);
108 void setMaxRetries(
size_t count);
115 std::promise<Blob> promise;
117 QueueItem(
Blob msg) : msg(std::move(msg)) {}
118 QueueItem() =
default;
119 QueueItem(QueueItem&&) =
default;
120 QueueItem& operator=(QueueItem&&) =
default;
121 QueueItem(QueueItem
const&) =
delete;
122 QueueItem& operator=(QueueItem
const&) =
delete;
125 std::shared_ptr<zmq::context_t> context_;
127 std::shared_mutex epM_;
128 std::vector<std::string> endpoints_;
129 bool endpointsChanged_ =
false;
132 std::queue<QueueItem> queue_;
133 size_t const maxConcurrentRequests_;
134 std::atomic<size_t> replyTimeoutMs_{10 * 1000};
135 std::atomic<size_t> maxRetries_{std::numeric_limits<size_t>::max()};
137 std::atomic<bool> stop_{
false};
138 std::string signalEndpoint_;
139 std::unique_ptr<zmq::socket_t> signalSocket_;
A request-reply client backed by ZeroMQ.
Definition: reqrepserver.h:88
std::function< void(void const *buf, size_t len)> ReplyFn
Definition: reqrepserver.h:40
A request-reply server backed by ZeroMQ.
Definition: reqrepserver.h:38
std::function< void(void const *buf, size_t len, ReplyFn reply)> CallbackFn
Definition: reqrepserver.h:42
std::chrono::time_point< Clock > TimePoint
Definition: reqrepserver.h:91
The TorchCraftAI training library.
Definition: batcher.cpp:15
Definition: episodeserver.h:16
std::chrono::steady_clock Clock
Definition: reqrepserver.h:90
void setReplyTimeout(std::chrono::milliseconds timeout)
Definition: reqrepserver.h:104
std::vector< char > Blob
Definition: reqrepserver.h:92