class Thread::Channel

A channel lets you send and receive various messages in a thread-safe way.

It also allows for guards upon sending and retrieval, to ensure the passed messages are safe to be consumed.

Public Class Methods

new(messages = [], &block) click to toggle source

Create a channel with optional initial messages and optional channel guard.

# File lib/thread/channel.rb, line 19
def initialize(messages = [], &block)
        @messages = []
        @mutex    = Mutex.new
        @check    = block

        messages.each {|o|
                send o
        }
end

Public Instance Methods

receive(&block) click to toggle source

Receive a message, if there are none the call blocks until there’s one.

If a block is passed, it’s used as guard to match to a message.

# File lib/thread/channel.rb, line 50
def receive(&block)
        message = nil
        found   = false

        if block
                until found
                        @mutex.synchronize {
                                if index = @messages.find_index(&block)
                                        message = @messages.delete_at(index)
                                        found   = true
                                else
                                        cond.wait @mutex
                                end
                        }
                end
        else
                until found
                        @mutex.synchronize {
                                if @messages.empty?
                                        cond.wait @mutex
                                end

                                unless @messages.empty?
                                        message = @messages.shift
                                        found   = true
                                end
                        }
                end
        end

        message
end
receive!(&block) click to toggle source

Receive a message, if there are none the call returns nil.

If a block is passed, it’s used as guard to match to a message.

# File lib/thread/channel.rb, line 86
def receive!(&block)
        if block
                @messages.delete_at(@messages.find_index(&block))
        else
                @messages.shift
        end
end
send(what) click to toggle source

Send a message to the channel.

If there’s a guard, the value is passed to it, if the guard returns a falsy value an ArgumentError exception is raised and the message is not sent.

# File lib/thread/channel.rb, line 33
def send(what)
        if @check && !@check.call(what)
                raise ArgumentError, 'guard mismatch'
        end

        @mutex.synchronize {
                @messages << what

                cond.broadcast if cond?
        }

        self
end

Private Instance Methods

cond() click to toggle source
# File lib/thread/channel.rb, line 99
def cond
        @cond ||= ConditionVariable.new
end
cond?() click to toggle source
# File lib/thread/channel.rb, line 95
def cond?
        instance_variable_defined? :@cond
end