class MessageChannel::Observer

Public Class Methods

new( **_options ) click to toggle source
# File lib/message_channel/observer.rb, line 18
def initialize( **_options )
  @asyncs  =  {}
  @awaits  =  {}
  @queues  =  {}
  @@Agent  ||=  Agent.new
  @@Agent.add_observer( self, :action ) 
end

Public Instance Methods

action( topic, message ) click to toggle source
# File lib/message_channel/observer.rb, line 26
def action( topic, message )
  items  =  JSON.parse( message, symbolize_names: true )
  @asyncs.keys.each do |pattern|
    if File.fnmatch( pattern, topic, File::FNM_PATHNAME )
      if ( action  =  @asyncs[pattern] )
        action.call( topic, items )
      end
    end
  end
  @awaits.keys.each do |queue|
    @awaits[queue].each do |pattern|
      if File.fnmatch( pattern, topic, File::FNM_PATHNAME )
        queue.push( [topic, items] )
      end
    end
  end
end
listen( *patterns, timeout: nil, &block ) click to toggle source
# File lib/message_channel/observer.rb, line 58
def listen( *patterns, timeout: nil, &block )
  if block_given?
    listen_each( *patterns ) do |topic, items|
      block.call( topic, items )
    end
    return  nil
  end
  if timeout.nil? || ( timeout.is_a?( Numeric ) && timeout >= 0 )
    begin
      Timeout.timeout( timeout ) do
        listen_once( *patterns )
      end
    rescue  Timeout::Error
      return  nil
    end
  else
    raise  ArgumentError, "timeout: %s" % timeout
  end
end
listen_each( *patterns, &block ) click to toggle source
# File lib/message_channel/observer.rb, line 52
def listen_each( *patterns, &block )
  patterns.each do |pattern|
    @asyncs[pattern]  =  block
  end
end
listen_once( *patterns ) click to toggle source
# File lib/message_channel/observer.rb, line 44
def listen_once( *patterns )
  queue  =  Queue.new
  @awaits[queue]  =  patterns
  topic, items  =  * queue.pop
  @awaits.delete( queue )    rescue nil
  [topic, items]
end
notify( topic, **items ) click to toggle source
# File lib/message_channel/observer.rb, line 86
def notify( topic, **items )
  @@Agent.notify( topic, items.to_json )
end
unlisten( *patterns ) click to toggle source
# File lib/message_channel/observer.rb, line 78
def unlisten( *patterns )
  patterns.each do |pattern|
    if ( action  =  @asyncs[pattern] )
      @asyncs.delete( pattern )
    end
  end
end