class Shamu::Events::InMemory::Service

Provides an in-memory {EventsService} that dispatches {Message messages} to subscribers within the same process.

Messages are volitale and will be lost if the process crashes.

Attributes

channels[R]
mutex[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/shamu/events/in_memory/service.rb, line 15
def initialize
  @mutex    = Thread::Mutex.new
  @channels = {}

  super
end

Public Instance Methods

channel_stats( name ) click to toggle source

(see ChannelStats#chanel_stats)

# File lib/shamu/events/in_memory/service.rb, line 50
def channel_stats( name )
  channel = fetch_channel( name )

  {
    name: name,
    subscribers_count: channel[ :subscribers ].count,
    queue_size: channel[ :queue ].size,
    dispatching: channel[ :dispatching ]
  }
end
dispatch( *names ) click to toggle source

Dispatch all pending mssages in the given named channels. @param [Array<String>] names of the channels to dispatch. Dispatches

to all queues if empty.

@return [void]

# File lib/shamu/events/in_memory/service.rb, line 41
def dispatch( *names )
  names = channels.keys if names.empty?

  names.each do |name|
    dispatch_channel( fetch_channel( name ) )
  end
end
publish( channel, message ) click to toggle source

(see EventsService#publish)

# File lib/shamu/events/in_memory/service.rb, line 23
def publish( channel, message )
  state = fetch_channel( channel )
  queue = state[ :queue ]
  queue.push serialize( message )
end
subscribe( channel, &callback ) click to toggle source

(see EventsService#subscribe)

# File lib/shamu/events/in_memory/service.rb, line 30
def subscribe( channel, &callback )
  subscribers = fetch_channel( channel )[ :subscribers ]
  mutex.synchronize do
    subscribers << callback
  end
end

Private Instance Methods

create_channel( _ ) click to toggle source
# File lib/shamu/events/in_memory/service.rb, line 66
def create_channel( _ )
  {
    queue: [],
    subscribers: [],
  }
end
dispatch_channel( state ) click to toggle source
# File lib/shamu/events/in_memory/service.rb, line 73
def dispatch_channel( state )
  mutex.synchronize do
    return if state[:dispatching]
    state[ :dispatching ] = true
  end

  dispatch_messages( state )
ensure
  mutex.synchronize do
    state[ :dispatching ] = false
  end
end
dispatch_messages( state ) click to toggle source
# File lib/shamu/events/in_memory/service.rb, line 86
def dispatch_messages( state )
  while raw_message = state[:queue].shift
    message = deserialize( raw_message )
    state[ :subscribers ].each do |subscriber|
      subscriber.call( message )
    end
  end
end