1 #include "ConfigZyre.hh"
17 std::shared_ptr<Salsa::Node>
ConfigZyre::apply(std::vector<std::shared_ptr<Salsa::ActorZmq>> & targetActors)
23 std::shared_ptr<Salsa::Node> node =
nullptr;
25 if (!
mConfig[
"salsa"][
"type"] ||
mConfig[
"salsa"][
"type"].as<std::string>() !=
"zyre") {
26 SPD_ERROR(
"Salsa type is not [zyre] !!! ");
30 if (!
mConfig[
"salsa"][
"spec"]) {
31 SPD_ERROR(
"Salsa spec was not found !!!");
35 SPD_TRACE(
"Caching hostname via zsys_hostname()");
36 char * pHostnameCStr_ = zsys_hostname();
37 std::string hostname =
"nohostname";
39 hostname = pHostnameCStr_;
42 SPD_TRACE(
"Cached hostname [{}]", hostname);
44 node = std::make_shared<Salsa::Node>(
"SALSA");
47 for (
auto spec :
mConfig[
"salsa"][
"spec"]) {
50 SPD_ERROR(
"Nodes array is missing for [{}] !!!", spec[
"name"].as<std::string>());
53 auto name = spec[
"name"].as<std::string>();
62 if (name ==
filter.first) {
64 if (opt[
"replicas"]) {
65 spec[
"replicas"] = opt[
"replicas"].as<
int>();
73 SPD_TRACE(
"name [{}]", name);
76 if (spec[
"replicas"]) {
77 count = spec[
"replicas"].as<
int>();
79 for (
int iCount = 0; iCount < count; iCount++) {
81 std::string zyreName = fmt::format(
"{}:{}:{}:{}", name, hostname, getpid(), nodeId);
82 std::shared_ptr<Salsa::NodeZyre> pNodeZyre = std::make_shared<Salsa::NodeZyre>(zyreName);
84 for (
auto nodes : spec[
"nodes"]) {
85 SPD_TRACE(
" name [{}]", nodes[
"name"].as<std::string>());
86 SPD_TRACE(
" zyreName [{}]", zyreName);
87 SPD_TRACE(
" type [{}]", nodes[
"type"].as<std::string>());
89 if (nodes[
"submit"][
"protocol"] && nodes[
"submit"][
"ip"] && nodes[
"submit"][
"port"]) {
90 if (nodes[
"submit"][
"ip"].as<std::string>() ==
"$all") nodes[
"submit"][
"ip"] =
"*";
92 fmt::format(
"{}://{}:{}", nodes[
"submit"][
"protocol"].as<std::string>(),
93 nodes[
"submit"][
"ip"].as<std::string>(), nodes[
"submit"][
"port"].as<int>());
95 SPD_INFO(
"Submit : url [{}]", url);
96 pNodeZyre->addSocket(static_cast<zsock_t *>(zsock_new_router(url.c_str())));
99 Salsa::NodeInfo * pNodeInfo = pNodeZyre->nodeInfo();
100 pNodeInfo->set_name(zyreName);
102 std::map<std::string, std::string> headers;
104 std::pair<std::string, std::string>(
"X-SALSA-NODE-TYPE", nodes[
"type"].as<std::string>()));
107 std::shared_ptr<Salsa::SocketZyre> pSocketZyre =
108 std::make_shared<Salsa::SocketZyre>(zyreName, headers);
112 if (nodes[
"discovery"][
"type"]) {
114 std::string url, endpoint;
115 if (getenv(
"SALSA_ENDPOINT")) endpoint = getenv(
"SALSA_ENDPOINT");
117 std::string discoveryType = nodes[
"discovery"][
"type"].as<std::string>();
119 if (discoveryType ==
"udp") {
121 if (nodes[
"discovery"][
"port"]) port = nodes[
"discovery"][
"port"].as<
int>();
122 SPD_TRACE(
"Using discovery [{}] via port [{}]...", discoveryType, port);
123 pSocketZyre->port(port);
125 else if (discoveryType ==
"gossip") {
128 if (nodes[
"discovery"][
"protocol"]) p = nodes[
"discovery"][
"protocol"].as<std::string>();
129 if (nodes[
"discovery"][
"ip"]) i = nodes[
"discovery"][
"ip"].as<std::string>();
130 if (nodes[
"discovery"][
"port"]) port = nodes[
"discovery"][
"port"].as<
int>();
132 url = fmt::format(
"{}://{}:{}", p, i, port);
134 SPD_INFO(
"Using dicovery : [{}] port [{}] endpoint [{}]...", discoveryType, url, endpoint);
135 if (!endpoint.empty()) zyre_set_endpoint(pSocketZyre->zyre(),
"%s", endpoint.c_str());
137 if (nodes[
"discovery"][
"bind"] && nodes[
"discovery"][
"bind"].as<bool>() ==
true) {
138 if (i ==
"$all") i =
"*";
139 url = fmt::format(
"{}://{}:{}", p, i, port);
140 zyre_gossip_bind(pSocketZyre->zyre(),
"%s", url.c_str());
143 zyre_gossip_connect(pSocketZyre->zyre(),
"%s", url.c_str());
146 if (mConfig[
"salsa"][
"options"][
"evasive"]) {
147 SPD_INFO(
"Setting 'evasive' timeout to [{}] msec ...",
148 mConfig[
"salsa"][
"options"][
"evasive"].as<int>());
149 zyre_set_evasive_timeout(pSocketZyre->zyre(),
150 mConfig[
"salsa"][
"options"][
"evasive"].as<
int>());
152 if (mConfig[
"salsa"][
"options"][
"expired"]) {
153 SPD_INFO(
"Setting 'expired' timeout to [{}] msec ...",
154 mConfig[
"salsa"][
"options"][
"expired"].as<int>());
155 zyre_set_expired_timeout(pSocketZyre->zyre(),
156 mConfig[
"salsa"][
"options"][
"expired"].as<
int>());
160 SPD_WARN(
"No discovery type specified !!!");
163 const char * zyreInterface = getenv(
"SALSA_INTERFACE");
164 if (zyreInterface && strcmp(zyreInterface,
"")) {
165 SPD_INFO(
"Using SALSA_INTERFACE [{}]", zyreInterface);
166 zyre_set_interface(pSocketZyre->zyre(), zyreInterface);
169 pSocketZyre->connect();
170 pNodeZyre->addSocket(pSocketZyre);
172 SPD_INFO(
"Node : type [{}] name [{}] discovery type [{}] url [{}] port [{}] "
174 name, zyreName, discoveryType, url, port, endpoint);
177 zyre_print(pSocketZyre->zyre());
180 node->add(pNodeZyre);
181 targetActors.push_back(pNodeZyre);
198 if (opt[
"type"] && src[
"discovery"][
"type"]) {
199 src[
"discovery"][
"type"] = opt[
"type"].as<std::string>();
201 if (opt[
"protocol"] && src[
"discovery"][
"protocol"]) {
202 src[
"discovery"][
"protocol"] = opt[
"protocol"].as<std::string>();
204 if (opt[
"ip"] && src[
"discovery"][
"ip"]) {
205 src[
"discovery"][
"ip"] = opt[
"ip"].as<std::string>();
207 if (opt[
"port"] && src[
"discovery"][
"port"]) {
208 src[
"discovery"][
"port"] = opt[
"port"].as<
int>();
std::map< std::string, YAML::Node > mFilter
Filter list.
void applyOptions(YAML::detail::iterator_value &src, YAML::Node &opt)
static std::shared_ptr< spdlog::logger > getConsoleOutput()
Get console output.
std::shared_ptr< Salsa::Node > apply(std::vector< std::shared_ptr< Salsa::ActorZmq >> &targetActors)
void filter(std::string const &f)
YAML::Node mConfig
YAML Configuration.