module Fluent::PluginHelper::ChildProcess

Constants

CHILD_PROCESS_DEFAULT_EXIT_TIMEOUT
CHILD_PROCESS_DEFAULT_KILL_TIMEOUT
CHILD_PROCESS_LOOP_CHECK_INTERVAL
MODE_PARAMS
STDERR_OPTIONS

Attributes

_child_process_processes[R]

stop : mark callback thread as stopped shutdown : close write IO to child processes (STDIN of child processes), send TERM (KILL for Windows) to all child processes close : send KILL to all child processes terminate: [-]

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::PluginHelper::Timer.new
# File lib/fluent/plugin_helper/child_process.rb, line 98
def initialize
  super
  # plugins MAY configure this parameter
  @_child_process_exit_timeout = CHILD_PROCESS_DEFAULT_EXIT_TIMEOUT
  @_child_process_kill_timeout = CHILD_PROCESS_DEFAULT_KILL_TIMEOUT
  @_child_process_mutex = Mutex.new
end

Public Instance Methods

child_process_execute( title, command, arguments: nil, subprocess_name: nil, interval: nil, immediate: false, parallel: false, mode: [:read, :write], stderr: :discard, env: {}, unsetenv: false, chdir: nil, internal_encoding: 'utf-8', external_encoding: 'ascii-8bit', scrub: true, replace_string: nil, &block ) click to toggle source
# File lib/fluent/plugin_helper/child_process.rb, line 56
def child_process_execute(
    title, command,
    arguments: nil, subprocess_name: nil, interval: nil, immediate: false, parallel: false,
    mode: [:read, :write], stderr: :discard, env: {}, unsetenv: false, chdir: nil,
    internal_encoding: 'utf-8', external_encoding: 'ascii-8bit', scrub: true, replace_string: nil,
    &block
)
  raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol
  raise ArgumentError, "BUG: arguments required if subprocess name is replaced" if subprocess_name && !arguments

  raise ArgumentError, "BUG: invalid mode specification" unless mode.all?{|m| MODE_PARAMS.include?(m) }
  raise ArgumentError, "BUG: read_with_stderr is exclusive with :read and :stderr" if mode.include?(:read_with_stderr) && (mode.include?(:read) || mode.include?(:stderr))
  raise ArgumentError, "BUG: invalid stderr handling specification" unless STDERR_OPTIONS.include?(stderr)

  raise ArgumentError, "BUG: block not specified which receive i/o object" unless block_given?
  raise ArgumentError, "BUG: number of block arguments are different from size of mode" unless block.arity == mode.size

  running = false
  callback = ->(*args) {
    running = true
    begin
      block.call(*args)
    ensure
      running = false
    end
  }

  if immediate || !interval
    child_process_execute_once(title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, &callback)
  end

  if interval
    timer_execute(:child_process_execute, interval, repeat: true) do
      if !parallel && running
        log.warn "previous child process is still running. skipped.", title: title, command: command, arguments: arguments, interval: interval, parallel: parallel
      else
        child_process_execute_once(title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, &callback)
      end
    end
  end
end
child_process_exit_status() click to toggle source
# File lib/fluent/plugin_helper/child_process.rb, line 52
def child_process_exit_status
  ::Thread.current[:_fluentd_plugin_helper_child_process_exit_status]
end
child_process_id() click to toggle source
# File lib/fluent/plugin_helper/child_process.rb, line 48
def child_process_id
  ::Thread.current[:_fluentd_plugin_helper_child_process_pid]
end
child_process_running?() click to toggle source
# File lib/fluent/plugin_helper/child_process.rb, line 43
def child_process_running?
  # checker for code in callback of child_process_execute
  ::Thread.current[:_fluentd_plugin_helper_child_process_running] || false
end
close() click to toggle source
# File lib/fluent/plugin_helper/child_process.rb, line 138
def close
  while (pids = @_child_process_mutex.synchronize{ @_child_process_processes.keys }).size > 0
    pids.each do |pid|
      process_info = @_child_process_processes[pid]
      if !process_info || !process_info.alive
        @_child_process_mutex.synchronize{ @_child_process_processes.delete(pid) }
        next
      end

      process_info.killed_at ||= Time.now # for illegular case (e.g., created after shutdown)
      next if Time.now < process_info.killed_at + @_child_process_kill_timeout

      child_process_kill(process_info, force: true)
      @_child_process_mutex.synchronize{ @_child_process_processes.delete(pid) }
    end

    sleep CHILD_PROCESS_LOOP_CHECK_INTERVAL
  end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/child_process.rb, line 120
def shutdown
  @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid|
    process_info = @_child_process_processes[pid]
    next if !process_info || !process_info.writeio_in_use
    begin
      Timeout.timeout(@_child_process_exit_timeout) do
        process_info.writeio.close
      end
    rescue Timeout::Error
      log.debug "External process #{process_info.title} doesn't exist after STDIN close in timeout #{@_child_process_exit_timeout}sec"
    end

    child_process_kill(process_info)
  end

  super
end
start() click to toggle source
Calls superclass method Fluent::PluginHelper::Timer#start
# File lib/fluent/plugin_helper/child_process.rb, line 106
def start
  super
  @_child_process_processes = {} # pid => ProcessInfo
end
stop() click to toggle source
# File lib/fluent/plugin_helper/child_process.rb, line 111
def stop
  @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid|
    process_info = @_child_process_processes[pid]
    if process_info
      process_info.thread[:_fluentd_plugin_helper_child_process_running] = false
    end
  end
end