1 #include "ConfigZyre.hh" 17 std::shared_ptr<Salsa::Node>
ConfigZyre::apply(std::vector<std::shared_ptr<Salsa::ActorZmq>> * targetActors)
23 if (!targetActors)
return nullptr;
25 std::shared_ptr<Salsa::Node> node =
nullptr;
27 if (!
mConfig[
"salsa"][
"type"] ||
mConfig[
"salsa"][
"type"].as<std::string>() !=
"zyre") {
28 SPD_ERROR(
"Salsa type is not [zyre] !!! ");
32 if (!
mConfig[
"salsa"][
"spec"]) {
33 SPD_ERROR(
"Salsa spec was not found !!!");
37 SPD_TRACE(
"Caching hostname via zsys_hostname()");
38 char * pHostnameCStr_ = zsys_hostname();
39 std::string hostname =
"nohostname";
41 hostname = pHostnameCStr_;
44 SPD_TRACE(
"Cached hostname [{}]", hostname);
46 node = std::make_shared<Salsa::Node>(
"SALSA");
49 for (
auto spec :
mConfig[
"salsa"][
"spec"]) {
52 SPD_ERROR(
"Nodes array is missing for [{}] !!!", spec[
"name"].as<std::string>());
55 auto name = spec[
"name"].as<std::string>();
64 if (name ==
filter.first) {
66 if (opt[
"replicas"]) {
67 spec[
"replicas"] = opt[
"replicas"].as<
int>();
75 SPD_TRACE(
"name [{}]", name);
78 if (spec[
"replicas"]) {
79 count = spec[
"replicas"].as<
int>();
81 for (
int iCount = 0; iCount < count; iCount++) {
83 std::string zyreName = fmt::format(
"{}:{}:{}:{}", name, hostname, getpid(), nodeId);
84 std::shared_ptr<Salsa::NodeZyre> pNodeZyre = std::make_shared<Salsa::NodeZyre>(zyreName);
86 if (spec[
"jobinfo"][
"broker"][
"protocol"] && spec[
"jobinfo"][
"broker"][
"ip"] &&
87 spec[
"jobinfo"][
"broker"][
"port"]) {
88 std::string url = spec[
"jobinfo"][
"broker"][
"ip"].as<std::string>();
89 if (url.empty()) url = hostname;
90 url = fmt::format(
">{}{}:{}", spec[
"jobinfo"][
"broker"][
"protocol"].as<std::string>(), url,
91 spec[
"jobinfo"][
"broker"][
"port"].as<std::string>());
92 SPD_TRACE(
"Setting jobInfoBrokerUrl [{}] from config", url);
93 pNodeZyre->jobInfoBrokerUrl(url);
95 if (spec[
"jobinfo"][
"client"][
"protocol"] && spec[
"jobinfo"][
"client"][
"ip"] &&
96 spec[
"jobinfo"][
"client"][
"port"]) {
97 std::string url = spec[
"jobinfo"][
"client"][
"ip"].as<std::string>();
98 if (url.empty()) url = hostname;
99 url = fmt::format(
">{}{}:{}", spec[
"jobinfo"][
"client"][
"protocol"].as<std::string>(), url,
100 spec[
"jobinfo"][
"client"][
"port"].as<std::string>());
101 SPD_INFO(
"Broker url for client : {}", url);
102 pNodeZyre->jobInfoClientUrl(url);
104 if (spec[
"timeout"][
"poller"]) {
105 pNodeZyre->timeout(spec[
"timeout"][
"poller"].as<int>());
106 SPD_INFO(
"Setting poller timeout [{}]", pNodeZyre->timeout());
109 if (spec[
"timeout"][
"jobfinished"]) {
110 SPD_INFO(
"Setting jobfinished timeout [{}]", spec[
"timeout"][
"jobfinished"].as<std::string>());
111 setenv(
"SALSA_FINISHED_JOB_TIMEOUT", spec[
"timeout"][
"jobfinished"].as<std::string>().data(),
true);
113 if (spec[
"timeout"][
"jobcheck"]) {
114 SPD_INFO(
"Setting jobcheck timeout [{}]", spec[
"timeout"][
"jobcheck"].as<std::string>());
115 setenv(
"SALSA_FINISHED_JOB_CHECK_TIMEOUT", spec[
"timeout"][
"jobcheck"].as<std::string>().data(),
true);
117 for (
auto nodes : spec[
"nodes"]) {
118 SPD_TRACE(
" name [{}]", nodes[
"name"].as<std::string>());
119 SPD_TRACE(
" zyreName [{}]", zyreName);
120 SPD_TRACE(
" type [{}]", nodes[
"type"].as<std::string>());
124 if (nodes[
"submit"][
"protocol"] && nodes[
"submit"][
"ip"] && nodes[
"submit"][
"port"]) {
125 if (nodes[
"submit"][
"ip"].as<std::string>() ==
"$all") nodes[
"submit"][
"ip"] =
"*";
127 fmt::format(
"{}://{}:{}", nodes[
"submit"][
"protocol"].as<std::string>(),
128 nodes[
"submit"][
"ip"].as<std::string>(), nodes[
"submit"][
"port"].as<int>());
130 SPD_INFO(
"Submit : url [{}]", url);
131 zsock_t * s = zsock_new_router(url.c_str());
133 SPD_CRIT(
"Failed to bind submiter on '{}' !!!", url);
136 pNodeZyre->addSocket(static_cast<zsock_t *>(s));
139 Salsa::NodeInfo * pNodeInfo = pNodeZyre->nodeInfo();
140 pNodeInfo->set_name(zyreName);
142 std::map<std::string, std::string> headers;
144 std::pair<std::string, std::string>(
"X-SALSA-NODE-TYPE", nodes[
"type"].as<std::string>()));
147 std::shared_ptr<Salsa::SocketZyre> pSocketZyre =
148 std::make_shared<Salsa::SocketZyre>(zyreName, headers);
152 if (nodes[
"discovery"][
"type"]) {
154 std::string url, endpoint;
155 if (getenv(
"SALSA_ENDPOINT")) endpoint = getenv(
"SALSA_ENDPOINT");
157 std::string discoveryType = nodes[
"discovery"][
"type"].as<std::string>();
159 if (discoveryType ==
"udp") {
161 if (nodes[
"discovery"][
"port"]) port = nodes[
"discovery"][
"port"].as<
int>();
162 SPD_TRACE(
"Using discovery [{}] via port [{}]...", discoveryType, port);
163 pSocketZyre->port(port);
165 else if (discoveryType ==
"gossip") {
168 if (nodes[
"discovery"][
"protocol"]) p = nodes[
"discovery"][
"protocol"].as<std::string>();
169 if (nodes[
"discovery"][
"ip"]) i = nodes[
"discovery"][
"ip"].as<std::string>();
170 if (nodes[
"discovery"][
"port"]) port = nodes[
"discovery"][
"port"].as<
int>();
172 url = fmt::format(
"{}://{}:{}", p, i, port);
174 SPD_INFO(
"Using discovery : [{}] port [{}] endpoint [{}]...", discoveryType, url, endpoint);
175 if (!endpoint.empty()) zyre_set_endpoint(pSocketZyre->zyre(),
"%s", endpoint.c_str());
177 if (nodes[
"discovery"][
"bind"] && nodes[
"discovery"][
"bind"].as<bool>() ==
true) {
178 if (i ==
"$all") i =
"*";
179 url = fmt::format(
"{}://{}:{}", p, i, port);
180 zyre_gossip_bind(pSocketZyre->zyre(),
"%s", url.c_str());
183 zyre_gossip_connect(pSocketZyre->zyre(),
"%s", url.c_str());
186 if (
mConfig[
"salsa"][
"options"][
"evasive"]) {
187 SPD_INFO(
"Setting 'evasive' timeout to [{}] msec ...",
188 mConfig[
"salsa"][
"options"][
"evasive"].as<int>());
189 zyre_set_evasive_timeout(pSocketZyre->zyre(),
190 mConfig[
"salsa"][
"options"][
"evasive"].as<
int>());
192 if (
mConfig[
"salsa"][
"options"][
"expired"]) {
193 SPD_INFO(
"Setting 'expired' timeout to [{}] msec ...",
194 mConfig[
"salsa"][
"options"][
"expired"].as<int>());
195 zyre_set_expired_timeout(pSocketZyre->zyre(),
196 mConfig[
"salsa"][
"options"][
"expired"].as<
int>());
200 SPD_WARN(
"No discovery type specified !!!");
203 const char * zyreInterface = getenv(
"SALSA_INTERFACE");
204 if (zyreInterface && strcmp(zyreInterface,
"")) {
205 SPD_INFO(
"Using SALSA_INTERFACE [{}]", zyreInterface);
206 zyre_set_interface(pSocketZyre->zyre(), zyreInterface);
209 pSocketZyre->connect();
210 pNodeZyre->addSocket(pSocketZyre);
212 SPD_INFO(
"Node : type [{}] name [{}] discovery type [{}] url [{}] port [{}] " 214 name, zyreName, discoveryType, url, port, endpoint);
217 zyre_print(pSocketZyre->zyre());
220 node->add(pNodeZyre);
221 targetActors->push_back(pNodeZyre);
238 if (opt[
"type"] && src[
"discovery"][
"type"]) {
239 src[
"discovery"][
"type"] = opt[
"type"].as<std::string>();
241 if (opt[
"protocol"] && src[
"discovery"][
"protocol"]) {
242 src[
"discovery"][
"protocol"] = opt[
"protocol"].as<std::string>();
244 if (opt[
"ip"] && src[
"discovery"][
"ip"]) {
245 src[
"discovery"][
"ip"] = opt[
"ip"].as<std::string>();
247 if (opt[
"port"] && src[
"discovery"][
"port"]) {
248 src[
"discovery"][
"port"] = opt[
"port"].as<
int>();
250 if (opt[
"submitport"] && src[
"submit"][
"port"]) {
251 src[
"submit"][
"port"] = opt[
"submitport"].as<
int>();
std::map< std::string, YAML::Node > mFilter
Filter list.
void applyOptions(YAML::detail::iterator_value &src, YAML::Node &opt)
static std::shared_ptr< spdlog::logger > getConsoleOutput()
Get console output.
void filter(std::string const &f)
YAML::Node mConfig
YAML Configuration.
std::shared_ptr< Salsa::Node > apply(std::vector< std::shared_ptr< Salsa::ActorZmq >> *targetActors)