class Celluloid::EventSource

Constants

CLOSED
CONNECTING
MAX_RECONNECT_TIME
OPEN
VERSION

Attributes

ready_state[R]
url[R]
with_credentials[R]

Public Class Methods

new(uri, options = {}) { |self| ... } click to toggle source

Constructor for an EventSource.

@param uri [String] the event stream URI @param opts [Hash] the configuration options @option opts [Hash] :headers Headers to send with the request @option opts [Float] :read_timeout Timeout (in seconds) after which to restart the connection if

the server has sent no data

@option opts [Float] :reconnect_delay Initial delay (in seconds) between connection attempts; this will

be increased exponentially if there are repeated failures
# File lib/celluloid/eventsource.rb, line 43
def initialize(uri, options = {})
  self.url = uri
  options  = options.dup
  @ready_state = CONNECTING
  @with_credentials = options.delete(:with_credentials) { false }
  @headers = default_request_headers.merge(options.fetch(:headers, {}))
  @read_timeout = options.fetch(:read_timeout, 0).to_i
  proxy = ENV['HTTP_PROXY'] || ENV['http_proxy'] || options[:proxy]
  if proxy
    proxyUri = URI(proxy)
    if proxyUri.scheme == 'http' || proxyUri.scheme == 'https'
      @proxy = proxyUri
    end
  end

  @reconnect_timeout = options.fetch(:reconnect_delay, 1)
  @on = { open: ->{}, message: ->(_) {}, error: ->(_) {} }

  @chunked = false

  yield self if block_given?

  async.listen
end

Public Instance Methods

close() click to toggle source
# File lib/celluloid/eventsource.rb, line 94
def close
  @socket.close if @socket
  @ready_state = CLOSED
end
closed?() click to toggle source
# File lib/celluloid/eventsource.rb, line 76
def closed?
  ready_state == CLOSED
end
connected?() click to toggle source
# File lib/celluloid/eventsource.rb, line 72
def connected?
  ready_state == OPEN
end
listen() click to toggle source
# File lib/celluloid/eventsource.rb, line 80
def listen
  while !closed?
    begin
      establish_connection
      process_stream
    rescue UnexpectedContentType
      raise  # Let these flow to the top
    rescue StandardError => e
      info "Reconnecting after exception: #{e}"
      # Just reconnect on runtime errors
    end
  end
end
on(event_name, &action) click to toggle source
# File lib/celluloid/eventsource.rb, line 99
def on(event_name, &action)
  @on[event_name.to_sym] = action
end
on_error(&action) click to toggle source
# File lib/celluloid/eventsource.rb, line 111
def on_error(&action)
  @on[:error] = action
end
on_message(&action) click to toggle source
# File lib/celluloid/eventsource.rb, line 107
def on_message(&action)
  @on[:message] = action
end
on_open(&action) click to toggle source
# File lib/celluloid/eventsource.rb, line 103
def on_open(&action)
  @on[:open] = action
end
url=(uri) click to toggle source
# File lib/celluloid/eventsource.rb, line 68
def url=(uri)
  @url = URI(uri)
end

Private Instance Methods

chunked?() click to toggle source
# File lib/celluloid/eventsource.rb, line 200
def chunked?
  @chunked
end
connect_string() click to toggle source
# File lib/celluloid/eventsource.rb, line 262
def connect_string
  req = "CONNECT #{url.host}:#{url.port} HTTP/1.1\r\n"
  req << "Host: #{url.host}:#{url.port}\r\n"
  if @proxy.user || @proxy.password
    encoded_credentials = Base64.strict_encode64([@proxy.user || '', @proxy.password || ''].join(":"))
    req << "Proxy-Authorization: Basic #{encoded_credentials}\r\n"
  end
  req << "\r\n"
end
default_request_headers() click to toggle source
# File lib/celluloid/eventsource.rb, line 192
def default_request_headers
  {
    'Accept'        => 'text/event-stream',
    'Cache-Control' => 'no-cache',
    'Host'          => url.host
  }
