class ActiveSupport::Notifications::Fanout

This is a default queue implementation that ships with Notifications. It just pushes events to all registered log subscribers.

This class is thread safe. All methods are reentrant.

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/active_support/notifications/fanout.rb, line 15
def initialize
  @string_subscribers = Hash.new { |h, k| h[k] = [] }
  @other_subscribers = []
  @listeners_for = Concurrent::Map.new
  super
end

Public Instance Methods

finish(name, id, payload, listeners = listeners_for(name)) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 60
def finish(name, id, payload, listeners = listeners_for(name))
  listeners.each { |s| s.finish(name, id, payload) }
end
listeners_for(name) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 68
def listeners_for(name)
  # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)
  @listeners_for[name] || synchronize do
    # use synchronisation when accessing @subscribers
    @listeners_for[name] ||=
      @string_subscribers[name] + @other_subscribers.select { |s| s.subscribed_to?(name) }
  end
end
listening?(name) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 77
def listening?(name)
  listeners_for(name).any?
end
publish(name, *args) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 64
def publish(name, *args)
  listeners_for(name).each { |s| s.publish(name, *args) }
end
start(name, id, payload) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 56
def start(name, id, payload)
  listeners_for(name).each { |s| s.start(name, id, payload) }
end
subscribe(pattern = nil, callable = nil, &block) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 22
def subscribe(pattern = nil, callable = nil, &block)
  subscriber = Subscribers.new(pattern, callable || block)
  synchronize do
    if String === pattern
      @string_subscribers[pattern] << subscriber
      @listeners_for.delete(pattern)
    else
      @other_subscribers << subscriber
      @listeners_for.clear
    end
  end
  subscriber
end
unsubscribe(subscriber_or_name) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 36
def unsubscribe(subscriber_or_name)
  synchronize do
    case subscriber_or_name
    when String
      @string_subscribers[subscriber_or_name].clear
      @listeners_for.delete(subscriber_or_name)
      @other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) }
    else
      pattern = subscriber_or_name.try(:pattern)
      if String === pattern
        @string_subscribers[pattern].delete(subscriber_or_name)
        @listeners_for.delete(pattern)
      else
        @other_subscribers.delete(subscriber_or_name)
        @listeners_for.clear
      end
    end
  end
end
wait() click to toggle source

This is a sync queue, so there is no waiting.

# File lib/active_support/notifications/fanout.rb, line 82
def wait
end