class LogStash::Codecs::IdentityMapCodec

Constants

CLEANER_INTERVAL

time that the cleaner thread sleeps for before it tries to clean out stale mappings

EVICT_TIMEOUT

time after which a stream is considered stale each time a stream is accessed it is given a new timeout

MAX_IDENTITIES

maximum size of the mapping hash

Attributes

auto_flusher[RW]
base_codec[RW]
cleaner[RW]
identity_map[R]

Public Class Methods

new(codec) click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 108
def initialize(codec)
  @base_codec = codec
  @base_codecs = [codec]
  @identity_map = ThreadSafe::Hash.new &method(:codec_builder)
  @max_identities = MAX_IDENTITIES
  @evict_timeout = EVICT_TIMEOUT
  cleaner_interval(CLEANER_INTERVAL)
  if codec.respond_to?(:use_mapper_auto_flush) &&
      (@auto_flush_interval = codec.use_mapper_auto_flush)
    @auto_flusher = PeriodicRunner.new(self, 0.5, :auto_flush_mapped)
  else
    @auto_flusher = NoopRunner.new
  end

  @decode_block = lambda {|*| true }
  @eviction_block = nil
end

Public Instance Methods

<<(data, identity = nil, &block)
Alias for: decode
accept(listener) click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 176
def accept(listener)
  stream_codec(listener.path).accept(listener)
end
all_codecs() click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 232
def all_codecs
  no_streams? ? @base_codecs : identity_map.values.map(&:codec)
end
auto_flush_mapped() click to toggle source

end Codec API

# File lib/logstash/codecs/identity_map_codec.rb, line 210
def auto_flush_mapped
  if !identity_count.zero?
    nowf = Time.now.to_f
    identity_map.each do |identity, compo|
      next if compo.auto_flush_timeout.zero?
      next unless nowf > compo.auto_flush_timeout
      compo.codec.auto_flush
      # at eof (tail and read) no more lines for a while or ever
      # so reset compo.auto_flush_timeout
      compo.auto_flush_timeout = 0
    end
  end
end
cleaner_interval(interval) click to toggle source

used to add a non-default cleaner interval

# File lib/logstash/codecs/identity_map_codec.rb, line 143
def cleaner_interval(interval)
  @cleaner.stop if @cleaner
  @cleaner = PeriodicRunner.new(self, interval.to_i, :map_cleanup)
  self
end
close() click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 202
def close()
  cleaner.stop
  auto_flusher.stop
  all_codecs.each(&:close)
end
codec_without_usage_update(identity) click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 283
def codec_without_usage_update(identity)
  find_codec_value(identity).codec
end
current_size_and_limit() click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 274
def current_size_and_limit
  [identity_count, max_limit]
end
decode(data, identity = nil, &block) click to toggle source

Codec API

# File lib/logstash/codecs/identity_map_codec.rb, line 171
def decode(data, identity = nil, &block)
  @decode_block = block if @decode_block != block
  stream_codec(identity).decode(data, &block)
end
Also aliased as: <<
encode(event, identity = nil) click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 182
def encode(event, identity = nil)
  stream_codec(identity).encode(event)
end
evict(identity) click to toggle source

IdentityMapCodec API

# File lib/logstash/codecs/identity_map_codec.rb, line 160
def evict(identity)
  # maybe called more than once
  if (compo = identity_map.delete(identity))
    compo.codec.auto_flush if compo.codec.respond_to?(:auto_flush)
  end
end
evict_flush(codec) click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 263
def evict_flush(codec)
  if codec.respond_to?(:auto_flush)
    codec.auto_flush
  else
    if (block = @eviction_block || @decode_block)
      codec.flush(&block)
    end
    # all else - can't do anything
  end
end
evict_timeout(timeout) click to toggle source

used to add a non-default evict timeout

# File lib/logstash/codecs/identity_map_codec.rb, line 137
def evict_timeout(timeout)
  @evict_timeout = timeout.to_i
  self
