class FnordMetric::AMQPAcceptor

Public Class Methods

new(opts) click to toggle source
# File lib/fnordmetric/acceptors/amqp_acceptor.rb, line 14
def initialize(opts)
  amqp = AMQP.connect(:host => 'firehose')
  amqp_channel = AMQP::Channel.new(amqp)

  msg_handler = lambda do |channel, data|
    event = begin
      JSON.parse(data)
    rescue
      FnordMetric.log("[AMQP] received invalid JSON: #{data[0..60]}")
    end

    if event
      event["_type"] ||= channel
      events << event
      push_next_event
    end
  end

  opts[:channels].each do |channel|
    queue = amqp_channel.queue(channel, :auto_delete => true)
    queue.subscribe{ |data| msg_handler[channel, data] }
  end
end
outbound?() click to toggle source
# File lib/fnordmetric/acceptors/amqp_acceptor.rb, line 52
def self.outbound?
  true
end
start(opts) click to toggle source
# File lib/fnordmetric/acceptors/amqp_acceptor.rb, line 3
def self.start(opts)
  begin
    require "amqp"
  rescue LoadError
    FnordMetric.error("require 'amqp' failed, you need the amqp gem")
    exit 1
  end

  new(opts)
end

Public Instance Methods

api() click to toggle source
# File lib/fnordmetric/acceptors/amqp_acceptor.rb, line 48
def api
  @api ||= FnordMetric::API.new(FnordMetric.options)
end
events() click to toggle source
# File lib/fnordmetric/acceptors/amqp_acceptor.rb, line 44
def events
  @events ||= []
end
push_next_event() click to toggle source
# File lib/fnordmetric/acceptors/amqp_acceptor.rb, line 38
def push_next_event
  return true if events.empty?
  api.event(@events.pop)
  EM.next_tick(&method(:push_next_event))
end