class LogStash::Agent
Collect logs, ship them out.
Attributes
config[R]
filters[R]
inputs[R]
outputs[R]
Public Class Methods
new(config)
click to toggle source
# File lib/logstash/agent.rb, line 17 def initialize(config) log_to(STDERR) @config = config @outputs = [] @inputs = [] @filters = [] # Config should have: # - list of logs to monitor # - log config # - where to ship to end
Public Instance Methods
log_to(target)
click to toggle source
# File lib/logstash/agent.rb, line 31 def log_to(target) @logger = LogStash::Logger.new(target) end
register()
click to toggle source
Register any event handlers with EventMachine
Technically, this agent could listen for anything (files, sockets, amqp, stomp, etc).
# File lib/logstash/agent.rb, line 39 def register # TODO(sissel): warn when no inputs and no outputs are defined. # TODO(sissel): Refactor this madness into a config lib if (["inputs", "outputs"] & @config.keys).length == 0 $stderr.puts "No inputs or no outputs configured. This probably isn't what you want." end # Register input and output stuff inputs = @config["inputs"] inputs.each do |value| # If 'url' is an array, then inputs is a hash and the key is the type if inputs.is_a?(Hash) type, urls = value else raise "config error, no type for url #{urls.inspect}" end # url could be a string or an array. urls = [urls] if !urls.is_a?(Array) urls.each do |url| @logger.debug("Using input #{url} of type #{type}") input = LogStash::Inputs.from_url(url, type) { |event| receive(event) } input.logger = @logger input.register @inputs << input end end # each input if @config.include?("filters") filters = @config["filters"] filters.collect { |x| x.to_a[0] }.each do |filter| name, value = filter @logger.debug("Using filter #{name} => #{value.inspect}") filter = LogStash::Filters.from_name(name, value) filter.logger = @logger filter.register @filters << filter end # each filter end # if we have filters @config["outputs"].each do |url| @logger.debug("Using output #{url}") output = LogStash::Outputs.from_url(url) output.logger = @logger output.register @outputs << output end # each output # Register any signal handlers register_signal_handler end
register_signal_handler()
click to toggle source
# File lib/logstash/agent.rb, line 137 def register_signal_handler @sigchannel = EventMachine::Channel.new Signal.trap("USR1") do @sigchannel.push(:USR1) end Signal.trap("INT") do @sigchannel.push(:INT) end @sigchannel.subscribe do |msg| case msg when :USR1 counts = Hash.new { |h,k| h[k] = 0 } ObjectSpace.each_object do |obj| counts[obj.class] += 1 end @logger.info("SIGUSR1 received. Dumping state") @logger.info("#{self.class.name} config") @logger.info([" Inputs:", @inputs]) @logger.info([" Filters:", @filters]) @logger.info([" Outputs:", @outputs]) @logger.info("Dumping counts of objects by class") counts.sort { |a,b| a[1] <=> b[1] or a[0] <=> b[0] }.each do |key, value| @logger.info("Class: [#{value}] #{key}") end when :INT @logger.warn("SIGINT received. Shutting down.") EventMachine::stop_event_loop # TODO(sissel): Should have input/output/filter register shutdown # hooks. end # case msg end # @sigchannel.subscribe end
run() { || ... }
click to toggle source
# File lib/logstash/agent.rb, line 94 def run(&block) EventMachine.run do self.register yield if block_given? end # EventMachine.run end
stop()
click to toggle source
# File lib/logstash/agent.rb, line 102 def stop # TODO(sissel): Stop inputs, fluch outputs, wait for finish, # then stop the event loop EventMachine.stop_event_loop # EventMachine has no default way to indicate a 'stopping' state. $EVENTMACHINE_STOPPING = true end
Protected Instance Methods
filter(event)
click to toggle source
# File lib/logstash/agent.rb, line 112 def filter(event) @filters.each do |f| f.filter(event) break if event.cancelled? end end
output(event)
click to toggle source
# File lib/logstash/agent.rb, line 120 def output(event) @outputs.each do |o| o.receive(event) end # each output end
receive(event)
click to toggle source
Process a message
# File lib/logstash/agent.rb, line 128 def receive(event) filter(event) if !event.cancelled? output(event) end end