class MessageChannel::Observer
Public Class Methods
new( **_options )
click to toggle source
# File lib/message_channel/observer.rb, line 18 def initialize( **_options ) @asyncs = {} @awaits = {} @queues = {} @@Agent ||= Agent.new @@Agent.add_observer( self, :action ) end
Public Instance Methods
action( topic, message )
click to toggle source
# File lib/message_channel/observer.rb, line 26 def action( topic, message ) items = JSON.parse( message, symbolize_names: true ) @asyncs.keys.each do |pattern| if File.fnmatch( pattern, topic, File::FNM_PATHNAME ) if ( action = @asyncs[pattern] ) action.call( topic, items ) end end end @awaits.keys.each do |queue| @awaits[queue].each do |pattern| if File.fnmatch( pattern, topic, File::FNM_PATHNAME ) queue.push( [topic, items] ) end end end end
listen( *patterns, timeout: nil, &block )
click to toggle source
# File lib/message_channel/observer.rb, line 58 def listen( *patterns, timeout: nil, &block ) if block_given? listen_each( *patterns ) do |topic, items| block.call( topic, items ) end return nil end if timeout.nil? || ( timeout.is_a?( Numeric ) && timeout >= 0 ) begin Timeout.timeout( timeout ) do listen_once( *patterns ) end rescue Timeout::Error return nil end else raise ArgumentError, "timeout: %s" % timeout end end
listen_each( *patterns, &block )
click to toggle source
# File lib/message_channel/observer.rb, line 52 def listen_each( *patterns, &block ) patterns.each do |pattern| @asyncs[pattern] = block end end
listen_once( *patterns )
click to toggle source
# File lib/message_channel/observer.rb, line 44 def listen_once( *patterns ) queue = Queue.new @awaits[queue] = patterns topic, items = * queue.pop @awaits.delete( queue ) rescue nil [topic, items] end
notify( topic, **items )
click to toggle source
# File lib/message_channel/observer.rb, line 86 def notify( topic, **items ) @@Agent.notify( topic, items.to_json ) end
unlisten( *patterns )
click to toggle source
# File lib/message_channel/observer.rb, line 78 def unlisten( *patterns ) patterns.each do |pattern| if ( action = @asyncs[pattern] ) @asyncs.delete( pattern ) end end end