class FnordMetric::STOMPAcceptor
Public Class Methods
new(opts)
click to toggle source
# File lib/fnordmetric/acceptors/stomp_acceptor.rb, line 14 def initialize(opts) @mutex = Mutex.new client = Stomp::Client.new(:hosts => [{ :host => opts[:host], :port => opts[:port], :passcode => opts[:password], :login => opts[:username]}]) msg_handler = lambda do |topic, msg| data = msg.body event = begin JSON.parse(data) rescue FnordMetric.log("[STOMP] received invalid JSON: #{data[0..60]}") end if event event["_type"] ||= topic.gsub(/^\/topic\//, '') @mutex.synchronize{ events << event } end end opts[:topics].each do |topic| client.subscribe(topic){ |data| msg_handler[topic, data] } end Thread.new do client.join end EM.next_tick(&method(:push_next_event)) end
outbound?()
click to toggle source
# File lib/fnordmetric/acceptors/stomp_acceptor.rb, line 67 def self.outbound? true end
start(opts)
click to toggle source
# File lib/fnordmetric/acceptors/stomp_acceptor.rb, line 3 def self.start(opts) begin require "stomp" rescue LoadError FnordMetric.error("require 'stomp' failed, you need the stomp gem") exit 1 end new(opts) end
Public Instance Methods
api()
click to toggle source
# File lib/fnordmetric/acceptors/stomp_acceptor.rb, line 63 def api @api ||= FnordMetric::API.new(FnordMetric.options) end
events()
click to toggle source
# File lib/fnordmetric/acceptors/stomp_acceptor.rb, line 59 def events @events ||= [] end
push_next_event()
click to toggle source
# File lib/fnordmetric/acceptors/stomp_acceptor.rb, line 49 def push_next_event nxt = @mutex.synchronize{ events.pop } unless nxt EM::Timer.new(0.01, &method(:push_next_event)) return true end api.event(nxt) EM.next_tick(&method(:push_next_event)) end