salsa  0.3.0
 All Classes Functions Variables Enumerations Pages
Job.cc
1 #include "Job.hh"
2 namespace Salsa {
3 Job::Job(std::string uuid, std::string type) : Object(), mUUID(uuid), mType(type)
4 {
8 }
10 {
14  TaskInfo * pTaskInfo;
15  for (int iType = QueueType::pending; iType < QueueType::all; iType++) {
16  for (auto job : mTasks[iType]) {
17  pTaskInfo = job.second;
18  delete pTaskInfo;
19  job.second = nullptr;
20  }
21  mTasks[iType].clear();
22  }
23 }
24 
25 bool Job::addTask(uint32_t id, TaskInfo * pJob, QueueType type)
26 {
30  if (!pJob) {
31  return false;
32  }
33 
34  if (type >= all) {
35  SPD_WARN("QueueType is our of range [{}]", type);
36  return false;
37  }
38 
39  // id = size();
40 
41  mTasks[type].insert(std::make_pair(id, pJob));
42  return true;
43 }
44 
45 bool Job::moveTask(uint32_t id, QueueType from, QueueType to)
46 {
50 
51  return moveTask(id, nullptr, from, to);
52 }
53 
54 bool Job::moveTask(uint32_t id, TaskInfo * pJobInfo, QueueType from, QueueType to)
55 {
59  auto iFound = mTasks[from].find(id);
60  if (iFound != mTasks[from].end()) {
61  if (pJobInfo == nullptr) {
62  pJobInfo = iFound->second;
63  }
64  else {
65  delete iFound->second;
66  }
67 
68  mTasks[from].erase(iFound);
69  addTask(id, pJobInfo, to);
70  return true;
71  }
72  SPD_WARN("Job with id [{}] was not found in queue [{}] !!!", id, from);
73  return false;
74 }
75 
76 bool Job::removeTask(uint32_t id, QueueType from)
77 {
81  auto found = mTasks[from].find(id);
82  if (found != mTasks[from].end()) {
83  mTasks[from].erase(found);
84  return true;
85  }
86 
87  return false;
88 }
89 
90 TaskInfo * Job::nextJob()
91 {
95  auto pJob = mTasks[QueueType::pending].begin();
96  if (pJob == mTasks[QueueType::pending].end()) {
97  return nullptr;
98  }
99 
100  TaskInfo * pJI = pJob->second;
101  moveTask(pJob->first, QueueType::pending, QueueType::assigned);
102  return pJI;
103 }
104 
105 void Job::tasks(std::vector<TaskInfo *> & v, QueueType type, bool shouldClear)
106 {
110 
111  for (auto task : mTasks[type]) {
112  v.push_back(task.second);
113  }
114  if (shouldClear) {
115  mTasks[type].clear();
116  }
117 }
118 
119 bool Job::isTaskInQueue(uint32_t id, QueueType type) const
120 {
124 
125  auto found = mTasks[type].find(id);
126  if (found != mTasks[type].end()) {
127  return true;
128  }
129 
130  return false;
131 }
132 
133 void Job::print() const
134 {
138  SPD_INFO("{} P[{}] A[{}] R[{}] D[{}] F[{}]\r", mUUID, mTasks[QueueType::pending].size(),
139  mTasks[QueueType::assigned].size(), mTasks[QueueType::running].size(), mTasks[QueueType::done].size(),
140  mTasks[QueueType::failed].size());
141  SPD_TRACE("Feeder [{}] Consumer [{}]", mFeederUUID, mConsumerUUID);
142 }
143 
144 void Job::json(Json::Value & json)
145 {
149 
150  Json::Value d;
151  d["name"] = mUUID;
152  d["P"] = static_cast<Json::Value::UInt64>(mTasks[QueueType::pending].size());
153  d["A"] = static_cast<Json::Value::UInt64>(mTasks[QueueType::assigned].size());
154  d["R"] = static_cast<Json::Value::UInt64>(mTasks[QueueType::running].size());
155  d["D"] = static_cast<Json::Value::UInt64>(mTasks[QueueType::done].size());
156  d["F"] = static_cast<Json::Value::UInt64>(mTasks[QueueType::failed].size());
157  json.append(d);
158 }
159 
160 size_t Job::size(QueueType type) const
161 {
166  if (type >= QueueType::all) {
167  size_t sum = mTasks[QueueType::pending].size();
168  sum += mTasks[QueueType::assigned].size();
169  sum += mTasks[QueueType::running].size();
170  sum += mTasks[QueueType::done].size();
171  sum += mTasks[QueueType::failed].size();
172  return sum;
173  }
174 
175  return mTasks[type].size();
176 }
177 
178 size_t Job::sizeNotFinished() const
179 {
183  size_t sum = mTasks[QueueType::pending].size();
184  sum += mTasks[QueueType::assigned].size();
185  sum += mTasks[QueueType::running].size();
186  return sum;
187 }
188 
189 void Job::consumer(std::string uuid)
190 {
194  mConsumerUUID = uuid;
195 }
196 
197 std::string Job::consumer() const
198 {
202  return std::move(mConsumerUUID);
203 }
204 
205 void Job::feeder(std::string uuid)
206 {
210  mFeederUUID = uuid;
211 }
212 
213 std::string Job::feeder() const
214 {
218  return std::move(mFeederUUID);
219 }
220 
221 bool Job::haveMoreTasks() const
222 {
226  return mHaveMoreTasks;
227 }
228 
229 void Job::haveMoreTasks(bool haveMoreTasks)
230 {
235 }
236 
237 } // namespace Salsa
Job(std::string uuid="", std::string type="NONE")
Definition: Job.cc:3
std::string mFeederUUID
Feeder UUID.
Definition: Job.hh:59
size_t size(QueueType t=all) const
Definition: Job.cc:160
void tasks(std::vector< TaskInfo * > &v, QueueType type, bool clear=true)
Definition: Job.cc:105
std::string mConsumerUUID
Source (consumer) UUID.
Definition: Job.hh:58
std::string mUUID
Job UUID.
Definition: Job.hh:57
void print() const
Definition: Job.cc:133
std::map< uint32_t, TaskInfo * > mTasks[all]
Lists of jobs.
Definition: Job.hh:56
virtual ~Job()
Definition: Job.cc:9
bool haveMoreTasks() const
Definition: Job.cc:221
std::string consumer() const
Definition: Job.cc:197
bool isTaskInQueue(uint32_t id, QueueType type) const
Check task presence in certain queue.
Definition: Job.cc:119
QueueType
Queue types.
Definition: Job.hh:19
bool mHaveMoreTasks
Flag if we have more tasks.
Definition: Job.hh:61
void json(Json::Value &json)
Definition: Job.cc:144
size_t sizeNotFinished() const
Definition: Job.cc:178
std::string feeder() const
Definition: Job.cc:213
Base Salsa Object class.
Definition: Object.hh:15
bool removeTask(uint32_t id, QueueType from)
Definition: Job.cc:76
bool moveTask(uint32_t id, QueueType from, QueueType to)
Definition: Job.cc:45
bool addTask(uint32_t id, TaskInfo *pJob, QueueType type)
Definition: Job.cc:25
TaskInfo * nextJob()
TODO Get next available task?job?
Definition: Job.cc:90