class Hoodoo::Communicators::Pool

Maintains a pool of object instances which are expected to be communicating with “the outside world” in some way. A message sent to the pool is replicated to all the communicators in that pool. Some communicators are fast, which means they are called synchronously and expected to return very quickly. Some communicators are slow, which means they are called asynchronously through a work queue.

See add for more information.

Constants

MAX_SLOW_QUEUE_SIZE

Hoodoo::Communicators::Slow subclass communicators are called in their own Threads via a processing Queue. There is the potential for a flood of communications to cause the queue to back up considerably, so a maximum number of messages is defined. If the queue size is _equal to or greater_ than this amount when a message arrives, it will be dropped and a 'dropped message' count incremented.

THREAD_EXIT_TIMEOUT

When asking slow communicator threads to exit, a timeout must be used in case the thread doesn't seem to be responsive. This is the timeout value in seconds - it can take a floating point or integer value.

THREAD_WAIT_TIMEOUT

Analogous to THREAD_WAIT_TIMEOUT but used when waiting for a processing Thread to drain its Queue, without asking it to exit.

Attributes

group[RW]

Retrieve the ThreadGroup instance managing the collection of slow communicator threads. This is mostly used for testing purposes and has little general purpose utility.

Public Class Methods

new() click to toggle source

Create a new pool of communicators - instances of subclasses of Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow, are added with add and called with communicate.

# File lib/hoodoo/communicators/pool.rb, line 59
def initialize
  @pool  = {}
  @group = ::ThreadGroup.new
end

Public Instance Methods

add( communicator ) click to toggle source

Add a communicator instance to the pool. Future calls to communicate will call the same-named method in that instance.

Subclasses of Hoodoo::Communicators::Slow are called within a processing Thread. Subclasses of Hoodoo::Communicators::Fast are called inline. The instances are called in the order of addition, but since each slow communicator runs in its own Thread, the execution order is indeterminate for such instances.

If a slow communicator's inbound message queue length matches or exceeds MAX_SLOW_QUEUE_SIZE, messages for that specific communicator will start being dropped until the communicator clears the backlog and at last one space opens on the queue. Slow communicators can detect when this has happened by implementing Hoodoo::Communicators::Slow#dropped in the subclass.

If you pass the same instance more than once, the subsequent calls are ignored. You can add many instances of the same class if that's useful for any reason.

Returns the passed-in communicator instance parameter, for convenience.

communicator

Instance is to be added to the pool. Must be either a Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow subclass instance.

# File lib/hoodoo/communicators/pool.rb, line 90
def add( communicator )
  unless ( communicator.class < Hoodoo::Communicators::Fast ||
           communicator.class < Hoodoo::Communicators::Slow )
    raise "Hoodoo::Communicators::Pool\#add must be called with an instance of a subclass of Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow only"
  end

  return if @pool.has_key?( communicator )

  if communicator.is_a?( Hoodoo::Communicators::Fast )
    add_fast_communicator( communicator )
  else
    add_slow_communicator( communicator )
  end

  return communicator
end
communicate( object ) click to toggle source

Call the communicate method on each communicator instance added via add. Each instance is called in the same order as corresponding calls are made to the pool. Across instances, fast communicators are called in the order they were added to the pool, but since each slow communicator runs in its own Thread, execution order is indeterminate.

object

Parameter passed to the communicator subclass instance communicate methods.

# File lib/hoodoo/communicators/pool.rb, line 149
def communicate( object )
  @pool.each do | communicator, item |

    if item.has_key?( :fast )
      begin
        communicator.communicate( object )
      rescue => exception
        handle_exception( exception, communicator, object )
      end

    else
      data       = item[ :slow       ]
      thread     = data[ :thread     ]
      work_queue = data[ :work_queue ]

      # This is inaccurate if one or more "dropped messages" reports are
      # on the queue, but since some communicators might report them in
      # the same way as other messages, it's not necessarily incorrect
      # either.
      #
      if work_queue.size < MAX_SLOW_QUEUE_SIZE
        dropped = thread[ :dropped_messages ]

        if dropped > 0
          thread[ :dropped_messages ] = 0

          # Opposite of comment above on MAX_SLOW_QUEUE_SIZE check...
          # Yes, this takes up a queue entry and the payload addition
          # afterwards might take it one above max size, but that's OK
          # since this is just a "dropped messages" report and though
          # some communicators might deal with them slowly, others may
          # just ignore them.
          #
          work_queue << QueueEntry.new( dropped: dropped )
        end

        work_queue << QueueEntry.new( payload: object )

      else
        thread[ :dropped_messages ] += 1

      end
    end

  end
end
remove( communicator ) click to toggle source

Remove a communicator previously added by add. See that for details.

It is harmless to try and remove communicator instances more than once or to try to remove something that was never added in the first place; the call simply has no side effects.

If removing a slow communicator, its thread will be terminated with default timeout value of THREAD_EXIT_TIMEOUT seconds. For this reason, removing a slow communicator may take a long time.

