class MessageChannel::Druby::Broker
Public Class Methods
new()
click to toggle source
# File lib/message_channel/druby.rb, line 10 def initialize @mutex = Mutex.new @patterns = {} end
Public Instance Methods
fetch( pattern )
click to toggle source
# File lib/message_channel/druby.rb, line 36 def fetch( pattern ) queue = Queue.new queue_id = queue.object_id @mutex.synchronize do @patterns[queue_id] = [pattern, queue] end topic, items = * queue.pop rescue nil ensure @mutex.synchronize do @patterns.delete( queue_id ) rescue nil end end
publish( topic, message )
click to toggle source
# File lib/message_channel/druby.rb, line 51 def publish( topic, message ) @mutex.synchronize do @patterns.each do |_queue_id, items| pattern, queue = *items if File.fnmatch( pattern, topic, File::FNM_PATHNAME ) queue.push( [topic, message] ) end end end end
subscribe( pattern )
click to toggle source
# File lib/message_channel/druby.rb, line 15 def subscribe( pattern ) queue = Queue.new queue_id = queue.object_id @mutex.synchronize do @patterns[queue_id] = [pattern, queue] end queue_id end
unsubscribe( queue_id )
click to toggle source
# File lib/message_channel/druby.rb, line 24 def unsubscribe( queue_id ) @mutex.synchronize do @patterns.delete( queue_id ) rescue nil end rescue nil end
wait( queue_id )
click to toggle source
# File lib/message_channel/druby.rb, line 32 def wait( queue_id ) @patterns[queue_id].last.pop end