class LogStash::Codecs::IdentityMapCodec
Constants
- CLEANER_INTERVAL
time that the cleaner thread sleeps for before it tries to clean out stale mappings
- EVICT_TIMEOUT
time after which a stream is considered stale each time a stream is accessed it is given a new timeout
- MAX_IDENTITIES
maximum size of the mapping hash
Attributes
auto_flusher[RW]
base_codec[RW]
cleaner[RW]
identity_map[R]
Public Class Methods
new(codec)
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 108 def initialize(codec) @base_codec = codec @base_codecs = [codec] @identity_map = ThreadSafe::Hash.new &method(:codec_builder) @max_identities = MAX_IDENTITIES @evict_timeout = EVICT_TIMEOUT cleaner_interval(CLEANER_INTERVAL) if codec.respond_to?(:use_mapper_auto_flush) && (@auto_flush_interval = codec.use_mapper_auto_flush) @auto_flusher = PeriodicRunner.new(self, 0.5, :auto_flush_mapped) else @auto_flusher = NoopRunner.new end @decode_block = lambda {|*| true } @eviction_block = nil end
Public Instance Methods
accept(listener)
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 176 def accept(listener) stream_codec(listener.path).accept(listener) end
all_codecs()
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 232 def all_codecs no_streams? ? @base_codecs : identity_map.values.map(&:codec) end
auto_flush_mapped()
click to toggle source
end Codec API
¶ ↑
# File lib/logstash/codecs/identity_map_codec.rb, line 210 def auto_flush_mapped if !identity_count.zero? nowf = Time.now.to_f identity_map.each do |identity, compo| next if compo.auto_flush_timeout.zero? next unless nowf > compo.auto_flush_timeout compo.codec.auto_flush # at eof (tail and read) no more lines for a while or ever # so reset compo.auto_flush_timeout compo.auto_flush_timeout = 0 end end end
cleaner_interval(interval)
click to toggle source
used to add a non-default cleaner interval
# File lib/logstash/codecs/identity_map_codec.rb, line 143 def cleaner_interval(interval) @cleaner.stop if @cleaner @cleaner = PeriodicRunner.new(self, interval.to_i, :map_cleanup) self end
close()
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 202 def close() cleaner.stop auto_flusher.stop all_codecs.each(&:close) end
codec_without_usage_update(identity)
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 283 def codec_without_usage_update(identity) find_codec_value(identity).codec end
current_size_and_limit()
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 274 def current_size_and_limit [identity_count, max_limit] end
decode(data, identity = nil, &block)
click to toggle source
¶ ↑
Codec API
# File lib/logstash/codecs/identity_map_codec.rb, line 171 def decode(data, identity = nil, &block) @decode_block = block if @decode_block != block stream_codec(identity).decode(data, &block) end
Also aliased as: <<
encode(event, identity = nil)
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 182 def encode(event, identity = nil) stream_codec(identity).encode(event) end
evict(identity)
click to toggle source
¶ ↑
IdentityMapCodec
API
# File lib/logstash/codecs/identity_map_codec.rb, line 160 def evict(identity) # maybe called more than once if (compo = identity_map.delete(identity)) compo.codec.auto_flush if compo.codec.respond_to?(:auto_flush) end end
evict_flush(codec)
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 263 def evict_flush(codec) if codec.respond_to?(:auto_flush) codec.auto_flush else if (block = @eviction_block || @decode_block) codec.flush(&block) end # all else - can't do anything end end
evict_timeout(timeout)
click to toggle source
used to add a non-default evict timeout
# File lib/logstash/codecs/identity_map_codec.rb, line 137 def evict_timeout(timeout) @evict_timeout = timeout.to_i self end
eviction_block(block)
click to toggle source
used to add a non-default eviction block
# File lib/logstash/codecs/identity_map_codec.rb, line 150 def eviction_block(block) @eviction_block = block self end
eviction_timestamp_for(identity)
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 287 def eviction_timestamp_for(identity) find_codec_value(identity).eviction_timeout end
flush(&block)
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 186 def flush(&block) all_codecs.each do |codec| #let ruby do its default args thing if block_given? codec.flush(&block) else if codec.respond_to?(:auto_flush) codec.auto_flush else #try this, no guarantees codec.flush end end end end
flush_mapped(listener)
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 224 def flush_mapped(listener) listener_has_path = listener.respond_to?(:path) identity_map.each do |identity, compo| listener.path = identity if listener_has_path compo.codec.auto_flush(listener) end end
identity_count()
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 240 def identity_count identity_map.size end
logger()
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 278 def logger # we 'borrow' the codec's logger as we don't have our own @base_codec.logger end
map_cleanup()
click to toggle source
support cleaning of stale stream/codecs a stream is considered stale if it has not been accessed in the last @evict_timeout period (default 1 hour)
# File lib/logstash/codecs/identity_map_codec.rb, line 248 def map_cleanup if !identity_count.zero? nowi = Time.now.to_i # delete_if is atomic # contents should not mutate during this call identity_map.delete_if do |identity, compo| if (flag = compo.eviction_timeout <= nowi) evict_flush(compo.codec) end flag end end current_size_and_limit end
max_identities(max)
click to toggle source
max_limit()
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 236 def max_limit @max_identities end
Private Instance Methods
auto_flush_timestamp(now = Time.now)
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 316 def auto_flush_timestamp(now = Time.now) now.to_f + @auto_flush_interval.to_f end
check_map_limits()
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 324 def check_map_limits UpperLimitReached.visit(self) EightyPercentWarning.visit(self) end
codec_builder(hash, k)
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 329 def codec_builder(hash, k) codec = hash.empty? ? @base_codec : @base_codec.clone codec.use_mapper_auto_flush if using_mapped_auto_flush? compo = CodecValue.new(codec).tap do |o| now = Time.now o.eviction_timeout = eviction_timestamp(now) o.auto_flush_timeout = auto_flush_timestamp(now) end hash.store(k, compo) end
eviction_timestamp(now = Time.now)
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 320 def eviction_timestamp(now = Time.now) now.to_i + @evict_timeout end
find_codec_value(identity)
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 298 def find_codec_value(identity) identity_map[identity] end
no_streams?()
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 340 def no_streams? identity_map.empty? end
record_codec_usage(identity)
click to toggle source
for nil stream this method is not called
# File lib/logstash/codecs/identity_map_codec.rb, line 303 def record_codec_usage(identity) check_map_limits # only start the cleaner if streams are in use # continuous calls to start are OK cleaner.start auto_flusher.start compo = find_codec_value(identity) now = Time.now compo.eviction_timeout = eviction_timestamp(now) compo.auto_flush_timeout = auto_flush_timestamp(now) compo.codec end
stream_codec(identity)
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 293 def stream_codec(identity) return base_codec if identity.nil? record_codec_usage(identity) # returns codec end
using_mapped_auto_flush?()
click to toggle source
# File lib/logstash/codecs/identity_map_codec.rb, line 344 def using_mapped_auto_flush? !@auto_flush_interval.nil? end