class Deepstream::EventHandler
Public Class Methods
new(client)
click to toggle source
# File lib/deepstream/event_handler.rb, line 8 def initialize(client) @client = client @callbacks = {} @listeners = {} @ack_timeout_registry = AckTimeoutRegistry.new(@client) end
Public Instance Methods
emit(event, *args, timeout: @client.options[:emit_timeout], **kwargs)
click to toggle source
# File lib/deepstream/event_handler.rb, line 53 def emit(event, *args, timeout: @client.options[:emit_timeout], **kwargs) data = Helpers.message_data(*args, **kwargs) @client.send_message(TOPIC::EVENT, ACTION::EVENT, event, Helpers.to_deepstream_type(data), timeout: timeout) rescue => e @client.on_exception(e) end
listen(pattern, &block)
click to toggle source
# File lib/deepstream/event_handler.rb, line 26 def listen(pattern, &block) pattern = pattern.is_a?(Regexp) ? pattern.source : pattern @listeners[pattern] = block @client.send_message(TOPIC::EVENT, ACTION::LISTEN, pattern) @ack_timeout_registry.add(pattern, "No ACK message received in time for #{pattern}") rescue => e @client.on_exception(e) end
on(event, &block)
click to toggle source
# File lib/deepstream/event_handler.rb, line 15 def on(event, &block) unless @callbacks[event] @client.send_message(TOPIC::EVENT, ACTION::SUBSCRIBE, event) if @client.state == CONNECTION_STATE::OPEN @ack_timeout_registry.add(event, "No ACK message received in time for #{event}") end @callbacks[event] = block rescue => e @client.on_exception(e) end
Also aliased as: subscribe
on_message(message)
click to toggle source
# File lib/deepstream/event_handler.rb, line 43 def on_message(message) case message.action when ACTION::ACK then @ack_timeout_registry.cancel(message.data.last) when ACTION::EVENT then fire_event_callback(message) when ACTION::SUBSCRIPTION_FOR_PATTERN_FOUND then fire_listen_callback(message) when ACTION::SUBSCRIPTION_FOR_PATTERN_REMOVED then fire_listen_callback(message) else raise(UnknownAction, message) end end
resubscribe()
click to toggle source
# File lib/deepstream/event_handler.rb, line 67 def resubscribe @callbacks.keys.each { |event| @client.send_message(TOPIC::EVENT, ACTION::SUBSCRIBE, event) } @listeners.keys.each { |pattern| @client.send_message(TOPIC::EVENT, ACTION::LISTEN, pattern) } rescue => e @client.on_exception(e) end
unlisten(pattern)
click to toggle source
# File lib/deepstream/event_handler.rb, line 35 def unlisten(pattern) pattern = pattern.is_a?(Regexp) ? pattern.source : pattern @listeners.delete(pattern) @client.send_message(TOPIC::EVENT, ACTION::UNLISTEN, pattern) rescue => e @client.on_exception(e) end
unsubscribe(event)
click to toggle source
# File lib/deepstream/event_handler.rb, line 60 def unsubscribe(event) @callbacks.delete(event) @client.send_message(TOPIC::EVENT, ACTION::UNSUBSCRIBE, event) rescue => e @client.on_exception(e) end
Private Instance Methods
fire_event_callback(message)
click to toggle source
# File lib/deepstream/event_handler.rb, line 76 def fire_event_callback(message) event, data = message.data @callbacks[event].call(Helpers.to_type(data)) end
fire_listen_callback(message)
click to toggle source
# File lib/deepstream/event_handler.rb, line 81 def fire_listen_callback(message) is_subscribed = message.action == ACTION::SUBSCRIPTION_FOR_PATTERN_FOUND pattern, event = message.data return @client.on_error(pattern) unless @listeners[pattern] @listeners[pattern].call(is_subscribed, event) end