class EventMachine::Channel

Provides a simple thread-safe way to transfer data between (typically) long running tasks in {EventMachine.defer} and event loop thread.

@example

channel = EventMachine::Channel.new
sid     = channel.subscribe { |msg| p [:got, msg] }

channel.push('hello world')
channel.unsubscribe(sid)

Public Class Methods

new() click to toggle source
# File lib/em/channel.rb, line 15
def initialize
  @subs = {}
  @uid  = 0
end

Public Instance Methods

<<(*items)
Alias for: push
num_subscribers() click to toggle source

Return the number of current subscribers.

# File lib/em/channel.rb, line 21
def num_subscribers
  return @subs.size
end
pop(*a, &b) click to toggle source

Fetches one message from the channel.

# File lib/em/channel.rb, line 53
def pop(*a, &b)
  EM.schedule {
    name = subscribe do |*args|
      unsubscribe(name)
      EM::Callback(*a, &b).call(*args)
    end
  }
end
push(*items) click to toggle source

Add items to the channel, which are pushed out to all subscribers.

# File lib/em/channel.rb, line 46
def push(*items)
  items = items.dup
  EM.schedule { items.each { |i| @subs.values.each { |s| s.call i } } }
end
Also aliased as: <<
subscribe(*a, &b) click to toggle source

Takes any arguments suitable for EM::Callback() and returns a subscriber id for use when unsubscribing.

@return [Integer] Subscribe identifier @see unsubscribe

# File lib/em/channel.rb, line 30
def subscribe(*a, &b)
  name = gen_id
  EM.schedule { @subs[name] = EM::Callback(*a, &b) }

  name
end
unsubscribe(name) click to toggle source

Removes subscriber from the list.

@param [Integer] Subscriber identifier @see subscribe

# File lib/em/channel.rb, line 41
def unsubscribe(name)
  EM.schedule { @subs.delete name }
end

Private Instance Methods

gen_id() click to toggle source

@private

# File lib/em/channel.rb, line 65
def gen_id
  @uid += 1
end