Returns the passed-in communicator instance parameter, for convenience.

communicator

Instance is to be removed from the pool. Must be either a Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow subclass instance.

# File lib/hoodoo/communicators/pool.rb, line 123
def remove( communicator )
  unless ( communicator.class < Hoodoo::Communicators::Fast ||
           communicator.class < Hoodoo::Communicators::Slow )
    raise "Hoodoo::Communicators::Pool\#remove must be called with an instance of a subclass of Hoodoo::Communicators::Fast or Hoodoo::Communicators::Slow only"
  end

  return unless @pool.has_key?( communicator )

  if communicator.is_a?( Hoodoo::Communicators::Fast )
    remove_fast_communicator( communicator )
  else
    remove_slow_communicator( communicator )
  end

  return communicator
end
terminate( per_instance_timeout: THREAD_EXIT_TIMEOUT ) click to toggle source

The communication pool is “emptied” by this call, going back to a clean state as if just initialised. New workers can be added via add and then called via communicate if you so wish.

Hoodoo::Communciators::Fast subclass instances are removed immediately without complications.

Hoodoo::Communicators::Slow subclass instances in the communication pool are called via a worker Thread; this method shuts down all such worker Threads, clearing their work queues and asking each one to exit (politely). There is no mechanism (other than overall Ruby process exit) available to shut down the Threads by force, so some Threads may not respond and time out.

When this method exits, all workers will have either exited or timed out and possibly still be running, but are considered too slow or dead. No further communications are made to them.

The following named parameters are supported:

per_instance_timeout

Timeout for each slow communicator Thread in seconds. Optional. Default is the value in THREAD_EXIT_TIMEOUT. For example, with three slow communicators in the pool and all three reached a 5 second timeout, the termination method would not return for 15 seconds (3 * 5 seconds full timeout).

# File lib/hoodoo/communicators/pool.rb, line 277
def terminate( per_instance_timeout: THREAD_EXIT_TIMEOUT )
  loop do
    klass, item = @pool.shift() # Hash#shift -> remove a key/value pair.
    break if klass.nil?

    next unless item.has_key?( :slow )
    data = item[ :slow ]

    request_termination_for(
      thread:     data[ :thread     ],
      work_queue: data[ :work_queue ],
      timeout:    per_instance_timeout
    )
  end
end
wait( per_instance_timeout: THREAD_WAIT_TIMEOUT, communicator: nil ) click to toggle source

This method is only useful if there are any Hoodoo::Communicators::Slow subclass instances in the communication pool. Each instance is called via a worker Thread; this method waits for each communicator to drain its queue before returning. This is useful if you have a requirement to wait for all communications to finish on all threads, presumably for wider synchronisation reasons.

Since fast communicators are called synchronously there is never any need to wait for them, so this call ignores such pool entries.

The following named parameters are supported:

per_instance_timeout

Timeout for each slow communicator Thread in seconds. Optional. Default is the value in THREAD_WAIT_TIMEOUT.

communicator

If you want to wait for specific instance only (see add), pass it here. If the instance is a fast communicator, or any object not added to the pool, then there is no error raised. The method simply returns immediately.

# File lib/hoodoo/communicators/pool.rb, line 218
def wait( per_instance_timeout: THREAD_WAIT_TIMEOUT,
          communicator:         nil )

  if communicator.nil?
    @pool.each do | communicator, item |
      next unless item.has_key?( :slow )
      data = item[ :slow ]

      wait_for(
        work_queue: data[ :work_queue ],
        sync_queue: data[ :sync_queue ],
        timeout:    per_instance_timeout
      )
    end

  else
    return unless @pool.has_key?( communicator )
    item = @pool[ communicator ]

    return unless item.has_key?( :slow )
    data = item[ :slow ]

    wait_for(
      work_queue: data[ :work_queue ],
      sync_queue: data[ :sync_queue ],
      timeout:    per_instance_timeout
    )

  end
end

Private Instance Methods

add_fast_communicator( communicator ) click to toggle source

Add a fast communicator to the pool. Requires no thread or queue.

Trusted internal interface - pass the correct subclass and don't pass it more than once unless terminate has cleared the pool beforehand.

communicator

The Hoodoo::Communicators::Fast subclass instance to add to the pool.

# File lib/hoodoo/communicators/pool.rb, line 303
def add_fast_communicator( communicator )
  @pool[ communicator ] = { :fast => true }
end
add_slow_communicator( communicator ) click to toggle source

Add a slow communicator to the pool. Requires a thread and queue.

Trusted internal interface - pass the correct subclass and don't pass it more than once unless terminate has cleared the pool beforehand.

communicator

The Hoodoo::Communicators::Slow subclass instance to add to the pool.

