class Krakow::Producer::Http
HTTP based producer
Attributes
Public Class Methods
@!endgroup
# File lib/krakow/producer/http.rb, line 45 def initialize(args={}) super build_ssl_context if ssl_context @uri = URI.parse(endpoint) end
Public Instance Methods
Create a new SSL context
@return [OpenSSL::SSL::SSLContext]
# File lib/krakow/producer/http.rb, line 54 def build_ssl_context require 'openssl' context = OpenSSL::SSL::SSLContext.new context.cert = OpenSSL::X509::Certificate.new(File.open(ssl_context[:certificate])) context.key = OpenSSL::PKey::RSA.new(File.open(ssl_context[:key])) config[:ssl_context] = context end
Create channel on topic
@param chan [String] channel name @return [Response]
# File lib/krakow/producer/http.rb, line 126 def create_channel(chan) send_message(:post, :create_channel, :params => { :topic => topic, :channel => chan } ) end
Create the topic
@return [Response]
# File lib/krakow/producer/http.rb, line 107 def create_topic send_message(:post, :create_topic, :params => {:topic => topic} ) end
Delete channel on topic
@param chan [String] channel name @return [Response]
# File lib/krakow/producer/http.rb, line 139 def delete_channel(chan) send_message(:post, :delete_channel, :params => { :topic => topic, :channel => chan } ) end
Delete the topic
@return [Response]
# File lib/krakow/producer/http.rb, line 116 def delete_topic send_message(:post, :delete_topic, :params => {:topic => topic} ) end
Remove all messages from given channel on topic
@param chan [String] channel name @return [Response]
# File lib/krakow/producer/http.rb, line 161 def empty_channel(chan) send_message(:post, :empty_channel, :params => { :topic => topic, :channel => chan } ) end
Remove all messages from topic
@return [Response]
# File lib/krakow/producer/http.rb, line 151 def empty_topic send_message(:post, :empty_topic, :params => {:topic => topic} ) end
Server information
@return [Response]
# File lib/krakow/producer/http.rb, line 218 def info send_message(:get, :info) end
Pause messages on given channel
@param chan [String] channel name @return [Response]
# File lib/krakow/producer/http.rb, line 174 def pause_channel(chan) send_message(:post, :pause_channel, :params => { :topic => topic, :channel => chan } ) end
Ping the server
@return [Response]
# File lib/krakow/producer/http.rb, line 211 def ping send_message(:get, :ping) end
Send a message via HTTP
@param method [String, Symbol] HTTP method to use (:get, :put, etc) @param path [String] URI path @param args [Hash] payload hash @return [Response]
# File lib/krakow/producer/http.rb, line 68 def send_message(method, path, args={}) build = uri.dup build.path = "/#{path}" response = HTTP.send(method, build.to_s, args.merge(config)) begin response = MultiJson.load(response.body.to_s) rescue MultiJson::LoadError response = { 'status_code' => response.code, 'status_txt' => response.body.to_s, 'response' => response.body.to_s, 'data' => nil, } end Response.new(response) end
Server stats
@param format [String] format of data @return [Response]
# File lib/krakow/producer/http.rb, line 200 def stats(format='json') send_message(:get, :stats, :params => { :format => format } ) end
Resume messages on a given channel
@param chan [String] channel name @return [Response]
# File lib/krakow/producer/http.rb, line 187 def unpause_channel(chan) send_message(:post, :unpause_channel, :params => { :topic => topic, :channel => chan } ) end
Send messages
@param payload [String] message @return [Response]
# File lib/krakow/producer/http.rb, line 89 def write(*payload) if(payload.size == 1) payload = payload.first send_message(:post, :pub, :body => payload, :params => {:topic => topic} ) else send_message(:post, :mpub, :body => payload.join("\n"), :params => {:topic => topic} ) end end