class Kraftwerk::Telemetry

Based on

Constants

HandlerIdAlreadyUsed

Public Class Methods

new() click to toggle source
# File lib/kraftwerk/telemetry.rb, line 10
def initialize
  @handlers = Concurrent::Map.new
  @used_ids = Concurrent::Map.new
  @semaphore = Concurrent::Semaphore.new(1)
end

Public Instance Methods

attach(id, event_key, &handler) click to toggle source
# File lib/kraftwerk/telemetry.rb, line 16
def attach(id, event_key, &handler)
  raise HandlerIdAlreadyUsed.new(id) if @used_ids.key?(id)

  @handlers[event_key] ||= Concurrent::Array.new
  @handlers[event_key] << { id: id, handler: handler }
  sync_used_ids_cache
end
detach(id) click to toggle source
# File lib/kraftwerk/telemetry.rb, line 24
def detach(id)
  event_key = @used_ids[id]
  @handlers[event_key].delete_if { |handler| handler[:id] == id }
  sync_used_ids_cache
end
execute(event_key, values = {}, meta = {}) click to toggle source
# File lib/kraftwerk/telemetry.rb, line 30
def execute(event_key, values = {}, meta = {})
  handlers = @handlers[event_key]
  return if handlers.nil?

  handlers.each do |handler|
    handler[:handler].call(values, {_event_key: event_key}.merge(meta))
  end
end

Private Instance Methods

sync_used_ids_cache() click to toggle source
# File lib/kraftwerk/telemetry.rb, line 41
def sync_used_ids_cache
  ids = Concurrent::Map.new
  @semaphore.acquire
  @handlers.each do |key, handlers|
    handlers.each do |handler|
      id = handler[:id]
      ids[id] = key
    end
  end

  @used_ids = ids
ensure
  @semaphore.release
end