class FFWD::Core

Public Class Methods

new(plugins, opts={}) click to toggle source
# File lib/ffwd/core.rb, line 40
def initialize plugins, opts={}
  @tunnel_plugins = plugins[:tunnel] || []
  @input_plugins = plugins[:input] || []
  @output_plugins = plugins[:output] || []

  @statistics_opts = opts[:statistics]
  @debug_opts = opts[:debug]
  @core_opts = opts[:core] || {}
  @processors = FFWD::Processor.load_processors(opts[:processor] || {})

  @output_channel = FFWD::PluginChannel.build 'core.output'
  @input_channel = FFWD::PluginChannel.build 'core.input'

  @system_channel = Channel.new log, "system_channel"

  memory_config = (@core_opts[:memory] || {})
  @memory_limit = (memory_config[:limit] || 1000).to_f.round(3)
  @memory_limit95 = @memory_limit * 0.95

  if @memory_limit < 0
    raise "memory limit must be non-negative number"
  end

  @emitter = Core::Emitter.build @output_channel, @core_opts
  @processor = Core::Processor.build @input_channel, @emitter, @processors

  @debug = nil

  if @debug_opts
    @debug = FFWD::Debug.setup @debug_opts
    @debug.monitor @input_channel, FFWD::Debug::Input
    @debug.monitor @output_channel, FFWD::Debug::Output
    @debug.depend_on self
  end

  # Configuration for statistics module.
  @statistics = nil

  if config = @statistics_opts
    @statistics = FFWD::Statistics::Collector.build(
      @emitter, @system_channel, config)
    @statistics.depend_on self
  end

  @interface = Core::Interface.new(
    @input_channel, @output_channel,
    @tunnel_plugins, @statistics, @debug, @processors, @core_opts
  )

  @interface.depend_on self

  @input_instances = @input_plugins.map do |factory|
    factory.call @interface
  end

  @output_instances = @output_plugins.map do |factory|
    factory.call @interface
  end

  unless @statistics.nil?
    reporters = [@input_channel, @output_channel, @processor]
    reporters += @input_instances.select{|i| FFWD.is_reporter?(i)}
    reporters += @output_instances.select{|i| FFWD.is_reporter?(i)}
    @statistics.register self, "core", Core::Reporter.new(reporters)
  end

  # Make the core-related channels depend on core.
  # They will then be orchestrated with core when it's being
  # started/stopped.
  @input_channel.depend_on self
  @output_channel.depend_on self
end

Public Instance Methods

run() click to toggle source

Main entry point.

Since all components are governed by the lifecycle of core, it should mostly be a matter of calling ‘start’.

# File lib/ffwd/core.rb, line 117
def run
  # What to do when we receive a shutdown signal?
  shutdown_handler = proc do
    # Hack to get out of trap context and into EM land.
    EM.add_timer(0) do
      log.info "Shutting down"
      stop
      EM.stop
    end
  end

  EM.run do
    Signal.trap("INT", &shutdown_handler)
    Signal.trap("TERM", &shutdown_handler)

    start
    setup_memory_monitor
  end

  stopping do
  end
end

Private Instance Methods

setup_memory_monitor() click to toggle source

Sets up a memory monitor based of :core -> :memory -> :limit. Will warn at least once before shutting down.

# File lib/ffwd/core.rb, line 144
def setup_memory_monitor
  if @memory_limit == 0
    log.warning "WARNING!!! YOU ARE RUNNING FFWD WITHOUT A MEMORY LIMIT, THIS COULD DAMAGE YOUR SYSTEM"
    log.warning "To configure it, set the (:core -> :memory -> :limit) option to a non-zero number!"
    return
  end

  log.info "Memory limited to #{@memory_limit} MB (:core -> :memory -> :limit)"

  memory_one_warning = false

  @system_channel.subscribe do |system|
    memory = system[:memory]

    mb = (memory[:resident].to_f / 1000000).round(3)

    if memory_one_warning and mb > @memory_limit
      log.error "Memory limit exceeded (#{mb}/#{@memory_limit} MB): SHUTTING DOWN"
      EM.stop
      next
    end

    if mb > @memory_limit95
      log.warning "Memory limit almost reached (#{mb}/#{@memory_limit} MB)"
      memory_one_warning = true
    else
      memory_one_warning = false
    end
  end
end