class Acquia::Cloud::Logs::Streamer

github.com/acquia/logstream/blob/master/README.md

Constants

RECV_LENGTH

Try to retrieve 16KB blocks from the socket.

Attributes

keepalive_duration[RW]
remote_server[R]

Public Class Methods

new(url, message) click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 18
def initialize(url, message)
  @url = url
  @message = message
  @logs = []
  @available = []
  @enabled_types = []
  @keepalive_duration = 30
  @last_keepalive = @last_data = Time.now
end

Public Instance Methods

close() click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 50
def close
  @socket.close
end
connect() click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 32
def connect
  raise StreamerConnectionError, 'Already connected.' unless @socket.nil?

  @handshake = ::WebSocket::Handshake::Client.new url: @url
  uri = ::URI.parse @url
  @socket = TCPSocket.new uri.host, uri.port || 80
  @socket.print @handshake.to_s

  until @handshake.finished?
    @handshake << @socket.gets
  end
  raise StreamerConnectionError, "Couldn't get a valid response while connecting to the remote server: #{@url}" unless @handshake.valid?
  @incoming = WebSocket::Frame::Incoming::Client.new(version: @handshake.version)

  # Initial authentication
  send @message
end
debug() click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 28
def debug
  @debug = true
end
disable(opts = {}) click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 79
def disable(opts = {})
  if opts[:source]
    opts[:server] = opts[:source].server
    opts[:type] = opts[:source].type
  end

  send(
    cmd: 'disable',
    server: opts[:server],
    type: opts[:type],
  )
end
disable_type(type) click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 92
def disable_type(type)
  @available.each do |source|
    disable source: source
  end
  @enabled_types.delete type
end
each_log() { |shift| ... } click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 99
def each_log
  update
  until @logs.empty?
    yield @logs.shift
  end
end
enable(opts = {}) click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 59
def enable(opts = {})
  if opts[:source]
    opts[:server] = opts[:source].server
    opts[:type] = opts[:source].type
  end

  send(
    cmd: 'enable',
    server: opts[:server],
    type: opts[:type],
  )
end
enable_type(type) click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 72
def enable_type(type)
  @available.each do |source|
    enable source: source
  end
  @enabled_types << type
end
send(message) click to toggle source

protected

# File lib/acquia/cloud/logs/streamer.rb, line 108
def send(message)
  message = message.to_json
  STDERR.puts "-> #{message}" if @debug
  frame = ::WebSocket::Frame::Outgoing::Client.new(version: @handshake.version, type: 'text', data: message)
  @socket.send frame.to_s, 0
end
sources() click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 54
def sources
  update
  @available.clone
end
update() click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 115
def update
  # Send a keepalive if required
  if Time.now - @last_keepalive < @keepalive_duration
    send(cmd: 'keepalive')
  end

  # Read as much as possible
  while true
    read = @socket.read_nonblock(RECV_LENGTH)
    @last_data = Time.now
    @incoming << read
  end
rescue ::IO::WaitReadable => e
  if Time.now - @last_data > @keepalive_duration * 1.5
    # There hasn't been a socket level error, but we're worried at this
    # point. Trigger a proper socket close 'just in case' then flag a
    # broken pipe to downstream code.
    close
    raise Errno::EPIPE
  end
  loop do
    msg = @incoming.next
    break unless msg
    STDERR.puts "<- #{msg}" if @debug

    msg = JSON.parse(msg.data)
    case msg['cmd']
      when 'connected'
        @remote_server = msg['server'] if msg['server'].start_with? 'logstream-api'
      when 'success'
        # Congratulations!
      when 'error'
        raise StreamerRemoteError, "#{msg['code']}: #{msg['description']} during #{msg['during'].inspect}"
      when 'available'
        source = Source.new(msg['server'], msg['type'], msg['display_type'])
        @available << source
        enable source: source if @enabled_types.include? source.type
      when 'line'
        @logs << msg
      else
        if @debug
          STDERR.puts "Received unknown command: #{msg['cmd']}"
          STDERR.puts msg.inspect
        else
          raise StreamerUnrecognisedCommandError, "Unrecognised command: #{msg['cmd']}\n#{msg.inspect}"
        end
    end
  end
end