class Sidekiq::EventBus::Adapters::Buffered

Public Class Methods

new(adapter: Sidekiq::EventBus::Adapters::Default.new) click to toggle source
# File lib/sidekiq/event_bus/adapters/buffered.rb, line 4
def initialize adapter: Sidekiq::EventBus::Adapters::Default.new
  @adapter  = adapter
  @bufferes = Concurrent::Map.new
end

Public Instance Methods

buffer!() click to toggle source
# File lib/sidekiq/event_bus/adapters/buffered.rb, line 36
def buffer!
  @bufferes[buffer_key] ||= Array.new
end
buffer_key() click to toggle source
# File lib/sidekiq/event_bus/adapters/buffered.rb, line 50
def buffer_key
  Thread.current.object_id
end
buffered() { || ... } click to toggle source
# File lib/sidekiq/event_bus/adapters/buffered.rb, line 18
def buffered
  buffer!
  begin
    yield
    flush!
  ensure
    clear!
  end
end
clear!() click to toggle source
# File lib/sidekiq/event_bus/adapters/buffered.rb, line 46
def clear!
  @bufferes.delete(buffer_key)
end
event_buffer() click to toggle source
# File lib/sidekiq/event_bus/adapters/buffered.rb, line 28
def event_buffer
  @bufferes[buffer_key] ||= Array.new
end
flush!() click to toggle source
# File lib/sidekiq/event_bus/adapters/buffered.rb, line 40
def flush!
  event_buffer.map do |event, payload|
    @adapter.push(event, payload)
  end
end
is_buffered?() click to toggle source
# File lib/sidekiq/event_bus/adapters/buffered.rb, line 32
def is_buffered?
  @bufferes.key?(buffer_key)
end
push(event, payload) click to toggle source
# File lib/sidekiq/event_bus/adapters/buffered.rb, line 9
def push event, payload
  if is_buffered?
    event_buffer << [ event, payload ]
    nil
  else
    @adapter.push(event, payload)
  end
end