salsa  0.4.0
Job.cc
1 #include "Job.hh"
2 
3 namespace Salsa {
4 Job::Job(std::string uuid, std::string type) : Object(), mUUID(uuid), mType(type)
5 {
9 
10  // mTimeStarted = std::chrono::system_clock::now();
11  mTimeStarted =
12  std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
13 }
14 
16 {
20  for (int iType = EQueueType::pending; iType < EQueueType::all; iType++) {
21  for (auto iTask : mTasks[iType]) {
22  delete iTask.second;
23  iTask.second = nullptr;
24  }
25  mTasks[iType].clear();
26  }
27 }
28 
29 bool Job::addTask(uint32_t id, TaskInfo * pTaskInfo, EQueueType type)
30 {
34  if (!pTaskInfo) {
35  return false;
36  }
37 
38  if (type >= all) {
39  SPD_CRIT("EQueueType is out of range [{}]", type);
40  return false;
41  }
42 
43  mTasks[type].insert(std::make_pair(id, pTaskInfo));
44  return true;
45 }
46 
47 bool Job::moveTask(uint32_t id, EQueueType from, EQueueType to)
48 {
52 
53  return moveTask(id, nullptr, from, to);
54 }
55 
56 bool Job::moveTask(JobID_t id, TaskInfo * pTaskInfo, EQueueType from, EQueueType to)
57 {
61  auto iFound = mTasks[from].find(id);
62  if (iFound != mTasks[from].end()) {
63  if (pTaskInfo == nullptr) {
64  pTaskInfo = iFound->second;
65  }
66  else {
67  delete iFound->second;
68  }
69  // TODO This is asking for trouble...
70  // Possible fix would be... std::shared_ptr;
71  // Also, why do we even need to supply pTaskInfo anyways?
72 
73  mTasks[from].erase(iFound);
74  addTask(id, pTaskInfo, to);
75  mChanged = true;
76  return true;
77  }
78  else {
79  SPD_WARN("Job with id [{}] was not found in queue [{}] !!!", id, from);
80  return false;
81  }
82 }
83 
84 bool Job::removeTask(uint32_t id, EQueueType from)
85 {
89 
90  // TODO This could cause problems at some point...
91  auto found = mTasks[from].find(id);
92  if (found != mTasks[from].end()) {
93  mTasks[from].erase(found);
94  return true;
95  }
96 
97  return false;
98 }
99 
100 TaskInfo * Job::nextTask()
101 {
105 
106  auto iAvailTask = mTasks[EQueueType::pending].begin();
107  if (iAvailTask == mTasks[EQueueType::pending].end()) {
108  return nullptr;
109  }
110 
111  TaskInfo * pNewTask = iAvailTask->second;
112  moveTask(iAvailTask->first, EQueueType::pending, EQueueType::assigned);
113  return pNewTask;
114 }
115 
116 void Job::tasks(std::vector<TaskInfo *> & targetVec, EQueueType type, bool shouldClear)
117 {
121 
122  for (auto task : mTasks[type]) {
123  targetVec.push_back(task.second);
124  }
125 
126  if (shouldClear) {
127  mTasks[type].clear();
128  }
129 }
130 
132 {
136 
137  return (mTasks[type].find(id) != mTasks[type].end());
138 
139  // auto found = mTasks[type].find(id);
140  // if (found != mTasks[type].end()) {
141  // return true;
142  //}
143  // else {
144  // return false;
145  //}
146 }
147 
148 void Job::print() const
149 {
153  SPD_DEBUG("{} P[{}] A[{}] R[{}] D[{}] F[{}] started[{}] finished[{}]", mUUID, mTasks[EQueueType::pending].size(),
154  mTasks[EQueueType::assigned].size(), mTasks[EQueueType::running].size(), mTasks[EQueueType::done].size(),
155  mTasks[EQueueType::failed].size(), mTimeStarted, mTimeFinished);
156  SPD_TRACE("Feeder [{}] Consumer [{}]", mFeederUUID, mConsumerUUID);
157 }
158 
159 void Job::json(Json::Value & json)
160 {
164 
165  Json::Value d;
166  d["name"] = mUUID;
167  Json::UInt64 ts = static_cast<Json::UInt64>(mTimeStarted);
168  Json::UInt64 tf = static_cast<Json::UInt64>(mTimeFinished);
169  d["time"]["started"] = ts;
170  if (tf) d["time"]["finished"] = tf;
171  d["P"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::pending].size());
172  d["A"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::assigned].size());
173  d["R"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::running].size());
174  d["D"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::done].size());
175  d["F"] = static_cast<Json::Value::UInt64>(mTasks[EQueueType::failed].size());
176  json.append(d);
177 }
178 
179 size_t Job::size(EQueueType type) const
180 {
185  if (type >= EQueueType::all) {
186  size_t sum = mTasks[EQueueType::pending].size();
187  sum += mTasks[EQueueType::assigned].size();
188  sum += mTasks[EQueueType::running].size();
189  sum += mTasks[EQueueType::done].size();
190  sum += mTasks[EQueueType::failed].size();
191  return sum;
192  }
193  else {
194 
195  return mTasks[type].size();
196  }
197 }
198 
199 size_t Job::sizeNotFinished() const
200 {
204  size_t sum = mTasks[EQueueType::pending].size();
205  sum += mTasks[EQueueType::assigned].size();
206  sum += mTasks[EQueueType::running].size();
207  return sum;
208 }
209 
210 void Job::consumer(std::string uuid)
211 {
216 }
217 
218 std::string Job::consumer() const
219 {
223  // TODO Also potentialy DANGEROUS
224  return std::move(mConsumerUUID);
225 }
226 
227 void Job::feeder(std::string uuid)
228 {
232  mFeederUUID = uuid;
233 }
234 
235 std::string Job::feeder() const
236 {
240  // TODO Potentialy DANGEROUS!
241  return std::move(mFeederUUID);
242 }
243 
244 bool Job::haveMoreTasks() const
245 {
249  return !mTasks[Job::pending].empty();
250 }
251 
253 {
257  if (sizeNotFinished() > 0) return false;
258 
259  mTimeFinished =
260  std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
261 
262  return true;
263 }
264 
265 } // namespace Salsa
bool removeTask(uint32_t id, EQueueType from)
Definition: Job.cc:84
uint32_t JobID_t
Job ID type alias.
Definition: Job.hh:66
bool moveTask(uint32_t id, EQueueType from, EQueueType to)
Definition: Job.cc:47
size_t sizeNotFinished() const
Definition: Job.cc:199
Job(std::string uuid="", std::string type="NONE")
Definition: Job.cc:4
std::string mFeederUUID
Feeder UUID.
Definition: Job.hh:87
bool isFinished()
Returns if jobs is finished.
Definition: Job.cc:252
EQueueType
Queue types.
Definition: Job.hh:20
std::string uuid() const
returns UUID
Definition: Job.hh:29
std::string mConsumerUUID
Source (consumer) UUID.
Definition: Job.hh:86
std::string mUUID
Job UUID.
Definition: Job.hh:85
std::map< uint32_t, TaskInfo * > mTasks[all]
Lists of jobs.
Definition: Job.hh:84
virtual ~Job()
Definition: Job.cc:15
bool mChanged
Flag if job was changed.
Definition: Job.hh:95
std::string feeder() const
Definition: Job.cc:235
bool haveMoreTasks() const
Task statuses.
Definition: Job.cc:244
bool addTask(uint32_t id, TaskInfo *pJob, EQueueType type)
Definition: Job.cc:29
void json(Json::Value &json)
Definition: Job.cc:159
Base Salsa Object class.
Definition: Object.hh:15
std::string consumer() const
Definition: Job.cc:218
void print() const
Definition: Job.cc:148
uint64_t mTimeFinished
Time finished.
Definition: Job.hh:90
uint64_t mTimeStarted
Time started.
Definition: Job.hh:89
void tasks(std::vector< TaskInfo * > &v, EQueueType type, bool clear=true)
Definition: Job.cc:116
TaskInfo * nextTask()
Definition: Job.cc:100
size_t size(EQueueType t=all) const
Definition: Job.cc:179
bool isTaskInQueue(uint32_t id, EQueueType type) const
Check task presence in certain queue.
Definition: Job.cc:131