class LogStash::Inputs::CrowdStrikeFDR
Get logs from AWS s3 buckets as issued by an object-created event via sqs.
This plugin is based on the logstash-input-sqs plugin but doesn't log the sqs event itself. Instead it assumes, that the event is an s3 object-created event and will then download and process the given file.
Some issues of logstash-input-sqs, like logstash not shutting down properly, have been fixed for this plugin.
In contrast to logstash-input-sqs this plugin uses the “Receive Message Wait Time” configured for the sqs queue in question, a good value will be something like 10 seconds to ensure a reasonable shutdown time of logstash. Also use a “Default Visibility Timeout” that is high enough for log files to be downloaded and processed (I think a good value should be 5-10 minutes for most use cases), the plugin will avoid removing the event from the queue if the associated log file couldn't be correctly passed to the processing level of logstash (e.g. downloaded content size doesn't match sqs event).
This plugin is meant for high availability setups, in contrast to logstash-input-s3 you can safely use multiple logstash nodes, since the usage of sqs will ensure that each logfile is processed only once and no file will get lost on node failure or downscaling for auto-scaling groups. (You should use a “Message Retention Period” >= 4 days for your sqs to ensure you can survive a weekend of faulty log file processing) The plugin will not delete objects from s3 buckets, so make sure to have a reasonable “Lifecycle” configured for your buckets, which should keep the files at least “Message Retention Period” days.
A typical setup will contain some s3 buckets containing elb, cloudtrail or other log files. These will be configured to send object-created events to a sqs queue, which will be configured as the source queue for this plugin. (The plugin supports gzipped content if it is marked with “contend-encoding: gzip” as it is the case for cloudtrail logs)
The logstash node therefore must have sqs permissions + the permissions to download objects from the s3 buckets that send events to the queue. (If logstash nodes are running on EC2 you should use a ServerRole to provide permissions)
- source,json
-
{
"Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "sqs:Get*", "sqs:List*", "sqs:ReceiveMessage", "sqs:ChangeMessageVisibility*", "sqs:DeleteMessage*" ], "Resource": [ "arn:aws:sqs:us-east-1:123456789012:my-elb-log-queue" ] }, { "Effect": "Allow", "Action": [ "s3:Get*", "s3:List*", "s3:DeleteObject" ], "Resource": [ "arn:aws:s3:::my-elb-logs", "arn:aws:s3:::my-elb-logs/*" ] } ]
}
Public Instance Methods
initialisation
# File lib/logstash/inputs/crowdstrike_fdr.rb, line 159 def register # prepare system FileUtils.mkdir_p(@temporary_directory) unless Dir.exist?(@temporary_directory) @id ||= "Unknown" #Use INPUT{ id => name} for thread identifier @credentials_by_bucket = hash_key_is_regex({}) @region_by_bucket = hash_key_is_regex({}) # create the bucket=>folder=>codec lookup from config options @codec_by_folder = hash_key_is_regex({}) @type_by_folder = hash_key_is_regex({}) # use deprecated settings only if new config is missing: if @s3_options_by_bucket.nil? # We don't know any bucket name, so we must rely on a "catch-all" regex s3_options = { 'bucket_name' => '.*', 'folders' => @set_codec_by_folder.map { |key, codec| { 'key' => key, 'codec' => codec } } } if @s3_role_arn.nil? # access key/secret key pair needed unless @s3_access_key_id.nil? or @s3_secret_access_key.nil? s3_options['credentials'] = { 'access_key_id' => @s3_access_key_id, 'secret_access_key' => @s3_secret_access_key } end else s3_options['credentials'] = { 'role' => @s3_role_arn } end @s3_options_by_bucket = [s3_options] end @s3_options_by_bucket.each do |options| bucket = options['bucket_name'] if options.key?('credentials') @credentials_by_bucket[bucket] = options['credentials'] end if options.key?('region') @region_by_bucket[bucket] = options['region'] end if options.key?('folders') # make these hashes do key lookups using regex matching folders = hash_key_is_regex({}) types = hash_key_is_regex({}) options['folders'].each do |entry| @logger.debug("options for folder ", :folder => entry) folders[entry['key']] = entry['codec'] if entry.key?('codec') types[entry['key']] = entry['type'] if entry.key?('type') end @codec_by_folder[bucket] = folders unless folders.empty? @type_by_folder[bucket] = types unless types.empty? end end @received_stop = Concurrent::AtomicBoolean.new(false) # instantiate helpers @sqs_poller = SqsPoller.new(@logger, @received_stop, { visibility_timeout: @visibility_timeout, skip_delete: @sqs_skip_delete, wait_time_seconds: @sqs_wait_time_seconds }, { sqs_queue: @queue, queue_owner_aws_account_id: @queue_owner_aws_account_id, from_sns: @from_sns, max_processing_time: @max_processing_time, sqs_delete_on_failure: @sqs_delete_on_failure }, aws_options_hash) @s3_client_factory = S3ClientFactory.new(@logger, { aws_region: @region, s3_default_options: @s3_default_options, s3_credentials_by_bucket: @credentials_by_bucket, s3_region_by_bucket: @region_by_bucket, s3_role_session_name: @s3_role_session_name }, aws_options_hash) @s3_downloader = S3Downloader.new(@logger, @received_stop, { s3_client_factory: @s3_client_factory, delete_on_success: @delete_on_success, include_object_properties: @include_object_properties }) @codec_factory = CodecFactory.new(@logger, { default_codec: @codec, codec_by_folder: @codec_by_folder }) #@log_processor = LogProcessor.new(self) # administrative stuff @worker_threads = [] end
startup
# File lib/logstash/inputs/crowdstrike_fdr.rb, line 256 def run(logstash_event_queue) @control_threads = @consumer_threads.times.map do |thread_id| Thread.new do restart_count = 0 while not stop? #make thead start async to prevent polling the same message from sqs sleep 0.5 worker_thread = run_worker_thread(logstash_event_queue, thread_id) worker_thread.join restart_count += 1 thread_id = "#{thread_id}_#{restart_count}" @logger.info("[control_thread] restarting a thread #{thread_id}... ", :thread => worker_thread.inspect) end end end @control_threads.each { |t| t.join } end
shutdown
# File lib/logstash/inputs/crowdstrike_fdr.rb, line 275 def stop @received_stop.make_true unless @worker_threads.nil? @worker_threads.each do |worker| begin @logger.info("Stopping thread ... ", :thread => worker.inspect) worker.wakeup rescue @logger.error("Cannot stop thread ... try to kill him", :thread => worker.inspect) worker.kill end end end end
# File lib/logstash/inputs/crowdstrike_fdr.rb, line 291 def stop? @received_stop.value end
Private Instance Methods
Will be removed in further releases:
# File lib/logstash/inputs/crowdstrike_fdr.rb, line 321 def get_object_folder(key) if match = /#{s3_key_prefix}\/?(?<type_folder>.*?)\/.*/.match(key) return match['type_folder'] else return "" end end
# File lib/logstash/inputs/crowdstrike_fdr.rb, line 329 def hash_key_is_regex(myhash) myhash.default_proc = lambda do |hash, lookup| result = nil hash.each_pair do |key, value| if %r[#{key}] =~ lookup result = value break end end result end # return input hash (convenience) return myhash end
— END plugin interface ——————————————#
# File lib/logstash/inputs/crowdstrike_fdr.rb, line 298 def run_worker_thread(queue, thread_id) Thread.new do LogStash::Util.set_thread_name("Worker #{@id}/#{thread_id}") @logger.info("[#{Thread.current[:name]}] started (#{Time.now})") #PROFILING temporary_directory = Dir.mktmpdir("#{@temporary_directory}/") @sqs_poller.run do |record| throw :skip_delete if stop? # record is a valid object with the keys ":bucket", ":key", ":size" record[:local_file] = File.join(temporary_directory, File.basename(record[:key])) if @s3_downloader.copy_s3object_to_disk(record) completed = catch(:skip_delete) do process(record, queue) end @s3_downloader.cleanup_local_object(record) # re-throw if necessary: throw :skip_delete unless completed @s3_downloader.cleanup_s3object(record) end end end end