module Ntswf::Base

Public Instance Methods

activity_name() click to toggle source
# File lib/ntswf/base.rb, line 71
def activity_name
  "master-activity"
end
activity_task_list(unit: nil) click to toggle source
# File lib/ntswf/base.rb, line 83
def activity_task_list(unit: nil)
  unit ||= default_unit
  activity_task_lists[unit] or raise Errors::InvalidArgument.new(
      "Missing activity task list configuration for unit '#{unit}'")
end
activity_task_lists() click to toggle source
# File lib/ntswf/base.rb, line 79
def activity_task_lists
  autocompleted_activity_task_lists || {}
end
activity_type() click to toggle source
# File lib/ntswf/base.rb, line 133
def activity_type
  @activity_type ||= domain.activity_types[activity_name, workflow_version]
end
configure(config) click to toggle source

@param config [Hash] A configuration with the following keys: @option config [String] :access_key_id

*deprecated:* AWS credential. Deprecated, use :swf instead.

@option config [Hash] :activity_task_lists

The task list names for activities per :unit.

@option config [String] :decision_task_list

*deprecated:* The task list name for decisions.
Deprecated, use :decision_task_lists instead.

@option config [Hash] :decision_task_lists

The task list names for decisions per :unit.

@option config [String] :domain The SWF domain name. @option config [String] :execution_id_prefix

