class Firehose::Rack::Consumer::HttpLongPoll::Handler
Public Class Methods
new(timeout=TIMEOUT) { |self| ... }
click to toggle source
# File lib/firehose/rack/consumer/http_long_poll.rb, line 29 def initialize(timeout=TIMEOUT) @timeout = timeout yield self if block_given? end
Public Instance Methods
call(env)
click to toggle source
# File lib/firehose/rack/consumer/http_long_poll.rb, line 34 def call(env) request = request(env) method = request.request_method case method # GET is how clients subscribe to the queue. When a messages comes in, we flush out a response, # close down the requeust, and the client then reconnects. when "GET" handle_request(request, env) return ASYNC_RESPONSE # we use post messages for http long poll multiplexing when "POST" if Consumer.multiplexing_request?(env) handle_request(request, env) return ASYNC_RESPONSE end end Firehose.logger.debug "HTTP #{method} not supported" response(405, "#{method} not supported.", "Allow" => "GET") end
Private Instance Methods
async_callback(env, code, message = "", headers = nil)
click to toggle source
# File lib/firehose/rack/consumer/http_long_poll.rb, line 76 def async_callback(env, code, message = "", headers = nil) resp_headers = response_headers(env) if headers resp_headers.merge!(headers) end if cb = env["async.callback"] cb.call response(code, message, resp_headers) else Firehose.logger.error "async.callback not set for response: #{message.inspect}" end end
cors_headers(env)
click to toggle source
# File lib/firehose/rack/consumer/http_long_poll.rb, line 67 def cors_headers(env) # TODO seperate out CORS logic as an async middleware with a Goliath web server. {'Access-Control-Allow-Origin' => cors_origin(env)} end
cors_origin(env)
click to toggle source
# File lib/firehose/rack/consumer/http_long_poll.rb, line 63 def cors_origin(env) env['HTTP_ORIGIN'] end
request(env)
click to toggle source
# File lib/firehose/rack/consumer/http_long_poll.rb, line 72 def request(env) env['parsed_request'] ||= ::Rack::Request.new(env) end
respond_async(channel, last_sequence, env)
click to toggle source
# File lib/firehose/rack/consumer/http_long_poll.rb, line 90 def respond_async(channel, last_sequence, env) EM.next_tick do if last_sequence < 0 async_callback env, 400, "The last_message_sequence parameter may not be less than zero" else Server::Channel.new(channel).next_messages(last_sequence, :timeout => @timeout).callback do |messages| # TODO: Can we send all of these messages down in one request? Sending one message per # request is slow and inefficient. If we change the protocol (3.0?) we could batch the # messages and send them all down the pipe, then close the conneciton. message = messages.first async_callback env, 200, wrap_frame(channel, message) end.errback do |e| if e == :timeout async_callback env, 204 else Firehose.logger.error "Unexpected error when trying to GET last_sequence #{last_sequence} for path #{channel}: #{e.inspect}" async_callback env, 500, "Unexpected error" end end end end end
response_headers(env)
click to toggle source
If the request is a CORS request, return those headers, otherwise don’t worry ‘bout it
# File lib/firehose/rack/consumer/http_long_poll.rb, line 59 def response_headers(env) cors_origin(env) ? cors_headers(env) : {} end