end
establish_connection() click to toggle source
# File lib/celluloid/eventsource.rb, line 121
def establish_connection
  parser = ResponseParser.new
  reconnect_attempts = 0
  reconnect_jitter_rand = Random.new

  loop do
    begin
      if @proxy
        sock = ::TCPSocket.new(@proxy.host, @proxy.port)
        @socket = Celluloid::IO::TCPSocket.new(sock)

        @socket.write(connect_string)
        @socket.flush
        while (line = readline_with_timeout(@socket).chomp) != '' do parser << line end

        unless parser.status_code == 200
          @on[:error].call({status_code: parser.status_code, body: parser.chunk})
          return
        end
      else
        sock = ::TCPSocket.new(@url.host, @url.port)
        @socket = Celluloid::IO::TCPSocket.new(sock)
      end

      if ssl?
        @socket = Celluloid::IO::SSLSocket.new(@socket)
        @socket.connect
      end

      @socket.write(request_string)
      @socket.flush()

      until parser.headers?
        parser << readline_with_timeout(@socket)
      end

      if parser.status_code != 200
        until @socket.eof?
          parser << readline_with_timeout(@socket)
        end
        # If the server returns a non-200, we don't want to close-- we just want to
        # report an error
        # close
        @on[:error].call({status_code: parser.status_code, body: parser.chunk})
      elsif parser.headers['Content-Type'] && parser.headers['Content-Type'].include?("text/event-stream")
        @chunked = !parser.headers["Transfer-Encoding"].nil? && parser.headers["Transfer-Encoding"].include?("chunked")
        @ready_state = OPEN
        @on[:open].call
        return  # Success, don't retry
      else
        close
        info "Invalid Content-Type #{parser.headers['Content-Type']}"
        @on[:error].call({status_code: parser.status_code, body: "Invalid Content-Type #{parser.headers['Content-Type']}. Expected text/event-stream"})
        raise UnexpectedContentType
      end

    rescue UnexpectedContentType
      raise  # Let these flow to the top

    rescue StandardError => e
      warn "Waiting to try again after exception while connecting: #{e}"
      # Just try again after a delay for any other exceptions
    end

    base_sleep_time = ([@reconnect_timeout * (2 ** reconnect_attempts), MAX_RECONNECT_TIME].min).to_f
    sleep_time = (base_sleep_time / 2) + reconnect_jitter_rand.rand(base_sleep_time / 2)
    sleep sleep_time
    reconnect_attempts += 1
  end
end
process_stream() click to toggle source
# File lib/celluloid/eventsource.rb, line 233
def process_stream
  parser = EventParser.new(read_lines, @chunked,->(timeout) { @read_timeout = timeout })
  parser.each do |event|
    @on[event.type] && @on[event.type].call(event)
    @last_event_id = event.id
  end
end
read_chunked_lines(socket) click to toggle source
# File lib/celluloid/eventsource.rb, line 204
def read_chunked_lines(socket)
  Enumerator.new do |lines|
    chunk_header = readline_with_timeout(socket)
    bytes_to_read = chunk_header.to_i(16)
    bytes_read = 0
    while bytes_read < bytes_to_read do
      line = readline_with_timeout(@socket)
      bytes_read += line.size
      lines << line
    end
  end
end
read_lines() click to toggle source
# File lib/celluloid/eventsource.rb, line 217
def read_lines
  Enumerator.new do |lines|
    loop do
      break if closed?
      if chunked?
        for line in read_chunked_lines(@socket) do
          break if closed?
          lines << line
        end
      else
        lines << readline_with_timeout(@socket)
      end
    end
  end
end
readline_with_timeout(socket) click to toggle source
# File lib/celluloid/eventsource.rb, line 241
def readline_with_timeout(socket)
  if @read_timeout > 0
    begin
      timeout(@read_timeout) do
        socket.readline
      end
    rescue Celluloid::TaskTimeout
      @on[:error].call({body: "Read timeout, will attempt reconnection"})
      raise ReadTimeout
    end
  else
    return socket.readline
  end
end
request_string() click to toggle source
# File lib/celluloid/eventsource.rb, line 256
def request_string
  headers = @headers.map { |k, v| "#{k}: #{v}" }

  ["GET #{url.request_uri} HTTP/1.1", headers].flatten.join("\r\n").concat("\r\n\r\n")
end
ssl?() click to toggle source
# File lib/celluloid/eventsource.rb, line 117
def ssl?
  url.scheme == 'https'
end