class Opener::Daemons::Worker
Downlods a KAF document, passes it to a component and submits the output to a callback URL or a default queue. Each Worker
instance runs in an isolated thread
@!attribute [r] config
@return [Opener::Daemons::Configuration]
@!attribute [r] uploader
@return [Opener::Daemons::Uploader]
@!attribute [r] downloader
@return [Opener::Daemons::Downloader]
@!attribute [r] callback_handler
@return [Opener::CallbackHandler]
Constants
- INLINE_IO
Attributes
callback_handler[R]
config[R]
downloader[R]
uploader[R]
Public Class Methods
new(config)
click to toggle source
@param [Opener::Daemons::Configuration] config
# File lib/opener/daemons/worker.rb, line 31 def initialize(config) @config = config @downloader = Downloader.new @uploader = Uploader.new @callback_handler = CallbackHandler.new @input = nil @output = nil end
Public Instance Methods
handle_unsupported_language()
click to toggle source
Sends the unsupported input to the last callback URL.
# File lib/opener/daemons/worker.rb, line 114 def handle_unsupported_language last_url = config.callbacks.last callback_handler.post last_url, input_params.merge( identifier: config.identifier, metadata: config.metadata, ) Core::Syslog.info( "Submitted input with an unsupported language to #{last_url}", config.metadata ) end
process()
click to toggle source
Processes a document.
@raise [Oni::WrappedError]
# File lib/opener/daemons/worker.rb, line 45 def process add_transaction_attributes begin process_input run_component process_output submit_callbacks # Unsupported languages are handled in a different manner as they can # occur quite often. In these cases we _do_ want the data to be sent # to the final callback URL (skipping whatever comes before it) so it # can act upon it. rescue Core::UnsupportedLanguageError handle_unsupported_language end end
process_input()
click to toggle source
# File lib/opener/daemons/worker.rb, line 65 def process_input if config.input @input = Zlib.gunzip Base64.decode64 config.input @input.force_encoding 'UTF-8' else @input = downloader.download config.input_url end end
process_output()
click to toggle source
@param [String] output @return [Aws::S3::Object]
# File lib/opener/daemons/worker.rb, line 85 def process_output if INLINE_IO @next_input = Base64.encode64 Zlib.gzip @output else @object = uploader.upload config.identifier, @output, config.metadata end end
run_component()
click to toggle source
@return [String]
# File lib/opener/daemons/worker.rb, line 77 def run_component @output = config.component_instance.run @input, config.metadata['custom_config'] end
submit_callbacks()
click to toggle source
Sends the object's URL to the next callback URL.
@param [Aws::S3::Object] object
# File lib/opener/daemons/worker.rb, line 98 def submit_callbacks urls = config.callbacks.dup next_url = urls.shift callback_handler.post next_url, next_input_params.merge( identifier: config.identifier, callbacks: urls, metadata: config.metadata, ) Core::Syslog.info("Submitted response to #{next_url}", config.metadata) end
Private Instance Methods
add_transaction_attributes()
click to toggle source
# File lib/opener/daemons/worker.rb, line 152 def add_transaction_attributes Transaction.current.add_parameters( input_url: config.input_url, identifier: config.identifier, callbacks: config.callbacks, metadata: config.metadata, ) end
input_params()
click to toggle source
Preserve input for last callback
# File lib/opener/daemons/worker.rb, line 133 def input_params if config.input_url {input_url: config.input_url} else {input: config.input} end end
next_input_params()
click to toggle source
Use generated output as new input
# File lib/opener/daemons/worker.rb, line 144 def next_input_params if INLINE_IO {input: @next_input} else {input_url: @object.public_url.to_s} end end