class Acquia::Cloud::Logs::Streamer
Constants
- RECV_LENGTH
Try to retrieve 16KB blocks from the socket.
Attributes
keepalive_duration[RW]
remote_server[R]
Public Class Methods
new(url, message)
click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 18 def initialize(url, message) @url = url @message = message @logs = [] @available = [] @enabled_types = [] @keepalive_duration = 30 @last_keepalive = @last_data = Time.now end
Public Instance Methods
close()
click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 50 def close @socket.close end
connect()
click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 32 def connect raise StreamerConnectionError, 'Already connected.' unless @socket.nil? @handshake = ::WebSocket::Handshake::Client.new url: @url uri = ::URI.parse @url @socket = TCPSocket.new uri.host, uri.port || 80 @socket.print @handshake.to_s until @handshake.finished? @handshake << @socket.gets end raise StreamerConnectionError, "Couldn't get a valid response while connecting to the remote server: #{@url}" unless @handshake.valid? @incoming = WebSocket::Frame::Incoming::Client.new(version: @handshake.version) # Initial authentication send @message end
debug()
click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 28 def debug @debug = true end
disable(opts = {})
click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 79 def disable(opts = {}) if opts[:source] opts[:server] = opts[:source].server opts[:type] = opts[:source].type end send( cmd: 'disable', server: opts[:server], type: opts[:type], ) end
disable_type(type)
click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 92 def disable_type(type) @available.each do |source| disable source: source end @enabled_types.delete type end
each_log() { |shift| ... }
click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 99 def each_log update until @logs.empty? yield @logs.shift end end
enable(opts = {})
click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 59 def enable(opts = {}) if opts[:source] opts[:server] = opts[:source].server opts[:type] = opts[:source].type end send( cmd: 'enable', server: opts[:server], type: opts[:type], ) end
enable_type(type)
click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 72 def enable_type(type) @available.each do |source| enable source: source end @enabled_types << type end
send(message)
click to toggle source
protected
# File lib/acquia/cloud/logs/streamer.rb, line 108 def send(message) message = message.to_json STDERR.puts "-> #{message}" if @debug frame = ::WebSocket::Frame::Outgoing::Client.new(version: @handshake.version, type: 'text', data: message) @socket.send frame.to_s, 0 end
sources()
click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 54 def sources update @available.clone end
update()
click to toggle source
# File lib/acquia/cloud/logs/streamer.rb, line 115 def update # Send a keepalive if required if Time.now - @last_keepalive < @keepalive_duration send(cmd: 'keepalive') end # Read as much as possible while true read = @socket.read_nonblock(RECV_LENGTH) @last_data = Time.now @incoming << read end rescue ::IO::WaitReadable => e if Time.now - @last_data > @keepalive_duration * 1.5 # There hasn't been a socket level error, but we're worried at this # point. Trigger a proper socket close 'just in case' then flag a # broken pipe to downstream code. close raise Errno::EPIPE end loop do msg = @incoming.next break unless msg STDERR.puts "<- #{msg}" if @debug msg = JSON.parse(msg.data) case msg['cmd'] when 'connected' @remote_server = msg['server'] if msg['server'].start_with? 'logstream-api' when 'success' # Congratulations! when 'error' raise StreamerRemoteError, "#{msg['code']}: #{msg['description']} during #{msg['during'].inspect}" when 'available' source = Source.new(msg['server'], msg['type'], msg['display_type']) @available << source enable source: source if @enabled_types.include? source.type when 'line' @logs << msg else if @debug STDERR.puts "Received unknown command: #{msg['cmd']}" STDERR.puts msg.inspect else raise StreamerUnrecognisedCommandError, "Unrecognised command: #{msg['cmd']}\n#{msg.inspect}" end end end end