1 #include "TaskExecutorForkZmq.hh"
2 #include <TaskState.hh>
27 if (
pipe() ==
nullptr)
return false;
29 zmsg_t * pOutMsg = zmsg_new();
33 zmsg_addstr(pOutMsg, worker.c_str());
34 zmsg_addstr(pOutMsg, upstream.c_str());
36 for (
int iPos = 0; iPos <
mpTaskState->
task()->logtargets_size(); iPos++) {
37 zmsg_addstrf(pOutMsg,
"%s",
mpTaskState->
task()->logtargets(iPos).c_str());
39 zsock_send(
pipe(),
"m", pOutMsg);
40 zmsg_destroy(&pOutMsg);
58 zmsg_t * pMessage = zmsg_recv(
pipe());
59 if (zframe_streq(zmsg_first(pMessage),
"$PID")) {
60 char * pPidStr = zframe_strdup(zmsg_next(pMessage));
62 uint32_t pid =
static_cast<uint32_t
>(strtoul(pPidStr,
nullptr, 0));
71 else if (zframe_streq(zmsg_first(pMessage),
"$EXIT")) {
72 char * pExitStatusStr = zframe_strdup(zmsg_next(pMessage));
73 uint32_t exitStatus =
static_cast<uint32_t
>(strtoul(pExitStatusStr,
nullptr, 0));
77 SPD_DEBUG(
"JOB [{}:{}] PID [{}] finished with rc [{}] killed [{}]",
mpTaskState->
task()->jobid(),
86 char * pWkUUID = zframe_strdup(zmsg_next(pMessage));
87 extra.push_back(pWkUUID);
88 char * pUpstream = zframe_strdup(zmsg_next(pMessage));
89 extra.push_back(pUpstream);
94 zmsg_destroy(&pMessage);
virtual bool handlePipe(std::vector< std::string > &extra)
Handle pipe.
zactor_t * mpZActor
ZMQ Actor pointer.
virtual ~TaskExecutorForkZmq()
TaskState * mpTaskState
Task state.
TaskExecutorForkZmq(zactor_t *pActor=nullptr)
virtual void * pipe() const
virtual bool run(std::string worker, std::string upstream)
Run task.