salsa  0.3.0
 All Classes Functions Variables Enumerations Pages
NodeZyre.cc
1 #include "NodeZyre.hh"
2 #include <NodeManagerZyre.hh>
3 #include <algorithm>
4 #include <cstdlib>
5 #include <vector>
6 using namespace fmt::literals;
7 
8 namespace Salsa {
9 NodeZyre::NodeZyre(std::string name) : Node(name), ActorZmq()
10 {
14 }
15 
17 {
21 
22  SPD_TRACE("### Destroy NodeZyre [{}] ###", mpNodeInfo->name());
23 
24  // clearing socket
25  mSockets.clear();
26 
27  for (auto pSocket : mZmqSockets) {
28  zsock_destroy(&pSocket);
29  }
30 
31  if (mpNodeManager) {
32  delete mpNodeManager;
33  mpNodeManager = nullptr;
34  }
35 }
36 
38 {
42 
43  SPD_TRACE("Salsa::NodeZyre::init()<-");
44 
45  if (!mpPoller) {
46  return 1;
47  }
48 
49  if (mpNodeManager == nullptr) {
50  mpNodeManager = new NodeManagerZyre(this);
51  }
52 
53  for (auto socket : mSockets) {
54  mpPoller->add(socket.get());
55 
56  if (socket->header("X-SALSA-NODE-TYPE") == "CONSUMER")
57  mpNodeManager->addConsumer(zyre_uuid(socket->zyre()), socket);
58 
59  if (socket->header("X-SALSA-NODE-TYPE") == "FEEDER")
60  mpNodeManager->addFeeder(zyre_uuid(socket->zyre()), socket);
61 
62  if (socket->header("X-SALSA-NODE-TYPE") == "WORKER")
63  mpNodeManager->addWorker(zyre_uuid(socket->zyre()), socket);
64  }
65 
66  for (auto socket : mZmqSockets) {
67  mpPoller->add(socket);
68  }
69 
70  // mpNodeManager->print();
71  SPD_TRACE("Salsa::NodeZyre::init()->");
72  return 0;
73 }
74 
76 {
80 
81  SPD_TRACE("Salsa::NodeZyre::exec()<-");
82 
83  void * pPointer = nullptr;
84  std::shared_ptr<SocketZyre> pSocket = nullptr;
85  zsock_t * pZmqSock = nullptr;
86 
87  while (!mTerminated && !Salsa::Actor::interrupted()) {
88  SPD_TRACE("Actor::wait()");
89  pPointer = ActorZmq::wait();
90  if (!pPointer) break;
91 
92  if (pPointer == mpPipe) {
93  SPD_TRACE("Signal from pipe={}", static_cast<void *>(mpPipe));
94  // We are not reacting to pipe events for now
95  continue;
96  }
97  else {
98  for (auto pZmqSocket : mZmqSockets) {
99  SPD_TRACE("Searching ZMQ inSocket=[{}] zmqSocket[{}]", static_cast<void *>(pPointer),
100  static_cast<void *>(pZmqSocket));
101  if (pZmqSocket == pPointer) {
102  pZmqSock = pZmqSocket;
103  break;
104  }
105  }
106 
107  if (pZmqSock) {
108  SPD_TRACE("HANDLING zmq socket [{}]", static_cast<void *>(pZmqSock));
109  zmsg_t * pMsg = zmsg_recv(pZmqSock);
110  handleZmq(pMsg);
111  zmsg_destroy(&pMsg);
112  pZmqSock = nullptr;
113  continue;
114  }
115 
116  for (auto pNet : mSockets) {
117  SPD_TRACE("Searching ZYRE socket={} in net={} socket={}", static_cast<void *>(pPointer),
118  static_cast<void *>(pNet.get()), static_cast<void *>(pNet->socket()));
119  if (pNet && pNet->socket() && pPointer == pNet->socket()) {
120  pSocket = pNet;
121  break;
122  }
123  }
124 
125  if (!pSocket) {
126  if (mpNodeManager->handleTaskPool(pPointer)) continue;
127  }
128 
129  if (!pSocket) {
130  SPD_ERROR("Socket comming from unknown network : socket={}", pPointer);
131  continue;
132  }
133 
134  // ==================================================
135 
136  Message * pMsg = pSocket->pull();
137  if (!pMsg) {
138  SPD_ERROR("Message from socket={} is null", pPointer);
139  continue;
140  }
141 
142  Message::EventType type = pMsg->event();
143  SPD_TRACE("Salsa::NodeZyre::exec() : Event from net [{}] pMsg [{}] type [{}]",
144  static_cast<void *>(pSocket.get()), static_cast<void *>(pMsg), type);
145 
146  bool doPublish = true;
147  std::vector<std::string> values;
148 
149  if (type == Message::ENTER) {
150  const char * pHeader =
151  zyre_event_header(static_cast<MessageZyre *>(pMsg)->zyreEvent(), "X-SALSA-NODE-TYPE");
152  std::string snt;
153  if (pHeader) snt = pHeader;
154  SPD_TRACE("[{}] ENTER uuid=[{}] node_type=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid(), snt);
155  mpNodeManager->onEnter(zyre_uuid(pSocket->zyre()), snt, pMsg, values);
156  }
157  else if (type == Message::EXIT) {
158  SPD_TRACE("[{}] EXIT uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid());
159  mpNodeManager->onExit(zyre_uuid(pSocket->zyre()), pMsg, values);
160  }
161  else if (type == Message::EVASIVE) {
162  SPD_TRACE("[{}] EVASIVE uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid());
163  doPublish = false;
164  }
165  else if (type == Message::WHISPER) {
166  SPD_TRACE("[{}] WHISPER uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->uuid());
167  mpNodeManager->onWhisper(zyre_uuid(pSocket->zyre()), pMsg, values);
168  }
169 
170  if (doPublish) {
171  SPD_TRACE("Publishing to [{}] ...", zyre_uuid(sockets()[0]->zyre()));
172  mpNodeManager->publish(zyre_uuid(sockets()[0]->zyre()));
173  }
174  delete pMsg;
175  pSocket = nullptr;
176  pZmqSock = nullptr;
177  }
178 
179  } // END WHILE not terminated
180 
181  SPD_TRACE("Salsa::NodeZyre::exec()->");
182  return 0;
183 }
184 
186 {
190 
191  SPD_TRACE("Salsa::NodeZyre::finish()<-");
192 
193  SPD_TRACE("Salsa::NodeZyre::finish()->");
194 
195  return 0;
196 }
197 
198 void NodeZyre::addSocket(std::shared_ptr<SocketZyre> socket)
199 {
203 
204  if (socket) {
205  auto pNode = std::make_shared<Node>(zyre_name(socket->zyre()), zyre_uuid(socket->zyre()));
206 
207  pNode->parent(shared_from_this());
208  Node::add(pNode);
209  mSockets.push_back(socket);
210  }
211 }
212 
213 std::vector<std::shared_ptr<SocketZyre>> NodeZyre::sockets() const
214 {
218  return mSockets;
219 }
220 
221 void NodeZyre::addSocket(zsock_t * socket)
222 {
226  if (socket) {
227  mZmqSockets.push_back(socket);
228  }
229 }
230 
231 void NodeZyre::handleZmq(zmsg_t * pMsg)
232 {
236 
237  free(zmsg_popstr(pMsg));
238  char * pCmd = zmsg_popstr(pMsg);
239  if (!strcmp(pCmd, "TASK")) {
240 
241  char * pPayload_str = zmsg_popstr(pMsg);
242  std::string payload = pPayload_str;
243  free(pPayload_str);
244 
245  TaskInfo * pTaskInfo = new TaskInfo();
246  {
247 
248  if (!pTaskInfo->ParseFromString(payload)) {
249  SPD_ERROR("Message does not contain ProtoBuf message!");
250  return;
251  }
252  }
253  SPD_DEBUG("TASK [{}:{}] ", pTaskInfo->jobid(), pTaskInfo->taskid());
254 
255  mpNodeManager->addTask(pTaskInfo, "", "", Salsa::Job::pending);
256 
257  // delete pTaskInfo;
258  }
259  else if (!strcmp(pCmd, "JOB_DEL")) {
260  char * pJobUUID_str = zmsg_popstr(pMsg);
261  std::string jobUUID = pJobUUID_str;
262  free(pJobUUID_str);
263 
264  mpNodeManager->terminateJob(jobUUID);
265  }
266  else if (!strcmp(pCmd, "JOB_DEL_ALL")) {
268  }
269  free(pCmd);
270 }
271 
272 } // namespace Salsa
PollerZmq * mpPoller
Internal poller.
Definition: ActorZmq.hh:41
virtual ~NodeZyre()
Destruct Zyre node.
Definition: NodeZyre.cc:16
virtual int init()
First function.
Definition: NodeZyre.cc:37
virtual EventType event() const =0
Returns node event type.
virtual Socket * onEnter(std::string self, std::string fromType, Message *pMsg, std::vector< std::string > &out)
Base Message class.
Definition: Message.hh:15
void handleZmq(zmsg_t *pMsg)
Definition: NodeZyre.cc:231
std::vector< std::shared_ptr< SocketZyre > > sockets() const
Definition: NodeZyre.cc:213
virtual int exec()
Main function.
Definition: NodeZyre.cc:75
virtual int finish()
Last function.
Definition: NodeZyre.cc:185
bool mTerminated
Flag if actor should be terminated.
Definition: ActorZmq.hh:42
Base Node class.
Definition: Node.hh:22
virtual void terminateJob(std::string uuid)
Definition: NodeManager.cc:337
std::vector< zsock_t * > mZmqSockets
List of zmq sockets.
Definition: NodeZyre.hh:39
virtual void * wait()
Definition: ActorZmq.cc:378
virtual Socket * onExit(std::string self, Message *pMsg, std::vector< std::string > &out)
ZeroMQ implementation of salsa actor class.
Definition: ActorZmq.hh:19
void add(std::shared_ptr< Node > node)
Adds node to the list of nodes.
Definition: Node.hh:50
void addSocket(std::shared_ptr< SocketZyre > socket)
Definition: NodeZyre.cc:198
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:60
virtual std::string uuid() const =0
Returns node uuid.
zsock_t * mpPipe
Zmq pipe socket.
Definition: ActorZmq.hh:40
virtual Socket * onWhisper(std::string self, Message *pMsg, std::vector< std::string > &out)
EventType
Node event type.
Definition: Message.hh:18
NodeInfo * mpNodeInfo
Node Info.
Definition: Node.hh:69
virtual void add(SocketZyre *pSocket)
Definition: PollerZmq.cc:45
virtual void publish(std::string id) const
Definition: NodeManager.cc:557
std::vector< std::shared_ptr< SocketZyre > > mSockets
List of zyre sockets.
Definition: NodeZyre.hh:38
virtual bool handleTaskPool(void *pPool)
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::QueueType t=Salsa::Job::pending)
Definition: NodeManager.cc:220
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:53
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
Definition: NodeManager.cc:67
virtual void terminateJobAll()
Definition: NodeManager.cc:364
static std::sig_atomic_t interrupted()
Returns if salsa is interrupted.
Definition: Actor.hh:35
NodeManagerZyre class.
NodeManagerZyre * mpNodeManager
Job manager.
Definition: NodeZyre.hh:40