class Fluent::Plugin::EventCollectorFilter
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_event_collector.rb, line 35 def configure(conf) super @event_buffer = {} @event_buffer_lock = Mutex.new end
filter(tag, time, record)
click to toggle source
# File lib/fluent/plugin/filter_event_collector.rb, line 49 def filter(tag, time, record) # Pass through if we can't associate a record with a event return record unless record[event_key] @event_buffer_lock.synchronize do if record[end_tag_key] == end_tag_value # Complete event object and publish to rest of fluent chain publish_event(record) else # Merge in fluent's tag and time in case we need to manually emit later update_event(record.merge({'fd_tag' => tag, 'fd_time' => time})) # nil return halts the fluent event chain for this event nil end end end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_event_collector.rb, line 42 def start super # Set up event timeout timer_execute(:event_timeout, event_timeout, repeat: false, &method(:timeout_flush)) end
Private Instance Methods
emit_event(event, reason)
click to toggle source
# File lib/fluent/plugin/filter_event_collector.rb, line 121 def emit_event(event, reason) router.emit("#{event.delete('fd_tag')}.#{reason}", event.delete('fd_time'), event) end
merge_records(r1, r2)
click to toggle source
Merges record hashes where key fields are concatenated
# File lib/fluent/plugin/filter_event_collector.rb, line 89 def merge_records(r1, r2) r1.merge!(r2) do |k, v1, v2| if merge_fields.include?(k) "#{v1}#{merge_field_delimeter}#{v2}" else v2 end end end
publish_event(record)
click to toggle source
Return a full formed event
# File lib/fluent/plugin/filter_event_collector.rb, line 70 def publish_event(record) return record unless event = @event_buffer.delete(record[event_key]) # Remove any fluent fields we added merge_records(event, record).delete_if { |k, _| k =~ /\Afd_/ } end
timeout_events(timeout_buffer)
click to toggle source
Manually emit a fluent event for all timed out events
# File lib/fluent/plugin/filter_event_collector.rb, line 115 def timeout_events(timeout_buffer) timeout_buffer.each do |event_id| emit_event(@event_buffer.delete(event_id), 'event_timeout') end end
timeout_flush()
click to toggle source
event timeout callback
# File lib/fluent/plugin/filter_event_collector.rb, line 100 def timeout_flush now = Time.now.to_i # Snapshot the event buffer and timeout a event if it hasn't been updated in 30s timeout_buffer = @event_buffer.dup.each_with_object([]) do |(id, event), buffer| buffer << id if (now - event['fd_time'].to_i) > event_timeout end @event_buffer_lock.synchronize { timeout_events(timeout_buffer) } if timeout_buffer.any? # To ensure we don't run more than one timer at a time, enqueue at the end timer_execute(:event_timeout, event_timeout, repeat: false, &method(:timeout_flush)) end
update_event(record)
click to toggle source
Initializes or merges record hashes into a event
# File lib/fluent/plugin/filter_event_collector.rb, line 78 def update_event(record) event_id = record[event_key] if event = @event_buffer[event_id] merge_records(event, record) else @event_buffer[event_id] = record end end