class Going::Channel

This class represents message channels of specified capacity. The push operation may be blocked if the capacity is full. The shift operation may be blocked if no messages have been sent.

Attributes

capacity[R]

Returns the capacity of the channel.

mutex[R]
pushes[R]
shifts[R]

Public Class Methods

new(capacity = 0) { |self| ... } click to toggle source

Creates a fixed-length channel with a capacity of capacity.

# File lib/going/channel.rb, line 13
def initialize(capacity = 0)
  fail ArgumentError, 'channel capacity must be 0 or greater' if capacity < 0
  @capacity = capacity

  @pushes = []
  @shifts = []

  @closed = false
  @mutex = Mutex.new

  yield self if block_given?
end

Public Instance Methods

<<(obj, &on_complete)

Alias of push

Alias for: push
close() click to toggle source

Closes the channel. Any data in the buffer may still be retrieved.

# File lib/going/channel.rb, line 39
def close
  synchronize do
    return false if closed?

    shifts.each(&:close).clear
    pushes_over_capacity!.each(&:close)
    @closed = true
  end
end
each() { |shift| ... } click to toggle source

Calls the given block once for each message until the channel is closed, passing that message as a parameter.

Note that this is a destructive action, since each message is ‘shift`ed.

# File lib/going/channel.rb, line 133
def each
  return enum_for(:each) unless block_given?

  catch :close do
    loop do
      yield self.shift
    end
  end
end
empty?() click to toggle source

Returns whether the channel is empty.

# File lib/going/channel.rb, line 123
def empty?
  size == 0
end
inspect() click to toggle source
# File lib/going/channel.rb, line 143
def inspect
  inspection = [:capacity, :size].map do |attr|
    "#{attr}: #{send(attr).inspect}"
  end
  "#<#{self.class} #{inspection.join(', ')}>"
end
length()

Alias of size

Alias for: size
next(&on_complete)
Alias for: shift
push(obj, &on_complete) click to toggle source

Pushes obj to the channel. If the channel is already full, waits until a thread shifts from it.

# File lib/going/channel.rb, line 53
def push(obj, &on_complete)
  synchronize do
    push = Push.new(message: obj, select_statement: select_statement, &on_complete)
    pushes << push

    pair_with_shift push

    select_statement.when_complete(push, pushes, &method(:remove_operation)) if select_statement?

    push.complete if under_capacity?
    push.signal if select_statement?
    push.close if closed?

    push.wait(mutex)

    fail 'cannot push to a closed channel' if closed? && !select_statement?
    self
  end
end
Also aliased as: <<, yield
receive(&on_complete)

Alias of shift

Alias for: shift
shift(&on_complete) click to toggle source

Receives data from the channel. If the channel is already empty, waits until a thread pushes to it.

# File lib/going/channel.rb, line 83
def shift(&on_complete)
  synchronize do
    shift = Shift.new(select_statement: select_statement, &on_complete)
    shifts << shift

    pair_with_push shift

    select_statement.when_complete(shift, shifts, &method(:remove_operation)) if select_statement?

    shift.signal if select_statement?
    shift.close if closed?

    shift.wait(mutex)

    throw :close if closed? && !select_statement? && shift.incomplete?
    shift.message
  end
end
Also aliased as: receive, next
size() click to toggle source

Returns the number of messages in the channel

# File lib/going/channel.rb, line 111
def size
  [capacity, pushes.size].min
end
Also aliased as: length
yield(obj, &on_complete)
Alias for: push

Private Instance Methods

complete_next_push_now_that_channel_under_capacity() click to toggle source
# File lib/going/channel.rb, line 186
def complete_next_push_now_that_channel_under_capacity
  push = pushes[capacity]
  push.complete if push && push.incomplete?
end
pair_with_push(shift) click to toggle source
# File lib/going/channel.rb, line 158
def pair_with_push(shift)
  pushes.each_with_index.any? do |push, index|
    if push.select_statement != select_statement && shift.complete(push)
      complete_next_push_now_that_channel_under_capacity
      shifts.pop
      pushes.delete_at index
      true
    end
  end
end
pair_with_shift(push) click to toggle source
# File lib/going/channel.rb, line 169
def pair_with_shift(push)
  shifts.each_with_index.any? do |shift, index|
    if shift.select_statement != select_statement && shift.complete(push)
      pushes.pop
      shifts.delete_at index
      true
    end
  end
end
pushes_over_capacity!() click to toggle source
# File lib/going/channel.rb, line 191
def pushes_over_capacity!
  pushes.slice!(capacity, pushes.size) || []
end
remove_operation(operation, queue) click to toggle source
# File lib/going/channel.rb, line 179
def remove_operation(operation, queue)
  synchronize do
    index = queue.index(operation)
    queue.delete_at index if index
  end
end
select_statement() click to toggle source
# File lib/going/channel.rb, line 199
def select_statement
  SelectStatement.instance || NilSelectStatement.instance
end
select_statement?() click to toggle source
# File lib/going/channel.rb, line 203
def select_statement?
  SelectStatement.instance?
end
synchronize(&blk) click to toggle source
# File lib/going/channel.rb, line 154
def synchronize(&blk)
  mutex.synchronize(&blk)
end
under_capacity?() click to toggle source
# File lib/going/channel.rb, line 195
def under_capacity?
  pushes.size <= capacity
end