class Shamu::Events::InMemory::Service
Provides an in-memory {EventsService} that dispatches {Message messages} to subscribers within the same process.
Messages are volitale and will be lost if the process crashes.
Attributes
channels[R]
mutex[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/shamu/events/in_memory/service.rb, line 15 def initialize @mutex = Thread::Mutex.new @channels = {} super end
Public Instance Methods
channel_stats( name )
click to toggle source
(see ChannelStats#chanel_stats)
# File lib/shamu/events/in_memory/service.rb, line 50 def channel_stats( name ) channel = fetch_channel( name ) { name: name, subscribers_count: channel[ :subscribers ].count, queue_size: channel[ :queue ].size, dispatching: channel[ :dispatching ] } end
dispatch( *names )
click to toggle source
Dispatch all pending mssages in the given named channels. @param [Array<String>] names of the channels to dispatch. Dispatches
to all queues if empty.
@return [void]
# File lib/shamu/events/in_memory/service.rb, line 41 def dispatch( *names ) names = channels.keys if names.empty? names.each do |name| dispatch_channel( fetch_channel( name ) ) end end
publish( channel, message )
click to toggle source
(see EventsService#publish
)
# File lib/shamu/events/in_memory/service.rb, line 23 def publish( channel, message ) state = fetch_channel( channel ) queue = state[ :queue ] queue.push serialize( message ) end
subscribe( channel, &callback )
click to toggle source
(see EventsService#subscribe
)
# File lib/shamu/events/in_memory/service.rb, line 30 def subscribe( channel, &callback ) subscribers = fetch_channel( channel )[ :subscribers ] mutex.synchronize do subscribers << callback end end
Private Instance Methods
create_channel( _ )
click to toggle source
# File lib/shamu/events/in_memory/service.rb, line 66 def create_channel( _ ) { queue: [], subscribers: [], } end
dispatch_channel( state )
click to toggle source
# File lib/shamu/events/in_memory/service.rb, line 73 def dispatch_channel( state ) mutex.synchronize do return if state[:dispatching] state[ :dispatching ] = true end dispatch_messages( state ) ensure mutex.synchronize do state[ :dispatching ] = false end end
dispatch_messages( state )
click to toggle source
# File lib/shamu/events/in_memory/service.rb, line 86 def dispatch_messages( state ) while raw_message = state[:queue].shift message = deserialize( raw_message ) state[ :subscribers ].each do |subscriber| subscriber.call( message ) end end end