class FnordMetric::Worker

Public Class Methods

new(opts = {}) click to toggle source
# File lib/fnordmetric/worker.rb, line 3
def initialize(opts = {})
  @namespaces = FnordMetric.namespaces
  @opts = FnordMetric.options(opts)

  FnordMetric.register(self)
end

Public Instance Methods

announce_event(event) click to toggle source
# File lib/fnordmetric/worker.rb, line 57
def announce_event(event)
  namespace(event[:_namespace]).ready!(redis, sync_redis).announce(event)
end
event_key(event_id) click to toggle source
# File lib/fnordmetric/worker.rb, line 49
def event_key(event_id)
  [@opts[:redis_prefix], 'event', event_id].join("-")
end
expire_event(event_id) click to toggle source
# File lib/fnordmetric/worker.rb, line 61
def expire_event(event_id)
  redis.expire(event_key(event_id), @opts[:event_data_ttl])
end
initialized() click to toggle source
# File lib/fnordmetric/worker.rb, line 10
def initialized
  FnordMetric.log("worker started")
  EM.next_tick(&method(:tick))
end
namespace(key) click to toggle source
# File lib/fnordmetric/worker.rb, line 69
def namespace(key)
  (@namespaces[key] || @namespaces.first.last).clone
end
parse_json(data) click to toggle source
# File lib/fnordmetric/worker.rb, line 73
def parse_json(data)
  event = Yajl::Parser.new(:symbolize_keys => true).parse(data)
  event[:_namespace] = event[:_namespace].to_sym if event[:_namespace]
  event
rescue Yajl::ParseError => e
  FnordMetric.error "invalid json: #{e.to_s}"; false
end
process_event(event_id, event_data) click to toggle source
# File lib/fnordmetric/worker.rb, line 28
def process_event(event_id, event_data)
  EM.next_tick do
    event = parse_json(event_data)
    if event
      event[:_time] ||= Time.now.to_i
      event[:_eid] = event_id
      announce_event(event)
      publish_event(event)
      expire_event(event_id)
    end
  end
end
publish_event(event) click to toggle source
# File lib/fnordmetric/worker.rb, line 65
def publish_event(event)    
  redis.publish(pubsub_key, event.to_json)
end
pubsub_key() click to toggle source
# File lib/fnordmetric/worker.rb, line 41
def pubsub_key
  [@opts[:redis_prefix], 'announce'].join("-")
end
queue_key() click to toggle source
# File lib/fnordmetric/worker.rb, line 45
def queue_key
  [@opts[:redis_prefix], 'queue'].join("-")
end
redis() click to toggle source
# File lib/fnordmetric/worker.rb, line 81
def redis
  @redis ||= EM::Hiredis.connect(FnordMetric.options[:redis_url]) # FIXPAUL
end
stats_key() click to toggle source
# File lib/fnordmetric/worker.rb, line 53
def stats_key
  [@opts[:redis_prefix], 'stats'].join("-")
end
sync_redis() click to toggle source
# File lib/fnordmetric/worker.rb, line 85
def sync_redis
  @sync_redis ||= FnordMetric.mk_redis
end
tick() click to toggle source
# File lib/fnordmetric/worker.rb, line 15
def tick
  redis.blpop(queue_key, 1).callback do |list, event_id|
    EM.next_tick(&method(:tick))
    if event_id
      redis.get(event_key(event_id)).callback do |event_data|
        process_event(event_id, event_data) if event_data
        FnordMetric.log("event_lost: event_data not found for event-id '#{event_id}' - maybe expired?") unless event_data
        redis.hincrby(stats_key, :events_processed, 1)
      end
    end
  end
end