module Ntswf::DecisionWorker
Interface for a worker arbitrating tasks, optionally for multiple apps
Public Instance Methods
process_decision_task()
click to toggle source
Process a decision task. The following task values are interpreted:
- input
-
see {Ntswf::Client#start_execution}
- reason
-
reschedule if {RETRY}
- result
-
Interpreted as {Hash}, see below for keys
Result keys
- :seconds_until_retry
-
See {ActivityWorker#on_activity}
# File lib/ntswf/decision_worker.rb, line 20 def process_decision_task announce("polling for decision task #{decision_task_list}") domain.decision_tasks.poll_for_single_task(decision_task_list, poll_options) do |task| announce("got decision task #{task.workflow_execution.inspect}") begin task.new_events.each { |event| process_decision_event(task, event) } rescue => e notify(e, workflow_execution: task.workflow_execution.inspect) raise e end end end
process_decisions()
click to toggle source
Start the worker loop for decision tasks.
# File lib/ntswf/decision_worker.rb, line 9 def process_decisions loop { in_subprocess :process_decision_task } end
Protected Instance Methods
activity_task_completed(task, event)
click to toggle source
# File lib/ntswf/decision_worker.rb, line 46 def activity_task_completed(task, event) result = parse_result(event.attributes.result) start_timer(task, result) or task.complete_workflow_execution(result: event.attributes.result) end
activity_task_failed(task, event)
click to toggle source
# File lib/ntswf/decision_worker.rb, line 51 def activity_task_failed(task, event) if (event.attributes.reason == RETRY) schedule(task, task.events.first) else start_timer(task) or task.fail_workflow_execution( event.attributes.to_h.keep_if {|k| [:details, :reason].include? k}) end end
activity_task_timed_out(task, event)
click to toggle source
# File lib/ntswf/decision_worker.rb, line 60 def activity_task_timed_out(task, event) notify("Timeout in Simple Workflow. Possible cause: all workers busy", workflow_execution: task.workflow_execution.inspect) start_timer(task) or task.cancel_workflow_execution(details: 'activity task timeout') end
parse_result(result)
click to toggle source
# File lib/ntswf/decision_worker.rb, line 110 def parse_result(result) if result value = JSON.parse(result) rescue nil # expecting JSON::ParserError end value = {} unless value.kind_of? Hash value end
process_decision_event(task, event)
click to toggle source
# File lib/ntswf/decision_worker.rb, line 35 def process_decision_event(task, event) log("processing event #{event.inspect}") case event.event_type when 'WorkflowExecutionStarted' then schedule(task, event) when 'TimerFired' then retry_or_continue_as_new(task, event) when 'ActivityTaskCompleted' then activity_task_completed(task, event) when 'ActivityTaskFailed' then activity_task_failed(task, event) when 'ActivityTaskTimedOut' then activity_task_timed_out(task, event) end end
retry_or_continue_as_new(task, event)
click to toggle source
# File lib/ntswf/decision_worker.rb, line 76 def retry_or_continue_as_new(task, event) original_event = task.events.first if to_be_continued?(task, event) keys = [ :child_policy, :execution_start_to_close_timeout, :input, :tag_list, :task_list, :task_start_to_close_timeout, ] attributes = original_event.attributes.to_h.keep_if {|k| keys.include? k} task.continue_as_new_workflow_execution(attributes) else schedule(task, original_event) end end
schedule(task, data_providing_event)
click to toggle source
# File lib/ntswf/decision_worker.rb, line 94 def schedule(task, data_providing_event) input = data_providing_event.attributes.input options = parse_input(input) task_list = options['activity_task_list'] || activity_task_list(unit: options['unit'] || guess_app_from(data_providing_event)) task.schedule_activity_task(activity_type, { heartbeat_timeout: :none, input: input, task_list: task_list, schedule_to_close_timeout: 12 * 3600, schedule_to_start_timeout: 10 * 60, start_to_close_timeout: 12 * 3600, }) end
start_timer(task, result = {})
click to toggle source
# File lib/ntswf/decision_worker.rb, line 66 def start_timer(task, result = {}) interval = result["seconds_until_retry"] || result["seconds_until_restart"] unless interval options = parse_input(task.events.first.attributes.input) interval = options['interval'] end task.start_timer(interval.to_i, control: result.to_json) if interval interval end
Private Instance Methods
guess_app_from(data_providing_event)
click to toggle source
transitional, until all apps speak the input options protocol
# File lib/ntswf/decision_worker.rb, line 129 def guess_app_from(data_providing_event) data_providing_event.workflow_execution.workflow_type.name[/\w+/] end
to_be_continued?(task, event)
click to toggle source
# File lib/ntswf/decision_worker.rb, line 120 def to_be_continued?(task, event) options = parse_input(task.events.first.attributes.input) return true if options["interval"] started_event = task.events.find { |e| e.event_id == event.attributes.started_event_id } result = parse_result(started_event.attributes.to_h[:control]) result["seconds_until_restart"] || result["perform_again"] end