dmlite 0.6
TaskExec.h
Go to the documentation of this file.
1/*
2 * Copyright 2015 CERN
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17#ifndef DMLITE_TASKEXEC_H
18#define DMLITE_TASKEXEC_H
19
20/** @file dmTaskExec.h
21 * @brief A class that spawns commands that perform actions
22 * @author Fabrizio Furano
23 * @date Dec 2015
24 */
25
26
27#include <boost/thread.hpp>
28#include <signal.h>
29#include <vector>
30#include <string>
31#include <algorithm>
32#include <sstream>
33#include <iterator>
34#include <iostream>
36
37namespace dmlite {
38 class dmTaskExec;
39
40 class dmTask: public boost::mutex {
41
42 protected:
43 /// Threads waiting for result about this task will wait and synchronize here
44 /// using something like
45 /// boost::lock_guard< boost::mutex > l(workmutex);
46 ///
47 boost::condition_variable condvar;
48 public:
49 dmTask(dmTaskExec *wheretolog);
50 dmTask(const dmTask &o) {
51 key = o.key;
52 cmd = o.cmd;
53 for(unsigned int i = 0; i < 64; i++) parms[i] = NULL;
56 endtime = o.endtime;
57 running = o.running;
59 fd[0] = 0; fd[1] = 0; fd[2] = 0;
60 this->stdout = o.stdout;
61 this->loggerinst = o.loggerinst;
62 }
63
65 int key;
66
67 std::string cmd;
68 const char *parms[64];
69
71
73 bool running;
75
76 int fd[3];
77 pid_t pid;
78 std::string stdout;
79
80 /// Split che command string into the single parms
81 void splitCmd();
82
83 /// Wait until the task has finished or the timeout is expired
84 int waitFinished(int tmout=5);
85
86 void notifyAll() {
87 condvar.notify_all();
88 }
89
91 };
92
93
94 /// Allows to spawn commands, useful for checksum calculations or file pulling
95 /// The spawned commands are pollable, i.e. in a given moment it's possible to
96 /// know the list of commands that are still running.
97 /// Objects belonging to this class in general are created in the disk nodes,
98 /// e.g. for running checksums or file copies and pulls
99 class dmTaskExec: public boost::recursive_mutex {
100
101 public:
104 std::string instance;
105 /// Executes a command. Returns a positive integer as a key to reference
106 /// the execution status and the result
107 /// The mechanics is that a detached thread is started. This guy invokes popen3
108 /// and blocks waiting for the process to end. Upon end it updates the corresponding
109 /// instance of dmTask with the result and the stdout
110 int submitCmd(std::string cmd);
111
112
113 /// Executes a command. Returns a positive integer as a key to reference
114 // the execution status and the result
115 // The mechanics is that a detached thread is started. This guy invokes popen3
116 // and blocks waiting for the process to end. Upon end it updates the corresponding
117 // instance of dmTask with the result and the stdout
118 // -1 is returned in case of error in the submission
119 int submitCmd(std::vector<std::string> &args);
120
121 /// Actually starts the thread corresponding to a command that was just submitted
122 /// Avoids race conditions
123 void goCmd(int id);
124
125 /// Split che command string into the single parms
126 void assignCmd(dmTask *task, std::vector<std::string> &args);
127
128 /// Get the results of a task.
129 /// Wait at max tmout seconds until the task finishes
130 /// Return 0 if the task has finished and there is a result
131 /// Return nonzero if the task is still running
132 int waitResult(int taskID, int tmout=5);
133
134 //kill a specific task given the id
135 int killTask(int taskID);
136
137 //get a dmTask given the id ( mainly for testing)
138 dmTask* getTask(int taskID);
139
140 //get the current stdout of a task which may be running
141 int getTaskStdout(int taskID, std::string &stdout);
142
143 /// Loops over all the tasks and:
144 /// - send a notification to the head node about all the processes that are running or that have finished
145 /// - garbage collect the task list.
146 /// - Task that are finished since long (e.g. 1 hour)
147 /// - Tasks that are stuck (e.g. 1 day)
148 void tick();
149
150 int getTaskCounters(int &tot, int &running, int &finished);
151
152
153 /// Event invoked internally to log stuff
154 virtual void onLoggingRequest(Logger::Level lvl, std::string const & msg) = 0;
155 /// Event invoked internally to log stuff
156 virtual void onErrLoggingRequest(std::string const & msg) = 0;
157
158 protected:
159
160 /// event for immediate notifications when a task finishes
161 /// Subclasses can specialize this and apply app-dependent behavior to
162 /// perform actions when something has finished running
163 /// NOTE the signature. This passes copies of Task objects, not the originals
164 virtual void onTaskCompleted(dmTask &task);
165
166 // event that notifies that a task is running
167 // This event can be invoked multiple times during the life of a task
168 /// NOTE the signature. This passes copies of Task objects, not the originals
169 virtual void onTaskRunning(dmTask &task);
170
171
172 private:
173
174 int popen3(int fd[3], pid_t *pid, const char ** argv );
175
176 /// Used to create keys to be inserted into the map. This has to be treated modulo MAXINT or similar big number
178 /// This map works like a sparse array :-)
179 std::map<int, dmTask*> tasks;
180
181
182 /// Here we invoke popen3
183 /// and block waiting for the process to end. Upon end it updates the corresponding
184 /// instance of dmTask with the result and the stdout
185 virtual void run(int key);
186
187 //kill a specific task
188 int killTask(dmTask *task);
189 };
190
191
192
193}
194
195
196
197
198
199
200
201
202
203
204
205
206
207#endif
208
Level
Definition logger.h:89
Definition TaskExec.h:99
virtual void onLoggingRequest(Logger::Level lvl, std::string const &msg)=0
Event invoked internally to log stuff.
int getTaskCounters(int &tot, int &running, int &finished)
int submitCmd(std::vector< std::string > &args)
Executes a command. Returns a positive integer as a key to reference.
int killTask(dmTask *task)
int taskcnt
Used to create keys to be inserted into the map. This has to be treated modulo MAXINT or similar big ...
Definition TaskExec.h:177
virtual void run(int key)
int killTask(int taskID)
void goCmd(int id)
dmTask * getTask(int taskID)
int waitResult(int taskID, int tmout=5)
std::string instance
Definition TaskExec.h:104
virtual void onTaskRunning(dmTask &task)
NOTE the signature. This passes copies of Task objects, not the originals.
virtual void onTaskCompleted(dmTask &task)
std::map< int, dmTask * > tasks
This map works like a sparse array :-)
Definition TaskExec.h:179
virtual void onErrLoggingRequest(std::string const &msg)=0
Event invoked internally to log stuff.
int getTaskStdout(int taskID, std::string &stdout)
int submitCmd(std::string cmd)
void assignCmd(dmTask *task, std::vector< std::string > &args)
Split che command string into the single parms.
int popen3(int fd[3], pid_t *pid, const char **argv)
Definition TaskExec.h:40
boost::condition_variable condvar
Definition TaskExec.h:47
dmTask(dmTaskExec *wheretolog)
const char * parms[64]
Definition TaskExec.h:68
time_t endtime
Definition TaskExec.h:72
time_t starttime
Definition TaskExec.h:72
int resultcode
Definition TaskExec.h:70
bool finished
Definition TaskExec.h:74
std::string stdout
Definition TaskExec.h:78
void splitCmd()
Split che command string into the single parms.
int key
Definition TaskExec.h:65
dmTask(const dmTask &o)
Definition TaskExec.h:50
bool running
Definition TaskExec.h:73
int fd[3]
Definition TaskExec.h:76
void notifyAll()
Definition TaskExec.h:86
dmTaskExec * loggerinst
Definition TaskExec.h:90
int waitFinished(int tmout=5)
Wait until the task has finished or the timeout is expired.
pid_t pid
Definition TaskExec.h:77
std::string cmd
Definition TaskExec.h:67
Namespace for the dmlite C++ API.
Definition authn.h:16