class LogjamAgent::ZMQForwarder

Constants

SEQUENCE_START
VALID_RESPONSE_CODES

Attributes

app[R]
env[R]

Public Class Methods

context() click to toggle source
# File lib/logjam_agent/zmq_forwarder.rb, line 42
def self.context
  @@context_mutex.synchronize do
    @@zmq_context ||=
      begin
        require 'ffi-rzmq'
        context = ZMQ::Context.new(1)
        at_exit { context.terminate }
        context
      end
  end
end
new(*args) click to toggle source
# File lib/logjam_agent/zmq_forwarder.rb, line 9
def initialize(*args)
  opts = args.extract_options!
  @app = args[0] || LogjamAgent.application_name
  @env = args[1] || LogjamAgent.environment_name
  @app_env = "#{@app}-#{@env}"
  @config = default_options.merge!(opts)
  @config[:host] = "localhost" if @config[:host].blank?
  @sequence = SEQUENCE_START
  @socket = nil
  @ping_ensured = false
  @socket_mutex = Mutex.new
end

Public Instance Methods

connection_specs() click to toggle source
# File lib/logjam_agent/zmq_forwarder.rb, line 22
def connection_specs
  @connection_specs ||= @config[:host].split(',').map do |host|
    augment_connection_spec(host, @config[:port])
  end
end
default_options() click to toggle source
# File lib/logjam_agent/zmq_forwarder.rb, line 28
def default_options
  {
    :port       => 9604,
    :linger     => 1000,
    :snd_hwm    => 1000,
    :rcv_hwm    => 1000,
    :rcv_timeo  => 5000,
    :snd_timeo  => 5000
  }
end
ensure_ping_at_exit() click to toggle source
# File lib/logjam_agent/zmq_forwarder.rb, line 60
def ensure_ping_at_exit
  return if @ping_ensured
  at_exit { ping; reset }
  @ping_ensured = true
end
forward(data, options={}) click to toggle source
# File lib/logjam_agent/zmq_forwarder.rb, line 66
def forward(data, options={})
  app_env = options[:app_env] || @app_env
  key = options[:routing_key] || "logs.#{app_env.sub('-','.')}"
  if engine = options[:engine]
    key += ".#{engine}"
  end
  msg = LogjamAgent.encode_payload(data)
  @socket_mutex.synchronize do
    if options[:sync]
      send_receive(app_env, key, msg)
    else
      publish(app_env, key, msg)
    end
  end
rescue => error
  reraise_expectation_errors!
  raise ForwardingError.new(error.message)
end
reset() click to toggle source
# File lib/logjam_agent/zmq_forwarder.rb, line 54
def reset
  @socket_mutex.synchronize do
    reset_without_locking
  end
end

Private Instance Methods

log_warning(message) click to toggle source
# File lib/logjam_agent/zmq_forwarder.rb, line 124
def log_warning(message)
  LogjamAgent.error_handler.call ForwardingWarning.new(message)
end
ping() click to toggle source
# File lib/logjam_agent/zmq_forwarder.rb, line 150
def ping
  @socket_mutex.synchronize do
    if @socket && !send_receive("ping", @app_env, "{}", NO_COMPRESSION)
      log_warning "failed to receive pong"
    end
  end
end
publish(app_env, key, data) click to toggle source
# File lib/logjam_agent/zmq_forwarder.rb, line 114
def publish(app_env, key, data)
  info = pack_info(@sequence = next_fixnum(@sequence))
  parts = [app_env, key, data, info]
  if socket.send_strings(parts, ZMQ::DONTWAIT) < 0
    error = ZMQ::Util.error_string
    reset if connection_specs.size > 1
    raise "ZMQ error on publishing: #{error}"
  end
end
reset_without_locking() click to toggle source
# File lib/logjam_agent/zmq_forwarder.rb, line 87
def reset_without_locking
  if @socket
    @socket.close
    @socket = nil
  end
end
send_receive(app_env, key, data, compression_method = LogjamAgent.compression_method) click to toggle source
# File lib/logjam_agent/zmq_forwarder.rb, line 130
def send_receive(app_env, key, data, compression_method = LogjamAgent.compression_method)
  info = pack_info(@sequence = next_fixnum(@sequence), compression_method)
  request_parts = ["", app_env, key, data, info]
  answer_parts = []
  if socket.send_strings(request_parts) < 0
    log_warning "ZMQ error on sending: #{ZMQ::Util.error_string}"
    reset_without_locking
    return nil
  end
  if socket.recv_strings(answer_parts) < 0
    log_warning "ZMQ error on receiving: #{ZMQ::Util.error_string}"
    reset_without_locking
    return nil
  end
  if answer_parts.first != "" || !VALID_RESPONSE_CODES.include?(answer_parts.second.to_s.to_i)
    log_warning "unexpected answer from logjam broker: #{answer_parts.inspect}"
  end
  answer_parts.second
end
socket() click to toggle source

this method assumes the caller holds the socket mutex

# File lib/logjam_agent/zmq_forwarder.rb, line 95
def socket
  return @socket if @socket
  @socket = self.class.context.socket(ZMQ::DEALER)
  raise "ZMQ error on socket creation: #{ZMQ::Util.error_string}" if @socket.nil?
  if LogjamAgent.ensure_ping_at_exit
    ensure_ping_at_exit
  else
    at_exit { reset }
  end
  @socket.setsockopt(ZMQ::LINGER, @config[:linger])
  @socket.setsockopt(ZMQ::SNDHWM, @config[:snd_hwm])
  @socket.setsockopt(ZMQ::RCVHWM, @config[:rcv_hwm])
  @socket.setsockopt(ZMQ::RCVTIMEO, @config[:rcv_timeo])
  @socket.setsockopt(ZMQ::SNDTIMEO, @config[:snd_timeo])
  spec = connection_specs.sort_by{rand}.first
  @socket.connect(spec)
  @socket
end