class Firehose::Client::Producer::Http
Publish messages to Firehose
via an HTTP interface.
Constants
- DEFAULT_ERROR_HANDLER
- DEFAULT_TIMEOUT
- PublishError
Exception gets raised when a 202 is not received from the server after a message is published.
- TimeoutError
Attributes
timeout[R]
URI for the Firehose
server. This URI does not include the path of the channel.
uri[R]
URI for the Firehose
server. This URI does not include the path of the channel.
Public Class Methods
adapter()
click to toggle source
Use :net_http for the default Faraday adapter.
# File lib/firehose/client/producer.rb, line 100 def self.adapter @adapter ||= Faraday.default_adapter end
adapter=(adapter)
click to toggle source
What adapter should Firehose
use to PUT the message? List of adapters is available at github.com/technoweenie/faraday.
# File lib/firehose/client/producer.rb, line 95 def self.adapter=(adapter) @adapter = adapter end
new(uri = Firehose::URI, timeout=DEFAULT_TIMEOUT)
click to toggle source
# File lib/firehose/client/producer.rb, line 32 def initialize(uri = Firehose::URI, timeout=DEFAULT_TIMEOUT) @uri = ::URI.parse(uri.to_s) @uri.scheme ||= 'http' @timeout = timeout end
Public Instance Methods
error_handler()
click to toggle source
Raise an exception if an error occurs when connecting to the Firehose
.
# File lib/firehose/client/producer.rb, line 89 def error_handler @error_handler || DEFAULT_ERROR_HANDLER end
on_error(&block)
click to toggle source
Handle errors that could happen while publishing a message.
# File lib/firehose/client/producer.rb, line 84 def on_error(&block) @error_handler = block end
publish(message)
click to toggle source
A DSL for publishing messages.
# File lib/firehose/client/producer.rb, line 39 def publish(message) Builder.new(self, message) end
put(message, channel, opts, &block)
click to toggle source
Publish the message via HTTP.
# File lib/firehose/client/producer.rb, line 44 def put(message, channel, opts, &block) ttl = opts[:ttl] timeout = opts[:timeout] || @timeout || DEFAULT_TIMEOUT buffer_size = opts[:buffer_size] response = conn.put do |req| req.options[:timeout] = timeout if conn.path_prefix.nil? || conn.path_prefix == '/' # This avoids a double / if the channel starts with a / too (which is expected). req.path = channel else if conn.path_prefix =~ /\/\Z/ || channel =~ /\A\// req.path = [conn.path_prefix, channel].compact.join else # Add a / so the prefix and channel aren't just rammed together. req.path = [conn.path_prefix, channel].compact.join('/') end end req.body = message req.headers['Cache-Control'] = "max-age=#{ttl.to_i}" if ttl req.headers["X-Firehose-Buffer-Size"] = buffer_size.to_s if buffer_size end response.on_complete do case response.status when 202 # Fire off the callback if everything worked out OK. block.call(response) if block else # don't pass along basic auth header, if present response_data = response.inspect.gsub(/"Authorization"=>"Basic \S+"/, '"Authorization" => "Basic [HIDDEN]"') endpoint = "#{uri}/#{channel}".gsub(/:\/\/\S+@/, "://") error_handler.call PublishError.new("Could not publish #{message.inspect} to '#{endpoint}': #{response_data}") end end # Hide Faraday with this Timeout exception, and through the error handler. rescue Faraday::Error::TimeoutError => e error_handler.call TimeoutError.new(e) end
Private Instance Methods
conn()
click to toggle source
Build out a Faraday connection
# File lib/firehose/client/producer.rb, line 106 def conn @conn ||= Faraday.new(:url => uri.to_s) do |builder| builder.adapter self.class.adapter end end