class Sidekiq::EventBus::Adapters::Buffered
Public Class Methods
new(adapter: Sidekiq::EventBus::Adapters::Default.new)
click to toggle source
# File lib/sidekiq/event_bus/adapters/buffered.rb, line 4 def initialize adapter: Sidekiq::EventBus::Adapters::Default.new @adapter = adapter @bufferes = Concurrent::Map.new end
Public Instance Methods
buffer!()
click to toggle source
# File lib/sidekiq/event_bus/adapters/buffered.rb, line 36 def buffer! @bufferes[buffer_key] ||= Array.new end
buffer_key()
click to toggle source
# File lib/sidekiq/event_bus/adapters/buffered.rb, line 50 def buffer_key Thread.current.object_id end
buffered() { || ... }
click to toggle source
# File lib/sidekiq/event_bus/adapters/buffered.rb, line 18 def buffered buffer! begin yield flush! ensure clear! end end
clear!()
click to toggle source
# File lib/sidekiq/event_bus/adapters/buffered.rb, line 46 def clear! @bufferes.delete(buffer_key) end
event_buffer()
click to toggle source
# File lib/sidekiq/event_bus/adapters/buffered.rb, line 28 def event_buffer @bufferes[buffer_key] ||= Array.new end
flush!()
click to toggle source
# File lib/sidekiq/event_bus/adapters/buffered.rb, line 40 def flush! event_buffer.map do |event, payload| @adapter.push(event, payload) end end
is_buffered?()
click to toggle source
# File lib/sidekiq/event_bus/adapters/buffered.rb, line 32 def is_buffered? @bufferes.key?(buffer_key) end
push(event, payload)
click to toggle source
# File lib/sidekiq/event_bus/adapters/buffered.rb, line 9 def push event, payload if is_buffered? event_buffer << [ event, payload ] nil else @adapter.push(event, payload) end end