(value of :unit) Workflow ID prefix
(see {Client#start_execution}'s :execution_id for allowed values).

@option config [Numeric] :execution_version

Value allowing clients to reject future execution versions.

@option config [String] :identity_suffix

When polling for a task, the suffix will be appended to the (default) identity
(<hostname>:<pid>), delimited by a ":".
Allows to distinguish worker activity on different hosts with identical hostnames.

@option config [String] :isolation_file

Development/test option.
A random ID is stored at the given path, and prepended to task list names and execution IDs.

@option config [String] :pidfile

A path receiving the current PID for looping methods. Causes exit, if
overwritten by another process. See {Worker#in_subprocess}.

@option config [String] :secret_access_key

*deprecated:* AWS credential. Deprecated, use :swf instead.

@option config [Numeric] :subprocess_retries (0) See {Worker#in_subprocess}. @option config [AWS::SimpleWorkflow] :swf

AWS simple workflow object (created e.g. with AWS::SimpleWorkflow.new).

@option config [String] :unit This worker/client's activity task list key. @raise [Errors::InvalidArgument] If a task list name is invalid.

# File lib/ntswf/base.rb, line 40
def configure(config)
  @config = OpenStruct.new(config)
  raise_if_invalid_task_list
end
decision_task_list(unit: nil) click to toggle source
# File lib/ntswf/base.rb, line 93
def decision_task_list(unit: nil)
  unit ||= default_unit
  decision_task_lists[unit] || decision_task_lists[default_unit] or
      raise Errors::InvalidArgument.new(
      "Missing decision task list configuration for unit '#{unit}'")
end
decision_task_lists() click to toggle source
# File lib/ntswf/base.rb, line 89
def decision_task_lists
  autocompleted_decision_task_lists || fallback_decision_task_lists
end
default_unit() click to toggle source
# File lib/ntswf/base.rb, line 100
def default_unit
  @default_unit ||= @config.unit.to_s
end
domain() click to toggle source
# File lib/ntswf/base.rb, line 75
def domain
  @domain ||= swf.domains[@config.domain]
end
execution_id_prefix() click to toggle source
# File lib/ntswf/base.rb, line 104
def execution_id_prefix
  "#{isolation_id}#{@config.execution_id_prefix || default_unit}"
end
execution_version() click to toggle source
# File lib/ntswf/base.rb, line 108
def execution_version
  @config.execution_version
end
notify(message, params) click to toggle source
# File lib/ntswf/base.rb, line 123
def notify(message, params)
  log("#{message.message}\n  #{message.backtrace.join("\n  ")}") if message.kind_of? Exception
  @notify_callback.call(message: message, params: params) if @notify_callback
end
on_notify(proc = nil, &block) click to toggle source

Configure a proc or block to be called on handled errors @yieldparam error [Hash]

Description of the error:
:message:: The error message or the exception
:params:: Error details

@param proc [Proc] The callback

# File lib/ntswf/base.rb, line 51
def on_notify(proc = nil, &block)
  @notify_callback = proc || block
end
parse_input(input) click to toggle source

Parse the options stored in a task's input value @param input [String] A task's input @return [Hash] Input, converted back from JSON @see Ntswf::Client#start_execution Hash keys to be expected

# File lib/ntswf/base.rb, line 116
def parse_input(input)
  options, legacy_params = JSON.parse(input)
  options = {"name" => options} unless options.kind_of? Hash
  options.merge!("params" => legacy_params) if legacy_params
  options
end
separator() click to toggle source

@return [String] separator for composite workflow_id

# File lib/ntswf/base.rb, line 129
def separator
  ";"
end
swf() click to toggle source

@return [AWS::SimpleWorkflow]

# File lib/ntswf/base.rb, line 56
def swf
  @swf ||= (@config.swf || AWS::SimpleWorkflow.new({
    access_key_id: @config.access_key_id,
    secret_access_key: @config.secret_access_key,
  }))
end
workflow_name() click to toggle source
# File lib/ntswf/base.rb, line 63
def workflow_name
  "master-workflow"
end
workflow_version() click to toggle source
# File lib/ntswf/base.rb, line 67
def workflow_version
  "v1"
end

Protected Instance Methods

all_task_list_names() click to toggle source
# File lib/ntswf/base.rb, line 161
def all_task_list_names
  [*activity_task_lists.values, *decision_task_lists.values, *@config.decision_task_list]
end
announce(s) click to toggle source
# File lib/ntswf/base.rb, line 139
def announce(s)
  $0 = s
  log(s)
end
autocomplete(value, fallback) click to toggle source
# File lib/ntswf/base.rb, line 183
def autocomplete(value, fallback)
  value = fallback unless value.kind_of? String
  "#{isolation_id}#{value}"
end
autocompleted_activity_task_lists() click to toggle source
# File lib/ntswf/base.rb, line 165
def autocompleted_activity_task_lists
  autocompleted_task_lists(@config.activity_task_lists, :atl)
end
autocompleted_decision_task_lists() click to toggle source
# File lib/ntswf/base.rb, line 169
def autocompleted_decision_task_lists
  autocompleted_task_lists(@config.decision_task_lists, :dtl)
end
autocompleted_task_lists(raw_task_lists, suffix) click to toggle source
# File lib/ntswf/base.rb, line 173
def autocompleted_task_lists(raw_task_lists, suffix)
  Hash(raw_task_lists).map do |unit, name|
    {unit => autocomplete(name, "#{unit}-#{suffix}")}
  end.reduce(:merge)
end
fallback_decision_task_lists() click to toggle source
# File lib/ntswf/base.rb, line 179
def fallback_decision_task_lists
  {default_unit => autocomplete(@config.decision_task_list, "master-dtl")}
end
isolation_id() click to toggle source
# File lib/ntswf/base.rb, line 188
def isolation_id
  file = @config.isolation_file || @config.task_list_suffix_file
  return "" unless file
  File.write(file, SecureRandom.hex(9)) unless File.exist?(file)
  @isolation_id ||= File.read(file)
end
log(s) click to toggle source
# File lib/ntswf/base.rb, line 144
def log(s)
  $stdout.puts("#{Process.pid} #{s}")
end
raise_if_invalid_task_list() click to toggle source
# File lib/ntswf/base.rb, line 148
def raise_if_invalid_task_list
  all_task_list_names.each do |task_list|
    if task_list.include?(separator)
      raise Errors::InvalidArgument.new(
          "Invalid config '#{task_list}': Separator '#{separator}' is reserved for internal use.")
    end
    if task_list.count(". ") > 0
      raise Errors::InvalidArgument.new(
          "Invalid config '#{task_list}': Dots and spaces not allowed.")
    end
  end
end