class JanusGateway::Transport::Http

Attributes

transaction_queue[R]

Public Class Methods

new(url) click to toggle source

@param [String] url

# File lib/janus_gateway/transport/http.rb, line 9
def initialize(url)
  @url = url
  @transaction_queue = {}
end

Public Instance Methods

run() click to toggle source
# File lib/janus_gateway/transport/http.rb, line 14
def run
  EventMachine.run do
    EM.error_handler { |e| raise(e) }
    # will be used for long-pooling. currently does nothing
  end
end
send(data) click to toggle source

@param [Hash] data

# File lib/janus_gateway/transport/http.rb, line 22
def send(data)
  sender = _send(data)

  sender.then do |response|
    response_transaction_id = response['transaction']

    transaction_list = @transaction_queue.clone
    unless response_transaction_id.nil?
      promise = transaction_list[response_transaction_id]
      unless promise.nil?
        if %w(success).include?(response['janus'])
          promise.set(response).execute
        elsif %w(ack event).include?(response['janus'])
          # do nothing for now
        else
          error_data = response['error']
          error = JanusGateway::Error.new(error_data['code'], error_data['reason'])
          promise.fail(error).execute
        end
      end
    end
  end

  sender.rescue do |error|
    request_transaction_id = data[:transaction]

    transaction_list = @transaction_queue.clone
    unless request_transaction_id.nil?
      promise = transaction_list[request_transaction_id]
      unless promise.nil?
        error = JanusGateway::Error.new(0, "HTTP/Transport response: `#{error}`")
        promise.fail(error).execute
      end
    end
  end
end
send_transaction(data) click to toggle source

@param [Hash] data @return [Concurrent::Promise]

# File lib/janus_gateway/transport/http.rb, line 61
def send_transaction(data)
  promise = Concurrent::Promise.new
  transaction = transaction_id_new

  data[:transaction] = transaction

  @transaction_queue[transaction] = promise
  send(data)

  thread = Thread.new do
    sleep(_transaction_timeout)
    error = JanusGateway::Error.new(0, "Transaction id `#{transaction}` has failed due to timeout!")
    promise.fail(error).execute
  end

  promise.then do
    @transaction_queue.remove(transaction)
    thread.exit
  end
  promise.rescue do
    @transaction_queue.remove(transaction)
    thread.exit
  end

  promise
end

Private Instance Methods

_send(data) click to toggle source

@param [Hash] data @return [EventMachine::HttpRequest]

# File lib/janus_gateway/transport/http.rb, line 92
def _send(data)
  promise = Concurrent::Promise.new

  http = EventMachine::HttpRequest.new(@url)
  request = http.post(body: JSON.generate(data), head: { 'Content-Type' => 'application/json' })

  request.callback do
    status = request.response_header.status
    if status == 200
      begin
        promise.set(JSON.parse(request.response)).execute
      rescue StandardError => e
        promise.fail(e).execute
      end
    else
      promise.fail(Error.new(status, "Invalid response. Status: `#{status}`. Body: `#{request.response}`")).execute
    end
  end

  request.errback do
    promise.fail(request.error).execute
  end

  promise
end
_transaction_timeout() click to toggle source

@return [Float, Integer]

# File lib/janus_gateway/transport/http.rb, line 119
def _transaction_timeout
  30
end