class SignalFlowWebsocketTransport

A WebSocket transport for SignalFlow. This should not be used directly by end-users.

Constants

COMPUTATION_START_TIMEOUT_SECONDS

A lower bound on the amount of time to wait for a computation to start

DETACHED

Public Class Methods

new(api_token, stream_endpoint, proxy_url: nil, logger: Logger.new(STDOUT, progname: "signalfx"), debug: false) click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 26
def initialize(api_token, stream_endpoint, proxy_url: nil, logger: Logger.new(STDOUT, progname: "signalfx"), debug: false)
  @api_token = api_token
  @stream_endpoint = stream_endpoint
  @logger = logger
  @compress = true
  @proxy_url = proxy_url
  @debug = debug

  @lock = Mutex.new
  @close_reason = nil
  @last_error = nil
  reinit
end

Public Instance Methods

attach(handle, filters: nil, resolution: nil) click to toggle source

This doesn't actually work on the backend yet

# File lib/signalfx/signalflow/websocket.rb, line 150
def attach(handle, filters: nil, resolution: nil)
  channel = make_new_channel

  send_msg({
    :type => "attach",
    :channel => channel.name,
    :handle => handle,
    :filters => filters,
    :resolution => resolution,
    :compress => @compress,
  }.reject!{|k,v| v.nil?}.to_json)

  channel
end
close() click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 180
def close
  if @ws
    @ws.close
  end
end
detach(channel, reason=nil) click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 165
def detach(channel, reason=nil)
  send_msg({
    :type => "detach",
    :channel => channel,
    :reason => reason,
  }.to_json)

  # There is no response message from the server signifying detach complete
  # and there could be messages coming in even after the detach request is
  # sent.  Therefore, use a sentinal value in place of the callback block so
  # that the message receiver logic can distinguish this case from some
  # anomolous case (say, due to bad logic in the code).
  @chan_callbacks[channel] = DETACHED
end
execute(program, start: nil, stop: nil, resolution: nil, max_delay: nil, persistent: nil, immediate: false) click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 95
def execute(program, start: nil, stop: nil, resolution: nil, max_delay: nil, persistent: nil, immediate: false)
  start_job do |channel_name|
    send_msg({
      :type => "execute",
      :channel => channel_name,
      :program => program,
      :start => start,
      :stop => stop,
      :resolution => resolution,
      :max_delay => max_delay,
      :persistent => persistent,
      :immediate => immediate,
      :compress => @compress,
    }.reject!{|k,v| v.nil?}.to_json)
  end
end
on_close(msg) click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 214
def on_close(msg)
  if @debug
    @logger.info("Websocket on_close: #{msg}")
  end

  @close_reason = "(#{msg.code}, #{msg.reason})"
  @chan_callbacks.keys.each do |channel_name|
    invoke_callback_for_channel({ :event => "CONNECTION_CLOSED" }, channel_name)
  end

  reinit
end
on_error(e) click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 227
def on_error(e)
  @logger.error("ERROR #{e.inspect}")
  @last_error = e
end
on_message(m) click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 233
def on_message(m)
  if @debug
    @logger.info("Websocket on_message: #{m}")
  end

  is_text = m.data.kind_of?(String)

  begin
    message_received(m.data, is_text)
  rescue Exception => e
    @logger.error("Error processing SignalFlow message: #{e.backtrace.first}: #{e.message} (#{e.class})")
  end
end
on_open() click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 247
def on_open
  if @debug
    @logger.info("Websocket on_open")
  end

  @ws.send({
    :type => "authenticate",
    :token => @api_token,
  }.to_json)
end
preflight(program, start, stop, resolution: nil, max_delay: nil) click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 112
def preflight(program, start, stop, resolution: nil, max_delay: nil)
  start_job do |channel_name|
    send_msg({
      :type => "preflight",
      :channel => channel_name,
      :program => program,
      :start => start,
      :stop => stop,
      :resolution => resolution,
      :max_delay => max_delay,
      :compress => @compress,
    }.reject!{|k,v| v.nil?}.to_json)
  end
end
start(program, start: nil, stop: nil, resolution: nil, max_delay: nil) click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 127
def start(program, start: nil, stop: nil, resolution: nil, max_delay: nil)
  start_job do |channel_name|
    send_msg({
      :type => "start",
      :channel => channel_name,
      :program => program,
      :start => start,
      :stop => stop,
      :resolution => resolution,
      :max_delay => max_delay,
    }.reject!{|k,v| v.nil?}.to_json)
  end
end
start_job() { |name| ... } click to toggle source

Starts a job (either execute or preflight) and waits until the JOB_START message is received with the computation handle arrives so that we can create a properly initialized computation object. Yields to the given block which should send the WS message to start the job.

