salsa  0.3.0
 All Classes Functions Variables Enumerations Pages
NodeManagerZyre.cc
1 #include "NodeManagerZyre.hh"
2 #include <MessageZyre.hh>
3 #include <PublisherZmq.hh>
4 #include <SocketZyre.hh>
5 #include <TaskExecutorFake.hh>
6 #include <TaskExecutorForkZmq.hh>
7 namespace Salsa {
9 {
13 
14  char * pPubUrl = getenv("SALSA_PUB_URL");
15  if (pPubUrl) {
16  SPD_INFO("Publisher url [{}]", pPubUrl);
17  mpPublisher = new PublisherZmq(pPubUrl);
18  }
19 }
21 {
25 }
26 
27 Socket * NodeManagerZyre::onEnter(std::string self, std::string fromType, Message * pMsg,
28  std::vector<std::string> & values)
29 {
33 
34  Socket * pSocket = NodeManager::onEnter(self, fromType, pMsg, values);
35 
36  if (pSocket) {
37  sendWhisper(pSocket, pMsg->uuid(), values);
38  }
39 
40  return pSocket;
41 }
42 
43 Socket * NodeManagerZyre::onExit(std::string self, Message * pMsg, std::vector<std::string> & values)
44 {
48 
49  Socket * pSocket = NodeManager::onExit(self, pMsg, values);
50 
51  if (pSocket) {
52  sendWhisper(pSocket, pMsg->uuid(), values);
53  }
54  return pSocket;
55 }
56 
57 Socket * NodeManagerZyre::onWhisper(std::string self, Message * pMsg, std::vector<std::string> & values)
58 {
62 
63  Socket * pSocket = NodeManager::onWhisper(self, pMsg, values);
64 
65  if (pSocket) {
66  sendWhisper(pSocket, pMsg->uuid(), values);
67  }
68  return pSocket;
69 }
70 
72 {
76 
77  if (mpTaskPool == nullptr) mpTaskPool = new TaskPool(this);
78 
79  if (getenv("SALSA_FAKE")) {
80  SPD_DEBUG("Fake jobs");
82  TaskState * pState = new TaskState(pExec);
83  pExec->taskState(pState);
84  mpTaskPool->add(pState->executor()->pipe(), pState);
85  }
86  else {
87  zactor_t * pActor = zactor_new(Salsa::ActorZmq::SalsaActorForkFn, nullptr);
88  TaskExecutor * pExec = new TaskExecutorForkZmq(pActor);
89  TaskState * pState = new TaskState(pExec);
90  pExec->taskState(pState);
91  mpNodeZyre->pollerZmq()->add(static_cast<zactor_t *>(pState->executor()->pipe()));
92  mpTaskPool->add(pState->executor()->pipe(), pState);
93  }
94 }
95 
96 void NodeManagerZyre::runTask(TaskState * pTaskState, std::string wk, std::string upstream)
97 {
101 
102  SPD_TRACE("Task [{}:{}] wk [{}] upstream [{}]", pTaskState->task()->jobid(), pTaskState->task()->taskid(), wk,
103  upstream);
104  pTaskState->executor()->run(wk, upstream);
105 }
106 
108 {
112 
113  if (mpTaskPool == nullptr) return false;
114 
115  return mpTaskPool->handlePipe(pPipe);
116 }
117 
118 bool NodeManagerZyre::sendWhisper(Socket * pSocket, std::string to, std::vector<std::string> & vec)
119 {
123 
124  if (vec.size()) {
125  SocketZyre * pSocketZyre = static_cast<SocketZyre *>(pSocket);
126  zmsg_t * pMsg = nullptr;
127 
128  if (vec[0] == "&") vec.erase(vec.begin());
129  for (auto str : vec) {
130  if (pMsg == nullptr) {
131  pMsg = zmsg_new();
132  }
133  if (str == "&") {
134  zyre_whisper(pSocketZyre->zyre(), to.c_str(), &pMsg);
135  zmsg_destroy(&pMsg);
136  pMsg = nullptr;
137  }
138  else {
139  zmsg_addstr(pMsg, str.c_str());
140  }
141  }
142  zyre_whisper(pSocketZyre->zyre(), to.c_str(), &pMsg);
143  zmsg_destroy(&pMsg);
144  return true;
145  }
146  return false;
147 }
148 
149 } // namespace Salsa
Salsa zyre socket class.
Definition: SocketZyre.hh:18
virtual Socket * onEnter(std::string self, std::string fromType, Message *pMsg, std::vector< std::string > &out)
TaskExecutorFake class.
Base Message class.
Definition: Message.hh:15
Base PublisherZmq class.
Definition: PublisherZmq.hh:17
virtual bool run(std::string, std::string)=0
Run task.
bool handlePipe(void *pPipe)
Definition: TaskPool.cc:142
NodeManagerZyre(NodeZyre *pNodeZyre)
Base salsa TaskState class.
Definition: TaskState.hh:16
void taskState(TaskState *pTS)
Definition: TaskExecutor.cc:22
virtual Socket * onExit(std::string self, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:141
NodeManager class.
Definition: NodeManager.hh:20
virtual Socket * onExit(std::string self, Message *pMsg, std::vector< std::string > &out)
void add(void *p, TaskState *t)
Definition: TaskPool.cc:24
NodeZyre * mpNodeZyre
Current zyre node.
virtual std::string uuid() const =0
Returns node uuid.
virtual zyre_t * zyre() const
Returns zyre pointer.
Definition: SocketZyre.hh:42
virtual bool sendWhisper(Socket *pSocket, std::string to, std::vector< std::string > &vect)
PollerZmq * pollerZmq() const
Definition: ActorZmq.cc:432
virtual Socket * onWhisper(std::string self, Message *pMsg, std::vector< std::string > &out)
TaskExecutorForkZmq class.
TaskExecutor * executor()
Definition: TaskState.cc:82
virtual void runTask(TaskState *pTaskState, std::string wk, std::string upstream)
Run task interface.
virtual void add(SocketZyre *pSocket)
Definition: PollerZmq.cc:45
TaskInfo * task() const
Definition: TaskState.cc:65
virtual Socket * onWhisper(std::string self, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:179
virtual void * pipe() const
Definition: TaskExecutor.cc:15
Base Socket class.
Definition: Socket.hh:15
salsa node class
Definition: NodeZyre.hh:20
virtual bool handleTaskPool(void *pPool)
TaskPool * mpTaskPool
Task pool.
Definition: NodeManager.hh:83
Base TaskExecutor class.
Definition: TaskExecutor.hh:14
Publisher * mpPublisher
Publisher.
Definition: NodeManager.hh:84
virtual Socket * onEnter(std::string self, std::string fromType, Message *msg, std::vector< std::string > &out)
Definition: NodeManager.cc:76
Base salsa TaskPool class.
Definition: TaskPool.hh:18
static void SalsaActorForkFn(zsock_t *pPipe, void *pArgv)
Actor function with fork capability.
Definition: ActorZmq.cc:54
virtual void addTaskSlot()