salsa  0.4.0
ConfigZyre.cc
1 #include "ConfigZyre.hh"
2 #include <NodeZyre.hh>
3 namespace Salsa {
5 {
9 }
11 {
15 }
16 
17 std::shared_ptr<Salsa::Node> ConfigZyre::apply(std::vector<std::shared_ptr<Salsa::ActorZmq>> * targetActors)
18 {
22 
23  if (!targetActors) return nullptr;
24 
25  std::shared_ptr<Salsa::Node> node = nullptr;
26 
27  if (!mConfig["salsa"]["type"] || mConfig["salsa"]["type"].as<std::string>() != "zyre") {
28  SPD_ERROR("Salsa type is not [zyre] !!! ");
29  return nullptr;
30  }
31 
32  if (!mConfig["salsa"]["spec"]) {
33  SPD_ERROR("Salsa spec was not found !!!");
34  return nullptr;
35  }
36 
37  SPD_TRACE("Caching hostname via zsys_hostname()");
38  char * pHostnameCStr_ = zsys_hostname();
39  std::string hostname = "nohostname";
40  if (pHostnameCStr_) {
41  hostname = pHostnameCStr_;
42  free(pHostnameCStr_);
43  }
44  SPD_TRACE("Cached hostname [{}]", hostname);
45 
46  node = std::make_shared<Salsa::Node>("SALSA");
47  int nodeId = 0;
48 
49  for (auto spec : mConfig["salsa"]["spec"]) {
50 
51  if (!spec["nodes"]) {
52  SPD_ERROR("Nodes array is missing for [{}] !!!", spec["name"].as<std::string>());
53  return nullptr;
54  }
55  auto name = spec["name"].as<std::string>();
56  bool found = false;
57  YAML::Node opt;
58 
59  if (mFilter.size() == 0) {
60  found = true;
61  }
62  else {
63  for (auto filter : mFilter) {
64  if (name == filter.first) {
65  opt = filter.second;
66  if (opt["replicas"]) {
67  spec["replicas"] = opt["replicas"].as<int>();
68  }
69  found = true;
70  }
71  }
72  }
73 
74  if (!found) continue;
75  SPD_TRACE("name [{}]", name);
76  int count = 1;
77 
78  if (spec["replicas"]) {
79  count = spec["replicas"].as<int>();
80  }
81  for (int iCount = 0; iCount < count; iCount++) {
82  // Create zyre node
83  std::string zyreName = fmt::format("{}:{}:{}:{}", name, hostname, getpid(), nodeId);
84  std::shared_ptr<Salsa::NodeZyre> pNodeZyre = std::make_shared<Salsa::NodeZyre>(zyreName);
85 
86  if (spec["jobinfo"]["broker"]["protocol"] && spec["jobinfo"]["broker"]["ip"] &&
87  spec["jobinfo"]["broker"]["port"]) {
88  std::string url = spec["jobinfo"]["broker"]["ip"].as<std::string>();
89  if (url.empty()) url = hostname;
90  url = fmt::format(">{}{}:{}", spec["jobinfo"]["broker"]["protocol"].as<std::string>(), url,
91  spec["jobinfo"]["broker"]["port"].as<std::string>());
92  SPD_TRACE("Setting jobInfoBrokerUrl [{}] from config", url);
93  pNodeZyre->jobInfoBrokerUrl(url);
94  }
95  if (spec["jobinfo"]["client"]["protocol"] && spec["jobinfo"]["client"]["ip"] &&
96  spec["jobinfo"]["client"]["port"]) {
97  std::string url = spec["jobinfo"]["client"]["ip"].as<std::string>();
98  if (url.empty()) url = hostname;
99  url = fmt::format(">{}{}:{}", spec["jobinfo"]["client"]["protocol"].as<std::string>(), url,
100  spec["jobinfo"]["client"]["port"].as<std::string>());
101  SPD_INFO("Broker url for client : {}", url);
102  pNodeZyre->jobInfoClientUrl(url);
103  }
104  if (spec["timeout"]["poller"]) {
105  pNodeZyre->timeout(spec["timeout"]["poller"].as<int>());
106  SPD_INFO("Setting poller timeout [{}]", pNodeZyre->timeout());
107  }
108 
109  if (spec["timeout"]["jobfinished"]) {
110  SPD_INFO("Setting jobfinished timeout [{}]", spec["timeout"]["jobfinished"].as<std::string>());
111  setenv("SALSA_FINISHED_JOB_TIMEOUT", spec["timeout"]["jobfinished"].as<std::string>().data(), true);
112  }
113  if (spec["timeout"]["jobcheck"]) {
114  SPD_INFO("Setting jobcheck timeout [{}]", spec["timeout"]["jobcheck"].as<std::string>());
115  setenv("SALSA_FINISHED_JOB_CHECK_TIMEOUT", spec["timeout"]["jobcheck"].as<std::string>().data(), true);
116  }
117  for (auto nodes : spec["nodes"]) {
118  SPD_TRACE(" name [{}]", nodes["name"].as<std::string>());
119  SPD_TRACE(" zyreName [{}]", zyreName);
120  SPD_TRACE(" type [{}]", nodes["type"].as<std::string>());
121 
122  applyOptions(nodes, opt);
123 
124  if (nodes["submit"]["protocol"] && nodes["submit"]["ip"] && nodes["submit"]["port"]) {
125  if (nodes["submit"]["ip"].as<std::string>() == "$all") nodes["submit"]["ip"] = "*";
126  std::string url =
127  fmt::format("{}://{}:{}", nodes["submit"]["protocol"].as<std::string>(),
128  nodes["submit"]["ip"].as<std::string>(), nodes["submit"]["port"].as<int>());
129 
130  SPD_INFO("Submit : url [{}]", url);
131  zsock_t * s = zsock_new_router(url.c_str());
132  if (s == nullptr) {
133  SPD_CRIT("Failed to bind submiter on '{}' !!!", url);
134  return nullptr;
135  }
136  pNodeZyre->addSocket(static_cast<zsock_t *>(s));
137  }
138  else {
139  Salsa::NodeInfo * pNodeInfo = pNodeZyre->nodeInfo();
140  pNodeInfo->set_name(zyreName);
141 
142  std::map<std::string, std::string> headers;
143  headers.insert(
144  std::pair<std::string, std::string>("X-SALSA-NODE-TYPE", nodes["type"].as<std::string>()));
145 
146  // Create zyre socket for node
147  std::shared_ptr<Salsa::SocketZyre> pSocketZyre =
148  std::make_shared<Salsa::SocketZyre>(zyreName, headers);
149 
150  applyOptions(nodes, opt);
151 
152  if (nodes["discovery"]["type"]) {
153  int port;
154  std::string url, endpoint;
155  if (getenv("SALSA_ENDPOINT")) endpoint = getenv("SALSA_ENDPOINT");
156 
157  std::string discoveryType = nodes["discovery"]["type"].as<std::string>();
158 
159  if (discoveryType == "udp") {
160  port = 10000;
161  if (nodes["discovery"]["port"]) port = nodes["discovery"]["port"].as<int>();
162  SPD_TRACE("Using discovery [{}] via port [{}]...", discoveryType, port);
163  pSocketZyre->port(port); // Set socket's port
164  }
165  else if (discoveryType == "gossip") {
166  std::string p, i;
167  port = 20000;
168  if (nodes["discovery"]["protocol"]) p = nodes["discovery"]["protocol"].as<std::string>();
169  if (nodes["discovery"]["ip"]) i = nodes["discovery"]["ip"].as<std::string>();
170  if (nodes["discovery"]["port"]) port = nodes["discovery"]["port"].as<int>();
171 
172  url = fmt::format("{}://{}:{}", p, i, port);
173 
174  SPD_INFO("Using discovery : [{}] port [{}] endpoint [{}]...", discoveryType, url, endpoint);
175  if (!endpoint.empty()) zyre_set_endpoint(pSocketZyre->zyre(), "%s", endpoint.c_str());
176 
177  if (nodes["discovery"]["bind"] && nodes["discovery"]["bind"].as<bool>() == true) {
178  if (i == "$all") i = "*";
179  url = fmt::format("{}://{}:{}", p, i, port);
180  zyre_gossip_bind(pSocketZyre->zyre(), "%s", url.c_str());
181  }
182  else {
183  zyre_gossip_connect(pSocketZyre->zyre(), "%s", url.c_str());
184  }
185 
186  if (mConfig["salsa"]["options"]["evasive"]) {
187  SPD_INFO("Setting 'evasive' timeout to [{}] msec ...",
188  mConfig["salsa"]["options"]["evasive"].as<int>());
189  zyre_set_evasive_timeout(pSocketZyre->zyre(),
190  mConfig["salsa"]["options"]["evasive"].as<int>());
191  }
192  if (mConfig["salsa"]["options"]["expired"]) {
193  SPD_INFO("Setting 'expired' timeout to [{}] msec ...",
194  mConfig["salsa"]["options"]["expired"].as<int>());
195  zyre_set_expired_timeout(pSocketZyre->zyre(),
196  mConfig["salsa"]["options"]["expired"].as<int>());
197  }
198  }
199  else {
200  SPD_WARN("No discovery type specified !!!");
201  }
202 
203  const char * zyreInterface = getenv("SALSA_INTERFACE");
204  if (zyreInterface && strcmp(zyreInterface, "")) {
205  SPD_INFO("Using SALSA_INTERFACE [{}]", zyreInterface);
206  zyre_set_interface(pSocketZyre->zyre(), zyreInterface);
207  }
208 
209  pSocketZyre->connect(); // Connect to socket
210  pNodeZyre->addSocket(pSocketZyre); // Add socket to zyre node
211 
212  SPD_INFO("Node : type [{}] name [{}] discovery type [{}] url [{}] port [{}] "
213  "endpoint [{}]",
214  name, zyreName, discoveryType, url, port, endpoint);
215 
216  if (Salsa::Object::getConsoleOutput()->level() < static_cast<int>(spdlog::level::warn)) {
217  zyre_print(pSocketZyre->zyre());
218  }
219  }
220  node->add(pNodeZyre); // Add zyre node to main node
221  targetActors->push_back(pNodeZyre); // Add zyre node to actor index
222  nodeId++;
223  }
224  }
225  }
226  SPD_TRACE("---");
227  nodeId++;
228  }
229 
230  return node;
231 }
232 
233 void ConfigZyre::applyOptions(YAML::detail::iterator_value & src, YAML::Node & opt)
234 {
238  if (opt["type"] && src["discovery"]["type"]) {
239  src["discovery"]["type"] = opt["type"].as<std::string>();
240  }
241  if (opt["protocol"] && src["discovery"]["protocol"]) {
242  src["discovery"]["protocol"] = opt["protocol"].as<std::string>();
243  }
244  if (opt["ip"] && src["discovery"]["ip"]) {
245  src["discovery"]["ip"] = opt["ip"].as<std::string>();
246  }
247  if (opt["port"] && src["discovery"]["port"]) {
248  src["discovery"]["port"] = opt["port"].as<int>();
249  }
250  if (opt["submitport"] && src["submit"]["port"]) {
251  src["submit"]["port"] = opt["submitport"].as<int>();
252  }
253 }
254 
255 } // namespace Salsa
std::map< std::string, YAML::Node > mFilter
Filter list.
Definition: Config.hh:31
void applyOptions(YAML::detail::iterator_value &src, YAML::Node &opt)
Definition: ConfigZyre.cc:233
virtual ~ConfigZyre()
Definition: ConfigZyre.cc:10
static std::shared_ptr< spdlog::logger > getConsoleOutput()
Get console output.
Definition: Object.hh:21
void filter(std::string const &f)
Definition: Config.cc:27
YAML::Node mConfig
YAML Configuration.
Definition: Config.hh:30
Base Config class.
Definition: Config.hh:17
std::shared_ptr< Salsa::Node > apply(std::vector< std::shared_ptr< Salsa::ActorZmq >> *targetActors)
Definition: ConfigZyre.cc:17