class Emit::Channel

Public Class Methods

new(name=nil) click to toggle source
# File lib/emit/channel.rb, line 5
def initialize(name=nil)
  @name        = name || SecureRandom.uuid
  @read_queue  = []
  @readers     = 0
  @write_queue = []
  @writers     = 0
  @state       = :alive
end

Public Instance Methods

+@()
Alias for: reader
-@()
Alias for: writer
leave_reader() click to toggle source
# File lib/emit/channel.rb, line 83
def leave_reader
  return if retired?
  @readers -= 1
  if @readers.zero?
    @state = :retired
    @write_queue.each(&:retire)
  end
end
leave_writer() click to toggle source
# File lib/emit/channel.rb, line 92
def leave_writer
  return if retired?
  @writers -= 1
  if @writers.zero?
    @state = :retired
    @read_queue.each(&:retire)
  end
end
poison() click to toggle source
# File lib/emit/channel.rb, line 56
def poison
  return if poisoned?
  @state = :poisoned
  @read_queue.each(&:poison)
  @write_queue.each(&:poison)
end
poisoned?() click to toggle source
# File lib/emit/channel.rb, line 75
def poisoned?
  @state == :poisoned
end
post_read(request) click to toggle source

private

# File lib/emit/channel.rb, line 103
def post_read(request)
  check_termination!
  @read_queue << request
  match
end
post_write(request) click to toggle source
# File lib/emit/channel.rb, line 109
def post_write(request)
  check_termination!
  @write_queue << request
  match
end
read() click to toggle source
# File lib/emit/channel.rb, line 14
def read
  check_termination!

  process = Scheduler.current

  fast_read(process).tap do |msg|
    return msg unless msg.nil?
  end

  process.state = :active
  request = ChannelRequest.new(process)
  post_read(request)
  request.process.wait
  remove_read(request)

  return request.message if request.success?

  check_termination!
  abort "Should not get here..."
end
reader() click to toggle source
# File lib/emit/channel.rb, line 63
def reader
  @readers += 1
  ChannelEndRead.new(self)
end
Also aliased as: +@
remove_read(request) click to toggle source
# File lib/emit/channel.rb, line 115
def remove_read(request)
  @read_queue.delete(request)
end
remove_write(request) click to toggle source
# File lib/emit/channel.rb, line 119
def remove_write(request)
  @write_queue.delete(request)
end
retired?() click to toggle source
# File lib/emit/channel.rb, line 79
def retired?
  @state == :retired
end
write(message) click to toggle source
# File lib/emit/channel.rb, line 35
def write(message)
  check_termination!

  process = Scheduler.current

  fast_write(process, message).tap do |written|
    return true unless written.nil?
  end

  process.state = :active
  request = ChannelRequest.new(process, message)
  post_write(request)
  request.process.wait
  remove_write(request)

  return true if request.success?

  check_termination!
  abort "Should not get here..."
end
writer() click to toggle source
# File lib/emit/channel.rb, line 69
def writer
  @writers += 1
  ChannelEndWrite.new(self)
end
Also aliased as: -@

Private Instance Methods

check_termination!() click to toggle source
# File lib/emit/channel.rb, line 148
def check_termination!
  case @state
  when :poisoned
    raise ChannelPoisonedException
  when :retired
    raise ChannelRetiredException
  end
end
fast_read(process) click to toggle source
# File lib/emit/channel.rb, line 125
def fast_read(process)
  writer = @write_queue.shuffle.find(&:active?)
  return nil if writer.nil?

  writer.result = :success
  writer.process.state = :done
  Scheduler.activate(writer.process) unless process == writer.process

  writer.message
end
fast_write(process, message) click to toggle source
# File lib/emit/channel.rb, line 136
def fast_write(process, message)
  reader = @read_queue.shuffle.find(&:active?)
  return nil if reader.nil?

  reader.message = message
  reader.result = :success
  reader.process.state = :done
  Scheduler.activate(reader.process) unless process == reader.process

  true
end
match() click to toggle source
# File lib/emit/channel.rb, line 157
def match
  @write_queue.shuffle.each do |writer|
    @read_queue.shuffle.each do |reader|
      return true if writer.offer(reader)
    end
  end

  false
end