class LogStash::Outputs::WebSocket::Pubsub
Attributes
logger[RW]
Public Class Methods
new()
click to toggle source
# File lib/logstash/outputs/websocket/pubsub.rb, line 8 def initialize @subscribers = [] @subscribers_lock = Mutex.new end
Public Instance Methods
publish(object)
click to toggle source
# File lib/logstash/outputs/websocket/pubsub.rb, line 13 def publish(object) @subscribers_lock.synchronize do break if @subscribers.size == 0 failed = [] @subscribers.each do |subscriber| begin subscriber.call(object) rescue => e @logger.error("Failed to publish to subscriber", :subscriber => subscriber, :exception => e) failed << subscriber end end failed.each do |subscriber| @subscribers.delete(subscriber) end end # @subscribers_lock.synchronize end
subscribe(&block)
click to toggle source
# File lib/logstash/outputs/websocket/pubsub.rb, line 33 def subscribe(&block) queue = Queue.new @subscribers_lock.synchronize do @subscribers << proc do |event| queue << event end end while true block.call(queue.pop) end end