module Ntswf::ActivityWorker

Interface for a worker executing tasks

Constants

KNOWN_RETURN_KEYS

Public Instance Methods

on_activity(proc = nil, &block) click to toggle source

Configure a proc or block to be called on receiving an {AWS::SimpleWorkflow::ActivityTask} @yieldparam task [Hash]

Description of the task's properties:
:activity_task:: The {AWS::SimpleWorkflow::ActivityTask}
:name:: Kind of task
:params:: Custom parameters given to the execution (parsed back from JSON)
:run_id:: The workflow execution's run ID
:version:: Client version
:workflow_id:: The workflow execution's workflow ID

See {Ntswf::Client#start_execution}'s options for details

@param proc [Proc] The callback @yieldreturn [Hash]

Processing result. The following keys are interpreted accordingly:
:error:: Fails the task with the given error details.
:outcome:: Completes the task, storing the outcome's value (as JSON).
:seconds_until_restart::
  Starts the task as new, after the given delay.
:seconds_until_retry::
  Re-schedules the task, after the given delay.
  In combination with an *:error*: Marks the task for immediate re-scheduling,
  ignoring the value.
  Please note that the behaviour is undefined if an *:interval* option has been set.
# File lib/ntswf/activity_worker.rb, line 31
def on_activity(proc = nil, &block)
  @task_callback = proc || block
end
process_activities() click to toggle source

Start the worker loop for activity tasks.

# File lib/ntswf/activity_worker.rb, line 36
def process_activities
  loop { in_subprocess :process_activity_task }
end
process_activity_task() click to toggle source
# File lib/ntswf/activity_worker.rb, line 40
def process_activity_task
  announce("polling for activity task #{activity_task_list}")
  domain.activity_tasks.poll_for_single_task(activity_task_list, poll_options) do |task|
    announce("got activity task #{task.activity_type.inspect} #{task.input}")
    process_single_task(task)
  end
end

Protected Instance Methods

describe(activity_task) click to toggle source
# File lib/ntswf/activity_worker.rb, line 67
def describe(activity_task)
  options = parse_input(activity_task.input)
  options.merge!(
    activity_task: activity_task,
    run_id: activity_task.workflow_execution.run_id,
    workflow_id: activity_task.workflow_execution.workflow_id,
  )
  options.map { |k, v| {k.to_sym => v} }.reduce(&:merge!)
end
fail_with_exception(activity_task, exception) click to toggle source
# File lib/ntswf/activity_worker.rb, line 57
def fail_with_exception(activity_task, exception)
  notify(exception, activity_type: activity_task.activity_type.inspect,
      input: activity_task.input)
  details = {
    error: exception.message[0, 1000],
    exception: exception.class.to_s[0, 1000],
  }
  activity_task.fail!(details: details.to_json, reason: 'Exception')
end
process_returned_hash(activity_task, returned_hash) click to toggle source
# File lib/ntswf/activity_worker.rb, line 79
def process_returned_hash(activity_task, returned_hash)
  return unless returned_hash.kind_of? Hash
  kind, value = returned_hash.detect { |k, v| KNOWN_RETURN_KEYS.include? k }
  case kind
  when :error
    reason = returned_hash[:seconds_until_retry] ? "Retry" : "Error"
    activity_task.fail!(details: {error: value.to_s[0, 1000]}.to_json, reason: reason)
  when :outcome, :seconds_until_retry
    activity_task.complete!(result: returned_hash.to_json)
  end
end
process_single_task(activity_task) click to toggle source
# File lib/ntswf/activity_worker.rb, line 50
def process_single_task(activity_task)
  returned_hash = @task_callback.call(describe(activity_task)) if @task_callback
  process_returned_hash(activity_task, returned_hash)
rescue => exception
  fail_with_exception(activity_task, exception)
end