class PushyDaemon::Consumer

Public Class Methods

new(channel, rule_name, rule) click to toggle source
Calls superclass method
# File lib/pushyd/consumer.rb, line 14
def initialize(channel, rule_name, rule)
  # Init MqConsumer
  log_pipe :consumer
  super

  # Init
  @queue = nil
  @rule = rule
  @rule_name = rule_name
end

Protected Instance Methods

handle_message(context, metadata, delivery_info, message = {}) click to toggle source

Handle the reception of a message on a queue

# File lib/pushyd/consumer.rb, line 34
def handle_message context, metadata, delivery_info, message = {}
  # Prepare data
  headers = metadata.headers || {}

  # Relay data if needed
  handle_relay context, message, headers

  # Handle errors and acknowledgments
  # log_debug "handle_message : channel[#{@channel.inspect}]"
  rescue Exception => e
    log_error "handle_message: EXCEPTION: #{e.message}, #{e.inspect}", e.backtrace
    channel_ackit(message[:tag], false)
  rescue StandardError => e
    log_error "handle_message: unknown: #{e.message}, #{e.inspect}", e.backtrace
    channel_ackit(message[:tag], false)
  else
    channel_ackit(message[:tag], true)
end
log_context() click to toggle source
# File lib/pushyd/consumer.rb, line 27
def log_context
  {
    rule: @rule_name
  }
end

Private Instance Methods

channel_ackit(tag, success=true) click to toggle source
# File lib/pushyd/consumer.rb, line 122
def channel_ackit tag, success=true
  # log_debug "channel_ackit[#{channel}.#{tag}] #{@channel.inspect}"
#   if success
#     log_debug "channel_ackit[#{@channel.id}.#{tag}]: ACK"
#     @channel.ack(tag)
#   else
#     log_debug "channel_ackit[#{@channel.id}.#{tag}]: NACK"
#     @channel.nack(tag)
#   end

# rescue Bunny::ChannelAlreadyClosed => ex
#   error "channel_ackit[#{@channel.id}.#{tag}]: exception: ChannelAlreadyClosed"

# rescue StandardError => ex
#   log_debug "channel_ackit[#{@channel.id}.#{tag}]: exception: #{ex.inspect}"
#   # fail PushyDaemon::ConsumerSubscribeError, "unhandled (#{e.inspect})"

# else
#   log_debug "channel_ackit[#{@channel.id}.#{tag}]: done"
end
handle_relay(context, message, headers) click to toggle source
# File lib/pushyd/consumer.rb, line 55
def handle_relay context, message, headers
  # Check we have a valid @rule
  raise ConsumerRuleMissing unless @rule.is_a? Hash

  # Check if we need to relay anything
  unless @rule[:relay]
    # log_debug "handle_relay: no [relay] URL"
    return
  end

  # Prepare stuff
  relay_auth  = @rule[:auth].to_s
  relay_url   = URI(@rule[:relay]).to_s
  request_id  = identifier(6)
  request_prefix = "handle_relay [#{request_id}] "

  # Build payload
  request_infos = {
    topic:    message[:topic],
    route:    message[:rkey],
    sent_at:  headers['sent_at'],
    sent_by:  headers['sent_by'],
    context:  context,
    data:     message[:data],
    }
  request_body = JSON.pretty_generate(request_infos)

  # Build request headers
  headers = {
    content_type: :json,
    accept: :json,
    user_agent: BmcDaemonLib::Conf.generate_user_agent,
    }

  # Set custom headers if provided
  @rule[:headers].each do |header_name, header_value|
    log_debug "set header [#{header_name}] to [#{header_value}]"
    headers[header_name] = header_value.to_s
  end if @rule[:headers].is_a? Enumerable

  # Compute: payload MD5, HMAC signature
  headers_md5 headers, request_body
  headers_sign headers, @rule[:sign]

  # Build final request
  request = RestClient::Request.new url: relay_url,
    method: :post,
    payload: request_body,
    headers: headers

  # Execute request
  log_message MSG_RLAY, request_id, relay_url, request_infos, request.processed_headers
  response = request.execute

  # Handle exceptions
  rescue RestClient::ExceptionWithResponse, URI::InvalidURIError, RestClient::InternalServerError => e
    log_error "#{request_prefix} rest-client: #{e.message}"
  rescue ApiAuth::ApiAuthError, ApiAuth::UnknownHTTPRequest => e
    log_error "#{request_prefix} api-auth: #{e.message}"
  rescue Errno::ECONNREFUSED => e
    log_error "#{request_prefix} connection refused: #{e.message}"
  rescue StandardError => e
    log_error "#{request_prefix} unknown: #{e.message}, #{e.inspect}", e.backtrace
  else
    log_info "#{request_prefix} received [#{response.body}]"
end