4 Job::Job(std::string uuid, std::string type) :
Object(), mUUID(uuid), mType(type)
12 std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
20 for (
int iType = EQueueType::pending; iType < EQueueType::all; iType++) {
21 for (
auto iTask :
mTasks[iType]) {
23 iTask.second =
nullptr;
39 SPD_CRIT(
"EQueueType is out of range [{}]", type);
43 mTasks[type].insert(std::make_pair(
id, pTaskInfo));
53 return moveTask(
id,
nullptr, from, to);
61 auto iFound =
mTasks[from].find(
id);
62 if (iFound !=
mTasks[from].end()) {
63 if (pTaskInfo ==
nullptr) {
64 pTaskInfo = iFound->second;
67 delete iFound->second;
73 mTasks[from].erase(iFound);
79 SPD_WARN(
"Job with id [{}] was not found in queue [{}] !!!",
id, from);
91 auto found =
mTasks[from].find(
id);
92 if (found !=
mTasks[from].end()) {
106 auto iAvailTask =
mTasks[EQueueType::pending].begin();
107 if (iAvailTask ==
mTasks[EQueueType::pending].end()) {
111 TaskInfo * pNewTask = iAvailTask->second;
112 moveTask(iAvailTask->first, EQueueType::pending, EQueueType::assigned);
122 for (
auto task :
mTasks[type]) {
123 targetVec.push_back(task.second);
153 SPD_DEBUG(
"{} P[{}] A[{}] R[{}] D[{}] F[{}] started[{}] finished[{}]",
mUUID,
mTasks[EQueueType::pending].
size(),
167 Json::UInt64 ts = static_cast<Json::UInt64>(
mTimeStarted);
169 d[
"time"][
"started"] = ts;
170 if (tf) d[
"time"][
"finished"] = tf;
171 d[
"P"] = static_cast<Json::Value::UInt64>(
mTasks[EQueueType::pending].
size());
172 d[
"A"] = static_cast<Json::Value::UInt64>(
mTasks[EQueueType::assigned].
size());
173 d[
"R"] = static_cast<Json::Value::UInt64>(
mTasks[EQueueType::running].
size());
174 d[
"D"] = static_cast<Json::Value::UInt64>(
mTasks[EQueueType::done].
size());
175 d[
"F"] = static_cast<Json::Value::UInt64>(
mTasks[EQueueType::failed].
size());
185 if (type >= EQueueType::all) {
186 size_t sum =
mTasks[EQueueType::pending].size();
187 sum +=
mTasks[EQueueType::assigned].size();
188 sum +=
mTasks[EQueueType::running].size();
189 sum +=
mTasks[EQueueType::done].size();
190 sum +=
mTasks[EQueueType::failed].size();
195 return mTasks[type].size();
204 size_t sum =
mTasks[EQueueType::pending].size();
205 sum +=
mTasks[EQueueType::assigned].size();
206 sum +=
mTasks[EQueueType::running].size();
249 return !
mTasks[Job::pending].empty();
260 std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
bool removeTask(uint32_t id, EQueueType from)
uint32_t JobID_t
Job ID type alias.
bool moveTask(uint32_t id, EQueueType from, EQueueType to)
size_t sizeNotFinished() const
Job(std::string uuid="", std::string type="NONE")
std::string mFeederUUID
Feeder UUID.
bool isFinished()
Returns if jobs is finished.
std::string uuid() const
returns UUID
std::string mConsumerUUID
Source (consumer) UUID.
std::string mUUID
Job UUID.
std::map< uint32_t, TaskInfo * > mTasks[all]
Lists of jobs.
bool mChanged
Flag if job was changed.
std::string feeder() const
bool haveMoreTasks() const
Task statuses.
bool addTask(uint32_t id, TaskInfo *pJob, EQueueType type)
void json(Json::Value &json)
std::string consumer() const
uint64_t mTimeFinished
Time finished.
uint64_t mTimeStarted
Time started.
void tasks(std::vector< TaskInfo * > &v, EQueueType type, bool clear=true)
size_t size(EQueueType t=all) const
bool isTaskInQueue(uint32_t id, EQueueType type) const
Check task presence in certain queue.