class Lightstreamer::StreamConnection

Internal class used by {Session} that manages a long-running Lightstreamer connection and handles incoming streaming data on a separate thread and makes it available for consumption through {#read_line}.

@private

Attributes

control_address[R]

The control address to use for this stream connection.

@return [String, nil]

error[R]

If an error occurs on the stream thread that causes the stream to disconnect then the error will be stored in this attribute.

@return [LightstreamerError, nil]

session_id[R]

The session ID returned from the server when this stream connection was initiated.

@return [String, nil]

Public Class Methods

new(session) click to toggle source

Establishes a new stream connection using the authentication details from the passed session.

@param [Session] session The session to create a stream connection for.

# File lib/lightstreamer/stream_connection.rb, line 26
def initialize(session)
  @session = session
  @queue = Queue.new

  @connect_result_mutex = Mutex.new
  @connect_result_condition_variable = ConditionVariable.new
end

Public Instance Methods

connect() click to toggle source

Establishes a new stream connection using the authentication details from the session that was passed to {#initialize}. Raises a {LightstreamerError} subclass on failure.

# File lib/lightstreamer/stream_connection.rb, line 36
def connect
  return if @thread

  @queue.clear

  @connect_result_mutex.synchronize do
    create_stream_thread
    @connect_result_condition_variable.wait @connect_result_mutex
  end

  return unless @error

  @thread = nil
  raise @error
end
connected?() click to toggle source

Returns whether or not this stream connection is connected.

@return [Boolean]

# File lib/lightstreamer/stream_connection.rb, line 55
def connected?
  !@thread.nil?
end
disconnect() click to toggle source

Disconnects this stream connection by shutting down the streaming thread.

# File lib/lightstreamer/stream_connection.rb, line 60
def disconnect
  return unless @thread

  @thread.exit
  @thread.join

  @thread = nil
end
read_line() click to toggle source

Reads the next line of streaming data. If the streaming thread is alive then this method blocks the calling thread until a line of data is available or the streaming thread terminates for any reason. If the streaming thread is not active then any unconsumed lines will be returned and after that the return value will be `nil`.

@return [String, nil]

# File lib/lightstreamer/stream_connection.rb, line 74
def read_line
  return nil if @queue.empty? && @thread.nil?

  @queue.pop
end

Private Instance Methods

bind_to_existing_stream() click to toggle source
# File lib/lightstreamer/stream_connection.rb, line 110
def bind_to_existing_stream
  params = build_params LS_session: @session_id

  url = URI.join(control_address, '/lightstreamer/bind_session.txt').to_s
  execute_stream_post_request url, connect_timeout: 15, query: params
end
build_params(params) click to toggle source
# File lib/lightstreamer/stream_connection.rb, line 117
def build_params(params)
  params[:LS_requested_max_bandwidth] = @session.requested_maximum_bandwidth

  if @session.polling_enabled
    params[:LS_polling] = true
    params[:LS_polling_millis] = 15_000
  end

  params
end
create_new_stream() click to toggle source
# File lib/lightstreamer/stream_connection.rb, line 98
def create_new_stream
  params = build_params LS_op2: 'create', LS_cid: 'mgQkwtwdysogQz2BJ4Ji kOj2Bg', LS_user: @session.username,
                        LS_password: @session.password

  params[:LS_adapter_set] = @session.adapter_set if @session.adapter_set

  url = URI.join(@session.server_url, '/lightstreamer/create_session.txt').to_s
  execute_stream_post_request url, connect_timeout: 15, query: params

  signal_connect_result_ready
end
create_stream_thread() click to toggle source
# File lib/lightstreamer/stream_connection.rb, line 82
def create_stream_thread
  @thread = Thread.new do
    Thread.current.abort_on_exception = true

    create_new_stream

    while @loop
      @loop = false
      bind_to_existing_stream
    end

    @thread = nil
    @queue.push nil
  end
end
execute_stream_post_request(url, options) click to toggle source
# File lib/lightstreamer/stream_connection.rb, line 128
def execute_stream_post_request(url, options)
  @header = StreamConnectionHeader.new

  buffer = StreamBuffer.new
  options[:response_block] = ->(data, _, _) { buffer.process data, &method(:process_stream_line) }
  options[:expects] = 200

  Excon.post url, options
rescue Excon::Error => e
  @error = Errors::ConnectionError.new e.message
end
process_body_line(line) click to toggle source
# File lib/lightstreamer/stream_connection.rb, line 174
def process_body_line(line)
  if /^LOOP( \d+|)$/.match?(line)
    @loop = true
  elsif /^END( \d+|)$/.match?(line)
    @error = Errors::SessionEndError.new line[4..-1]
  elsif !line.empty?
    @queue.push line
  end
end
process_header_line(line) click to toggle source
# File lib/lightstreamer/stream_connection.rb, line 154
def process_header_line(line)
  header_incomplete = @header.process_line line

  @session_id = @header['SessionId']

  # Set the control address and ensure it has a schema
  @control_address = @header['ControlAddress'] || @session.server_url
  unless @control_address.start_with? 'http'
    @control_address = "#{URI(@session.server_url).scheme}://#{@control_address}"
  end

  @error = @header.error

  return if header_incomplete

  @header = nil

  signal_connect_result_ready
end
process_stream_line(line) click to toggle source
# File lib/lightstreamer/stream_connection.rb, line 144
def process_stream_line(line)
  return if /^(PROBE|Preamble:.*)$/.match?(line)

  if @header
    process_header_line line
  else
    process_body_line line
  end
end
signal_connect_result_ready() click to toggle source
# File lib/lightstreamer/stream_connection.rb, line 140
def signal_connect_result_ready
  @connect_result_mutex.synchronize { @connect_result_condition_variable.signal }
end