class Krakow::Producer::Http

HTTP based producer

Attributes

uri[R]

Public Class Methods

new(args={}) click to toggle source

@!endgroup

Calls superclass method
# File lib/krakow/producer/http.rb, line 45
def initialize(args={})
  super
  build_ssl_context if ssl_context
  @uri = URI.parse(endpoint)
end

Public Instance Methods

build_ssl_context() click to toggle source

Create a new SSL context

@return [OpenSSL::SSL::SSLContext]

# File lib/krakow/producer/http.rb, line 54
def build_ssl_context
  require 'openssl'
  context = OpenSSL::SSL::SSLContext.new
  context.cert = OpenSSL::X509::Certificate.new(File.open(ssl_context[:certificate]))
  context.key = OpenSSL::PKey::RSA.new(File.open(ssl_context[:key]))
  config[:ssl_context] = context
end
create_channel(chan) click to toggle source

Create channel on topic

@param chan [String] channel name @return [Response]

# File lib/krakow/producer/http.rb, line 126
def create_channel(chan)
  send_message(:post, :create_channel,
    :params => {
      :topic => topic,
      :channel => chan
    }
  )
end
create_topic() click to toggle source

Create the topic

@return [Response]

# File lib/krakow/producer/http.rb, line 107
def create_topic
  send_message(:post, :create_topic,
    :params => {:topic => topic}
  )
end
delete_channel(chan) click to toggle source

Delete channel on topic

@param chan [String] channel name @return [Response]

# File lib/krakow/producer/http.rb, line 139
def delete_channel(chan)
  send_message(:post, :delete_channel,
    :params => {
      :topic => topic,
      :channel => chan
    }
  )
end
delete_topic() click to toggle source

Delete the topic

@return [Response]

# File lib/krakow/producer/http.rb, line 116
def delete_topic
  send_message(:post, :delete_topic,
    :params => {:topic => topic}
  )
end
empty_channel(chan) click to toggle source

Remove all messages from given channel on topic

@param chan [String] channel name @return [Response]

# File lib/krakow/producer/http.rb, line 161
def empty_channel(chan)
  send_message(:post, :empty_channel,
    :params => {
      :topic => topic,
      :channel => chan
    }
  )
end
empty_topic() click to toggle source

Remove all messages from topic

@return [Response]

# File lib/krakow/producer/http.rb, line 151
def empty_topic
  send_message(:post, :empty_topic,
    :params => {:topic => topic}
  )
end
info() click to toggle source

Server information

@return [Response]

# File lib/krakow/producer/http.rb, line 218
def info
  send_message(:get, :info)
end
pause_channel(chan) click to toggle source

Pause messages on given channel

@param chan [String] channel name @return [Response]

# File lib/krakow/producer/http.rb, line 174
def pause_channel(chan)
  send_message(:post, :pause_channel,
    :params => {
      :topic => topic,
      :channel => chan
    }
  )
end
ping() click to toggle source

Ping the server

@return [Response]

# File lib/krakow/producer/http.rb, line 211
def ping
  send_message(:get, :ping)
end
send_message(method, path, args={}) click to toggle source

Send a message via HTTP

@param method [String, Symbol] HTTP method to use (:get, :put, etc) @param path [String] URI path @param args [Hash] payload hash @return [Response]

# File lib/krakow/producer/http.rb, line 68
def send_message(method, path, args={})
  build = uri.dup
  build.path = "/#{path}"
  response = HTTP.send(method, build.to_s, args.merge(config))
  begin
    response = MultiJson.load(response.body.to_s)
  rescue MultiJson::LoadError
    response = {
      'status_code' => response.code,
      'status_txt' => response.body.to_s,
      'response' => response.body.to_s,
      'data' => nil,
    }
  end
  Response.new(response)
end
stats(format='json') click to toggle source

Server stats

@param format [String] format of data @return [Response]

# File lib/krakow/producer/http.rb, line 200
def stats(format='json')
  send_message(:get, :stats,
    :params => {
      :format => format
    }
  )
end
unpause_channel(chan) click to toggle source

Resume messages on a given channel

@param chan [String] channel name @return [Response]

# File lib/krakow/producer/http.rb, line 187
def unpause_channel(chan)
  send_message(:post, :unpause_channel,
    :params => {
      :topic => topic,
      :channel => chan
    }
  )
end
write(*payload) click to toggle source

Send messages

@param payload [String] message @return [Response]

# File lib/krakow/producer/http.rb, line 89
def write(*payload)
  if(payload.size == 1)
    payload = payload.first
    send_message(:post, :pub,
      :body => payload,
      :params => {:topic => topic}
    )
  else
    send_message(:post, :mpub,
      :body => payload.join("\n"),
      :params => {:topic => topic}
    )
  end
end