# File lib/hoodoo/communicators/pool.rb, line 324
def add_slow_communicator( communicator )

  work_queue = ::Queue.new
  sync_queue = QueueWithTimeout.new

  # Start (and keep a reference to) a thread that just loops around
  # processing queue messages until asked to exit.

  thread = ::Thread.new do

    # Outer infinite loop restarts queue processing if exceptions occur.
    #
    loop do

      obj = nil
      # Exception handler block.
      #
      begin

        # Inner infinite loop processes queue objects until asked to exit
        # via a +nil+ queue entry.
        #
        loop do
          entry = work_queue.shift() # ".shift" => FIFO, ".pop" would be LIFO
          obj = nil

          if entry.terminate?
            ::Thread.exit
          elsif entry.sync?

            sync_queue << :sync
          elsif entry.dropped?
            communicator.dropped( entry.dropped )
          else
            obj = entry.payload
            communicator.communicate( entry.payload )
          end
        end

      rescue => exception
        handle_exception( exception, communicator, obj )

      end
    end
  end

  thread[ :dropped_messages ] = 0

  @group.add( thread )
  @pool[ communicator ] = {
    :slow => {
      :thread     => thread,
      :work_queue => work_queue,
      :sync_queue => sync_queue,
    }
  }
end
handle_exception( exception, communicator, obj ) click to toggle source

Intended for cases where a communicator raised an exception - print details to $stderr. This is all we can do; the logging engine runs through the communications pool so attempting to log an exception might cause an exception that we then attempt to log - and so-on.

exception

Exception (or Exception subclass) instance to print.

communicator

Communicator instance that raised the exception.

obj

Parameter passed to the communicator subclass instance communicate methods

# File lib/hoodoo/communicators/pool.rb, line 465
def handle_exception( exception, communicator, obj )
  begin
    report = "Communicator class #{ communicator.class.name } raised exception '#{ exception }': #{ exception.backtrace }"
    $stderr.puts( report )
    pp( obj, $stderr )
  rescue
    # If the above fails then everything else is probably about to
    # collapse, but optimistically try to ignore the error and keep
    # the wider processing code alive.

  end
end
remove_fast_communicator( communicator ) click to toggle source

Remove a fast communicator from the pool. See add_fast_communicator.

communicator

The Hoodoo::Communicators::Fast subclass instance to remove from the pool.

# File lib/hoodoo/communicators/pool.rb, line 312
def remove_fast_communicator( communicator )
  @pool.delete( communicator )
end
remove_slow_communicator( communicator ) click to toggle source

Remove a slow communicator from the pool. See add_slow_communicator.

May take a while to return, as it must request thread shutdown via request_termination_for (and uses the default timeout for that).

communicator

The Hoodoo::Communicators::Slow subclass instance to remove from the pool.

# File lib/hoodoo/communicators/pool.rb, line 390
def remove_slow_communicator( communicator )
  item = @pool[ communicator ]
  data = item[ :slow ]

  request_termination_for(
    thread:     data[ :thread     ],
    work_queue: data[ :work_queue ]
  )

  @pool.delete( communicator )
end
request_termination_for( thread:, work_queue:, timeout: THREAD_EXIT_TIMEOUT ) click to toggle source

Ask a slow communicator Thread to exit. Existing work on any Queues is cleared first, so only the current in-process message for a given communicator has to finish prior to exit.

Named parameters are:

:thread

Mandatory. Worker Thread for the communicator.

:work_queue

Mandatory. Queue used to send work to the Thread.

timeout

Optional timeout in seconds - default is THREAD_EXIT_TIMEOUT.

The method returns if the timeout threshold is exceeded, without raising any exceptions.

# File lib/hoodoo/communicators/pool.rb, line 416
def request_termination_for( thread:, work_queue:, timeout: THREAD_EXIT_TIMEOUT )
  work_queue.clear()
  work_queue << QueueEntry.new( terminate: true )

  thread.join( timeout )
end
wait_for( work_queue:, sync_queue:, timeout: THREAD_WAIT_TIMEOUT ) click to toggle source

Wait for a slow communicator Thread to empty its work Queue. Named parameters are:

:work_queue

Mandatory. Queue used to send work to the Thread.

:sync_queue

Mandatory. Queue used by that Thread to send back a sync notification to the pool.

timeout

Optional timeout in seconds - default is THREAD_WAIT_TIMEOUT.

The method returns if the timeout threshold is exceeded, without raising any exceptions.

# File lib/hoodoo/communicators/pool.rb, line 435
def wait_for( work_queue:, sync_queue:, timeout: THREAD_WAIT_TIMEOUT )

  # Push a 'sync' entry onto the work Queue. Once the worker Thread gets
  # through other Queue items and reaches this entry, it'll respond
  # by pushing an item onto its sync Queue.

  work_queue << QueueEntry.new( sync: true )

  # Wait on the sync Queue for the worker Thread to send the requested
  # message indicating that we're in sync.

  begin
    sync_queue.shift( timeout )

  rescue ThreadError
    # Do nothing

  end
end