class WithEvents::Stream

Attributes

subscribed[RW]
configuration[R]
events[R]
klass[R]
name[R]
subscribe[R]
topic[R]
watchers[R]

Public Class Methods

find(name) click to toggle source
# File lib/with_events/stream.rb, line 59
def find(name)
  streams.find { |s| s.name == name }
end
find_or_initialize(name, klass, options = {}) click to toggle source
# File lib/with_events/stream.rb, line 55
def find_or_initialize(name, klass, options = {})
  find(name) || new(name, klass, options)
end
new(name, klass, options = {}) click to toggle source
# File lib/with_events/stream.rb, line 7
def initialize(name, klass, options = {})
  @name = name
  @klass = klass
  @events = []
  @watchers = {}
  @topic = options[:topic]
  @configuration = {}
  @subscribe = options[:subscribe]

  self.class.streams << self
end
streams() click to toggle source
# File lib/with_events/stream.rb, line 51
def streams
  @streams ||= []
end
subscribe() click to toggle source
# File lib/with_events/stream.rb, line 63
def subscribe
  return if subscribed || !streams.find { |s| s.topic && s.subscribe }
  self.subscribed = true

  Aws::Topic.new.subscribe(async: true, timeout: 0) do |message, topic|
    selected = stream_events(message, topic)
    selected.each { |event| notify_event(event, message) }.size.positive?
  end
end

Private Class Methods

notify_event(event, message) click to toggle source
# File lib/with_events/stream.rb, line 85
def notify_event(event, message)
  context = Invoker.new(event.finder)
                   .invoke(TOPLEVEL_BINDING.eval('self'), message)
  event.stream.notify_watchers(event, context)
  Invoker.new(event.callback).invoke(context)
end
stream_events(message, topic_name) click to toggle source
# File lib/with_events/stream.rb, line 75
def stream_events(message, topic_name)
  stream = find(message.stream.to_sym)
  return [] unless stream&.subscribe && stream&.topic&.to_s == topic_name
  stream.events.select { |event| valid_event?(event, message) }
end
valid_event?(event, message) click to toggle source
# File lib/with_events/stream.rb, line 81
def valid_event?(event, message)
  event.finder && event.callback && message.event.to_sym == event.name
end

Public Instance Methods

configure_all(options = {}) click to toggle source
# File lib/with_events/stream.rb, line 29
def configure_all(options = {})
  @configuration = options
end
event(name, options = {}) click to toggle source
# File lib/with_events/stream.rb, line 19
def event(name, options = {})
  events <<
    Event.new(name, klass, options.merge(configuration).merge(stream: self))
end
notify(event, resource) click to toggle source
# File lib/with_events/stream.rb, line 38
def notify(event, resource)
  notify_sqs(event, resource) if topic
  notify_watchers(event, resource)
end
notify_watchers(event, resource) click to toggle source
# File lib/with_events/stream.rb, line 43
def notify_watchers(event, resource)
  return if watchers[event.name].nil?
  watchers[event.name].each { |watcher| resource.instance_exec(&watcher) }
end
on(name, &block) click to toggle source
# File lib/with_events/stream.rb, line 33
def on(name, &block)
  watchers[name] ||= []
  watchers[name] << block
end
reset_configure_all() click to toggle source
# File lib/with_events/stream.rb, line 24
def reset_configure_all
  @configuration = {}
  self
end

Private Instance Methods

notify_sqs(event, resource) click to toggle source
# File lib/with_events/stream.rb, line 97
def notify_sqs(event, resource)
  Aws::Publisher.new(event, resource).publish
end