class Spacebunny::Device::Amqp

Constants

ACK_TYPES
DEFAULT_CHANNEL_OPTIONS

Attributes

built_channels[R]
built_exchanges[R]
client[R]

Public Class Methods

new(*args) click to toggle source
Calls superclass method
# File lib/spacebunny/device/amqp.rb, line 11
def initialize(*args)
  super(:amqp, *args)
  @built_channels = {}
  @built_exchanges = {}
end

Public Instance Methods

connect() click to toggle source
# File lib/spacebunny/device/amqp.rb, line 17
def connect
  # 'Fix' attributes: start from common connection configs and adjust attributes to match what Bunny
  # wants as connection args
  connection_params = connection_configs.dup
  connection_params[:user] = connection_params.delete :device_id
  connection_params[:password] = connection_params.delete :secret
  # Default on a tls connection
  unless connection_params[:tls] == false
    connection_params[:port] = connection_params.delete(:tls_port)
  end
  connection_params[:log_level] = connection_params.delete(:log_level) || ::Logger::ERROR

  # Re-create client every time connect is called
  @client = Bunny.new(connection_params)
  @client.start
  logger.info 'Connected to SpaceBunny'
end
disconnect() click to toggle source
Calls superclass method
# File lib/spacebunny/device/amqp.rb, line 35
def disconnect
  super
  @built_exchanges = {}
  @built_channels = {}
  client.stop if client
  logger.info 'Disconnected from SpaceBunny'
end
inbox(options = {})
Alias for: on_receive
input_channel() click to toggle source
# File lib/spacebunny/device/amqp.rb, line 43
def input_channel
  return @input_channel if @input_channel
  @input_channel = client.create_channel
end
on_receive(options = {}) { |message| ... } click to toggle source
# File lib/spacebunny/device/amqp.rb, line 48
def on_receive(options = {})
  unless block_given?
    raise BlockRequired
  end
  blocking = options.fetch :wait, false
  to_ack, auto_ack = parse_ack options.fetch(:ack, :manual)

  input_queue.subscribe(block: blocking, manual_ack: to_ack) do |delivery_info, metadata, payload|
    message = Device::Message.new self, options, delivery_info, metadata, payload

    # Skip message if required
    if message.blacklisted?
      message.nack
      next
    end

    yield message

    # If ack is :auto then ack current message
    if to_ack && auto_ack
      message.ack
    end
  end
end
Also aliased as: inbox
publish(channel_name, message, options = {}) click to toggle source
# File lib/spacebunny/device/amqp.rb, line 74
def publish(channel_name, message, options = {})
  if check_client
    channel_key = if options[:with_confirm]
                    "#{channel_name}_confirm"
                  else
                    channel_name
                  end.to_sym

    unless @built_exchanges[channel_key]
      @built_exchanges[channel_key] = create_channel(channel_name, options)
    end
    # Call Bunny "publish"
    res = @built_exchanges[channel_key].publish message, channel_options(channel_name, options)
    @logger.debug 'Message published'
    res
  else
    @logger.debug 'Message NOT published due to client not connected'
    false
  end
end
wait_for_publish_confirms() click to toggle source
# File lib/spacebunny/device/amqp.rb, line 95
def wait_for_publish_confirms
  results = {}
  threads = []
  @built_channels.each do |name, channel|
    if channel.using_publisher_confirmations?
      threads << Thread.new do
        results[name] = { all_confirmed: channel.wait_for_confirms, nacked_set: channel.nacked_set }
      end
    end
  end
  threads.map{ |t| t.join }
  results
end

Private Instance Methods

channel_options(channel, options) click to toggle source

Merge default channel options with provided ones

# File lib/spacebunny/device/amqp.rb, line 112
def channel_options(channel, options)
  options.merge({routing_key: "#{id}.#{channel}" })
end
check_client() click to toggle source

Check if client has been prepared.

# File lib/spacebunny/device/amqp.rb, line 117
def check_client
  unless client
    raise ClientNotSetup
  end
  unless client.connected?
    if raise_on_error
      raise ClientNotConnected
    else
      @logger.error 'Client not connected! Check internet connection'
      return false
    end
  end
  true
end
create_channel(name, options = {}) click to toggle source
# File lib/spacebunny/device/amqp.rb, line 132
def create_channel(name, options = {})
  with_channel_check name do
    channel = client.create_channel
    if options.delete(:with_confirm)
      channel.confirm_select
    end
    @built_channels[name] = channel
    channel.direct(id, DEFAULT_CHANNEL_OPTIONS)
  end
end
input_queue() click to toggle source
# File lib/spacebunny/device/amqp.rb, line 143
def input_queue
  return @input_queue if @input_queue
  @input_queue = input_channel.queue "#{id}.inbox", passive: true
end
parse_ack(ack) click to toggle source
# File lib/spacebunny/device/amqp.rb, line 148
def parse_ack(ack)
  to_ack = false
  auto_ack = false
  if ack
    raise AckTypeError unless ACK_TYPES.include?(ack)
    to_ack = true
    case ack
      when :manual
        auto_ack = false
      when :auto
        auto_ack = true
    end
  end
  return to_ack, auto_ack
end