end
eviction_block(block) click to toggle source

used to add a non-default eviction block

# File lib/logstash/codecs/identity_map_codec.rb, line 150
def eviction_block(block)
  @eviction_block = block
  self
end
eviction_timestamp_for(identity) click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 287
def eviction_timestamp_for(identity)
  find_codec_value(identity).eviction_timeout
end
flush(&block) click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 186
def flush(&block)
  all_codecs.each do |codec|
    #let ruby do its default args thing
    if block_given?
      codec.flush(&block)
    else
      if codec.respond_to?(:auto_flush)
        codec.auto_flush
      else
        #try this, no guarantees
        codec.flush
      end
    end
  end
end
flush_mapped(listener) click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 224
def flush_mapped(listener)
  listener_has_path = listener.respond_to?(:path)
  identity_map.each do |identity, compo|
    listener.path = identity if listener_has_path
    compo.codec.auto_flush(listener)
  end
end
identity_count() click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 240
def identity_count
  identity_map.size
end
logger() click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 278
def logger
  # we 'borrow' the codec's logger as we don't have our own
  @base_codec.logger
end
map_cleanup() click to toggle source

support cleaning of stale stream/codecs a stream is considered stale if it has not been accessed in the last @evict_timeout period (default 1 hour)

# File lib/logstash/codecs/identity_map_codec.rb, line 248
def map_cleanup
  if !identity_count.zero?
    nowi = Time.now.to_i
    # delete_if is atomic
    # contents should not mutate during this call
    identity_map.delete_if do |identity, compo|
      if (flag = compo.eviction_timeout <= nowi)
        evict_flush(compo.codec)
      end
      flag
    end
  end
  current_size_and_limit
end
max_identities(max) click to toggle source

Constructional/builder methods chain this method off of new

used to add a non-default maximum identities

# File lib/logstash/codecs/identity_map_codec.rb, line 131
def max_identities(max)
  @max_identities = max.to_i
  self
end
max_limit() click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 236
def max_limit
  @max_identities
end

Private Instance Methods

auto_flush_timestamp(now = Time.now) click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 316
def auto_flush_timestamp(now = Time.now)
  now.to_f + @auto_flush_interval.to_f
end
check_map_limits() click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 324
def check_map_limits
  UpperLimitReached.visit(self)
  EightyPercentWarning.visit(self)
end
codec_builder(hash, k) click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 329
def codec_builder(hash, k)
  codec = hash.empty? ? @base_codec : @base_codec.clone
  codec.use_mapper_auto_flush if using_mapped_auto_flush?
  compo = CodecValue.new(codec).tap do |o|
    now = Time.now
    o.eviction_timeout = eviction_timestamp(now)
    o.auto_flush_timeout = auto_flush_timestamp(now)
  end
  hash.store(k, compo)
end
eviction_timestamp(now = Time.now) click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 320
def eviction_timestamp(now = Time.now)
  now.to_i + @evict_timeout
end
find_codec_value(identity) click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 298
def find_codec_value(identity)
  identity_map[identity]
end
no_streams?() click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 340
def no_streams?
  identity_map.empty?
end
record_codec_usage(identity) click to toggle source

for nil stream this method is not called

# File lib/logstash/codecs/identity_map_codec.rb, line 303
def record_codec_usage(identity)
  check_map_limits
  # only start the cleaner if streams are in use
  # continuous calls to start are OK
  cleaner.start
  auto_flusher.start
  compo = find_codec_value(identity)
  now = Time.now
  compo.eviction_timeout = eviction_timestamp(now)
  compo.auto_flush_timeout = auto_flush_timestamp(now)
  compo.codec
end
stream_codec(identity) click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 293
def stream_codec(identity)
  return base_codec if identity.nil?
  record_codec_usage(identity) # returns codec
end
using_mapped_auto_flush?() click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 344
def using_mapped_auto_flush?
  !@auto_flush_interval.nil?
end