module Fluent::PluginHelper

Constants

ProcessInfo

Public Instance Methods

child_process_execute_once( title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, &block ) click to toggle source
# File lib/fluent/plugin_helper/child_process.rb, line 195
def child_process_execute_once(
    title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir,
    internal_encoding, external_encoding, scrub, replace_string, &block
)
  spawn_args = if arguments || subprocess_name
                 [ env, (subprocess_name ? [command, subprocess_name] : command), *(arguments || []) ]
               else
                 [ env, command ]
               end
  spawn_opts = {
    unsetenv_others: unsetenv,
  }
  if chdir
    spawn_opts[:chdir] = chdir
  end

  encoding_options = {}
  if scrub
    encoding_options[:invalid] = encoding_options[:undef] = :replace
    if replace_string
      encoding_options[:replace] = replace_string
    end
  end

  log.debug "Executing command", title: title, spawn: spawn_args, mode: mode, stderr: stderr

  readio = writeio = stderrio = wait_thread = nil
  readio_in_use = writeio_in_use = stderrio_in_use = false

  if !mode.include?(:stderr) && !mode.include?(:read_with_stderr) && stderr != :discard # connect
    writeio, readio, wait_thread = *Open3.popen2(*spawn_args, spawn_opts)
  elsif mode.include?(:read_with_stderr)
    writeio, readio, wait_thread = *Open3.popen2e(*spawn_args, spawn_opts)
  else
    writeio, readio, stderrio, wait_thread = *Open3.popen3(*spawn_args, spawn_opts)
    if !mode.include?(:stderr) # stderr == :discard
      stderrio.reopen(IO::NULL)
    end
  end

  if mode.include?(:write)
    writeio.set_encoding(external_encoding, internal_encoding, encoding_options)
    writeio_in_use = true
  end
  if mode.include?(:read) || mode.include?(:read_with_stderr)
    readio.set_encoding(external_encoding, internal_encoding, encoding_options)
    readio_in_use = true
  end
  if mode.include?(:stderr)
    stderrio.set_encoding(external_encoding, internal_encoding, encoding_options)
    stderrio_in_use = true
  end

  pid = wait_thread.pid # wait_thread => Process::Waiter

  io_objects = []
  mode.each do |m|
    io_objects << case m
                  when :read then readio
                  when :write then writeio
                  when :read_with_stderr then readio
                  when :stderr then stderrio
                  else
                    raise "BUG: invalid mode must be checked before here: '#{m}'"
                  end
  end

  m = Mutex.new
  m.lock
  thread = thread_create :child_process_callback do
    m.lock # run after plugin thread get pid, thread instance and i/o
    m.unlock
    begin
      block.call(*io_objects)
    rescue EOFError => e
      log.debug "Process exit and I/O closed", title: title, pid: pid, command: command, arguments: arguments
    rescue IOError => e
      if e.message == 'stream closed'
        log.debug "Process I/O stream closed", title: title, pid: pid, command: command, arguments: arguments
      else
        log.error "Unexpected I/O error for child process", title: title, pid: pid, command: command, arguments: arguments, error: e
      end
    rescue => e
      log.warn "Unexpected error while processing I/O for child process", title: title, pid: pid, command: command, error: e
    end
    process_info = @_child_process_mutex.synchronize do
      process_info = @_child_process_processes[pid]
      @_child_process_processes.delete(pid)
      process_info
    end
    child_process_kill(process_info, force: true) if process_info && process_info.alive && ::Thread.current[:_fluentd_plugin_helper_child_process_running]
  end
  thread[:_fluentd_plugin_helper_child_process_running] = true
  thread[:_fluentd_plugin_helper_child_process_pid] = pid
  pinfo = ProcessInfo.new(title, thread, pid, readio, readio_in_use, writeio, writeio_in_use, stderrio, stderrio_in_use, wait_thread, true, nil)
  @_child_process_mutex.synchronize do
    @_child_process_processes[pid] = pinfo
  end
  m.unlock
  pid
end
child_process_kill(process_info, force: false) click to toggle source
# File lib/fluent/plugin_helper/child_process.rb, line 160
def child_process_kill(process_info, force: false)
  if !process_info || !process_info.alive
    return
  end

  process_info.killed_at = Time.now unless force

  begin
    pid, status = Process.waitpid2(process_info.pid, Process::WNOHANG)
    if pid && status
      process_info.thread[:_fluentd_plugin_helper_child_process_exit_status] = status
      process_info.alive = false
    end
  rescue Errno::ECHILD, Errno::ESRCH, Errno::EPERM
    process_info.alive = false
  rescue
    # ignore
  end
  if !process_info.alive
    return
  end

  begin
    signal = (Fluent.windows? || force) ? :KILL : :TERM
    Process.kill(signal, process_info.pid)
    if force
      process_info.alive = false
    end
  rescue Errno::ECHILD, Errno::ESRCH
    process_info.alive = false
  end
end
helpers(*snake_case_symbols) click to toggle source
# File lib/fluent/plugin_helper.rb, line 38
def helpers(*snake_case_symbols)
  helper_modules = snake_case_symbols.map{|name| Fluent::PluginHelper.const_get(name.to_s.split('_').map(&:capitalize).join) }
  include(*helper_modules)
end