class CZTop::Poller::Aggregated

This is a poller which is able to provide a list of readable and a list of writable sockets. This is useful for when you need to process socket events in batch, rather than one per event loop iteration.

In particular, this is needed in Celluloid::ZMQ, where in a call to Celluloid::ZMQ::Reactor#run_once all readable/writable sockets need to be processed.

Implementation

It wraps a {CZTop::Poller} and just does the following to support getting an array of readable/writable sockets:

Forwarded Methods

The following methods are defined on this class too, and calls are forwarded directly to the actual {CZTop::Poller} instance:

Attributes

poller[R]

@return [CZTop::Poller.new] the associated (regular) poller

readables[R]

@return [Array<CZTop::Socket>] readable sockets

writables[R]

@return [Array<CZTop::Socket>] writable sockets

Public Class Methods

new(poller = CZTop::Poller.new) click to toggle source

Initializes the aggregated poller. @param poller [CZTop::Poller] the wrapped poller

# File lib/cztop/poller/aggregated.rb, line 59
def initialize(poller = CZTop::Poller.new)
  @readables = []
  @writables = []
  @poller = poller
end

Public Instance Methods

wait(timeout = -1) click to toggle source

Forgets all previous event information (which sockets are readable/writable) and waits for events anew. After getting the first event, {CZTop::Poller#wait} is called again with a zero-timeout to get all pending events to extract them into the aggregated lists of readable and writable sockets.

For every event, the corresponding event mask flag is disabled for the associated socket, so it won't turn up again. Finally, all event masks are restored to what they were before the call to this method.

@param timeout [Integer] how long to wait in ms, or 0 to avoid blocking,

or -1 to wait indefinitely

@return [Boolean] whether there have been any events

# File lib/cztop/poller/aggregated.rb, line 78
def wait(timeout = -1)
  @readables = []
  @writables = []
  @event_masks = {}

  if event = @poller.wait(timeout)
    extract(event)

    # get all other pending events, if any, but no more blocking
    while event = @poller.wait(0)
      extract(event)
    end

    restore_event_masks
    return true
  end
  return false
end

Private Instance Methods

extract(event) click to toggle source

Extracts the event information, adds the socket to the correct list(s) and modifies the socket's event mask for the socket to not turn up again during the next call(s) to {CZTop::Poller#wait} within {#wait}.

@param event [CZTop::Poller::Event] @return [void]

# File lib/cztop/poller/aggregated.rb, line 105
def extract(event)
  event_mask = poller.event_mask_for_socket(event.socket)
  @event_masks[event.socket] = event_mask
  if event.readable?
    @readables << event.socket
    event_mask &= 0xFFFF ^ CZTop::Poller::ZMQ::POLLIN
  end
  if event.writable?
    @writables << event.socket
    event_mask &= 0xFFFF ^ CZTop::Poller::ZMQ::POLLOUT
  end
  poller.modify(event.socket, event_mask)
end
restore_event_masks() click to toggle source

Restores the event mask for all registered sockets to the state they were before the call to {#wait}. @return [void]

# File lib/cztop/poller/aggregated.rb, line 122
def restore_event_masks
  @event_masks.each { |socket, mask| poller.modify(socket, mask) }
end