10 #include "distributed.h" 11 #include <nlohmann/json.hpp> 24 bool roleIs(std::string_view role);
26 template <
typename Archive>
54 std::string_view
host,
56 int64_t intevalMs = 10 * 1000);
61 bool consideredDead()
const;
68 std::string bootKey()
const;
69 std::string deadKey()
const;
70 std::string heartBeatKey()
const;
71 std::string heartBeatData()
const;
78 std::unique_ptr<RedisClient> redis_;
80 std::atomic<bool> stop_{
false};
81 std::atomic<bool> consideredDead_{
false};
89 using Clock = std::chrono::steady_clock;
99 std::vector<Cpid2kWorkerInfo> peers(std::string_view role);
100 std::vector<std::string> serviceEndpoints(std::string
const& serviceName);
108 Clock::time_point lastPeersCheck_;
109 std::chrono::milliseconds pcInterval_;
110 std::vector<Cpid2kWorkerInfo> peers_;
111 bool isDone_ =
false;
134 using Clock = std::chrono::steady_clock;
144 int64_t hbIntervalMs = 10 * 1000);
148 static std::unique_ptr<Cpid2kWorker> fromEnvVars();
151 std::string_view prefix()
const;
152 bool consideredDead()
const;
154 std::string redisKey(std::string_view key)
const;
155 std::shared_ptr<RedisClient> threadLocalClient();
161 std::string
const& role = kAnyRole,
162 std::chrono::milliseconds timeout = kDefaultTimeout);
163 void discardDContext(std::string
const& role = kAnyRole);
165 std::vector<Cpid2kWorkerInfo> peers(std::string_view role = kAnyRole);
166 std::vector<std::string> serviceEndpoints(std::string
const& serviceName);
168 std::string_view role,
169 std::chrono::milliseconds timeout = kNoTimeout);
171 std::string_view role,
172 std::chrono::milliseconds timeout = kNoTimeout);
173 void appendMetrics(std::string_view metricsName, nlohmann::json
const& json);
176 std::shared_ptr<RedisClient> redisClient(std::thread::id
id);
177 int numWorkersWithRoleInSpec(std::string_view role);
186 std::chrono::milliseconds pcInterval_;
187 std::unordered_map<std::string, std::unique_ptr<distributed::Context>>
189 std::unordered_map<std::string, std::vector<std::string>> dcontextIds_;
190 std::unordered_map<std::thread::id, std::shared_ptr<RedisClient>>
208 template <
typename T>
213 typename std::enable_if_t<std::is_arithmetic<T>::value>* = 0)
214 : name(
std::move(n)), value(float(v)), aggregation(a) {}
222 virtual void add(
float value) = 0;
223 virtual nlohmann::json value()
const = 0;
228 std::shared_ptr<Cpid2kWorker> worker,
229 std::chrono::milliseconds sendInterval = std::chrono::seconds(30));
231 void push(std::vector<EventMetric>
const& metrics);
235 using Clock = std::chrono::steady_clock;
244 std::unordered_map<std::string, std::unique_ptr<Aggregator>>
aggregators_;
std::chrono::steady_clock Clock
Definition: cpid2kworker.h:89
Definition: cpid2kworker.h:203
std::chrono::steady_clock Clock
Definition: cpid2kworker.h:134
std::thread thr_
Definition: cpid2kworker.h:240
Cpid2kHeartBeater & heartBeater()
Definition: cpid2kworker.h:156
bool roleIs(std::string_view role)
Definition: cpid2kworker.cpp:62
std::shared_ptr< Cpid2kWorker > worker_
Definition: cpid2kworker.h:237
static std::string const kAnyRole
Definition: cpid2kworker.h:135
std::chrono::milliseconds sendInterval_
Definition: cpid2kworker.h:238
static std::chrono::milliseconds const kDefaultTimeout
Definition: cpid2kworker.h:137
Definition: distributed.h:108
Definition: cpid2kworker.h:205
std::string host
IP address of the machine this process is running on.
Definition: cpid2kworker.h:36
static std::chrono::milliseconds const kNoTimeout
Definition: cpid2kworker.h:136
AggregationType
Definition: cpid2kworker.h:200
std::string_view prefix() const
Definition: cpid2kworker.h:95
Definition: cpid2kworker.h:204
static Cpid2kWorkerInfo withLocalIp()
Definition: cpid2kworker.cpp:50
std::string name
Definition: cpid2kworker.h:216
Helper class to aggregate metrics locally, and send them reguarly as events in the redis database as ...
Definition: cpid2kworker.h:198
std::atomic< bool > stop_
Definition: cpid2kworker.h:241
Definition: cpid2kworker.h:20
Periodically sends out heartbeats to a Redis instance.
Definition: cpid2kworker.h:49
std::chrono::steady_clock Clock
Definition: cpid2kworker.h:235
Definition: cpid2kworker.h:207
std::string id
Worker ID.
Definition: cpid2kworker.h:34
Simple, synchronous C++ wrapper for the Hiredis Redis client.
Definition: redisclient.h:30
The TorchCraftAI training library.
Definition: batcher.cpp:15
Definition: cpid2kworker.h:220
Definition: cpid2kworker.h:201
void serialize(Archive &ar)
Definition: cpid2kworker.h:27
std::map< std::string, int > services
Services offered by this worker (name to port number)
Definition: cpid2kworker.h:38
std::mutex aggregatorsMutex_
Definition: cpid2kworker.h:243
Encapsulates information about the participating peers in a cpid2k job.
Definition: cpid2kworker.h:87
static Cpid2kWorkerInfo withLocalIpFromEnvVars()
Definition: cpid2kworker.cpp:56
EventMetric(std::string n, T v, AggregationType a=AggregateMean, typename std::enable_if_t< std::is_arithmetic< T >::value > *=0)
Definition: cpid2kworker.h:209
AggregationType aggregation
Definition: cpid2kworker.h:218
float value
Definition: cpid2kworker.h:217
int64_t intervalMs() const
Definition: cpid2kworker.h:63
Helper class for job coordination via a central Redis instance.
Definition: cpid2kworker.h:132
std::string_view type
Definition: cpid2kworker.h:224
Definition: cpid2kworker.h:202
std::unordered_map< std::string, std::unique_ptr< Aggregator > > aggregators_
Definition: cpid2kworker.h:244