def child_process_execute_once(
title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir,
internal_encoding, external_encoding, scrub, replace_string, &block
)
spawn_args = if arguments || subprocess_name
[ env, (subprocess_name ? [command, subprocess_name] : command), *(arguments || []) ]
else
[ env, command ]
end
spawn_opts = {
unsetenv_others: unsetenv,
}
if chdir
spawn_opts[:chdir] = chdir
end
encoding_options = {}
if scrub
encoding_options[:invalid] = encoding_options[:undef] = :replace
if replace_string
encoding_options[:replace] = replace_string
end
end
log.debug "Executing command", title: title, spawn: spawn_args, mode: mode, stderr: stderr
readio = writeio = stderrio = wait_thread = nil
readio_in_use = writeio_in_use = stderrio_in_use = false
if !mode.include?(:stderr) && !mode.include?(:read_with_stderr) && stderr != :discard
writeio, readio, wait_thread = *Open3.popen2(*spawn_args, spawn_opts)
elsif mode.include?(:read_with_stderr)
writeio, readio, wait_thread = *Open3.popen2e(*spawn_args, spawn_opts)
else
writeio, readio, stderrio, wait_thread = *Open3.popen3(*spawn_args, spawn_opts)
if !mode.include?(:stderr)
stderrio.reopen(IO::NULL)
end
end
if mode.include?(:write)
writeio.set_encoding(external_encoding, internal_encoding, encoding_options)
writeio_in_use = true
end
if mode.include?(:read) || mode.include?(:read_with_stderr)
readio.set_encoding(external_encoding, internal_encoding, encoding_options)
readio_in_use = true
end
if mode.include?(:stderr)
stderrio.set_encoding(external_encoding, internal_encoding, encoding_options)
stderrio_in_use = true
end
pid = wait_thread.pid
io_objects = []
mode.each do |m|
io_objects << case m
when :read then readio
when :write then writeio
when :read_with_stderr then readio
when :stderr then stderrio
else
raise "BUG: invalid mode must be checked before here: '#{m}'"
end
end
m = Mutex.new
m.lock
thread = thread_create :child_process_callback do
m.lock
m.unlock
begin
block.call(*io_objects)
rescue EOFError => e
log.debug "Process exit and I/O closed", title: title, pid: pid, command: command, arguments: arguments
rescue IOError => e
if e.message == 'stream closed'
log.debug "Process I/O stream closed", title: title, pid: pid, command: command, arguments: arguments
else
log.error "Unexpected I/O error for child process", title: title, pid: pid, command: command, arguments: arguments, error: e
end
rescue => e
log.warn "Unexpected error while processing I/O for child process", title: title, pid: pid, command: command, error: e
end
process_info = @_child_process_mutex.synchronize do
process_info = @_child_process_processes[pid]
@_child_process_processes.delete(pid)
process_info
end
child_process_kill(process_info, force: true) if process_info && process_info.alive && ::Thread.current[:_fluentd_plugin_helper_child_process_running]
end
thread[:_fluentd_plugin_helper_child_process_running] = true
thread[:_fluentd_plugin_helper_child_process_pid] = pid
pinfo = ProcessInfo.new(title, thread, pid, readio, readio_in_use, writeio, writeio_in_use, stderrio, stderrio_in_use, wait_thread, true, nil)
@_child_process_mutex.synchronize do
@_child_process_processes[pid] = pinfo
end
m.unlock
pid
end