class Channel

Channel represents a medium through which SignalFlow messages pass. The main method for it is {#each_message}, which is how you get messages from the channel. There can only be one user of a channel and they are NOT thread-safe.

Channels are for one-time use only. Once a channel is detached from (either manually or due to the end of a computation) previous messages will be iterable but nothing new will show up.

Attributes

detached[RW]
name[RW]

Public Class Methods

new(name, detach_cb) click to toggle source
# File lib/signalfx/signalflow/channel.rb, line 20
def initialize(name, detach_cb)
  @lock = Mutex.new
  @detach_lock = Mutex.new
  @detached = false
  @name = name
  @detach_from_transport = detach_cb
  @messages = QueueWithTimeout.new
end

Public Instance Methods

detach(send_detach_to_server=true) click to toggle source
# File lib/signalfx/signalflow/channel.rb, line 62
def detach(send_detach_to_server=true)
  if !@detached
    @detached = true
    @detach_from_transport.call if send_detach_to_server
    @detach_from_transport = nil
  end
end
inject_message(msg) click to toggle source
# File lib/signalfx/signalflow/channel.rb, line 70
def inject_message(msg)
  # Since messages are injected by a separate websocket thread, they could
  # come in after the user has detached manually from the channel.  Just
  # silently ignore them in that case.
  return if @detached
  raise 'Cannot inject nil message' if msg.nil?

  @messages << msg
end
pop(timeout_seconds=nil) click to toggle source

Waits for and returns the next message in the channel.

@param timeout_seconds [Float] Number of seconds to wait for a message.

@return [Hash] The next message received by this channel. A return value of `nil` indicates that the channel has detected it is done and will not be receiving any more useful messages.

@raise [ChannelTimeout] If the timeout is exceeded with no messages

# File lib/signalfx/signalflow/channel.rb, line 38
def pop(timeout_seconds=nil)
  raise "Channel #{@name} is detached" if @detached

  msg = nil
  begin
    msg = @messages.pop_with_timeout(timeout_seconds)
  rescue ThreadError
    raise ChannelTimeout.new(
      "Did not receive a message on channel #{@name} within #{timeout_seconds} seconds")
  end

  if msg[:event] == "END_OF_CHANNEL" || msg[:event] == "CONNECTION_CLOSED" || msg[:event] == "CHANNEL_ABORT"
    # Mark this channel as detached and then return nil as an indicator that
    # this channel is done
    detach(false)

    nil
  else
    msg
  end

end