2 #include <NodeManagerZyre.hh>
6 using namespace fmt::literals;
22 SPD_TRACE(
"### Destroy NodeZyre [{}] ###",
mpNodeInfo->name());
28 zsock_destroy(&pSocket);
43 SPD_TRACE(
"Salsa::NodeZyre::init()<-");
56 if (socket->header(
"X-SALSA-NODE-TYPE") ==
"CONSUMER")
59 if (socket->header(
"X-SALSA-NODE-TYPE") ==
"FEEDER")
62 if (socket->header(
"X-SALSA-NODE-TYPE") ==
"WORKER")
71 SPD_TRACE(
"Salsa::NodeZyre::init()->");
81 SPD_TRACE(
"Salsa::NodeZyre::exec()<-");
83 void * pPointer =
nullptr;
84 std::shared_ptr<SocketZyre> pSocket =
nullptr;
85 zsock_t * pZmqSock =
nullptr;
88 SPD_TRACE(
"Actor::wait()");
93 SPD_TRACE(
"Signal from pipe={}", static_cast<void *>(
mpPipe));
99 SPD_TRACE(
"Searching ZMQ inSocket=[{}] zmqSocket[{}]", static_cast<void *>(pPointer),
100 static_cast<void *>(pZmqSocket));
101 if (pZmqSocket == pPointer) {
102 pZmqSock = pZmqSocket;
108 SPD_TRACE(
"HANDLING zmq socket [{}]", static_cast<void *>(pZmqSock));
109 zmsg_t * pMsg = zmsg_recv(pZmqSock);
117 SPD_TRACE(
"Searching ZYRE socket={} in net={} socket={}", static_cast<void *>(pPointer),
118 static_cast<void *>(pNet.get()), static_cast<void *>(pNet->socket()));
119 if (pNet && pNet->socket() && pPointer == pNet->socket()) {
130 SPD_ERROR(
"Socket comming from unknown network : socket={}", pPointer);
136 Message * pMsg = pSocket->pull();
138 SPD_ERROR(
"Message from socket={} is null", pPointer);
143 SPD_TRACE(
"Salsa::NodeZyre::exec() : Event from net [{}] pMsg [{}] type [{}]",
144 static_cast<void *>(pSocket.get()), static_cast<void *>(pMsg), type);
146 bool doPublish =
true;
147 std::vector<std::string> values;
149 if (type == Message::ENTER) {
150 const char * pHeader =
151 zyre_event_header(static_cast<MessageZyre *>(pMsg)->zyreEvent(),
"X-SALSA-NODE-TYPE");
153 if (pHeader) snt = pHeader;
154 SPD_TRACE(
"[{}] ENTER uuid=[{}] node_type=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid(), snt);
157 else if (type == Message::EXIT) {
158 SPD_TRACE(
"[{}] EXIT uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid());
161 else if (type == Message::EVASIVE) {
162 SPD_TRACE(
"[{}] EVASIVE uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid());
165 else if (type == Message::WHISPER) {
166 SPD_TRACE(
"[{}] WHISPER uuid=[{}]", zyre_name(pSocket->zyre()), pMsg->
uuid());
171 SPD_TRACE(
"Publishing to [{}] ...", zyre_uuid(
sockets()[0]->zyre()));
181 SPD_TRACE(
"Salsa::NodeZyre::exec()->");
191 SPD_TRACE(
"Salsa::NodeZyre::finish()<-");
193 SPD_TRACE(
"Salsa::NodeZyre::finish()->");
205 auto pNode = std::make_shared<Node>(zyre_name(socket->zyre()), zyre_uuid(socket->zyre()));
207 pNode->parent(shared_from_this());
237 free(zmsg_popstr(pMsg));
238 char * pCmd = zmsg_popstr(pMsg);
239 if (!strcmp(pCmd,
"TASK")) {
241 char * pPayload_str = zmsg_popstr(pMsg);
242 std::string payload = pPayload_str;
245 TaskInfo * pTaskInfo =
new TaskInfo();
248 if (!pTaskInfo->ParseFromString(payload)) {
249 SPD_ERROR(
"Message does not contain ProtoBuf message!");
253 SPD_DEBUG(
"TASK [{}:{}] ", pTaskInfo->jobid(), pTaskInfo->taskid());
259 else if (!strcmp(pCmd,
"JOB_DEL")) {
260 char * pJobUUID_str = zmsg_popstr(pMsg);
261 std::string jobUUID = pJobUUID_str;
266 else if (!strcmp(pCmd,
"JOB_DEL_ALL")) {
PollerZmq * mpPoller
Internal poller.
virtual ~NodeZyre()
Destruct Zyre node.
virtual int init()
First function.
virtual EventType event() const =0
Returns node event type.
virtual Socket * onEnter(std::string self, std::string fromType, Message *pMsg, std::vector< std::string > &out)
void handleZmq(zmsg_t *pMsg)
std::vector< std::shared_ptr< SocketZyre > > sockets() const
virtual int exec()
Main function.
virtual int finish()
Last function.
bool mTerminated
Flag if actor should be terminated.
virtual void terminateJob(std::string uuid)
std::vector< zsock_t * > mZmqSockets
List of zmq sockets.
virtual Socket * onExit(std::string self, Message *pMsg, std::vector< std::string > &out)
ZeroMQ implementation of salsa actor class.
void add(std::shared_ptr< Node > node)
Adds node to the list of nodes.
void addSocket(std::shared_ptr< SocketZyre > socket)
void addFeeder(std::string uuid, std::shared_ptr< Socket > s)
virtual std::string uuid() const =0
Returns node uuid.
zsock_t * mpPipe
Zmq pipe socket.
virtual Socket * onWhisper(std::string self, Message *pMsg, std::vector< std::string > &out)
EventType
Node event type.
NodeInfo * mpNodeInfo
Node Info.
virtual void add(SocketZyre *pSocket)
virtual void publish(std::string id) const
std::vector< std::shared_ptr< SocketZyre > > mSockets
List of zyre sockets.
virtual bool handleTaskPool(void *pPool)
void addTask(TaskInfo *taskInfo, std::string cuuid, std::string fuuid, Salsa::Job::QueueType t=Salsa::Job::pending)
void addConsumer(std::string uuid, std::shared_ptr< Socket > s)
void addWorker(std::string uuid, std::shared_ptr< Socket > s)
virtual void terminateJobAll()
static std::sig_atomic_t interrupted()
Returns if salsa is interrupted.
NodeManagerZyre * mpNodeManager
Job manager.