salsa  0.4.0
ActorZmq.cc
1 #include "ActorZmq.hh"
2 
3 namespace Salsa {
5 {
9 
10  mpPoller = new PollerZmq();
11 }
13 {
17 
18  // Why no smart pointer? Because this one is fully managed within current class.
19  delete mpPoller;
20 }
21 
22 void ActorZmq::SalsaActorFn(zsock_t * pPipe, void * pArg)
23 {
27 
28  zsock_signal(pPipe, 0);
29  ActorZmq * pActor = static_cast<Salsa::ActorZmq *>(pArg);
30  pActor->pipe(pPipe);
31 
32  SPD_TRACE("SalsaActorFn::init() <-");
33  int ret = 0;
34  if ((ret = pActor->init())) {
35  SPD_ERROR("init() failed! [{}]", ret);
36  return;
37  }
38  SPD_TRACE("SalsaActorFn::init()->");
39 
40  if (!Salsa::Actor::interrupted() && !pActor->terminated()) {
41  SPD_TRACE("SalsaActorFn::exec() <-");
42  if ((ret = pActor->exec())) {
43  SPD_ERROR("exec() failed! [{}]", ret);
44  return;
45  }
46  SPD_TRACE("SalsaActorFn::exec() ->");
47  }
48 
49  SPD_TRACE("SalsaActorFn::finish() <-");
50  if ((ret = pActor->finish())) {
51  SPD_ERROR("finish() failed! [{}]", ret);
52  return;
53  }
54  SPD_TRACE("SalsaActorFn::finish() ->");
55 }
56 
57 void ActorZmq::SalsaActorForkFn(zsock_t * pPipe, void *)
58 {
59  // _/(;_;)\_
60  // AT+OK
61  zsock_signal(pPipe, 0);
62 
63  pid_t pid = 0;
64 
65  // PA will be running indefinitely, until interrupted
66  while (true) {
67  zmsg_t * pReceived = zmsg_recv(pPipe);
68  if (!pReceived) {
69  SPD_WARN("PA: pReceived == <nullptr> (Exec interrupted)");
70  break;
71  }
72 
73  // read first frame
74  zframe_t * pFrame = zmsg_first(pReceived);
75 
76  // Terminate on signal
77  if (zframe_streq(pFrame, "$TERM")) {
78  SPD_TRACE("PA: Terminate received");
79  zmsg_destroy(&pReceived);
80  break; // TERMINATE persistent actor
81  }
82  else {
83  // GET command from message
84  char * pCommand = zframe_strdup(pFrame);
85  SPD_TRACE("PA: got Command [{}]", pCommand);
86  pFrame = zmsg_next(pReceived);
87 
88  // GET UID from message
89  char * pUID = zframe_strdup(pFrame);
90  SPD_TRACE("PA: got UID [{}]", pUID);
91  pFrame = zmsg_next(pReceived);
92 
93  // GET GID from message
94  char * pGID = zframe_strdup(pFrame);
95  SPD_TRACE("PA: got GID [{}]", pGID);
96  pFrame = zmsg_next(pReceived);
97 
98  // GET Upsream uuid from message
99  char * pWorker = zframe_strdup(pFrame);
100  SPD_TRACE("PA: got Woker [{}]", pWorker);
101  pFrame = zmsg_next(pReceived);
102 
103  // GET Upsream uuid from message
104  char * pUpsream = zframe_strdup(pFrame);
105  SPD_TRACE("PA: got Upstream [{}]", pUpsream);
106  pFrame = zmsg_next(pReceived);
107 
108  // GET Client uuid from message
109  char * pClient = zframe_strdup(pFrame);
110  SPD_TRACE("PA: got Client [{}]", pClient);
111  pFrame = zmsg_next(pReceived);
112 
113  std::string pMessage_str;
114  std::string pLoop_str;
115  // GET targets from message
116  Salsa::Log log;
117  if (pFrame) {
118 
119  char * pLoop = zframe_strdup(pFrame);
120  pLoop_str = pLoop;
121  zstr_free(&pLoop);
122  SPD_TRACE("PA: got str logs [{}]", pLoop);
123 
124  std::string pMessage_str;
125  if (pLoop_str == "logs") {
126  while ((pFrame = zmsg_next(pReceived)) != nullptr) {
127  char * pMessage = zframe_strdup(pFrame);
128  pMessage_str = pMessage;
129  if (pMessage_str == "envs") {
130  pLoop_str = "envs";
131  break;
132  }
133  SPD_TRACE("PA: Adding log target [{}]", pMessage);
134  log.add(pMessage);
135  zstr_free(&pMessage);
136  }
138  // if (log.empty())
139  // log.add("");
140  }
141  }
142 
143  std::vector<std::string> envs;
144  if (pLoop_str == "envs") {
145  while ((pFrame = zmsg_next(pReceived)) != nullptr) {
146  char * pMessage = zframe_strdup(pFrame);
147  SPD_TRACE("PA: Adding env [{}]", pMessage);
148  envs.push_back(pMessage);
149  zstr_free(&pMessage);
150  }
151  }
152  char * envp[envs.size()];
153  int i = 0;
154  for (auto s : envs) {
155  char * cstr = new char[s.length() + 1];
156  strcpy(cstr, s.c_str());
157  // do stuff
158  envp[i] = cstr;
159  // delete[] cstr;
160  i++;
161  }
162  envp[i] = NULL;
163 
164  // Destroy message and go on with your life
165  SPD_TRACE("PA: Destroying message [{}]", static_cast<void *>(pReceived));
166  zmsg_destroy(&pReceived);
167 
168  // Initialization DONE ---------------------------------------------
169 
170  SPD_TRACE("PA: Creating logger");
171  log.create();
172  log.spd()->info("Running [{}]", pCommand);
173 
174  SPD_TRACE("PA: Waiting for pipes...");
175  int pipefd[2];
176  if (pipe2(pipefd, O_NONBLOCK)) {
177  SPD_ERROR("FAILED to receive pipes!"); // TODO Inform manager about pipe failure ?
178  }
179  SPD_TRACE("PA: Got pipes [{}, {}]", pipefd[0], pipefd[1]);
180 
181  // = = = = = = = = = = FORK = = = = = = = = = =
182  pid = fork();
183  if (pid == 0) {
184 
185  // TODO this needs to be improved if we cannot set UID
186  // TODO Check if uid is equal to process uid (in non-root case)
187  if (getuid() == 0) {
188  SPD_TRACE("PA Child: uid [{}]->[{}] guid [{}]->[{}]", getuid(), pUID, getgid(), pGID);
189  if (setgid(atoi(pGID)) == -1) {
190  SPD_ERROR("Problem setting GUI to process !!! ");
191  return;
192  }
193  if (setuid(atoi(pUID)) == -1) {
194  SPD_ERROR("Problem setting UID to process !!! ");
195  return;
196  }
197 
198  SPD_TRACE("PA Child: uid [{}] guid [{}]", getuid(), getgid());
199  }
200 
201  SPD_TRACE("PA Child: Running command [{}]", pCommand);
202  // FORK Child handler
203  unsigned int iCount = 0;
204  char ** ppCommand = nullptr;
205  char * tmp = std::strtok(pCommand, " ");
206 
207  SPD_TRACE("PA Child: Tokenizing");
208  do {
209  // iCount + 2 because 1) iCounter an iterator and 2) we need nullptr at the end
210  ppCommand = static_cast<char **>(realloc(ppCommand, (iCount + 2) * sizeof(char **)));
211  ppCommand[iCount++] = strdup(tmp);
212  tmp = std::strtok(nullptr, " ");
213  } while (tmp);
214  ppCommand[iCount] = nullptr;
215 
216  // SPD_TRACE("PA Child: Sleeping for 100ms");
217  // std::this_thread::sleep_for(std::chrono::milliseconds(100));
218 
219  SPD_TRACE("PA Child: Configuring pipes");
220 
221  // After these lines you'll be unable to log anything to console, so don't even
222  // try. It's literally a waste of time.
223  close(pipefd[0]);
224  dup2(pipefd[1], STDOUT_FILENO);
225  dup2(pipefd[1], STDERR_FILENO);
226  close(pipefd[1]);
227 
228  if (execvpe(ppCommand[0], ppCommand, envp) == -1) {
229  // int const err = errno;
230  // SPD_ERROR("PA failed to execute command! Error: [{}]", strerror(err));
231  // *facepalm*
232  exit(127);
233  }
234  }
235  else if (pid > 0) {
236  // FORK Parent handler
237  // Send PID to parent
238  SPD_TRACE("PA Parent: Sending PID [{}] to parent", pid);
239  {
240  zmsg_t * pTx = zmsg_new();
241  zmsg_addstr(pTx, "$PID");
242  zmsg_addstrf(pTx, "%d", pid);
243  zmsg_addstr(pTx, pUpsream);
244  zmsg_addstr(pTx, pClient);
245  zsock_send(pPipe, "m", pTx);
246  zmsg_destroy(&pTx);
247  }
248 
249  int stat = -1;
250  close(pipefd[1]);
251 
252  log.fd(pipefd[0]);
253  zactor_t * pWatcherActor = zactor_new(actorProcwaitSupport_, &log);
254 
255  SPD_TRACE("PA Parent: Running command...");
256  // Read from pipe until child dies
257  while (true) {
258  waitpid(pid, &stat, WUNTRACED);
259  if (WIFEXITED(stat) || WIFSIGNALED(stat)) {
260  zstr_sendf(pWatcherActor, "$EXIT");
261  break;
262  }
263  }
264 
265  zactor_destroy(&pWatcherActor);
266 
267  close(pipefd[0]);
268  int rc = WEXITSTATUS(stat);
269  // In case of kill -9 : returning 137
270  if (stat == 9) rc = 137;
271 
272  SPD_TRACE("PA Parent: Exit [{}] rc [{}]", stat, rc);
273  {
274 
275  zmsg_t * pTx = zmsg_new();
276  zmsg_addstr(pTx, "$EXIT");
277  zmsg_addstrf(pTx, "%d", rc);
278  zmsg_addstr(pTx, pWorker);
279  zmsg_addstr(pTx, pUpsream);
280  zmsg_addstr(pTx, pClient);
281  zsock_send(pPipe, "m", pTx);
282  zmsg_destroy(&pTx);
283  }
284  log.spd()->info("Process exited with status [{}]", stat);
285  }
286  else {
287  // FORK Failed handler
288  SPD_ERROR("PA Parent: fork() failure!");
289  {
290  zmsg_t * pTx = zmsg_new();
291  zmsg_addstr(pTx, "$FORKFAIL");
292  zsock_send(pPipe, "m", pTx);
293  zmsg_destroy(&pTx);
294  }
295  } // END FORK handling
296 
297  free(pCommand);
298  free(pUID);
299  free(pGID);
300  free(pWorker);
301  free(pUpsream);
302  free(pClient);
303 
304  } // END execute command
305  } // END while (true)
306 
307  SPD_TRACE("PA: Terminating persistent actor");
308  return;
309 }
310 
311 void ActorZmq::actorProcwaitSupport_(zsock_t * pPipe, void * pLogger)
312 {
313  zsock_signal(pPipe, 0);
314 
315  Log & commandLogger = *(static_cast<Log *>(pLogger));
316  // 3) 2)1)
317  // Since this is kind of hard to read:
318  // 1) Cast pLogger to Log *
319  // 2) Get its value
320  // 3) Set reference to it
321 
322  int fd = commandLogger.fd();
323  const int LIMIT = PIPE_BUF;
324  char buffer[LIMIT + 1];
325  std::memset(buffer, 0, LIMIT + 1);
326 
327  zpoller_t * pPoller = zpoller_new(nullptr);
328  zpoller_add(pPoller, pPipe);
329  zpoller_add(pPoller, &fd);
330 
331  while (true) {
332  // Possible death race condition... I'm looking at you Valgrind
333  void * pRecvSock = zpoller_wait(pPoller, -1);
334  if (pRecvSock == pPipe) {
335  char * pMsg = zstr_recv(pPipe);
336  std::string recvMsg = pMsg;
337  free(pMsg);
338  if (recvMsg == "$EXIT") {
339  break;
340  }
341  }
342  else if (pRecvSock == &fd) {
343  ssize_t readRet = read(fd, buffer, LIMIT);
344  if (readRet > 0) {
345  if (buffer[0] != '\0') {
346  commandLogger.write(buffer);
347  memset(buffer, 0, sizeof(buffer));
348  }
349  }
350  }
351  }
352 
353  zpoller_remove(pPoller, pPipe);
354  zpoller_remove(pPoller, &fd);
355  zpoller_destroy(&pPoller);
356  return;
357 }
358 
359 void ActorZmq::pipe(void * pPipe)
360 {
364 
365  SPD_TRACE("ActorZmq::pipe()<-");
366  mpPipe = static_cast<zsock_t *>(pPipe);
367 
368  if (!mpPoller) {
369  mpPoller = new PollerZmq();
370  }
371 
372  if (mpPipe) {
373  mpPoller->add(mpPipe);
374  }
375  SPD_TRACE("ActorZmq::pipe()->");
376 }
377 
379 {
383 
384  SPD_TRACE("ActorZmq::init()<-");
385  // Setting up signal handler
386  std::signal(SIGINT, Salsa::Actor::signalHandler);
387  std::signal(SIGTERM, Salsa::Actor::signalHandler);
388 
389  SPD_TRACE("ActorZmq::init()->");
390  return 0;
391 }
392 
394 {
398 
399  SPD_TRACE("ActorZmq::exec()<-");
400 
401  void * pEvent;
402  while (!mTerminated && !Salsa::Actor::interrupted()) {
403  pEvent = wait();
404  if (pEvent) {
405  // handle other socket
406  SPD_WARN("ActorZmq::exec() : Other socket from ActorZmq class");
407  }
408  }
409 
410  SPD_TRACE("ActorZmq::exec() : Salsa::interrupted() [{}]", Salsa::Actor::interrupted());
411  SPD_TRACE("ActorZmq::exec()->");
412  return 0;
413 }
414 
416 {
420 
421  SPD_TRACE("ActorZmq::finish()<-");
422  SPD_TRACE("ActorZmq::finish()->");
423  return 0;
424 }
425 
427 {
431  if (!mpPoller) {
432  SPD_ERROR("Poller is nullptr!");
433  return nullptr;
434  }
435 
436  void * pEvent = mpPoller->wait(mTimeout);
437  SPD_TRACE("ActorZmq::exec(): pEvent [{}] mpPipe [{}]", static_cast<void *>(pEvent), static_cast<void *>(mpPipe));
438 
439  if (mpPipe && pEvent == mpPipe) {
440  zmsg_t * pMsg = zmsg_recv(mpPipe);
441  if (!pMsg) {
442  return nullptr;
443  }
444 
445  char * pCommand = zmsg_popstr(pMsg);
446  zmsg_destroy(&pMsg);
447  if (streq(pCommand, "$TERM")) {
448  SPD_TRACE("ActorZmq::exec(): received $TERM");
449  mTerminated = true;
450  }
451  else {
452  SPD_ERROR("ActorZmq::exec(): invalid message to actor msg: [{}]", pCommand);
453  assert(false); // We should __not__ use assert here, because it's only used to debug...
454  }
455  zstr_free(&pCommand);
456  }
457  else {
458  if (zpoller_expired(mpPoller->poller())) {
459  SPD_TRACE("ActorZmq::exec(): Poller expired timeout [{}]...", mTimeout);
460  }
461  else if (zpoller_terminated(mpPoller->poller())) {
462  SPD_TRACE("ActorZmq::exec(): Poller terminated ...");
463  mTerminated = true;
464  }
465  else {
466  return pEvent;
467  }
468  }
469 
470  return pEvent;
471 }
472 
473 zpoller_t * ActorZmq::poller() const
474 {
478  return mpPoller->poller();
479 }
481 {
485  return mpPoller;
486 }
487 } // namespace Salsa
PollerZmq * mpPoller
Internal poller.
Definition: ActorZmq.hh:49
std::shared_ptr< spdlog::logger > spd()
Get SPDLOG logger handle.
Definition: Log.hh:37
virtual int init()
First function.
Definition: ActorZmq.cc:378
virtual void pipe(void *pipe)
Setter for pipe.
Definition: ActorZmq.cc:359
static void signalHandler(int signalNumber)
Setter salsa interruption.
Definition: Actor.cc:19
virtual ~ActorZmq()
Definition: ActorZmq.cc:12
zpoller_t * poller() const
Definition: ActorZmq.cc:473
virtual int finish()
Last function.
Definition: ActorZmq.cc:415
void fd(int newFD)
Set FD of pipe to watch.
Definition: Log.hh:42
bool mTerminated
Flag if actor should be terminated.
Definition: ActorZmq.hh:50
zpoller_t * poller() const
Returns Poller.
Definition: PollerZmq.hh:27
static void actorProcwaitSupport_(zsock_t *pipe, void *argv)
Support actor method (used for PID waiting)
Definition: ActorZmq.cc:311
bool terminated() const
Flag if actor should be terminated.
Definition: ActorZmq.hh:40
int write(char const *)
Write to logger.
Definition: Log.cc:50
virtual void * wait()
Definition: ActorZmq.cc:426
salsa node class
Definition: PollerZmq.hh:16
ZeroMQ implementation of salsa actor class.
Definition: ActorZmq.hh:19
zsock_t * mpPipe
Zmq pipe socket.
Definition: ActorZmq.hh:48
virtual void add(SocketZyre *pSocket)
Definition: PollerZmq.cc:45
Base salsa actor class.
Definition: Actor.hh:17
static void SalsaActorFn(zsock_t *pPipe, void *pArgv)
Definition: ActorZmq.cc:22
virtual int exec()
Main function.
Definition: ActorZmq.cc:393
Definition: Log.hh:17
static std::sig_atomic_t interrupted()
Returns if salsa is interrupted.
Definition: Actor.hh:35
int create()
Create SPDLOG loger.
Definition: Log.cc:31
static void SalsaActorForkFn(zsock_t *pPipe, void *pArgv)
Actor function with fork capability.
Definition: ActorZmq.cc:57
int mTimeout
Poller timeout.
Definition: ActorZmq.hh:51
PollerZmq * pollerZmq() const
Definition: ActorZmq.cc:480
virtual void * wait(int timeout=-1)
Waiting for socket.
Definition: PollerZmq.cc:56
int add(std::string)
Add output sink (file, console, zmq) for SPDLOG.
Definition: Log.cc:10