salsa  0.3.0
 All Classes Functions Variables Enumerations Pages
TaskExecutorForkZmq.cc
1 #include "TaskExecutorForkZmq.hh"
2 #include <TaskState.hh>
3 namespace Salsa {
4 TaskExecutorForkZmq::TaskExecutorForkZmq(zactor_t * actor) : TaskExecutor(), mpZActor(actor)
5 {
9 }
11 {
15  if (mpZActor) {
16  zactor_destroy(&mpZActor);
17  }
18 }
19 bool TaskExecutorForkZmq::run(std::string worker, std::string upstream)
20 {
24 
25  if (mpTaskState == nullptr) return false;
26 
27  if (pipe() == nullptr) return false;
28 
29  zmsg_t * pOutMsg = zmsg_new();
30  zmsg_addstrf(pOutMsg, "%s", mpTaskState->task()->data().c_str());
31  zmsg_addstrf(pOutMsg, "%d", mpTaskState->task()->clientid());
32  zmsg_addstrf(pOutMsg, "%d", mpTaskState->task()->groupid());
33  zmsg_addstr(pOutMsg, worker.c_str());
34  zmsg_addstr(pOutMsg, upstream.c_str());
35  zmsg_addstr(pOutMsg, mpTaskState->task()->jobid().c_str());
36  for (int iPos = 0; iPos < mpTaskState->task()->logtargets_size(); iPos++) {
37  zmsg_addstrf(pOutMsg, "%s", mpTaskState->task()->logtargets(iPos).c_str());
38  }
39  zsock_send(pipe(), "m", pOutMsg);
40  zmsg_destroy(&pOutMsg);
41 
42  return true;
43 }
45 {
49  return mpZActor;
50 }
51 
52 bool TaskExecutorForkZmq::handlePipe(std::vector<std::string> & extra)
53 {
57 
58  zmsg_t * pMessage = zmsg_recv(pipe());
59  if (zframe_streq(zmsg_first(pMessage), "$PID")) {
60  char * pPidStr = zframe_strdup(zmsg_next(pMessage));
61 
62  uint32_t pid = static_cast<uint32_t>(strtoul(pPidStr, nullptr, 0));
63  mpTaskState->pid(pid);
64  // mpTaskState->state(TaskState::State::running);
65  std::string payload;
66  mpTaskState->task()->SerializeToString(&payload);
67  SPD_DEBUG("JOB [{}:{}] PID [{}] started", mpTaskState->task()->jobid(), mpTaskState->task()->taskid(), pPidStr);
68 
69  free(pPidStr);
70  }
71  else if (zframe_streq(zmsg_first(pMessage), "$EXIT")) {
72  char * pExitStatusStr = zframe_strdup(zmsg_next(pMessage));
73  uint32_t exitStatus = static_cast<uint32_t>(strtoul(pExitStatusStr, nullptr, 0));
74  free(pExitStatusStr);
75  mpTaskState->task()->set_returncode(exitStatus);
76 
77  SPD_DEBUG("JOB [{}:{}] PID [{}] finished with rc [{}] killed [{}]", mpTaskState->task()->jobid(),
78  mpTaskState->task()->taskid(), mpTaskState->pid(), mpTaskState->task()->returncode(),
79  mpTaskState->state() == TaskState::killed);
80 
81  // mpTaskState->state(TaskState::State::idle);
82  // mpTaskState->pid(0);
83 
84  // std::string payload;
85  // mpTaskState->task()->SerializeToString(&payload);
86  char * pWkUUID = zframe_strdup(zmsg_next(pMessage));
87  extra.push_back(pWkUUID);
88  char * pUpstream = zframe_strdup(zmsg_next(pMessage));
89  extra.push_back(pUpstream);
90 
91  free(pWkUUID);
92  free(pUpstream);
93  }
94  zmsg_destroy(&pMessage);
95 
96  return true;
97 }
98 } // namespace Salsa
virtual bool handlePipe(std::vector< std::string > &extra)
Handle pipe.
zactor_t * mpZActor
ZMQ Actor pointer.
void pid(uint32_t pid)
Definition: TaskState.cc:50
void state(State s)
Definition: TaskState.cc:35
TaskState * mpTaskState
Task state.
Definition: TaskExecutor.hh:31
TaskExecutorForkZmq(zactor_t *pActor=nullptr)
TaskInfo * task() const
Definition: TaskState.cc:65
virtual void * pipe() const
virtual bool run(std::string worker, std::string upstream)
Run task.
Base TaskExecutor class.
Definition: TaskExecutor.hh:14