class Hivent::Redis::Consumer
Constants
- CONSUMER_TTL
- LUA_CONSUMER
- LUA_HEARTBEAT
- SLEEP_TIME
In milliseconds
Public Class Methods
new(redis, service_name, name, life_cycle_event_handler)
click to toggle source
# File lib/hivent/redis/consumer.rb, line 16 def initialize(redis, service_name, name, life_cycle_event_handler) @redis = redis @service_name = service_name @name = name @stop = false @life_cycle_event_handler = life_cycle_event_handler end
Public Instance Methods
consume()
click to toggle source
# File lib/hivent/redis/consumer.rb, line 38 def consume to_process = items to_process.each do |(queue, item)| payload = nil begin payload = JSON.parse(item).with_indifferent_access Hivent.emitter.broadcast(payload) @life_cycle_event_handler.event_processing_succeeded(event_name(payload), event_version(payload), payload) rescue => e @redis.lpush(dead_letter_queue_name(queue), item) @life_cycle_event_handler.event_processing_failed(e, payload, item, dead_letter_queue_name(queue)) ensure @redis.rpop(queue) end end Kernel.sleep(SLEEP_TIME.to_f / 1000) if to_process.empty? end
queues()
click to toggle source
# File lib/hivent/redis/consumer.rb, line 34 def queues script(LUA_CONSUMER, @service_name, @name, CONSUMER_TTL) || [] end
run!()
click to toggle source
# File lib/hivent/redis/consumer.rb, line 24 def run! start_heartbeat! consume while !@stop end
stop!()
click to toggle source
# File lib/hivent/redis/consumer.rb, line 29 def stop! @stop = true stop_heartbeat! end
Private Instance Methods
dead_letter_queue_name(queue)
click to toggle source
# File lib/hivent/redis/consumer.rb, line 97 def dead_letter_queue_name(queue) "#{queue}:dead_letter" end
event_name(payload)
click to toggle source
# File lib/hivent/redis/consumer.rb, line 89 def event_name(payload) payload["meta"].try(:[], "name") end
event_version(payload)
click to toggle source
# File lib/hivent/redis/consumer.rb, line 93 def event_version(payload) payload["meta"].try(:[], "version") end
heartbeat!()
click to toggle source
# File lib/hivent/redis/consumer.rb, line 79 def heartbeat! script(LUA_HEARTBEAT, @service_name, @name, CONSUMER_TTL) end
items()
click to toggle source
# File lib/hivent/redis/consumer.rb, line 83 def items queues .map { |queue| [queue, @redis.lindex(queue, -1)] } .select { |(_queue, item)| item } end
start_heartbeat!()
click to toggle source
# File lib/hivent/redis/consumer.rb, line 63 def start_heartbeat! stop_heartbeat! @heartbeat = Thread.new do loop do heartbeat! Kernel.sleep(SLEEP_TIME.to_f / 1000) end end end
stop_heartbeat!()
click to toggle source
# File lib/hivent/redis/consumer.rb, line 75 def stop_heartbeat! @heartbeat.exit if @heartbeat.present? end