class FFWD::Core
Public Class Methods
new(plugins, opts={})
click to toggle source
# File lib/ffwd/core.rb, line 40 def initialize plugins, opts={} @tunnel_plugins = plugins[:tunnel] || [] @input_plugins = plugins[:input] || [] @output_plugins = plugins[:output] || [] @statistics_opts = opts[:statistics] @debug_opts = opts[:debug] @core_opts = opts[:core] || {} @processors = FFWD::Processor.load_processors(opts[:processor] || {}) @output_channel = FFWD::PluginChannel.build 'core.output' @input_channel = FFWD::PluginChannel.build 'core.input' @system_channel = Channel.new log, "system_channel" memory_config = (@core_opts[:memory] || {}) @memory_limit = (memory_config[:limit] || 1000).to_f.round(3) @memory_limit95 = @memory_limit * 0.95 if @memory_limit < 0 raise "memory limit must be non-negative number" end @emitter = Core::Emitter.build @output_channel, @core_opts @processor = Core::Processor.build @input_channel, @emitter, @processors @debug = nil if @debug_opts @debug = FFWD::Debug.setup @debug_opts @debug.monitor @input_channel, FFWD::Debug::Input @debug.monitor @output_channel, FFWD::Debug::Output @debug.depend_on self end # Configuration for statistics module. @statistics = nil if config = @statistics_opts @statistics = FFWD::Statistics::Collector.build( @emitter, @system_channel, config) @statistics.depend_on self end @interface = Core::Interface.new( @input_channel, @output_channel, @tunnel_plugins, @statistics, @debug, @processors, @core_opts ) @interface.depend_on self @input_instances = @input_plugins.map do |factory| factory.call @interface end @output_instances = @output_plugins.map do |factory| factory.call @interface end unless @statistics.nil? reporters = [@input_channel, @output_channel, @processor] reporters += @input_instances.select{|i| FFWD.is_reporter?(i)} reporters += @output_instances.select{|i| FFWD.is_reporter?(i)} @statistics.register self, "core", Core::Reporter.new(reporters) end # Make the core-related channels depend on core. # They will then be orchestrated with core when it's being # started/stopped. @input_channel.depend_on self @output_channel.depend_on self end
Public Instance Methods
run()
click to toggle source
Main entry point.
Since all components are governed by the lifecycle of core, it should mostly be a matter of calling ‘start’.
# File lib/ffwd/core.rb, line 117 def run # What to do when we receive a shutdown signal? shutdown_handler = proc do # Hack to get out of trap context and into EM land. EM.add_timer(0) do log.info "Shutting down" stop EM.stop end end EM.run do Signal.trap("INT", &shutdown_handler) Signal.trap("TERM", &shutdown_handler) start setup_memory_monitor end stopping do end end
Private Instance Methods
setup_memory_monitor()
click to toggle source
Sets up a memory monitor based of :core -> :memory -> :limit. Will warn at least once before shutting down.
# File lib/ffwd/core.rb, line 144 def setup_memory_monitor if @memory_limit == 0 log.warning "WARNING!!! YOU ARE RUNNING FFWD WITHOUT A MEMORY LIMIT, THIS COULD DAMAGE YOUR SYSTEM" log.warning "To configure it, set the (:core -> :memory -> :limit) option to a non-zero number!" return end log.info "Memory limited to #{@memory_limit} MB (:core -> :memory -> :limit)" memory_one_warning = false @system_channel.subscribe do |system| memory = system[:memory] mb = (memory[:resident].to_f / 1000000).round(3) if memory_one_warning and mb > @memory_limit log.error "Memory limit exceeded (#{mb}/#{@memory_limit} MB): SHUTTING DOWN" EM.stop next end if mb > @memory_limit95 log.warning "Memory limit almost reached (#{mb}/#{@memory_limit} MB)" memory_one_warning = true else memory_one_warning = false end end end