class Firehose::Client::Producer::Http

Publish messages to Firehose via an HTTP interface.

Constants

DEFAULT_ERROR_HANDLER
DEFAULT_TIMEOUT
PublishError

Exception gets raised when a 202 is not received from the server after a message is published.

TimeoutError

Attributes

timeout[R]

URI for the Firehose server. This URI does not include the path of the channel.

uri[R]

URI for the Firehose server. This URI does not include the path of the channel.

Public Class Methods

adapter() click to toggle source

Use :net_http for the default Faraday adapter.

# File lib/firehose/client/producer.rb, line 100
def self.adapter
  @adapter ||= Faraday.default_adapter
end
adapter=(adapter) click to toggle source

What adapter should Firehose use to PUT the message? List of adapters is available at github.com/technoweenie/faraday.

# File lib/firehose/client/producer.rb, line 95
def self.adapter=(adapter)
  @adapter = adapter
end
new(uri = Firehose::URI, timeout=DEFAULT_TIMEOUT) click to toggle source
# File lib/firehose/client/producer.rb, line 32
def initialize(uri = Firehose::URI, timeout=DEFAULT_TIMEOUT)
  @uri = ::URI.parse(uri.to_s)
  @uri.scheme ||= 'http'
  @timeout = timeout
end

Public Instance Methods

error_handler() click to toggle source

Raise an exception if an error occurs when connecting to the Firehose.

# File lib/firehose/client/producer.rb, line 89
def error_handler
  @error_handler || DEFAULT_ERROR_HANDLER
end
on_error(&block) click to toggle source

Handle errors that could happen while publishing a message.

# File lib/firehose/client/producer.rb, line 84
def on_error(&block)
  @error_handler = block
end
publish(message) click to toggle source

A DSL for publishing messages.

# File lib/firehose/client/producer.rb, line 39
def publish(message)
  Builder.new(self, message)
end
put(message, channel, opts, &block) click to toggle source

Publish the message via HTTP.

# File lib/firehose/client/producer.rb, line 44
def put(message, channel, opts, &block)
  ttl         = opts[:ttl]
  timeout     = opts[:timeout] || @timeout || DEFAULT_TIMEOUT
  buffer_size = opts[:buffer_size]

  response = conn.put do |req|
    req.options[:timeout] = timeout
    if conn.path_prefix.nil? || conn.path_prefix == '/'
      # This avoids a double / if the channel starts with a / too (which is expected).
      req.path = channel
    else
      if conn.path_prefix =~ /\/\Z/ || channel =~ /\A\//
        req.path = [conn.path_prefix, channel].compact.join
      else
        # Add a / so the prefix and channel aren't just rammed together.
        req.path = [conn.path_prefix, channel].compact.join('/')
      end
    end
    req.body = message
    req.headers['Cache-Control'] = "max-age=#{ttl.to_i}" if ttl
    req.headers["X-Firehose-Buffer-Size"] = buffer_size.to_s if buffer_size
  end
  response.on_complete do
    case response.status
    when 202 # Fire off the callback if everything worked out OK.
      block.call(response) if block
    else
      # don't pass along basic auth header, if present
      response_data = response.inspect.gsub(/"Authorization"=>"Basic \S+"/, '"Authorization" => "Basic [HIDDEN]"')
      endpoint = "#{uri}/#{channel}".gsub(/:\/\/\S+@/, "://")
      error_handler.call PublishError.new("Could not publish #{message.inspect} to '#{endpoint}': #{response_data}")
    end
  end

  # Hide Faraday with this Timeout exception, and through the error handler.
  rescue Faraday::Error::TimeoutError => e
    error_handler.call TimeoutError.new(e)
end

Private Instance Methods

conn() click to toggle source

Build out a Faraday connection

# File lib/firehose/client/producer.rb, line 106
def conn
  @conn ||= Faraday.new(:url => uri.to_s) do |builder|
    builder.adapter self.class.adapter
  end
end