28 zsock_signal(pPipe, 0);
32 SPD_TRACE(
"SalsaActorFn::init()");
34 if ((ret = pActor->
init())) {
35 SPD_ERROR(
"init() failed! [{}]", ret);
40 SPD_TRACE(
"SalsaActorFn::exec()");
41 if ((ret = pActor->
exec())) {
42 SPD_ERROR(
"exec() failed! [{}]", ret);
47 SPD_TRACE(
"SalsaActorFn::finish()");
48 if ((ret = pActor->
finish())) {
49 SPD_ERROR(
"finish() failed! [{}]", ret);
58 zsock_signal(pPipe, 0);
64 zmsg_t * pReceived = zmsg_recv(pPipe);
66 SPD_WARN(
"PA: pReceived == <nullptr> (Exec interrupted)");
71 zframe_t * pFrame = zmsg_first(pReceived);
74 if (zframe_streq(pFrame,
"$TERM")) {
75 SPD_TRACE(
"PA: Terminate received");
76 zmsg_destroy(&pReceived);
81 char * pCommand = zframe_strdup(pFrame);
82 SPD_TRACE(
"PA: got Command [{}]", pCommand);
83 pFrame = zmsg_next(pReceived);
86 char * pUid = zframe_strdup(pFrame);
87 SPD_TRACE(
"PA: got UID [{}]", pUid);
88 pFrame = zmsg_next(pReceived);
91 char * pGid = zframe_strdup(pFrame);
92 SPD_TRACE(
"PA: got GID [{}]", pGid);
93 pFrame = zmsg_next(pReceived);
96 char * pWorker = zframe_strdup(pFrame);
97 SPD_TRACE(
"PA: got Woker [{}]", pWorker);
98 pFrame = zmsg_next(pReceived);
101 char * pUpsream = zframe_strdup(pFrame);
102 SPD_TRACE(
"PA: got Upstream [{}]", pUpsream);
103 pFrame = zmsg_next(pReceived);
106 char * pClient = zframe_strdup(pFrame);
107 SPD_TRACE(
"PA: got Client [{}]", pClient);
111 while ((pFrame = zmsg_next(pReceived)) !=
nullptr) {
112 char * pMessage = zframe_strdup(pFrame);
113 SPD_TRACE(
"PA: Adding log target [{}]", pMessage);
115 zstr_free(&pMessage);
122 SPD_TRACE(
"PA: Destroying message [{}]", static_cast<void *>(pReceived));
123 zmsg_destroy(&pReceived);
127 SPD_TRACE(
"PA: Creating logger");
129 log.
spd()->info(
"Running [{}]", pCommand);
131 SPD_TRACE(
"PA: Waiting for pipes...");
133 if (pipe2(pipefd, O_NONBLOCK)) {
134 SPD_ERROR(
"FAILED to receive pipes!");
136 SPD_TRACE(
"PA: Got pipes [{}, {}]", pipefd[0], pipefd[1]);
145 SPD_TRACE(
"PA Child: uid [{}]->[{}] guid [{}]->[{}]", getuid(), pUid, getgid(), pGid);
146 if (setgid(atoi(pGid)) == -1) {
147 SPD_ERROR(
"Problem setting GUI to process !!! ");
150 if (setuid(atoi(pUid)) == -1) {
151 SPD_ERROR(
"Problem setting UID to process !!! ");
155 SPD_TRACE(
"PA Child: uid [{}] guid [{}]", getuid(), getgid());
158 SPD_TRACE(
"PA Child: Running command [{}]", pCommand);
160 unsigned int iCount = 0;
161 char ** ppCommand =
nullptr;
162 char * tmp = std::strtok(pCommand,
" ");
164 SPD_TRACE(
"PA Child: Tokenizing");
167 ppCommand =
static_cast<char **
>(realloc(ppCommand, (iCount + 2) *
sizeof(
char **)));
168 ppCommand[iCount++] = strdup(tmp);
169 tmp = std::strtok(
nullptr,
" ");
171 ppCommand[iCount] =
nullptr;
176 SPD_TRACE(
"PA Child: Configuring pipes");
181 dup2(pipefd[1], STDOUT_FILENO);
182 dup2(pipefd[1], STDERR_FILENO);
185 if (execvp(ppCommand[0], ppCommand) == -1) {
186 int const err = errno;
187 SPD_ERROR(
"PA failed to execute command! Error: [{}]", strerror(err));
194 SPD_TRACE(
"PA Parent: Sending PID [{}] to parent", pid);
196 zmsg_t * pTx = zmsg_new();
197 zmsg_addstr(pTx,
"$PID");
198 zmsg_addstrf(pTx,
"%d", pid);
199 zmsg_addstr(pTx, pUpsream);
200 zmsg_addstr(pTx, pClient);
201 zsock_send(pPipe,
"m", pTx);
211 SPD_TRACE(
"PA Parent: Running command...");
214 waitpid(pid, &stat, WUNTRACED);
215 if (WIFEXITED(stat) || WIFSIGNALED(stat)) {
216 zstr_sendf(pWatcherActor,
"$EXIT");
221 zactor_destroy(&pWatcherActor);
224 int rc = WEXITSTATUS(stat);
226 if (stat == 9) rc = 137;
228 SPD_TRACE(
"PA Parent: Exit [{}] rc [{}]", stat, rc);
231 zmsg_t * pTx = zmsg_new();
232 zmsg_addstr(pTx,
"$EXIT");
233 zmsg_addstrf(pTx,
"%d", rc);
234 zmsg_addstr(pTx, pWorker);
235 zmsg_addstr(pTx, pUpsream);
236 zmsg_addstr(pTx, pClient);
237 zsock_send(pPipe,
"m", pTx);
240 log.
spd()->info(
"Process exited with status [{}]", stat);
244 SPD_ERROR(
"PA Parent: fork() failure!");
246 zmsg_t * pTx = zmsg_new();
247 zmsg_addstr(pTx,
"$FORKFAIL");
248 zsock_send(pPipe,
"m", pTx);
263 SPD_TRACE(
"PA: Terminating persistent actor");
269 zsock_signal(pPipe, 0);
271 Log & commandLogger = *(
static_cast<Log *
>(pLogger));
278 int fd = commandLogger.
fd();
279 const int LIMIT = PIPE_BUF;
280 char buffer[LIMIT + 1];
281 std::memset(buffer, 0, LIMIT + 1);
283 zpoller_t * pPoller = zpoller_new(
nullptr);
284 zpoller_add(pPoller, pPipe);
285 zpoller_add(pPoller, &fd);
289 void * pRecvSock = zpoller_wait(pPoller, -1);
290 if (pRecvSock == pPipe) {
291 char * pMsg = zstr_recv(pPipe);
292 std::string recvMsg = pMsg;
294 if (recvMsg ==
"$EXIT") {
298 else if (pRecvSock == &fd) {
299 ssize_t readRet = read(fd, buffer, LIMIT);
301 if (buffer[0] !=
'\0') {
302 commandLogger.
write(buffer);
303 memset(buffer, 0,
sizeof(buffer));
309 zpoller_remove(pPoller, pPipe);
310 zpoller_remove(pPoller, &fd);
311 zpoller_destroy(&pPoller);
321 SPD_TRACE(
"ActorZmq::pipe()<-");
322 mpPipe =
static_cast<zsock_t *
>(pPipe);
331 SPD_TRACE(
"ActorZmq::pipe()->");
340 SPD_TRACE(
"ActorZmq::init()<-");
341 SPD_TRACE(
"ActorZmq::init()->");
351 SPD_TRACE(
"ActorZmq::exec()<-");
358 SPD_WARN(
"ActorZmq::exec() : Other socket from ActorZmq class");
363 SPD_TRACE(
"ActorZmq::exec()->");
373 SPD_TRACE(
"ActorZmq::finish()<-");
374 SPD_TRACE(
"ActorZmq::finish()->");
384 SPD_ERROR(
"Poller is nullptr!");
389 SPD_TRACE(
"ActorZmq::exec(): pEvent [{}] mpPipe [{}]", static_cast<void *>(pEvent), static_cast<void *>(
mpPipe));
392 zmsg_t * pMsg = zmsg_recv(
mpPipe);
397 char * pCommand = zmsg_popstr(pMsg);
399 if (streq(pCommand,
"$TERM")) {
400 SPD_TRACE(
"ActorZmq::exec(): received $TERM");
404 SPD_ERROR(
"ActorZmq::exec(): invalid message to actor msg: [{}]", pCommand);
407 zstr_free(&pCommand);
411 SPD_TRACE(
"ActorZmq::exec(): Poller expired timeout [{}]...",
mTimeout);
414 SPD_TRACE(
"ActorZmq::exec(): Poller terminated ...");
PollerZmq * mpPoller
Internal poller.
std::shared_ptr< spdlog::logger > spd()
Get SPDLOG logger handle.
virtual int init()
First function.
virtual void pipe(void *pipe)
Setter for pipe.
virtual int finish()
Last function.
void fd(int newFD)
Set FD of pipe to watch.
bool mTerminated
Flag if actor should be terminated.
static void actorProcwaitSupport_(zsock_t *pipe, void *argv)
Support actor method (used for PID waiting)
int write(char const *)
Write to logger.
ZeroMQ implementation of salsa actor class.
zsock_t * mpPipe
Zmq pipe socket.
PollerZmq * pollerZmq() const
zpoller_t * poller() const
virtual void add(SocketZyre *pSocket)
static void SalsaActorFn(zsock_t *pPipe, void *pArgv)
zpoller_t * poller() const
Returns Poller.
virtual int exec()
Main function.
static std::sig_atomic_t interrupted()
Returns if salsa is interrupted.
int create()
Create SPDLOG loger.
static void SalsaActorForkFn(zsock_t *pPipe, void *pArgv)
Actor function with fork capability.
int mTimeout
Poller timeout.
virtual void * wait(int timeout=-1)
Waiting for socket.
int add(std::string)
Add output sink (file, console, zmq) for SPDLOG.