class LaunchDarkly::StreamProcessor

@private

Public Class Methods

new(sdk_key, config, diagnostic_accumulator = nil) click to toggle source
# File lib/ldclient-rb/stream.rb, line 25
def initialize(sdk_key, config, diagnostic_accumulator = nil)
  @sdk_key = sdk_key
  @config = config
  @feature_store = config.feature_store
  @initialized = Concurrent::AtomicBoolean.new(false)
  @started = Concurrent::AtomicBoolean.new(false)
  @stopped = Concurrent::AtomicBoolean.new(false)
  @ready = Concurrent::Event.new
  @connection_attempt_start_time = 0
end

Public Instance Methods

initialized?() click to toggle source
# File lib/ldclient-rb/stream.rb, line 36
def initialized?
  @initialized.value
end
start() click to toggle source
# File lib/ldclient-rb/stream.rb, line 40
def start
  return @ready unless @started.make_true

  @config.logger.info { "[LDClient] Initializing stream connection" }
  
  headers = Impl::Util.default_http_headers(@sdk_key, @config)
  opts = {
    headers: headers,
    read_timeout: READ_TIMEOUT_SECONDS,
    logger: @config.logger,
    socket_factory: @config.socket_factory
  }
  log_connection_started
  @es = SSE::Client.new(@config.stream_uri + "/all", **opts) do |conn|
    conn.on_event { |event| process_message(event) }
    conn.on_error { |err|
      log_connection_result(false)
      case err
      when SSE::Errors::HTTPStatusError
        status = err.status
        message = Util.http_error_message(status, "streaming connection", "will retry")
        @config.logger.error { "[LDClient] #{message}" }
        if !Util.http_error_recoverable?(status)
          @ready.set  # if client was waiting on us, make it stop waiting - has no effect if already set
          stop
        end
      end
    }
  end
  
  @ready
end
stop() click to toggle source
# File lib/ldclient-rb/stream.rb, line 73
def stop
  if @stopped.make_true
    @es.close
    @config.logger.info { "[LDClient] Stream connection stopped" }
  end
end

Private Instance Methods

key_for_path(kind, path) click to toggle source
# File lib/ldclient-rb/stream.rb, line 118
def key_for_path(kind, path)
  path.start_with?(KEY_PATHS[kind]) ? path[KEY_PATHS[kind].length..-1] : nil
end
log_connection_result(is_success) click to toggle source
# File lib/ldclient-rb/stream.rb, line 126
def log_connection_result(is_success)
  if !@diagnostic_accumulator.nil? && @connection_attempt_start_time > 0
    @diagnostic_accumulator.record_stream_init(@connection_attempt_start_time, !is_success,
      Impl::Util::current_time_millis - @connection_attempt_start_time)
    @connection_attempt_start_time = 0
  end
end
log_connection_started() click to toggle source
# File lib/ldclient-rb/stream.rb, line 122
def log_connection_started
  @connection_attempt_start_time = Impl::Util::current_time_millis
end
process_message(message) click to toggle source
# File lib/ldclient-rb/stream.rb, line 82
def process_message(message)
  log_connection_result(true)
  method = message.type
  @config.logger.debug { "[LDClient] Stream received #{method} message: #{message.data}" }
  if method == PUT
    message = JSON.parse(message.data, symbolize_names: true)
    all_data = Impl::Model.make_all_store_data(message[:data])
    @feature_store.init(all_data)
    @initialized.make_true
    @config.logger.info { "[LDClient] Stream initialized" }
    @ready.set
  elsif method == PATCH
    data = JSON.parse(message.data, symbolize_names: true)
    for kind in [FEATURES, SEGMENTS]
      key = key_for_path(kind, data[:path])
      if key
        data = data[:data]
        Impl::Model.postprocess_item_after_deserializing!(kind, data)
        @feature_store.upsert(kind, data)
        break
      end
    end
  elsif method == DELETE
    data = JSON.parse(message.data, symbolize_names: true)
    for kind in [FEATURES, SEGMENTS]
      key = key_for_path(kind, data[:path])
      if key
        @feature_store.delete(kind, key, data[:version])
        break
      end
    end
  else
    @config.logger.warn { "[LDClient] Unknown message received: #{method}" }
  end
end