6 #include "NodeManagerZyre.hh" 7 #include "PublisherZmq.hh" 9 using namespace fmt::literals;
25 SPD_TRACE(
"### Destroy NodeZyre [{}] ###",
mpNodeInfo->name());
31 zsock_destroy(&pSocket);
46 SPD_TRACE(
"Salsa::NodeZyre::init()<-");
62 if (socket->header(
"X-SALSA-NODE-TYPE") ==
"CONSUMER")
65 if (socket->header(
"X-SALSA-NODE-TYPE") ==
"FEEDER") {
68 char * pPubUrl = getenv(
"SALSA_PUB_URL");
75 char * pTimeout = getenv(
"SALSA_FINISHED_JOB_TIMEOUT");
79 char * pCheckTimeout = getenv(
"SALSA_FINISHED_JOB_CHECK_TIMEOUT");
85 if (socket->header(
"X-SALSA-NODE-TYPE") ==
"WORKER")
94 SPD_TRACE(
"Salsa::NodeZyre::init()->");
104 SPD_TRACE(
"Salsa::NodeZyre::exec()<-");
106 void * pPointer =
nullptr;
107 std::shared_ptr<SocketZyre> pSocket =
nullptr;
108 zsock_t * pZmqSock =
nullptr;
110 int64_t last_time = zclock_time();
113 SPD_TRACE(
"Actor::wait()");
120 SPD_TRACE(
"Signal from pipe={}", static_cast<void *>(
mpPipe));
126 SPD_TRACE(
"Searching ZMQ inSocket=[{}] zmqSocket[{}]", static_cast<void *>(pPointer),
127 static_cast<void *>(pZmqSocket));
128 if (pZmqSocket == pPointer) {
129 pZmqSock = pZmqSocket;
135 SPD_TRACE(
"HANDLING zmq socket [{}]", static_cast<void *>(pZmqSock));
136 zmsg_t * pMsg = zmsg_recv(pZmqSock);
144 SPD_TRACE(
"Searching ZYRE socket={} in net={} socket={}", static_cast<void *>(pPointer),
145 static_cast<void *>(pNet.get()), static_cast<void *>(pNet->socket()));
146 if (pNet && pNet->socket() && pPointer == pNet->socket()) {
157 SPD_ERROR(
"Socket comming from unknown network : socket={}", pPointer);
163 Message * pMsg = pSocket->pull();
165 SPD_ERROR(
"Message from socket={} is null", pPointer);
170 SPD_TRACE(
"Salsa::NodeZyre::exec() : Event from net [{}] pMsg [{}] type [{}]",
171 static_cast<void *>(pSocket.get()), static_cast<void *>(pMsg), type);
173 bool doPublish =
true;
174 std::vector<std::string> values;
176 if (type == Message::ENTER) {
177 const char * pHeader =
178 zyre_event_header(static_cast<MessageZyre *>(pMsg)->zyreEvent(),
"X-SALSA-NODE-TYPE");
180 if (pHeader) snt = pHeader;
181 SPD_TRACE(
"[{}] ENTER uuid=[{}] node_type=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid(), snt);
185 else if (type == Message::EXIT) {
186 SPD_TRACE(
"[{}] EXIT uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid());
190 else if (type == Message::EVASIVE) {
191 SPD_TRACE(
"[{}] EVASIVE uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid());
194 else if (type == Message::WHISPER) {
195 SPD_TRACE(
"[{}] WHISPER uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid());
200 SPD_TRACE(
"Publishing to [{}] ...", zyre_uuid(
sockets()[0]->zyre()));
204 cur_time = zclock_time();
206 SPD_TRACE(
"Poller expired. Doing finished job cleaning ...");
208 last_time = zclock_time();
218 SPD_TRACE(
"Salsa::NodeZyre::exec()->");
228 SPD_TRACE(
"Salsa::NodeZyre::finish()<-");
230 SPD_TRACE(
"Salsa::NodeZyre::finish()->");
242 auto pNode = std::make_shared<Node>(zyre_name(socket->zyre()), zyre_uuid(socket->zyre()));
244 pNode->parent(shared_from_this());
275 zframe_t * pID = zmsg_pop(pMsg);
276 char * pCmd = zmsg_popstr(pMsg);
277 if (!strcmp(pCmd,
"TASK")) {
280 char * pPayload_str = zmsg_popstr(pMsg);
281 TaskInfo * pTaskInfo =
nullptr;
282 while (pPayload_str) {
283 std::string payload = pPayload_str;
286 pTaskInfo =
new TaskInfo();
289 if (!pTaskInfo->ParseFromString(payload)) {
290 SPD_ERROR(
"Message does not contain ProtoBuf message!");
294 SPD_DEBUG(
"TASK [{}:{}] ", pTaskInfo->jobid(), pTaskInfo->taskid());
298 pPayload_str = zmsg_popstr(pMsg);
322 zmsg_t * pMsgOut = zmsg_new();
323 zmsg_add(pMsgOut, pID);
324 zmsg_addstr(pMsgOut,
"");
325 zmsg_addstr(pMsgOut,
"TASK_ADDED");
326 zmsg_addstr(pMsgOut, fmt::format(
"{}", task_count).data());
327 zmsg_send(&pMsgOut, pSocket);
328 zmsg_destroy(&pMsgOut);
347 else if (!strcmp(pCmd,
"AUTH")) {
349 SPD_DEBUG(
"Checking AUTH ...");
350 zmsg_t * pMsgOut = zmsg_new();
351 zmsg_add(pMsgOut, pID);
352 zmsg_addstr(pMsgOut,
"");
353 zmsg_addstr(pMsgOut,
"AUTH");
354 zmsg_addstr(pMsgOut,
"OK");
357 rdr = zyre_uuid(
sockets()[0]->zyre());
358 zmsg_addstr(pMsgOut, rdr.data());
360 fmt::format(
"v{}.{}.{}-{}", SALSA_VERSION_MAJOR(SALSA_VERSION), SALSA_VERSION_MINOR(SALSA_VERSION),
361 SALSA_VERSION_PATCH(SALSA_VERSION), SALSA_VERSION_RELEASE)
364 zmsg_send(&pMsgOut, pSocket);
365 SPD_DEBUG(
"Sent AUTH OK {} ...", static_cast<void *>(pSocket));
369 else if (!strcmp(pCmd,
"JOB_DEL_ID")) {
370 char * pJobUUID_str = zmsg_popstr(pMsg);
371 std::string jobUUID = pJobUUID_str;
375 SPD_TRACE(
"Publishing to [{}] ...", zyre_uuid(
sockets()[0]->zyre()));
378 zmsg_t * pMsgOut = zmsg_new();
379 zmsg_add(pMsgOut, pID);
380 zmsg_addstr(pMsgOut,
"");
381 zmsg_addstr(pMsgOut, pCmd);
382 zmsg_addstr(pMsgOut,
"OK");
383 zmsg_send(&pMsgOut, pSocket);
384 SPD_DEBUG(
"Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
386 else if (!strcmp(pCmd,
"JOB_DEL_FINISHED")) {
388 SPD_TRACE(
"Publishing to [{}] ...", zyre_uuid(
sockets()[0]->zyre()));
391 zmsg_t * pMsgOut = zmsg_new();
392 zmsg_add(pMsgOut, pID);
393 zmsg_addstr(pMsgOut,
"");
394 zmsg_addstr(pMsgOut, pCmd);
395 zmsg_addstr(pMsgOut,
"OK");
396 zmsg_send(&pMsgOut, pSocket);
397 SPD_DEBUG(
"Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
399 else if (!strcmp(pCmd,
"JOB_DEL_ALL")) {
401 SPD_TRACE(
"Publishing to [{}] ...", zyre_uuid(
sockets()[0]->zyre()));
404 zmsg_t * pMsgOut = zmsg_new();
405 zmsg_add(pMsgOut, pID);
406 zmsg_addstr(pMsgOut,
"");
407 zmsg_addstr(pMsgOut, pCmd);
408 zmsg_addstr(pMsgOut,
"OK");
409 zmsg_send(&pMsgOut, pSocket);
410 SPD_DEBUG(
"Sent [{}] OK {} ...", pCmd, static_cast<void *>(pSocket));
PollerZmq * mpPoller
Internal poller.
virtual int init()
First function.
virtual ~NodeZyre()
Destruct Zyre node.
virtual int init()
First function.
virtual Socket * onEnter(std::string self, std::string fromType, Message *pMsg, std::vector< std::string > &out)
int submiterSocketIndex() const
Returns submiter socket index.
virtual int exec()
Main function.
std::string mJobInfoClientUrl
JobInfo url for client (salsa-broker –out ...)
virtual int finish()
Last function.
bool mTerminated
Flag if actor should be terminated.
virtual void terminateJob(std::string uuid)
int mJobCheckTimeout
Job check timeout.
std::vector< zsock_t * > mZmqSockets
List of zmq sockets.
virtual Socket * onExit(std::string self, Message *pMsg, std::vector< std::string > &out)
ZeroMQ implementation of salsa actor class.
void add(std::shared_ptr< Node > node)
Adds node to the list of nodes.
uint64_t finishedJobTimeout() const
Returns finished job timeout.
void addSocket(std::shared_ptr< SocketZyre > socket)
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
virtual std::string uuid() const =0
Returns node uuid.
Job * job(std::string uuid)
void handleExternalZmq(zmsg_t *pMsg, zsock_t *pSocket)
zsock_t * mpPipe
Zmq pipe socket.
void print(std::string opt="") const
std::string mJobInfoBrokerUrl
JobInfo broker url (salsa-broker –in ...)
virtual Socket * onWhisper(std::string self, Message *pMsg, std::vector< std::string > &out)
NodeInfo * mpNodeInfo
Node Info.
virtual void add(SocketZyre *pSocket)
std::vector< std::shared_ptr< SocketZyre > > mSockets
List of zyre sockets.
virtual void publish(std::string id) const
virtual bool handleTaskPool(void *pPool)
virtual EEventType event() const =0
Returns node event type.
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
std::vector< std::shared_ptr< SocketZyre > > sockets() const
virtual void publisher(Publisher *p)
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
virtual bool terminateFinishedJobs()
static std::sig_atomic_t interrupted()
Returns if salsa is interrupted.
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::EQueueType t=Salsa::Job::pending)
void * submiterSocketID() const
Returns submiter socket identity.
EEventType
Node event type.
virtual void terminateAllJobs(bool finishedonly=false)
NodeManagerZyre * mpNodeManager
Job manager.