class Fluent::Plugin::EventCollectorFilter

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_event_collector.rb, line 35
def configure(conf)
  super

  @event_buffer = {}
  @event_buffer_lock = Mutex.new
end
filter(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_event_collector.rb, line 49
def filter(tag, time, record)
  # Pass through if we can't associate a record with a event
  return record unless record[event_key]
  
  @event_buffer_lock.synchronize do
    if record[end_tag_key] == end_tag_value
      # Complete event object and publish to rest of fluent chain
      publish_event(record)
    else
      # Merge in fluent's tag and time in case we need to manually emit later
      update_event(record.merge({'fd_tag' => tag, 'fd_time' => time}))

      # nil return halts the fluent event chain for this event
      nil
    end
  end
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_event_collector.rb, line 42
def start
  super

  # Set up event timeout
  timer_execute(:event_timeout, event_timeout, repeat: false, &method(:timeout_flush))
end

Private Instance Methods

emit_event(event, reason) click to toggle source
# File lib/fluent/plugin/filter_event_collector.rb, line 121
def emit_event(event, reason)
  router.emit("#{event.delete('fd_tag')}.#{reason}", event.delete('fd_time'), event)
end
merge_records(r1, r2) click to toggle source

Merges record hashes where key fields are concatenated

# File lib/fluent/plugin/filter_event_collector.rb, line 89
def merge_records(r1, r2)
  r1.merge!(r2) do |k, v1, v2|
    if merge_fields.include?(k)
    "#{v1}#{merge_field_delimeter}#{v2}"
    else
      v2
    end
  end
end
publish_event(record) click to toggle source

Return a full formed event

# File lib/fluent/plugin/filter_event_collector.rb, line 70
def publish_event(record)
  return record unless event = @event_buffer.delete(record[event_key])

  # Remove any fluent fields we added
  merge_records(event, record).delete_if { |k, _| k =~ /\Afd_/ }
end
timeout_events(timeout_buffer) click to toggle source

Manually emit a fluent event for all timed out events

# File lib/fluent/plugin/filter_event_collector.rb, line 115
def timeout_events(timeout_buffer)
  timeout_buffer.each do |event_id|
    emit_event(@event_buffer.delete(event_id), 'event_timeout')
  end
end
timeout_flush() click to toggle source

event timeout callback

# File lib/fluent/plugin/filter_event_collector.rb, line 100
def timeout_flush
  now = Time.now.to_i

  # Snapshot the event buffer and timeout a event if it hasn't been updated in 30s
  timeout_buffer = @event_buffer.dup.each_with_object([]) do |(id, event), buffer|
    buffer << id if (now - event['fd_time'].to_i) > event_timeout
  end

  @event_buffer_lock.synchronize { timeout_events(timeout_buffer) } if timeout_buffer.any?

  # To ensure we don't run more than one timer at a time, enqueue at the end
  timer_execute(:event_timeout, event_timeout, repeat: false, &method(:timeout_flush))
end
update_event(record) click to toggle source

Initializes or merges record hashes into a event

# File lib/fluent/plugin/filter_event_collector.rb, line 78
def update_event(record)
  event_id = record[event_key]

  if event = @event_buffer[event_id]
    merge_records(event, record)
  else
    @event_buffer[event_id] = record
  end
end