class FnordMetric::STOMPAcceptor

Public Class Methods

new(opts) click to toggle source
# File lib/fnordmetric/acceptors/stomp_acceptor.rb, line 14
def initialize(opts)
  @mutex = Mutex.new

  client = Stomp::Client.new(:hosts => [{
    :host => opts[:host],
    :port => opts[:port],
    :passcode => opts[:password],
    :login => opts[:username]}])

  msg_handler = lambda do |topic, msg|
    data = msg.body

    event = begin
      JSON.parse(data)
    rescue
      FnordMetric.log("[STOMP] received invalid JSON: #{data[0..60]}")
    end

    if event
      event["_type"] ||= topic.gsub(/^\/topic\//, '')
      @mutex.synchronize{ events << event }
    end
  end

  opts[:topics].each do |topic|
    client.subscribe(topic){ |data| msg_handler[topic, data] }
  end

  Thread.new do
    client.join
  end

  EM.next_tick(&method(:push_next_event))
end
outbound?() click to toggle source
# File lib/fnordmetric/acceptors/stomp_acceptor.rb, line 67
def self.outbound?
  true
end
start(opts) click to toggle source
# File lib/fnordmetric/acceptors/stomp_acceptor.rb, line 3
def self.start(opts)
  begin
    require "stomp"
  rescue LoadError
    FnordMetric.error("require 'stomp' failed, you need the stomp gem")
    exit 1
  end

  new(opts)
end

Public Instance Methods

api() click to toggle source
# File lib/fnordmetric/acceptors/stomp_acceptor.rb, line 63
def api
  @api ||= FnordMetric::API.new(FnordMetric.options)
end
events() click to toggle source
# File lib/fnordmetric/acceptors/stomp_acceptor.rb, line 59
def events
  @events ||= []
end
push_next_event() click to toggle source
# File lib/fnordmetric/acceptors/stomp_acceptor.rb, line 49
def push_next_event
  nxt = @mutex.synchronize{ events.pop }
  unless nxt
    EM::Timer.new(0.01, &method(:push_next_event))
    return true
  end
  api.event(nxt)
  EM.next_tick(&method(:push_next_event))
end