salsa  0.4.0
NodeZyre.cc
1 #include <algorithm>
2 #include <cstdlib>
3 #include <vector>
4 
5 #include "Job.hh"
6 #include "NodeManagerZyre.hh"
7 #include "PublisherZmq.hh"
8 #include "NodeZyre.hh"
9 using namespace fmt::literals;
10 
11 namespace Salsa {
12 NodeZyre::NodeZyre(std::string name) : Node(name), ActorZmq()
13 {
17 }
18 
20 {
24 
25  SPD_TRACE("### Destroy NodeZyre [{}] ###", mpNodeInfo->name());
26 
27  // clearing socket
28  mSockets.clear();
29 
30  for (auto pSocket : mZmqSockets) {
31  zsock_destroy(&pSocket);
32  }
33 
34  if (mpNodeManager) {
35  delete mpNodeManager;
36  mpNodeManager = nullptr;
37  }
38 }
39 
41 {
45 
46  SPD_TRACE("Salsa::NodeZyre::init()<-");
47 
50 
51  if (!mpPoller) {
52  return 1;
53  }
54 
55  if (mpNodeManager == nullptr) {
56  mpNodeManager = new NodeManagerZyre(this);
57  }
58 
59  for (auto socket : mSockets) {
60  mpPoller->add(socket.get());
61 
62  if (socket->header("X-SALSA-NODE-TYPE") == "CONSUMER")
63  mpNodeManager->addConsumer(zyre_uuid(socket->zyre()), socket);
64 
65  if (socket->header("X-SALSA-NODE-TYPE") == "FEEDER") {
66  mpNodeManager->addFeeder(zyre_uuid(socket->zyre()), socket);
67  if (!mpNodeManager->publisher()) {
68  char * pPubUrl = getenv("SALSA_PUB_URL");
69  if (pPubUrl) {
70  mJobInfoBrokerUrl = pPubUrl;
71  }
72  SPD_INFO("JobInfo broker url [{}]", mJobInfoBrokerUrl);
74  }
75  char * pTimeout = getenv("SALSA_FINISHED_JOB_TIMEOUT");
76  if (pTimeout) {
77  mpNodeManager->finishedJobTimeout(atol(pTimeout));
78  }
79  char * pCheckTimeout = getenv("SALSA_FINISHED_JOB_CHECK_TIMEOUT");
80  if (pCheckTimeout) {
81  mJobCheckTimeout = atoi(pCheckTimeout);
82  }
83  }
84 
85  if (socket->header("X-SALSA-NODE-TYPE") == "WORKER")
86  mpNodeManager->addWorker(zyre_uuid(socket->zyre()), socket);
87  }
88 
89  for (auto socket : mZmqSockets) {
90  mpPoller->add(socket);
91  }
92 
93  // mpNodeManager->print();
94  SPD_TRACE("Salsa::NodeZyre::init()->");
95  return 0;
96 }
97 
99 {
103 
104  SPD_TRACE("Salsa::NodeZyre::exec()<-");
105 
106  void * pPointer = nullptr;
107  std::shared_ptr<SocketZyre> pSocket = nullptr;
108  zsock_t * pZmqSock = nullptr;
109  int64_t cur_time;
110  int64_t last_time = zclock_time();
111 
112  while (!mTerminated && !Salsa::Actor::interrupted()) {
113  SPD_TRACE("Actor::wait()");
114  pPointer = ActorZmq::wait();
115  if (!pPointer) {
116  break;
117  }
118 
119  if (pPointer == mpPipe) {
120  SPD_TRACE("Signal from pipe={}", static_cast<void *>(mpPipe));
121  // We are not reacting to pipe events for now
122  continue;
123  }
124  else {
125  for (auto pZmqSocket : mZmqSockets) {
126  SPD_TRACE("Searching ZMQ inSocket=[{}] zmqSocket[{}]", static_cast<void *>(pPointer),
127  static_cast<void *>(pZmqSocket));
128  if (pZmqSocket == pPointer) {
129  pZmqSock = pZmqSocket;
130  break;
131  }
132  }
133 
134  if (pZmqSock) {
135  SPD_TRACE("HANDLING zmq socket [{}]", static_cast<void *>(pZmqSock));
136  zmsg_t * pMsg = zmsg_recv(pZmqSock);
137  handleExternalZmq(pMsg, pZmqSock);
138  zmsg_destroy(&pMsg);
139  pZmqSock = nullptr;
140  continue;
141  }
142 
143  for (auto pNet : mSockets) {
144  SPD_TRACE("Searching ZYRE socket={} in net={} socket={}", static_cast<void *>(pPointer),
145  static_cast<void *>(pNet.get()), static_cast<void *>(pNet->socket()));
146  if (pNet && pNet->socket() && pPointer == pNet->socket()) {
147  pSocket = pNet;
148  break;
149  }
150  }
151 
152  if (!pSocket) {
153  if (mpNodeManager->handleTaskPool(pPointer)) continue;
154  }
155 
156  if (!pSocket) {
157  SPD_ERROR("Socket comming from unknown network : socket={}", pPointer);
158  continue;
159  }
160 
161  // ==================================================
162 
163  Message * pMsg = pSocket->pull();
164  if (!pMsg) {
165  SPD_ERROR("Message from socket={} is null", pPointer);
166  continue;
167  }
168 
169  Message::EEventType type = pMsg->event();
170  SPD_TRACE("Salsa::NodeZyre::exec() : Event from net [{}] pMsg [{}] type [{}]",
171  static_cast<void *>(pSocket.get()), static_cast<void *>(pMsg), type);
172 
173  bool doPublish = true;
174  std::vector<std::string> values;
175 
176  if (type == Message::ENTER) {
177  const char * pHeader =
178  zyre_event_header(static_cast<MessageZyre *>(pMsg)->zyreEvent(), "X-SALSA-NODE-TYPE");
179  std::string snt;
180  if (pHeader) snt = pHeader;
181  SPD_TRACE("[{}] ENTER uuid=[{}] node_type=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid(), snt);
182  mpNodeManager->onEnter(zyre_uuid(pSocket->zyre()), snt, pMsg, values);
183  doPublish = false;
184  }
185  else if (type == Message::EXIT) {
186  SPD_TRACE("[{}] EXIT uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid());
187  mpNodeManager->onExit(zyre_uuid(pSocket->zyre()), pMsg, values);
188  doPublish = false;
189  }
190  else if (type == Message::EVASIVE) {
191  SPD_TRACE("[{}] EVASIVE uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid());
192  doPublish = false;
193  }
194  else if (type == Message::WHISPER) {
195  SPD_TRACE("[{}] WHISPER uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid());
196  mpNodeManager->onWhisper(zyre_uuid(pSocket->zyre()), pMsg, values);
197  }
198 
199  if (doPublish) {
200  SPD_TRACE("Publishing to [{}] ...", zyre_uuid(sockets()[0]->zyre()));
201  mpNodeManager->publish(zyre_uuid(sockets()[0]->zyre()));
202  }
203 
204  cur_time = zclock_time();
205  if ((cur_time - last_time) > mJobCheckTimeout) {
206  SPD_TRACE("Poller expired. Doing finished job cleaning ...");
207  if (mpNodeManager->terminateFinishedJobs()) mpNodeManager->publish(zyre_uuid(sockets()[0]->zyre()));
208  last_time = zclock_time();
209  }
210 
211  delete pMsg;
212  pSocket = nullptr;
213  pZmqSock = nullptr;
214  }
215 
216  } // END WHILE not terminated
217 
218  SPD_TRACE("Salsa::NodeZyre::exec()->");
219  return 0;
220 }
221 
223 {
227 
228  SPD_TRACE("Salsa::NodeZyre::finish()<-");
229 
230  SPD_TRACE("Salsa::NodeZyre::finish()->");
231 
232  return 0;
233 }
234 
235 void NodeZyre::addSocket(std::shared_ptr<SocketZyre> socket)
236 {
240 
241  if (socket) {
242  auto pNode = std::make_shared<Node>(zyre_name(socket->zyre()), zyre_uuid(socket->zyre()));
243 
244  pNode->parent(shared_from_this());
245  Node::add(pNode);
246  mSockets.push_back(socket);
247  }
248 }
249 
250 std::vector<std::shared_ptr<SocketZyre>> NodeZyre::sockets() const
251 {
255  return mSockets;
256 }
257 
258 void NodeZyre::addSocket(zsock_t * socket)
259 {
263  if (socket) {
264  mZmqSockets.push_back(socket);
265  }
266 }
267 
268 void NodeZyre::handleExternalZmq(zmsg_t * pMsg, zsock_t * pSocket)
269 {
274 
275  zframe_t * pID = zmsg_pop(pMsg);
276  char * pCmd = zmsg_popstr(pMsg);
277  if (!strcmp(pCmd, "TASK")) {
278 
279  int task_count = 0;
280  char * pPayload_str = zmsg_popstr(pMsg);
281  TaskInfo * pTaskInfo = nullptr;
282  while (pPayload_str) {
283  std::string payload = pPayload_str;
284  free(pPayload_str);
285 
286  pTaskInfo = new TaskInfo();
287  {
288 
289  if (!pTaskInfo->ParseFromString(payload)) {
290  SPD_ERROR("Message does not contain ProtoBuf message!");
291  return;
292  }
293  }
294  SPD_DEBUG("TASK [{}:{}] ", pTaskInfo->jobid(), pTaskInfo->taskid());
295 
296  mpNodeManager->addTask(pTaskInfo, "", "", Salsa::Job::pending);
297  task_count++;
298  pPayload_str = zmsg_popstr(pMsg);
299  }
300 
301  Job * job = mpNodeManager->job(pTaskInfo->jobid());
302  if (job) {
303  if (job->submiterSocketID() == nullptr) job->submiterSocketID(zframe_dup(pID));
305  if (mZmqSockets.size() == 1) {
306  job->submiterSocketIndex(0);
307  }
308  else {
309  int i = 0;
310  for (auto s : mZmqSockets) {
311  if (s == pSocket) {
312  job->submiterSocketIndex(i);
313  break;
314  }
315  i++;
316  }
317  }
318 
319  // SPD_INFO("{} {}", job->submiterSocketIndex(), job->submiterSocketID());
320  }
321 
322  zmsg_t * pMsgOut = zmsg_new();
323  zmsg_add(pMsgOut, pID);
324  zmsg_addstr(pMsgOut, "");
325  zmsg_addstr(pMsgOut, "TASK_ADDED");
326  zmsg_addstr(pMsgOut, fmt::format("{}", task_count).data());
327  zmsg_send(&pMsgOut, pSocket);
328  zmsg_destroy(&pMsgOut);
329 
330  // zmsg_t * pMsgOut = zmsg_new();
331  // zframe_destroy(&pID);
332  // pID = static_cast<zframe_t *>(job->submiterSocketID());
333  // zmsg_add(pMsgOut, pID);
334  // zmsg_addstr(pMsgOut, "");
335  // zmsg_addstr(pMsgOut, "TASK_ADDED");
336  // zmsg_addstr(pMsgOut, fmt::format("{}", task_count).data());
337  // // zmsg_send(&pMsgOut, pSocket);
338  // zmsg_send(&pMsgOut, mZmqSockets[job->submiterSocketIndex()]);
339  // zmsg_destroy(&pMsgOut);
340 
341  pID = nullptr;
342 
343  mpNodeManager->print();
344 
345  // delete pTaskInfo;
346  }
347  else if (!strcmp(pCmd, "AUTH")) {
348 
349  SPD_DEBUG("Checking AUTH ...");
350  zmsg_t * pMsgOut = zmsg_new();
351  zmsg_add(pMsgOut, pID);
352  zmsg_addstr(pMsgOut, "");
353  zmsg_addstr(pMsgOut, "AUTH");
354  zmsg_addstr(pMsgOut, "OK");
356  std::string rdr;
357  rdr = zyre_uuid(sockets()[0]->zyre());
358  zmsg_addstr(pMsgOut, rdr.data());
359  zmsg_addstr(pMsgOut,
360  fmt::format("v{}.{}.{}-{}", SALSA_VERSION_MAJOR(SALSA_VERSION), SALSA_VERSION_MINOR(SALSA_VERSION),
361  SALSA_VERSION_PATCH(SALSA_VERSION), SALSA_VERSION_RELEASE)
362  .data());
363  zmsg_addstr(pMsgOut, mJobInfoClientUrl.data());
364  zmsg_send(&pMsgOut, pSocket);
365  SPD_DEBUG("Sent AUTH OK {} ...", static_cast<void *>(pSocket));
366  pID = nullptr;
367  }
368 
369  else if (!strcmp(pCmd, "JOB_DEL_ID")) {
370  char * pJobUUID_str = zmsg_popstr(pMsg);
371  std::string jobUUID = pJobUUID_str;
372  free(pJobUUID_str);
373 
374  mpNodeManager->terminateJob(jobUUID);
375  SPD_TRACE("Publishing to [{}] ...", zyre_uuid(sockets()[0]->zyre()));
376  mpNodeManager->publish(zyre_uuid(sockets()[0]->zyre()));
377 
378  zmsg_t * pMsgOut = zmsg_new();
379  zmsg_add(pMsgOut, pID);
380  zmsg_addstr(pMsgOut, "");
381  zmsg_addstr(pMsgOut, pCmd);
382  zmsg_addstr(pMsgOut, "OK");
383  zmsg_send(&pMsgOut, pSocket);
384  SPD_DEBUG("Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
385  }
386  else if (!strcmp(pCmd, "JOB_DEL_FINISHED")) {
388  SPD_TRACE("Publishing to [{}] ...", zyre_uuid(sockets()[0]->zyre()));
389  mpNodeManager->publish(zyre_uuid(sockets()[0]->zyre()));
390 
391  zmsg_t * pMsgOut = zmsg_new();
392  zmsg_add(pMsgOut, pID);
393  zmsg_addstr(pMsgOut, "");
394  zmsg_addstr(pMsgOut, pCmd);
395  zmsg_addstr(pMsgOut, "OK");
396  zmsg_send(&pMsgOut, pSocket);
397  SPD_DEBUG("Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
398  }
399  else if (!strcmp(pCmd, "JOB_DEL_ALL")) {
401  SPD_TRACE("Publishing to [{}] ...", zyre_uuid(sockets()[0]->zyre()));
402  mpNodeManager->publish(zyre_uuid(sockets()[0]->zyre()));
403 
404  zmsg_t * pMsgOut = zmsg_new();
405  zmsg_add(pMsgOut, pID);
406  zmsg_addstr(pMsgOut, "");
407  zmsg_addstr(pMsgOut, pCmd);
408  zmsg_addstr(pMsgOut, "OK");
409  zmsg_send(&pMsgOut, pSocket);
410  SPD_DEBUG("Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
411  }
412  free(pCmd);
413  // if (pID) zframe_destroy(&pID);
414 }
415 
416 } // namespace Salsa
PollerZmq * mpPoller
Internal poller.
Definition: ActorZmq.hh:49
virtual int init()
First function.
Definition: ActorZmq.cc:378
virtual ~NodeZyre()
Destruct Zyre node.
Definition: NodeZyre.cc:19
virtual int init()
First function.
Definition: NodeZyre.cc:40
virtual Socket * onEnter(std::string self, std::string fromType, Message *pMsg, std::vector< std::string > &out)
int submiterSocketIndex() const
Returns submiter socket index.
Definition: Job.hh:74
Base Message class.
Definition: Message.hh:15
Base PublisherZmq class.
Definition: PublisherZmq.hh:17
virtual int exec()
Main function.
Definition: NodeZyre.cc:98
std::string mJobInfoClientUrl
JobInfo url for client (salsa-broker –out ...)
Definition: NodeZyre.hh:54
virtual int finish()
Last function.
Definition: NodeZyre.cc:222
bool mTerminated
Flag if actor should be terminated.
Definition: ActorZmq.hh:50
Base Node class.
Definition: Node.hh:22
virtual void terminateJob(std::string uuid)
Definition: NodeManager.cc:361
int mJobCheckTimeout
Job check timeout.
Definition: NodeZyre.hh:55
Job class.
Definition: Job.hh:17
std::vector< zsock_t * > mZmqSockets
List of zmq sockets.
Definition: NodeZyre.hh:51
virtual void * wait()
Definition: ActorZmq.cc:426
virtual Socket * onExit(std::string self, Message *pMsg, std::vector< std::string > &out)
ZeroMQ implementation of salsa actor class.
Definition: ActorZmq.hh:19
void add(std::shared_ptr< Node > node)
Adds node to the list of nodes.
Definition: Node.hh:49
uint64_t finishedJobTimeout() const
Returns finished job timeout.
Definition: NodeManager.hh:55
void addSocket(std::shared_ptr< SocketZyre > socket)
Definition: NodeZyre.cc:235
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:65
virtual std::string uuid() const =0
Returns node uuid.
Job * job(std::string uuid)
Definition: NodeManager.cc:481
void handleExternalZmq(zmsg_t *pMsg, zsock_t *pSocket)
Definition: NodeZyre.cc:268
zsock_t * mpPipe
Zmq pipe socket.
Definition: ActorZmq.hh:48
void print(std::string opt="") const
Definition: NodeManager.cc:31
std::string mJobInfoBrokerUrl
JobInfo broker url (salsa-broker –in ...)
Definition: NodeZyre.hh:53
virtual Socket * onWhisper(std::string self, Message *pMsg, std::vector< std::string > &out)
NodeInfo * mpNodeInfo
Node Info.
Definition: Node.hh:68
virtual void add(SocketZyre *pSocket)
Definition: PollerZmq.cc:45
std::vector< std::shared_ptr< SocketZyre > > mSockets
List of zyre sockets.
Definition: NodeZyre.hh:50
virtual void publish(std::string id) const
Definition: NodeManager.cc:624
virtual bool handleTaskPool(void *pPool)
virtual EEventType event() const =0
Returns node event type.
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:56
std::vector< std::shared_ptr< SocketZyre > > sockets() const
Definition: NodeZyre.cc:250
virtual void publisher(Publisher *p)
Definition: NodeManager.cc:608
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:74
virtual bool terminateFinishedJobs()
Definition: NodeManager.cc:390
static std::sig_atomic_t interrupted()
Returns if salsa is interrupted.
Definition: Actor.hh:35
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
Definition: NodeManager.cc:227
void * submiterSocketID() const
Returns submiter socket identity.
Definition: Job.hh:79
NodeManagerZyre class.
EEventType
Node event type.
Definition: Message.hh:18
virtual void terminateAllJobs(bool finishedonly=false)
Definition: NodeManager.cc:421
NodeManagerZyre * mpNodeManager
Job manager.
Definition: NodeZyre.hh:52