class LogStash::Outputs::RabbitMQ

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/rabbitmq.rb, line 95
def close
  close_connection
end
local_channel() click to toggle source
# File lib/logstash/outputs/rabbitmq.rb, line 86
def local_channel
  channel = @thread_local_channel.get
  if !channel
    channel = @hare_info.connection.create_channel
    @thread_local_channel.set(channel)
  end
  channel
end
local_exchange() click to toggle source
# File lib/logstash/outputs/rabbitmq.rb, line 77
def local_exchange
  exchange = @thread_local_exchange.get
  if !exchange
    exchange = declare_exchange!(local_channel, @exchange, @exchange_type, @durable)
    @thread_local_exchange.set(exchange)
  end
  exchange
end
multi_receive_encoded(events_and_data) click to toggle source
# File lib/logstash/outputs/rabbitmq.rb, line 58
def multi_receive_encoded(events_and_data)
  events_and_data.each do |event, data|
    publish(event, data)
  end
end
publish(event, message) click to toggle source
# File lib/logstash/outputs/rabbitmq.rb, line 64
def publish(event, message)
  raise ArgumentError, "No exchange set in HareInfo!!!" unless @hare_info.exchange
  local_exchange.publish(message, :routing_key => event.sprintf(@key), :properties => symbolize(@message_properties.merge(:persistent => @persistent)))
rescue MarchHare::Exception, IOError, AlreadyClosedException, TimeoutException => e
  @logger.error("Error while publishing. Will retry.",
                :message => e.message,
                :exception => e.class,
                :backtrace => e.backtrace)

  sleep_for_retry
  retry
end
register() click to toggle source
# File lib/logstash/outputs/rabbitmq.rb, line 46
def register
  connect!
  @hare_info.exchange = declare_exchange!(@hare_info.channel, @exchange, @exchange_type, @durable)
  # The connection close should close all channels, so it is safe to store thread locals here without closing them
  @thread_local_channel = java.lang.ThreadLocal.new
  @thread_local_exchange = java.lang.ThreadLocal.new
end
symbolize(myhash) click to toggle source
# File lib/logstash/outputs/rabbitmq.rb, line 54
def symbolize(myhash)
  Hash[myhash.map{|(k,v)| [k.to_sym,v]}]
end