class LogStash::Outputs::RabbitMQ
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/rabbitmq.rb, line 95 def close close_connection end
local_channel()
click to toggle source
# File lib/logstash/outputs/rabbitmq.rb, line 86 def local_channel channel = @thread_local_channel.get if !channel channel = @hare_info.connection.create_channel @thread_local_channel.set(channel) end channel end
local_exchange()
click to toggle source
# File lib/logstash/outputs/rabbitmq.rb, line 77 def local_exchange exchange = @thread_local_exchange.get if !exchange exchange = declare_exchange!(local_channel, @exchange, @exchange_type, @durable) @thread_local_exchange.set(exchange) end exchange end
multi_receive_encoded(events_and_data)
click to toggle source
# File lib/logstash/outputs/rabbitmq.rb, line 58 def multi_receive_encoded(events_and_data) events_and_data.each do |event, data| publish(event, data) end end
publish(event, message)
click to toggle source
# File lib/logstash/outputs/rabbitmq.rb, line 64 def publish(event, message) raise ArgumentError, "No exchange set in HareInfo!!!" unless @hare_info.exchange local_exchange.publish(message, :routing_key => event.sprintf(@key), :properties => symbolize(@message_properties.merge(:persistent => @persistent))) rescue MarchHare::Exception, IOError, AlreadyClosedException, TimeoutException => e @logger.error("Error while publishing. Will retry.", :message => e.message, :exception => e.class, :backtrace => e.backtrace) sleep_for_retry retry end
register()
click to toggle source
# File lib/logstash/outputs/rabbitmq.rb, line 46 def register connect! @hare_info.exchange = declare_exchange!(@hare_info.channel, @exchange, @exchange_type, @durable) # The connection close should close all channels, so it is safe to store thread locals here without closing them @thread_local_channel = java.lang.ThreadLocal.new @thread_local_exchange = java.lang.ThreadLocal.new end
symbolize(myhash)
click to toggle source
# File lib/logstash/outputs/rabbitmq.rb, line 54 def symbolize(myhash) Hash[myhash.map{|(k,v)| [k.to_sym,v]}] end