class Going::Channel
This class represents message channels of specified capacity. The push operation may be blocked if the capacity is full. The shift operation may be blocked if no messages have been sent.
Attributes
Returns the capacity of the channel.
Public Class Methods
Creates a fixed-length channel with a capacity of capacity
.
# File lib/going/channel.rb, line 13 def initialize(capacity = 0) fail ArgumentError, 'channel capacity must be 0 or greater' if capacity < 0 @capacity = capacity @pushes = [] @shifts = [] @closed = false @mutex = Mutex.new yield self if block_given? end
Public Instance Methods
Closes the channel. Any data in the buffer may still be retrieved.
# File lib/going/channel.rb, line 39 def close synchronize do return false if closed? shifts.each(&:close).clear pushes_over_capacity!.each(&:close) @closed = true end end
Calls the given block once for each message until the channel is closed, passing that message as a parameter.
Note that this is a destructive action, since each message is ‘shift`ed.
# File lib/going/channel.rb, line 133 def each return enum_for(:each) unless block_given? catch :close do loop do yield self.shift end end end
Returns whether the channel is empty.
# File lib/going/channel.rb, line 123 def empty? size == 0 end
# File lib/going/channel.rb, line 143 def inspect inspection = [:capacity, :size].map do |attr| "#{attr}: #{send(attr).inspect}" end "#<#{self.class} #{inspection.join(', ')}>" end
Pushes obj
to the channel. If the channel is already full, waits until a thread shifts from it.
# File lib/going/channel.rb, line 53 def push(obj, &on_complete) synchronize do push = Push.new(message: obj, select_statement: select_statement, &on_complete) pushes << push pair_with_shift push select_statement.when_complete(push, pushes, &method(:remove_operation)) if select_statement? push.complete if under_capacity? push.signal if select_statement? push.close if closed? push.wait(mutex) fail 'cannot push to a closed channel' if closed? && !select_statement? self end end
Receives data from the channel. If the channel is already empty, waits until a thread pushes to it.
# File lib/going/channel.rb, line 83 def shift(&on_complete) synchronize do shift = Shift.new(select_statement: select_statement, &on_complete) shifts << shift pair_with_push shift select_statement.when_complete(shift, shifts, &method(:remove_operation)) if select_statement? shift.signal if select_statement? shift.close if closed? shift.wait(mutex) throw :close if closed? && !select_statement? && shift.incomplete? shift.message end end
Returns the number of messages in the channel
# File lib/going/channel.rb, line 111 def size [capacity, pushes.size].min end
Private Instance Methods
# File lib/going/channel.rb, line 186 def complete_next_push_now_that_channel_under_capacity push = pushes[capacity] push.complete if push && push.incomplete? end
# File lib/going/channel.rb, line 158 def pair_with_push(shift) pushes.each_with_index.any? do |push, index| if push.select_statement != select_statement && shift.complete(push) complete_next_push_now_that_channel_under_capacity shifts.pop pushes.delete_at index true end end end
# File lib/going/channel.rb, line 169 def pair_with_shift(push) shifts.each_with_index.any? do |shift, index| if shift.select_statement != select_statement && shift.complete(push) pushes.pop shifts.delete_at index true end end end
# File lib/going/channel.rb, line 191 def pushes_over_capacity! pushes.slice!(capacity, pushes.size) || [] end
# File lib/going/channel.rb, line 179 def remove_operation(operation, queue) synchronize do index = queue.index(operation) queue.delete_at index if index end end
# File lib/going/channel.rb, line 199 def select_statement SelectStatement.instance || NilSelectStatement.instance end
# File lib/going/channel.rb, line 203 def select_statement? SelectStatement.instance? end
# File lib/going/channel.rb, line 154 def synchronize(&blk) mutex.synchronize(&blk) end
# File lib/going/channel.rb, line 195 def under_capacity? pushes.size <= capacity end