class Restforce::DB::Worker

Restforce::DB::Worker represents the primary polling loop through which all record synchronization occurs.

Constants

DEFAULT_DELAY
DEFAULT_INTERVAL
GRACEFUL_SHUTDOWN_SIGNALS

TERM and INT signals should trigger a graceful shutdown.

ROTATION_SIGNALS

HUP and USR1 will reopen all files at their original paths, to accommodate log rotation.

Attributes

logger[RW]
tracker[RW]

Public Class Methods

new(options = {}) click to toggle source

Public: Initialize a new Restforce::DB::Worker.

options - A Hash of options to configure the worker’s run. Currently

supported options are:
interval - The maximum polling loop rest time.
delay    - The amount of time by which to offset queries.
config   - The path to a client configuration file.
# File lib/restforce/db/worker.rb, line 36
def initialize(options = {})
  @options = options
  @interval = @options.fetch(:interval) { DEFAULT_INTERVAL }
end

Public Instance Methods

start() click to toggle source

Public: Start the polling loop for this Worker. Synchronizes all registered record types between the database and Salesforce, looping indefinitely until processing is interrupted by a signal.

Returns nothing.

# File lib/restforce/db/worker.rb, line 46
def start
  DB.reset
  DB.configure do |config|
    config.parse(@options[:config])
    config.logger = logger
  end

  GRACEFUL_SHUTDOWN_SIGNALS.each { |signal| trap(signal) { stop } }
  ROTATION_SIGNALS.each { |signal| trap(signal) { Worker.reopen_files } }

  preload

  loop do
    runtime = Benchmark.realtime { perform }
    sleep(@interval - runtime) if runtime < @interval && !stop?

    break if stop?
  end
end
stop() click to toggle source

Public: Instruct the worker to stop running at the end of the current processing loop.

Returns nothing.

# File lib/restforce/db/worker.rb, line 70
def stop
  Thread.new { log "Exiting..." }
  @exit = true
end

Private Instance Methods

perform() click to toggle source

Internal: Perform the synchronization loop, recording the time that the run is performed so that future runs can pick up where the last run left off.

NOTE: In order to keep our long-term memory usage in check, we fork a task manager to process the tasks for each synchronization loop. Once the subprocess dies, its memory can be reclaimed by the OS.

Returns nothing.

# File lib/restforce/db/worker.rb, line 111
def perform
  reset!

  track do
    forked = ForkedProcess.new

    forked.write do |writer|
      Worker.after_fork
      task_manager.perform

      runner.dump_timestamps(writer)
    end

    forked.read do |reader|
      runner.load_timestamps(reader)
    end

    begin
      forked.run
    rescue ForkedProcess::UnsuccessfulExit => e
      # NOTE: Due to thread-safety issues in any of a number of libraries
      # included in the host application (even in ActiveSupport itself),
      # our forked processes may occasionally encounter various annoying
      # and intermittent errors.
      #
      # Retrying here is our way of handling that. It's not great, but
      # it's the best we can do for now without sacrificing the benefits
      # of forking our task manager runs.
      #
      # In the event that the master process has received a kill signal,
      # we can safely crash instead of attempting a retry -- we don't want
      # to fight with intentional user actions.
      stop? ? raise(e) : retry
    end
  end
end
preload() click to toggle source

Internal: Populate the field cache for each Salesforce object in the defined mappings.

NOTE: To work around thread-safety issues with Typheous (and possibly some other HTTP adapters, we need to fork our preloading to prevent intialization of our Client object in the context of the master Worker process.

Returns a Hash.

# File lib/restforce/db/worker.rb, line 86
def preload
  forked = ForkedProcess.new

  forked.write do |writer|
    log "INITIALIZING..."
    FieldProcessor.preload
    YAML.dump(FieldProcessor.field_cache, writer)
  end

  forked.read do |reader|
    FieldProcessor.field_cache.merge!(YAML.load(reader.read))
  end

  forked.run
end
reset!() click to toggle source

Internal: Reset the internal state of the Worker in preparation for a new synchronization loop.

Returns nothing.

# File lib/restforce/db/worker.rb, line 152
def reset!
  runner.tick!
  Worker.before_fork
end
runner() click to toggle source

Internal: Get a Runner object which can be passed to the various workflow objects to scope their record lookups.

Returns a Restforce::DB::Runner.

# File lib/restforce/db/worker.rb, line 193
def runner
  @runner ||= Runner.new(@options.fetch(:delay) { DEFAULT_DELAY })
end
stop?() click to toggle source

Internal: Has this worker been instructed to stop?

Returns a boolean.

# File lib/restforce/db/worker.rb, line 200
def stop?
  @exit == true
end
task_manager() click to toggle source

Internal: Get a new TaskManager instance, which reflects the current runner state.

Returns a Restforce::DB::TaskManager.

# File lib/restforce/db/worker.rb, line 161
def task_manager
  TaskManager.new(runner, logger: logger)
end
track() { || ... } click to toggle source

Internal: Run the passed block, updating the tracker with the time at which the run was initiated.

Yields to a passed block. Returns nothing.

# File lib/restforce/db/worker.rb, line 170
def track
  if tracker
    runtime = Time.now

    if tracker.last_run
      log "SYNCHRONIZING from #{tracker.last_run.iso8601}"
    else
      log "SYNCHRONIZING"
    end

    duration = Benchmark.realtime { yield }
    log format("DONE after %.4f", duration)

    tracker.track(runtime)
  else
    yield
  end
end