class LogStash::Filters::Ciseipdb
Search elasticsearch for matching IPs in Elasticsearch IP database indexes i and add that information into events.
Caches matching IPs in redis.
Example:
ciseipdb {
hosts => [ "elasticsearch" ] indexes => [ "ipdatabase" ] ipaddress => "%{ip_dst}" target => "dst_info"
}
Public Instance Methods
check_redis(ip)
click to toggle source
# File lib/logstash/filters/ciseipdb.rb, line 152 def check_redis(ip) begin output = @redis.get(ip) if output.nil? output else eval(output) end rescue => e @logger.warn("Problem getting key from redis", :ip => ip, :error => e) end end
filter(event)
click to toggle source
# File lib/logstash/filters/ciseipdb.rb, line 82 def filter(event) ipaddress = event.sprintf(@ipaddress) # Check ip address in redis data = check_redis(ipaddress) # IP not in redis, lookup elasticsearch, add to redis if data.nil? data = search(ipaddress) update_redis(ipaddress, data) end # Update event data.each_pair do |k,v| targetname = "#{@target}_#{k}" event[targetname] = v end filter_matched(event) end
register()
click to toggle source
# File lib/logstash/filters/ciseipdb.rb, line 54 def register require "elasticsearch" require "redis" transport_options = {} if @user && @password token = Base64.strict_encode64("#{@user}:#{@password.value}") transport_options[:headers] = { Authorization: "Basic #{token}" } end hosts = if @ssl then @hosts.map {|h| { host: h, scheme: 'https' } } else @hosts end if @ssl && @ca_file transport_options[:ssl] = { ca_file: @ca_file } end @logger.info("New CISE IPDB filter", :hosts => hosts) @client = Elasticsearch::Client.new hosts: hosts, transport_options: transport_options @redis = Redis.new(:host => redis_host) end
search(ip)
click to toggle source
# File lib/logstash/filters/ciseipdb.rb, line 104 def search(ip) output = Hash.new begin query = { query: { filtered: { filter: { and: [ { term: { IPADDRESS: ip } }, { range: { "@timestamp" => { gte: "now-1d/d", lt: "now" } } } ] } } } } results = @client.search index: @indexes, body: query if results['hits']['total'] >= 1 output['databases'] = Array.new output['reputation_score'] = 0 results['hits']['hits'].each do |hit| output['databases'] << hit['_source']['database']['shortname'] output['reputation_score'] += hit['_source']['database']['reputation_score'].to_i # Extra data from nipap if hit['_source']['database']['shortname'] == 'nipap' output['service_slug'] = hit['_source']['service_slug'] output['description'] = hit['_source']['description'] output['router'] = hit['_source']['router'] end # Extra data for pools if hit['_source']['database']['shortname'] == 'pool' output['pool_name'] = hit['_source']['pool_name'] output['pool_description'] = hit['_source']['pool_description'] end end end rescue => e @logger.debug("No hits for ipaddresses", :query => query, :error => e) end #begin..rescue output end
update_redis(ip, data)
click to toggle source
# File lib/logstash/filters/ciseipdb.rb, line 165 def update_redis(ip, data) begin @redis.set(ip, data) @redis.expire(ip, @redis_ttl) rescue => e @logger.warn("Problem updating redis", :ip => ip, :data => data , :error => e) end end