class Firehose::Rack::Consumer::HttpLongPoll::Handler

Public Class Methods

new(timeout=TIMEOUT) { |self| ... } click to toggle source
# File lib/firehose/rack/consumer/http_long_poll.rb, line 29
def initialize(timeout=TIMEOUT)
  @timeout = timeout
  yield self if block_given?
end

Public Instance Methods

call(env) click to toggle source
# File lib/firehose/rack/consumer/http_long_poll.rb, line 34
def call(env)
  request = request(env)
  method  = request.request_method

  case method
  # GET is how clients subscribe to the queue. When a messages comes in, we flush out a response,
  # close down the requeust, and the client then reconnects.
  when "GET"
    handle_request(request, env)
    return ASYNC_RESPONSE
  # we use post messages for http long poll multiplexing
  when "POST"
    if Consumer.multiplexing_request?(env)
      handle_request(request, env)
      return ASYNC_RESPONSE
    end
  end

  Firehose.logger.debug "HTTP #{method} not supported"
  response(405, "#{method} not supported.", "Allow" => "GET")
end

Private Instance Methods

async_callback(env, code, message = "", headers = nil) click to toggle source
# File lib/firehose/rack/consumer/http_long_poll.rb, line 76
def async_callback(env, code, message = "", headers = nil)
  resp_headers = response_headers(env)

  if headers
    resp_headers.merge!(headers)
  end

  if cb = env["async.callback"]
    cb.call response(code, message, resp_headers)
  else
    Firehose.logger.error "async.callback not set for response: #{message.inspect}"
  end
end
cors_headers(env) click to toggle source
# File lib/firehose/rack/consumer/http_long_poll.rb, line 67
def cors_headers(env)
  # TODO seperate out CORS logic as an async middleware with a Goliath web server.
  {'Access-Control-Allow-Origin' => cors_origin(env)}
end
cors_origin(env) click to toggle source
# File lib/firehose/rack/consumer/http_long_poll.rb, line 63
def cors_origin(env)
  env['HTTP_ORIGIN']
end
request(env) click to toggle source
# File lib/firehose/rack/consumer/http_long_poll.rb, line 72
def request(env)
  env['parsed_request'] ||= ::Rack::Request.new(env)
end
respond_async(channel, last_sequence, env) click to toggle source
# File lib/firehose/rack/consumer/http_long_poll.rb, line 90
def respond_async(channel, last_sequence, env)
  EM.next_tick do
    if last_sequence < 0
      async_callback env, 400, "The last_message_sequence parameter may not be less than zero"
    else
      Server::Channel.new(channel).next_messages(last_sequence, :timeout => @timeout).callback do |messages|
        # TODO: Can we send all of these messages down in one request? Sending one message per
        # request is slow and inefficient. If we change the protocol (3.0?) we could batch the
        # messages and send them all down the pipe, then close the conneciton.
        message = messages.first
        async_callback env, 200, wrap_frame(channel, message)
      end.errback do |e|
        if e == :timeout
          async_callback env, 204
        else
          Firehose.logger.error "Unexpected error when trying to GET last_sequence #{last_sequence} for path #{channel}: #{e.inspect}"
          async_callback env, 500, "Unexpected error"
        end
      end
    end
  end
end
response_headers(env) click to toggle source

If the request is a CORS request, return those headers, otherwise don’t worry ‘bout it

# File lib/firehose/rack/consumer/http_long_poll.rb, line 59
def response_headers(env)
  cors_origin(env) ? cors_headers(env) : {}
end