class MagicPipe::Senders::Async::Worker

Public Instance Methods

perform(decomposed_object, topic, time, client_name) click to toggle source
# File lib/magic_pipe/senders/async.rb, line 12
def perform(decomposed_object, topic, time, client_name)
  client = MagicPipe.lookup_client(client_name)
  object = client.loader.load(decomposed_object)
  codec = client.codec

  metadata = {
    topic: topic,
    producer: client.config.producer_name,
    time: time.to_i,
    mime: codec::TYPE
  }

  envelope = Envelope.new(
    body: object,
    **metadata
  )

  payload = codec.new(envelope).encode
  client.transport.submit!(payload, metadata)

  track_success(client.metrics, topic)
rescue => e
  track_failure(client.metrics, topic)
  raise e
end