# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 noet:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
# See COPYING file distributed along with the datalad package for the
# copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""
Wrapper for command and function calls, allowing for dry runs and output handling
"""
import subprocess
import sys
import logging
import os
import functools
import tempfile
from locale import getpreferredencoding
import asyncio
from collections import (
namedtuple,
)
from .consts import GIT_SSH_COMMAND
from .dochelpers import (
borrowdoc,
exc_str,
)
from .support import path as op
from .support.exceptions import CommandError
from .utils import (
auto_repr,
ensure_unicode,
generate_file_chunks,
try_multiple,
unlink,
)
lgr = logging.getLogger('datalad.cmd')
# In python3 to split byte stream on newline, it must be bytes
linesep_bytes = os.linesep.encode()
_TEMP_std = sys.stdout, sys.stderr
# To be used in the temp file name to distinguish the ones we create
# in Runner so we take care about their removal, in contrast to those
# which might be created outside and passed into Runner
_MAGICAL_OUTPUT_MARKER = "_runneroutput_"
[docs]async def run_async_cmd(loop, cmd, protocol, stdin, protocol_kwargs=None,
**kwargs):
"""Run a command in a subprocess managed by asyncio
This implementation has been inspired by
https://pymotw.com/3/asyncio/subprocesses.html
Parameters
----------
loop : asyncio.AbstractEventLoop
asyncio event loop instance. Must support subprocesses on the
target platform.
cmd : list or str
Command to be executed, passed to `subprocess_exec` (list), or
`subprocess_shell` (str).
protocol : WitlessProtocol
Protocol class to be instantiated for managing communication
with the subprocess.
stdin : file-like or None
Passed to the subprocess as its standard input.
protocol_kwargs : dict, optional
Passed to the Protocol class constructor.
kwargs : Pass to `subprocess_exec`, will typically be parameters
supported by `subprocess.Popen`.
Returns
-------
undefined
The nature of the return value is determined by the given
protocol class.
"""
if protocol_kwargs is None:
protocol_kwargs = {}
cmd_done = asyncio.Future(loop=loop)
factory = functools.partial(protocol, cmd_done, **protocol_kwargs)
kwargs.update(
stdin=stdin,
# ask the protocol which streams to capture
stdout=asyncio.subprocess.PIPE if protocol.proc_out else None,
stderr=asyncio.subprocess.PIPE if protocol.proc_err else None,
)
if isinstance(cmd, str):
proc = loop.subprocess_shell(factory, cmd, **kwargs)
else:
proc = loop.subprocess_exec(factory, *cmd, **kwargs)
transport = None
result = None
try:
lgr.debug('Launching process %s', cmd)
transport, protocol = await proc
lgr.debug('Waiting for process %i to complete', transport.get_pid())
# The next wait is a workaround that avoids losing the output of
# quickly exiting commands (https://bugs.python.org/issue41594).
await asyncio.ensure_future(transport._wait())
await cmd_done
result = protocol._prepare_result()
finally:
# protect against a crash whe launching the process
if transport:
transport.close()
return result
[docs]class WitlessProtocol(asyncio.SubprocessProtocol):
"""Subprocess communication protocol base class for `run_async_cmd`
This class implements basic subprocess output handling. Derived classes
like `StdOutCapture` should be used for subprocess communication that need
to capture and return output. In particular, the `pipe_data_received()`
method can be overwritten to implement "online" processing of process
output.
This class defines a default return value setup that causes
`run_async_cmd()` to return a 2-tuple with the subprocess's exit code
and a list with bytestrings of all captured output streams.
"""
FD_NAMES = ['stdin', 'stdout', 'stderr']
proc_out = None
proc_err = None
def __init__(self, done_future, encoding=None):
"""
Parameters
----------
done_future : asyncio.Future
Future promise to be fulfilled when process exits.
encoding : str
Encoding to be used for process output bytes decoding. By default,
the preferred system encoding is guessed.
"""
self.done = done_future
# capture output in bytearrays while the process is running
Streams = namedtuple('Streams', ['out', 'err'])
self.buffer = Streams(
out=bytearray() if self.proc_out else None,
err=bytearray() if self.proc_err else None,
)
self.pid = None
super().__init__()
self.encoding = encoding or getpreferredencoding(do_setlocale=False)
self._log_outputs = False
if lgr.isEnabledFor(5):
try:
from . import cfg
self._log_outputs = cfg.getbool('datalad.log', 'outputs', default=False)
except ImportError:
pass
self._log = self._log_summary
else:
self._log = self._log_nolog
def _log_nolog(self, *args):
pass
def _log_summary(self, fd, data):
fd_name = self.FD_NAMES[fd]
lgr.log(5, 'Read %i bytes from %i[%s]%s',
len(data), self.pid, fd_name, ':' if self._log_outputs else '')
if self._log_outputs:
log_data = ensure_unicode(data)
# The way we log is to stay consistent with Runner.
# TODO: later we might just log in a single entry, without
# fd_name prefix
lgr.log(5, "%s| %s ", fd_name, log_data)
[docs] def connection_made(self, transport):
self.transport = transport
self.pid = transport.get_pid()
lgr.debug('Process %i started', self.pid)
[docs] def pipe_data_received(self, fd, data):
self._log(fd, data)
# store received output if stream was to be captured
if self.buffer[fd - 1] is not None:
self.buffer[fd - 1].extend(data)
def _prepare_result(self):
"""Prepares the final result to be returned to the runner
Note for derived classes overwriting this method:
The result must be a dict with keys that do not unintentionally
conflict with the API of CommandError, as the result dict is passed to
this exception class as kwargs on error. The Runner will overwrite
'cmd' and 'cwd' on error, if they are present in the result.
"""
return_code = self.transport.get_returncode()
lgr.debug(
'Process %i exited with return code %i',
self.pid, return_code)
# give captured process output back to the runner as string(s)
results = {
name:
bytes(byt).decode(self.encoding)
if byt else ''
for name, byt in zip(self.FD_NAMES[1:], self.buffer)
}
results['code'] = return_code
return results
[docs] def process_exited(self):
# actually fulfill the future promise and let the execution finish
self.done.set_result(True)
[docs]class NoCapture(WitlessProtocol):
"""WitlessProtocol that captures no subprocess output
As this is identical with the behavior of the WitlessProtocol base class,
this class is merely a more readable convenience alias.
"""
pass
[docs]class StdOutCapture(WitlessProtocol):
"""WitlessProtocol that only captures and returns stdout of a subprocess"""
proc_out = True
[docs]class StdErrCapture(WitlessProtocol):
"""WitlessProtocol that only captures and returns stderr of a subprocess"""
proc_err = True
[docs]class StdOutErrCapture(WitlessProtocol):
"""WitlessProtocol that captures and returns stdout/stderr of a subprocess
"""
proc_out = True
proc_err = True
[docs]class KillOutput(WitlessProtocol):
"""WitlessProtocol that swallows stdout/stderr of a subprocess
"""
proc_out = True
proc_err = True
[docs] def pipe_data_received(self, fd, data):
if lgr.isEnabledFor(5):
lgr.log(
5,
'Discarded %i bytes from %i[%s]',
len(data), self.pid, self.FD_NAMES[fd])
[docs]class WitlessRunner(object):
"""Minimal Runner with support for online command output processing
It aims to be as simple as possible, providing only essential
functionality.
"""
__slots__ = ['cwd', 'env']
# To workaround issues where parent process does not take care about proper
# new loop instantiation in a child process
# https://bugs.python.org/issue21998
_loop_pid = None
_loop_need_new = None
def __init__(self, cwd=None, env=None):
"""
Parameters
----------
cwd : path-like, optional
If given, commands are executed with this path as PWD,
the PWD of the parent process is used otherwise.
env : dict, optional
Environment to be used for command execution. If `cwd`
was given, 'PWD' in the environment is set to its value.
This must be a complete environment definition, no values
from the current environment will be inherited.
"""
self.env = env
# stringify to support Path instances on PY35
self.cwd = str(cwd) if cwd is not None else None
def _get_adjusted_env(self, env=None, cwd=None, copy=True):
"""Return an adjusted copy of an execution environment
Or return an unaltered copy of the environment, if no adjustments
need to be made.
"""
if copy:
env = env.copy() if env else None
if cwd and env is not None:
# if CWD was provided, we must not make it conflict with
# a potential PWD setting
env['PWD'] = cwd
return env
[docs] def run(self, cmd, protocol=None, stdin=None, cwd=None, env=None, **kwargs):
"""Execute a command and communicate with it.
Parameters
----------
cmd : list or str
Sequence of program arguments. Passing a single string causes
execution via the platform shell.
protocol : WitlessProtocol, optional
Protocol class handling interaction with the running process
(e.g. output capture). A number of pre-crafted classes are
provided (e.g `KillOutput`, `NoCapture`, `GitProgress`).
stdin : byte stream, optional
File descriptor like, used as stdin for the process. Passed
verbatim to subprocess.Popen().
cwd : path-like, optional
If given, commands are executed with this path as PWD,
the PWD of the parent process is used otherwise. Overrides
any `cwd` given to the constructor.
env : dict, optional
Environment to be used for command execution. If `cwd`
was given, 'PWD' in the environment is set to its value.
This must be a complete environment definition, no values
from the current environment will be inherited. Overrides
any `env` given to the constructor.
kwargs :
Passed to the Protocol class constructor.
Returns
-------
dict
At minimum there will be keys 'stdout', 'stderr' with
unicode strings of the cumulative standard output and error
of the process as values.
Raises
------
CommandError
On execution failure (non-zero exit code) this exception is
raised which provides the command (cmd), stdout, stderr,
exit code (status), and a message identifying the failed
command, as properties.
FileNotFoundError
When a given executable does not exist.
"""
if protocol is None:
# by default let all subprocess stream pass through
protocol = NoCapture
cwd = cwd or self.cwd
env = self._get_adjusted_env(
env or self.env,
cwd=cwd,
)
# rescue any event-loop to be able to reassign after we are done
# with our own event loop management
# this is how ipython does it
try:
is_new_proc = self._check_if_new_proc()
event_loop = asyncio.get_event_loop()
if is_new_proc:
self._check_if_loop_usable(event_loop, stdin)
if event_loop.is_closed():
raise RuntimeError("the loop was closed - use our own")
new_loop = False
except RuntimeError:
event_loop = self._get_new_event_loop()
new_loop = True
try:
lgr.debug('Async run:\n cwd=%s\n cmd=%s', cwd, cmd)
# include the subprocess manager in the asyncio event loop
results = event_loop.run_until_complete(
run_async_cmd(
event_loop,
cmd,
protocol,
stdin,
protocol_kwargs=kwargs,
cwd=cwd,
env=env,
)
)
finally:
if new_loop:
# be kind to callers and leave asyncio as we found it
asyncio.set_event_loop(None)
# terminate the event loop, cannot be undone, hence we start a fresh
# one each time (see BlockingIOError notes above)
event_loop.close()
# log before any exception is raised
lgr.log(8, "Finished running %r with status %s", cmd, results['code'])
# make it such that we always blow if a protocol did not report
# a return code at all
if results.get('code', True) not in [0, None]:
# the runner has a better idea, doc string warns Protocol
# implementations not to return these
results.pop('cmd', None)
results.pop('cwd', None)
raise CommandError(
# whatever the results were, we carry them forward
cmd=cmd,
cwd=self.cwd,
**results,
)
# denoise, must be zero at this point
results.pop('code', None)
return results
@classmethod
def _check_if_new_proc(cls):
"""Check if WitlessRunner is used under a new PID
Note that this is a function that is meant to be called from within a
particular context only. The RuntimeError is expected to be catched by
the caller and is meant to be more like a response message than an
exception.
Returns
-------
bool
Raises
------
RuntimeError
If it is not a new proc and we already know that we need a new loop
in this pid
"""
pid = os.getpid()
is_new_proc = cls._loop_pid is None or cls._loop_pid != pid
if is_new_proc:
# We need to check if we can run any command smoothly
cls._loop_pid = pid
cls._loop_need_new = None
elif cls._loop_need_new:
raise RuntimeError("we know we need a new loop")
return is_new_proc
@classmethod
def _check_if_loop_usable(cls, event_loop, stdin):
"""Check if given event_loop could run a simple command
Sets _loop_need_new variable to a bool depending on what it finds
Note that this is a function that is meant to be called from within a
particular context only. The RuntimeError is expected to be catched by
the caller and is meant to be more like a response message than an
exception.
Raises
------
RuntimeError
If loop is not reusable
"""
# We need to check if we can run any command smoothly
try:
event_loop.run_until_complete(
run_async_cmd(
event_loop,
[sys.executable, "--version"], # fast! 0.004 sec and to be ran once per process
KillOutput,
stdin,
)
)
cls._loop_need_new = False
except OSError as e:
# due to https://bugs.python.org/issue21998
# exhibits in https://github.com/ReproNim/testkraken/issues/95
lgr.debug("It seems we need a new loop when running our commands: %s", exc_str(e))
cls._loop_need_new = True
raise RuntimeError("the loop is not reusable")
@staticmethod
def _get_new_event_loop():
# start a new event loop, which we will close again further down
# if this is not done events like this will occur
# BlockingIOError: [Errno 11] Resource temporarily unavailable
# Exception ignored when trying to write to the signal wakeup fd:
# It is unclear to me why it happens when reusing an event looped
# that it stopped from time to time, but starting fresh and doing
# a full termination seems to address the issue
if sys.platform == "win32":
# use special event loop that supports subprocesses on windows
event_loop = asyncio.ProactorEventLoop()
else:
event_loop = asyncio.SelectorEventLoop()
asyncio.set_event_loop(event_loop)
return event_loop
[docs]class GitRunnerBase(object):
"""
Mix-in class for Runners to be used to run git and git annex commands
Overloads the runner class to check & update GIT_DIR and
GIT_WORK_TREE environment variables set to the absolute path
if is defined and is relative path
"""
_GIT_PATH = None
@staticmethod
def _check_git_path():
"""If using bundled git-annex, we would like to use bundled with it git
Thus we will store _GIT_PATH a path to git in the same directory as annex
if found. If it is empty (but not None), we do nothing
"""
if GitRunnerBase._GIT_PATH is None:
from distutils.spawn import find_executable
# with all the nesting of config and this runner, cannot use our
# cfg here, so will resort to dark magic of environment options
if (os.environ.get('DATALAD_USE_DEFAULT_GIT', '0').lower()
in ('1', 'on', 'true', 'yes')):
git_fpath = find_executable("git")
if git_fpath:
GitRunnerBase._GIT_PATH = ''
lgr.log(9, "Will use default git %s", git_fpath)
return # we are done - there is a default git avail.
# if not -- we will look for a bundled one
GitRunnerBase._GIT_PATH = GitRunnerBase._get_bundled_path()
lgr.log(9, "Will use git under %r (no adjustments to PATH if empty "
"string)", GitRunnerBase._GIT_PATH)
assert(GitRunnerBase._GIT_PATH is not None) # we made the decision!
@staticmethod
def _get_bundled_path():
from distutils.spawn import find_executable
annex_fpath = find_executable("git-annex")
if not annex_fpath:
# not sure how to live further anyways! ;)
alongside = False
else:
annex_path = op.dirname(op.realpath(annex_fpath))
bundled_git_path = op.join(annex_path, 'git')
# we only need to consider bundled git if it's actually different
# from default. (see issue #5030)
alongside = op.lexists(bundled_git_path) and \
bundled_git_path != op.realpath(find_executable('git'))
return annex_path if alongside else ''
[docs] @staticmethod
def get_git_environ_adjusted(env=None):
"""
Replaces GIT_DIR and GIT_WORK_TREE with absolute paths if relative path and defined
"""
# if env set copy else get os environment
git_env = env.copy() if env else os.environ.copy()
if GitRunnerBase._GIT_PATH:
git_env['PATH'] = op.pathsep.join([GitRunnerBase._GIT_PATH, git_env['PATH']]) \
if 'PATH' in git_env \
else GitRunnerBase._GIT_PATH
for varstring in ['GIT_DIR', 'GIT_WORK_TREE']:
var = git_env.get(varstring)
if var: # if env variable set
if not op.isabs(var): # and it's a relative path
git_env[varstring] = op.abspath(var) # to absolute path
lgr.log(9, "Updated %s to %s", varstring, git_env[varstring])
if 'GIT_SSH_COMMAND' not in git_env:
git_env['GIT_SSH_COMMAND'] = GIT_SSH_COMMAND
git_env['GIT_SSH_VARIANT'] = 'ssh'
# We are parsing error messages and hints. For those to work more
# reliably we are doomed to sacrifice i18n effort of git, and enforce
# consistent language of the messages
git_env['LC_MESSAGES'] = 'C'
# But since LC_ALL takes precedence, over LC_MESSAGES, we cannot
# "leak" that one inside, and are doomed to pop it
git_env.pop('LC_ALL', None)
return git_env
[docs]class GitWitlessRunner(WitlessRunner, GitRunnerBase):
"""A WitlessRunner for git and git-annex commands.
See GitRunnerBase it mixes in for more details
"""
@borrowdoc(WitlessRunner)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._check_git_path()
def _get_adjusted_env(self, env=None, cwd=None, copy=True):
env = GitRunnerBase.get_git_environ_adjusted(env=env)
return super()._get_adjusted_env(
env=env,
cwd=cwd,
# git env above is already a copy, so we have no choice,
# but we can prevent duplication
copy=False,
)
[docs] def run_on_filelist_chunks(self, cmd, files, protocol=None,
cwd=None, env=None, **kwargs):
"""Run a git-style command multiple times if `files` is too long
Parameters
----------
cmd : list
Sequence of program arguments.
files : list
List of files.
protocol : WitlessProtocol, optional
Protocol class handling interaction with the running process
(e.g. output capture). A number of pre-crafted classes are
provided (e.g `KillOutput`, `NoCapture`, `GitProgress`).
cwd : path-like, optional
If given, commands are executed with this path as PWD,
the PWD of the parent process is used otherwise. Overrides
any `cwd` given to the constructor.
env : dict, optional
Environment to be used for command execution. If `cwd`
was given, 'PWD' in the environment is set to its value.
This must be a complete environment definition, no values
from the current environment will be inherited. Overrides
any `env` given to the constructor.
kwargs :
Passed to the Protocol class constructor.
Returns
-------
dict
At minimum there will be keys 'stdout', 'stderr' with
unicode strings of the cumulative standard output and error
of the process as values.
Raises
------
CommandError
On execution failure (non-zero exit code) this exception is
raised which provides the command (cmd), stdout, stderr,
exit code (status), and a message identifying the failed
command, as properties.
FileNotFoundError
When a given executable does not exist.
"""
assert isinstance(cmd, list)
file_chunks = generate_file_chunks(files, cmd)
results = None
for i, file_chunk in enumerate(file_chunks):
# do not pollute with message when there only ever is a single chunk
if len(file_chunk) < len(files):
lgr.debug('Process file list chunk %i (length %i)',
i, len(file_chunk))
res = self.run(
cmd + ['--'] + file_chunk,
protocol=protocol,
cwd=cwd,
env=env,
**kwargs)
if results is None:
results = res
else:
for k, v in res.items():
results[k] += v
return results
[docs]def readline_rstripped(stdout):
#return iter(stdout.readline, b'').next().rstrip()
return stdout.readline().rstrip()
[docs]class SafeDelCloseMixin(object):
"""A helper class to use where __del__ would call .close() which might
fail if "too late in GC game"
"""
def __del__(self):
try:
self.close()
except TypeError:
if os.fdopen is None or lgr.debug is None:
# if we are late in the game and things already gc'ed in py3,
# it is Ok
return
raise
[docs]@auto_repr
class BatchedCommand(SafeDelCloseMixin):
"""Container for a process which would allow for persistent communication
"""
def __init__(self, cmd, path=None, output_proc=None):
if not isinstance(cmd, list):
cmd = [cmd]
self.cmd = cmd
self.path = path
self.output_proc = output_proc if output_proc else readline_rstripped
self._process = None
self._stderr_out = None
self._stderr_out_fname = None
def _initialize(self):
lgr.debug("Initiating a new process for %s", repr(self))
lgr.log(5, "Command: %s", self.cmd)
# according to the internet wisdom there is no easy way with subprocess
# while avoid deadlocks etc. We would need to start a thread/subprocess
# to timeout etc
# kwargs = dict(bufsize=1, universal_newlines=True) if PY3 else {}
self._stderr_out, self._stderr_out_fname = tempfile.mkstemp()
self._process = subprocess.Popen(
self.cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=self._stderr_out,
env=GitRunnerBase.get_git_environ_adjusted(),
cwd=self.path,
bufsize=1,
universal_newlines=True # **kwargs
)
def _check_process(self, restart=False):
"""Check if the process was terminated and restart if restart
Returns
-------
bool
True if process was alive.
str
stderr if any recorded if was terminated
"""
process = self._process
ret = True
ret_stderr = None
if process and process.poll():
lgr.warning("Process %s was terminated with returncode %s" % (process, process.returncode))
ret_stderr = self.close(return_stderr=True)
ret = False
if self._process is None and restart:
lgr.warning("Restarting the process due to previous failure")
self._initialize()
return ret, ret_stderr
def __call__(self, cmds):
"""
Parameters
----------
cmds : str or tuple or list of (str or tuple)
Returns
-------
str or list
Output received from process. list in case if cmds was a list
"""
input_multiple = isinstance(cmds, list)
if not input_multiple:
cmds = [cmds]
output = [o for o in self.yield_(cmds)]
return output if input_multiple else output[0]
[docs] def yield_(self, cmds):
"""Same as __call__, but requires `cmds` to be an iterable
and yields results for each item."""
for entry in cmds:
if not isinstance(entry, str):
entry = ' '.join(entry)
yield self.proc1(entry)
[docs] def proc1(self, arg):
"""Same as __call__, but only takes a single command argument
and returns a single result.
"""
# TODO: add checks -- may be process died off and needs to be reinitiated
if not self._process:
self._initialize()
entry = arg + '\n'
lgr.log(5, "Sending %r to batched command %s", entry, self)
# apparently communicate is just a one time show
# stdout, stderr = self._process.communicate(entry)
# according to the internet wisdom there is no easy way with subprocess
self._check_process(restart=True)
process = self._process # _check_process might have restarted it
process.stdin.write(entry)
process.stdin.flush()
lgr.log(5, "Done sending.")
still_alive, stderr = self._check_process(restart=False)
# TODO: we might want to handle still_alive, e.g. to allow for
# a number of restarts/resends, but it should be per command
# since for some we cannot just resend the same query. But if
# it is just a "get"er - we could resend it few times
# The default output_proc expects a single line output.
# TODO: timeouts etc
stdout = ensure_unicode(self.output_proc(process.stdout)) \
if not process.stdout.closed else None
if stderr:
lgr.warning("Received output in stderr: %r", stderr)
lgr.log(5, "Received output: %r", stdout)
return stdout
[docs] def close(self, return_stderr=False):
"""Close communication and wait for process to terminate
Returns
-------
str
stderr output if return_stderr and stderr file was there.
None otherwise
"""
ret = None
process = self._process
if self._stderr_out:
# close possibly still open fd
lgr.debug(
"Closing stderr of %s", process)
os.fdopen(self._stderr_out).close()
self._stderr_out = None
if process:
lgr.debug(
"Closing stdin of %s and waiting process to finish", process)
process.stdin.close()
process.stdout.close()
from . import cfg
cfg_var = 'datalad.runtime.stalled-external'
cfg_val = cfg.obtain(cfg_var)
if cfg_val == 'wait':
process.wait()
elif cfg_val == 'abandon':
# try waiting for the annex process to finish 3 times for 3 sec
# with 1s pause in between
try:
try_multiple(
# ntrials
3,
# exception to catch
subprocess.TimeoutExpired,
# base waiting period
1.0,
# function to run
process.wait,
timeout=3.0,
)
except subprocess.TimeoutExpired:
lgr.warning(
"Batched process %s did not finish, abandoning it without killing it",
process)
else:
raise ValueError(f"Unexpected {cfg_var}={cfg_val!r}")
self._process = None
lgr.debug("Process %s has finished", process)
# It is hard to debug when something is going wrong. Hopefully logging stderr
# if generally asked might be of help
if lgr.isEnabledFor(5):
from . import cfg
log_stderr = cfg.getbool('datalad.log', 'outputs', default=False)
else:
log_stderr = False
if self._stderr_out_fname and os.path.exists(self._stderr_out_fname):
if return_stderr or log_stderr:
with open(self._stderr_out_fname, 'r') as f:
stderr = f.read()
if return_stderr:
ret = stderr
if log_stderr:
stderr = ensure_unicode(stderr)
stderr = stderr.splitlines()
lgr.log(5, "stderr of %s had %d lines:", process.pid, len(stderr))
for l in stderr:
lgr.log(5, "| " + l)
# remove the file where we kept dumping stderr
unlink(self._stderr_out_fname)
self._stderr_out_fname = None
return ret