class MessageChannel::Druby::Broker

Public Class Methods

new() click to toggle source
# File lib/message_channel/druby.rb, line 10
def initialize
  @mutex  =  Mutex.new
  @patterns  =  {}
end

Public Instance Methods

fetch( pattern ) click to toggle source
# File lib/message_channel/druby.rb, line 36
def fetch( pattern )
  queue  =  Queue.new
  queue_id  =  queue.object_id
  @mutex.synchronize do
    @patterns[queue_id]  =  [pattern, queue]
  end
  topic, items  =  * queue.pop
rescue
  nil
ensure
  @mutex.synchronize do
    @patterns.delete( queue_id )    rescue  nil
  end
end
publish( topic, message ) click to toggle source
# File lib/message_channel/druby.rb, line 51
def publish( topic, message )
  @mutex.synchronize do
    @patterns.each do |_queue_id, items|
      pattern, queue  =  *items
      if File.fnmatch( pattern, topic, File::FNM_PATHNAME )
        queue.push( [topic, message] )
      end
    end
  end
end
subscribe( pattern ) click to toggle source
# File lib/message_channel/druby.rb, line 15
def subscribe( pattern )
  queue  =  Queue.new
  queue_id  =  queue.object_id
  @mutex.synchronize do
    @patterns[queue_id]  =  [pattern, queue]
  end
  queue_id
end
unsubscribe( queue_id ) click to toggle source
# File lib/message_channel/druby.rb, line 24
def unsubscribe( queue_id )
  @mutex.synchronize do
    @patterns.delete( queue_id )    rescue  nil
  end
rescue
  nil
end
wait( queue_id ) click to toggle source
# File lib/message_channel/druby.rb, line 32
def wait( queue_id )
  @patterns[queue_id].last.pop
end