class KRPC::Streaming::StreamsManager
Attributes
client[R]
Public Class Methods
new(client)
click to toggle source
# File lib/krpc/streaming.rb, line 10 def initialize(client) @client = client @streams = {} @streams_mutex = Mutex.new @streaming_thread = Thread.new {} end
Public Instance Methods
create_stream(call, return_type, method, *args, **kwargs)
click to toggle source
Send the streaming request, create related Stream
object and return it. If identical Stream
already exists, doesn't create new Stream
and return the existing one.
# File lib/krpc/streaming.rb, line 19 def create_stream(call, return_type, method, *args, **kwargs) raise RuntimeError("Cannot stream a property setter") if method.name.to_s.end_with? '=' stream_msg = client.core.add_stream(call) id = stream_msg.id @streams_mutex.synchronize do if @streams.include? id @streams[id] else value = method.call(*args, **kwargs) @streams[id] = Stream.new(self, id, return_type, value, method, *args, **kwargs) end end end
remove_all_streams()
click to toggle source
Remove all streams created by this streams manager.
# File lib/krpc/streaming.rb, line 48 def remove_all_streams @streams.each {|_,stream| remove_stream(stream)} end
remove_stream(stream)
click to toggle source
Remove the streaming request and deactivate the Stream
object. Returns true
if the streaming request has been removed or false
if passed Stream
object is already inactive.
# File lib/krpc/streaming.rb, line 35 def remove_stream(stream) return false unless stream.active? @streams_mutex.synchronize do return false unless @streams.include? stream.id client.core.remove_stream stream.id @streams.delete stream.id end stream.value = RuntimeError.new("Stream has been removed") stream.mark_as_inactive true end
start_streaming_thread()
click to toggle source
Start streaming thread. It receives stream data, and updates Stream
object's value
attribute.
# File lib/krpc/streaming.rb, line 53 def start_streaming_thread stop_streaming_thread @streaming_thread = Thread.new do connection = client.stream_connection loop do size = connection.recv_varint data = connection.recv(size) stream_msg = PB::StreamUpdate.decode(data) @streams_mutex.synchronize do stream_msg.results.each do |result| next unless @streams.include? result.id stream = @streams[result.id] if result.result.field_empty? :error stream.value = Decoder.decode(result.result.value, stream.return_type, client) else stream.value = client.build_exception(result.result.error) end end end end end end
stop_streaming_thread()
click to toggle source
Stop streaming thread.
# File lib/krpc/streaming.rb, line 77 def stop_streaming_thread @streaming_thread.terminate end