class KRPC::Streaming::StreamsManager

Attributes

client[R]

Public Class Methods

new(client) click to toggle source
# File lib/krpc/streaming.rb, line 10
def initialize(client)
  @client = client
  @streams = {}
  @streams_mutex = Mutex.new
  @streaming_thread = Thread.new {}
end

Public Instance Methods

create_stream(call, return_type, method, *args, **kwargs) click to toggle source

Send the streaming request, create related Stream object and return it. If identical Stream already exists, doesn't create new Stream and return the existing one.

# File lib/krpc/streaming.rb, line 19
def create_stream(call, return_type, method, *args, **kwargs)
  raise RuntimeError("Cannot stream a property setter") if method.name.to_s.end_with? '='
  stream_msg = client.core.add_stream(call)
  id = stream_msg.id
  @streams_mutex.synchronize do
    if @streams.include? id
      @streams[id]
    else
      value = method.call(*args, **kwargs)
      @streams[id] = Stream.new(self, id, return_type, value, method, *args, **kwargs)
    end
  end
end
remove_all_streams() click to toggle source

Remove all streams created by this streams manager.

# File lib/krpc/streaming.rb, line 48
def remove_all_streams
  @streams.each {|_,stream| remove_stream(stream)}
end
remove_stream(stream) click to toggle source

Remove the streaming request and deactivate the Stream object. Returns true if the streaming request has been removed or false if passed Stream object is already inactive.

# File lib/krpc/streaming.rb, line 35
def remove_stream(stream)
  return false unless stream.active?
  @streams_mutex.synchronize do
    return false unless @streams.include? stream.id
    client.core.remove_stream stream.id
    @streams.delete stream.id
  end
  stream.value = RuntimeError.new("Stream has been removed")
  stream.mark_as_inactive
  true
end
start_streaming_thread() click to toggle source

Start streaming thread. It receives stream data, and updates Stream object's value attribute.

# File lib/krpc/streaming.rb, line 53
def start_streaming_thread
  stop_streaming_thread
  @streaming_thread = Thread.new do
    connection = client.stream_connection
    loop do
      size = connection.recv_varint
      data = connection.recv(size)
      stream_msg = PB::StreamUpdate.decode(data)
      @streams_mutex.synchronize do
        stream_msg.results.each do |result|
          next unless @streams.include? result.id
          stream = @streams[result.id]
          if result.result.field_empty? :error
            stream.value = Decoder.decode(result.result.value, stream.return_type, client)
          else
            stream.value = client.build_exception(result.result.error)
          end
        end
      end
    end
  end
end
stop_streaming_thread() click to toggle source

Stop streaming thread.

# File lib/krpc/streaming.rb, line 77
def stop_streaming_thread
  @streaming_thread.terminate
end