class ActiveSupport::Notifications::Fanout

This is a default queue implementation that ships with Notifications. It just pushes events to all registered log subscribers.

This class is thread safe. All methods are reentrant.

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/active_support/notifications/fanout.rb, line 27
def initialize
  @string_subscribers = Hash.new { |h, k| h[k] = [] }
  @other_subscribers = []
  @listeners_for = Concurrent::Map.new
  super
end

Public Instance Methods

finish(name, id, payload, listeners = listeners_for(name)) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 75
def finish(name, id, payload, listeners = listeners_for(name))
  iterate_guarding_exceptions(listeners) { |s| s.finish(name, id, payload) }
end
iterate_guarding_exceptions(listeners) { |s| ... } click to toggle source
# File lib/active_support/notifications/fanout.rb, line 87
def iterate_guarding_exceptions(listeners)
  exceptions = nil

  listeners.each do |s|
    yield s
  rescue Exception => e
    exceptions ||= []
    exceptions << e
  end

  if exceptions
    if exceptions.size == 1
      raise exceptions.first
    else
      raise InstrumentationSubscriberError.new(exceptions), cause: exceptions.first
    end
  end

  listeners
end
listeners_for(name) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 108
def listeners_for(name)
  # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)
  @listeners_for[name] || synchronize do
    # use synchronisation when accessing @subscribers
    @listeners_for[name] ||=
      @string_subscribers[name] + @other_subscribers.select { |s| s.subscribed_to?(name) }
  end
end
listening?(name) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 117
def listening?(name)
  listeners_for(name).any?
end
publish(name, *args) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 79
def publish(name, *args)
  iterate_guarding_exceptions(listeners_for(name)) { |s| s.publish(name, *args) }
end
publish_event(event) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 83
def publish_event(event)
  iterate_guarding_exceptions(listeners_for(event.name)) { |s| s.publish_event(event) }
end
start(name, id, payload) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 71
def start(name, id, payload)
  iterate_guarding_exceptions(listeners_for(name)) { |s| s.start(name, id, payload) }
end
subscribe(pattern = nil, callable = nil, monotonic: false, &block) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 34
def subscribe(pattern = nil, callable = nil, monotonic: false, &block)
  subscriber = Subscribers.new(pattern, callable || block, monotonic)
  synchronize do
    case pattern
    when String
      @string_subscribers[pattern] << subscriber
      @listeners_for.delete(pattern)
    when NilClass, Regexp
      @other_subscribers << subscriber
      @listeners_for.clear
    else
      raise ArgumentError,  "pattern must be specified as a String, Regexp or empty"
    end
  end
  subscriber
end
unsubscribe(subscriber_or_name) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 51
def unsubscribe(subscriber_or_name)
  synchronize do
    case subscriber_or_name
    when String
      @string_subscribers[subscriber_or_name].clear
      @listeners_for.delete(subscriber_or_name)
      @other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) }
    else
      pattern = subscriber_or_name.try(:pattern)
      if String === pattern
        @string_subscribers[pattern].delete(subscriber_or_name)
        @listeners_for.delete(pattern)
      else
        @other_subscribers.delete(subscriber_or_name)
        @listeners_for.clear
      end
    end
  end
end
wait() click to toggle source

This is a sync queue, so there is no waiting.

# File lib/active_support/notifications/fanout.rb, line 122
def wait
end