class Emit::Channel
Public Class Methods
new(name=nil)
click to toggle source
# File lib/emit/channel.rb, line 5 def initialize(name=nil) @name = name || SecureRandom.uuid @read_queue = [] @readers = 0 @write_queue = [] @writers = 0 @state = :alive end
Public Instance Methods
leave_reader()
click to toggle source
# File lib/emit/channel.rb, line 83 def leave_reader return if retired? @readers -= 1 if @readers.zero? @state = :retired @write_queue.each(&:retire) end end
leave_writer()
click to toggle source
# File lib/emit/channel.rb, line 92 def leave_writer return if retired? @writers -= 1 if @writers.zero? @state = :retired @read_queue.each(&:retire) end end
poison()
click to toggle source
# File lib/emit/channel.rb, line 56 def poison return if poisoned? @state = :poisoned @read_queue.each(&:poison) @write_queue.each(&:poison) end
poisoned?()
click to toggle source
# File lib/emit/channel.rb, line 75 def poisoned? @state == :poisoned end
post_read(request)
click to toggle source
private
# File lib/emit/channel.rb, line 103 def post_read(request) check_termination! @read_queue << request match end
post_write(request)
click to toggle source
# File lib/emit/channel.rb, line 109 def post_write(request) check_termination! @write_queue << request match end
read()
click to toggle source
# File lib/emit/channel.rb, line 14 def read check_termination! process = Scheduler.current fast_read(process).tap do |msg| return msg unless msg.nil? end process.state = :active request = ChannelRequest.new(process) post_read(request) request.process.wait remove_read(request) return request.message if request.success? check_termination! abort "Should not get here..." end
reader()
click to toggle source
# File lib/emit/channel.rb, line 63 def reader @readers += 1 ChannelEndRead.new(self) end
Also aliased as: +@
remove_read(request)
click to toggle source
# File lib/emit/channel.rb, line 115 def remove_read(request) @read_queue.delete(request) end
remove_write(request)
click to toggle source
# File lib/emit/channel.rb, line 119 def remove_write(request) @write_queue.delete(request) end
retired?()
click to toggle source
# File lib/emit/channel.rb, line 79 def retired? @state == :retired end
write(message)
click to toggle source
# File lib/emit/channel.rb, line 35 def write(message) check_termination! process = Scheduler.current fast_write(process, message).tap do |written| return true unless written.nil? end process.state = :active request = ChannelRequest.new(process, message) post_write(request) request.process.wait remove_write(request) return true if request.success? check_termination! abort "Should not get here..." end
writer()
click to toggle source
# File lib/emit/channel.rb, line 69 def writer @writers += 1 ChannelEndWrite.new(self) end
Also aliased as: -@
Private Instance Methods
check_termination!()
click to toggle source
# File lib/emit/channel.rb, line 148 def check_termination! case @state when :poisoned raise ChannelPoisonedException when :retired raise ChannelRetiredException end end
fast_read(process)
click to toggle source
# File lib/emit/channel.rb, line 125 def fast_read(process) writer = @write_queue.shuffle.find(&:active?) return nil if writer.nil? writer.result = :success writer.process.state = :done Scheduler.activate(writer.process) unless process == writer.process writer.message end
fast_write(process, message)
click to toggle source
# File lib/emit/channel.rb, line 136 def fast_write(process, message) reader = @read_queue.shuffle.find(&:active?) return nil if reader.nil? reader.message = message reader.result = :success reader.process.state = :done Scheduler.activate(reader.process) unless process == reader.process true end
match()
click to toggle source
# File lib/emit/channel.rb, line 157 def match @write_queue.shuffle.each do |writer| @read_queue.shuffle.each do |reader| return true if writer.offer(reader) end end false end