class JanusGateway::Transport::Http
Attributes
transaction_queue[R]
Public Class Methods
new(url)
click to toggle source
@param [String] url
# File lib/janus_gateway/transport/http.rb, line 9 def initialize(url) @url = url @transaction_queue = {} end
Public Instance Methods
run()
click to toggle source
# File lib/janus_gateway/transport/http.rb, line 14 def run EventMachine.run do EM.error_handler { |e| raise(e) } # will be used for long-pooling. currently does nothing end end
send(data)
click to toggle source
@param [Hash] data
# File lib/janus_gateway/transport/http.rb, line 22 def send(data) sender = _send(data) sender.then do |response| response_transaction_id = response['transaction'] transaction_list = @transaction_queue.clone unless response_transaction_id.nil? promise = transaction_list[response_transaction_id] unless promise.nil? if %w(success).include?(response['janus']) promise.set(response).execute elsif %w(ack event).include?(response['janus']) # do nothing for now else error_data = response['error'] error = JanusGateway::Error.new(error_data['code'], error_data['reason']) promise.fail(error).execute end end end end sender.rescue do |error| request_transaction_id = data[:transaction] transaction_list = @transaction_queue.clone unless request_transaction_id.nil? promise = transaction_list[request_transaction_id] unless promise.nil? error = JanusGateway::Error.new(0, "HTTP/Transport response: `#{error}`") promise.fail(error).execute end end end end
send_transaction(data)
click to toggle source
@param [Hash] data @return [Concurrent::Promise]
# File lib/janus_gateway/transport/http.rb, line 61 def send_transaction(data) promise = Concurrent::Promise.new transaction = transaction_id_new data[:transaction] = transaction @transaction_queue[transaction] = promise send(data) thread = Thread.new do sleep(_transaction_timeout) error = JanusGateway::Error.new(0, "Transaction id `#{transaction}` has failed due to timeout!") promise.fail(error).execute end promise.then do @transaction_queue.remove(transaction) thread.exit end promise.rescue do @transaction_queue.remove(transaction) thread.exit end promise end
Private Instance Methods
_send(data)
click to toggle source
@param [Hash] data @return [EventMachine::HttpRequest]
# File lib/janus_gateway/transport/http.rb, line 92 def _send(data) promise = Concurrent::Promise.new http = EventMachine::HttpRequest.new(@url) request = http.post(body: JSON.generate(data), head: { 'Content-Type' => 'application/json' }) request.callback do status = request.response_header.status if status == 200 begin promise.set(JSON.parse(request.response)).execute rescue StandardError => e promise.fail(e).execute end else promise.fail(Error.new(status, "Invalid response. Status: `#{status}`. Body: `#{request.response}`")).execute end end request.errback do promise.fail(request.error).execute end promise end
_transaction_timeout()
click to toggle source
@return [Float, Integer]
# File lib/janus_gateway/transport/http.rb, line 119 def _transaction_timeout 30 end