# File lib/signalfx/signalflow/websocket.rb, line 61
def start_job
  computation = nil

  channel = make_new_channel

  yield channel.name

  while true
    begin
      msg = channel.pop(COMPUTATION_START_TIMEOUT_SECONDS)
    rescue ChannelTimeout
      raise "Computation did not start after at least #{COMPUTATION_START_TIMEOUT_SECONDS} seconds"
    end
    if msg[:type] == "error"
      raise ComputationFailure.new(msg[:message])
    end

    # STREAM_START comes before this but contains no useful information
    if msg[:event] == "JOB_START"
      computation = Computation.new(msg[:handle], method(:attach), method(:stop))
      computation.channel = channel
    elsif msg[:type] == "computation-started"
      computation = Computation.new(msg[:computationId], method(:attach), method(:stop))
      # Start jobs only use the channel to get error messages and can
      # detach from the channel once the job has started.
      channel.detach
    else
      next
    end

    return computation
  end
end
stop(handle, reason) click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 141
def stop(handle, reason)
  send_msg({
    :type => "stop",
    :handle => handle,
    :reason => reason,
  }.reject!{|k,v| v.nil?}.to_json)
end

Private Instance Methods

add_parsed_timestamp!(msg) click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 341
def add_parsed_timestamp!(msg)
  if msg.has_key?(:timestampMs)
    msg[:timestamp] = Time.at(msg[:timestampMs] / 1000.0)
  end
  msg
end
invoke_callback_for_channel(msg, channel_name) click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 297
def invoke_callback_for_channel(msg, channel_name)
  chan = @chan_callbacks[channel_name]

  raise "Callback for channel #{channel_name} is missing!" unless chan

  if chan == DETACHED
    return
  else
    chan.inject_message(msg)
  end
end
make_new_channel() click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 349
def make_new_channel
  name = @channel_namer.()
  channel = Channel.new(name, ->(){ detach(name) })
  @chan_callbacks[name] = channel
  channel
end
message_received(raw_msg, is_text) click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 310
def message_received(raw_msg, is_text)
  msg = add_parsed_timestamp!(parse_message(raw_msg, is_text))

  if msg[:type] == "authenticated"
    @authenticated = true
    return
  end

  if msg[:channel]
    invoke_callback_for_channel(msg, msg[:channel])
  else
    # Ignore keep-alives
    if msg[:event] == "KEEP_ALIVE"
      return
    else
      raise "Unknown SignalFlow message: #{msg}"
    end
  end
end
parse_message(raw_msg, is_text) click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 331
def parse_message(raw_msg, is_text)
  if is_text
    JSON.parse(raw_msg, {:symbolize_names => true})
  else
    # Convert the byte array to a string
    BinaryMessageParser.parse(raw_msg.pack("c*"))
  end
end
reinit() click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 40
def reinit
  @ws = nil
  @authenticated = false
  @chan_callbacks = {}

  name_lock = Mutex.new
  num = 0
  # Returns a unique channel name each time it is called
  @channel_namer = ->{
    name_lock.synchronize do
      num += 1
      "channel-#{num}"
    end
  }
end
send_msg(msg) click to toggle source
# File lib/signalfx/signalflow/websocket.rb, line 186
def send_msg(msg)
  @lock.synchronize do
    if @ws.nil?
      startup_client

      # Polling is the simplest and most robust way to handle blocking until
      # authenticated. Using ConditionVariable requires more complex logic
      # that gains very little in terms of efficiecy given how quick auth
      # should be.
      start_time = Time.now
      while !@authenticated
        # The socket will be closed by the server if auth isn't successful
        # within 5 seconds so no point in waiting longer
        if Time.now - start_time > 5 || @close_reason
          if @last_error
            raise WebsocketError.new(@last_error)
          end
          raise "Could not authenticate to SignalFlow WebSocket: #{@close_reason}"
        end
        sleep 0.1
      end
    end

    @ws.send(msg)
  end
end
startup_client() click to toggle source

Start up a new WS client in its own thread that runs an EventMachine reactor.

# File lib/signalfx/signalflow/websocket.rb, line 260
def startup_client
  this = self

  options = {
    :tls => {
      :verify_peer => true,
    }
  }
  if @proxy_url
    options[:proxy] = {
     :origin  => @proxy_url,
    }
  end
  Thread.new {
    EM.run {
      @ws = Faye::WebSocket::Client.new("#{@stream_endpoint}/v2/signalflow/connect", [], options)
      @ws.on :error do |e|
        this.on_error(e)
      end

      @ws.on :close do |e|
        this.on_close(e)
        EM.stop_event_loop
      end

      @ws.on :message do |m|
        this.on_message(m)
      end

      @ws.on :open do
        this.on_open
      end
    }
  }
end