class WithEvents::Stream
Attributes
subscribed[RW]
configuration[R]
events[R]
klass[R]
name[R]
subscribe[R]
topic[R]
watchers[R]
Public Class Methods
find(name)
click to toggle source
# File lib/with_events/stream.rb, line 59 def find(name) streams.find { |s| s.name == name } end
find_or_initialize(name, klass, options = {})
click to toggle source
# File lib/with_events/stream.rb, line 55 def find_or_initialize(name, klass, options = {}) find(name) || new(name, klass, options) end
new(name, klass, options = {})
click to toggle source
# File lib/with_events/stream.rb, line 7 def initialize(name, klass, options = {}) @name = name @klass = klass @events = [] @watchers = {} @topic = options[:topic] @configuration = {} @subscribe = options[:subscribe] self.class.streams << self end
streams()
click to toggle source
# File lib/with_events/stream.rb, line 51 def streams @streams ||= [] end
subscribe()
click to toggle source
# File lib/with_events/stream.rb, line 63 def subscribe return if subscribed || !streams.find { |s| s.topic && s.subscribe } self.subscribed = true Aws::Topic.new.subscribe(async: true, timeout: 0) do |message, topic| selected = stream_events(message, topic) selected.each { |event| notify_event(event, message) }.size.positive? end end
Private Class Methods
notify_event(event, message)
click to toggle source
# File lib/with_events/stream.rb, line 85 def notify_event(event, message) context = Invoker.new(event.finder) .invoke(TOPLEVEL_BINDING.eval('self'), message) event.stream.notify_watchers(event, context) Invoker.new(event.callback).invoke(context) end
stream_events(message, topic_name)
click to toggle source
# File lib/with_events/stream.rb, line 75 def stream_events(message, topic_name) stream = find(message.stream.to_sym) return [] unless stream&.subscribe && stream&.topic&.to_s == topic_name stream.events.select { |event| valid_event?(event, message) } end
valid_event?(event, message)
click to toggle source
# File lib/with_events/stream.rb, line 81 def valid_event?(event, message) event.finder && event.callback && message.event.to_sym == event.name end
Public Instance Methods
configure_all(options = {})
click to toggle source
# File lib/with_events/stream.rb, line 29 def configure_all(options = {}) @configuration = options end
event(name, options = {})
click to toggle source
# File lib/with_events/stream.rb, line 19 def event(name, options = {}) events << Event.new(name, klass, options.merge(configuration).merge(stream: self)) end
notify(event, resource)
click to toggle source
# File lib/with_events/stream.rb, line 38 def notify(event, resource) notify_sqs(event, resource) if topic notify_watchers(event, resource) end
notify_watchers(event, resource)
click to toggle source
# File lib/with_events/stream.rb, line 43 def notify_watchers(event, resource) return if watchers[event.name].nil? watchers[event.name].each { |watcher| resource.instance_exec(&watcher) } end
on(name, &block)
click to toggle source
# File lib/with_events/stream.rb, line 33 def on(name, &block) watchers[name] ||= [] watchers[name] << block end
reset_configure_all()
click to toggle source
# File lib/with_events/stream.rb, line 24 def reset_configure_all @configuration = {} self end
Private Instance Methods
notify_sqs(event, resource)
click to toggle source
# File lib/with_events/stream.rb, line 97 def notify_sqs(event, resource) Aws::Publisher.new(event, resource).publish end