Skip to content

Commit

Permalink
Avoid resource leaks with os.devnull
Browse files Browse the repository at this point in the history
  • Loading branch information
ssbarnea committed Sep 6, 2021
1 parent 955aea7 commit 3e3c902
Showing 1 changed file with 68 additions and 68 deletions.
136 changes: 68 additions & 68 deletions src/subprocess_tee/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,78 38,78 @@ async def _stream_subprocess(args: str, **kwargs: Any) -> CompletedProcess:
# this part keeps behavior backwards compatible with subprocess.run
tee = kwargs.get("tee", True)
stdout = kwargs.get("stdout", sys.stdout)
if stdout == subprocess.DEVNULL or not tee:
# pylint: disable=consider-using-with
stdout = open(os.devnull, "w", encoding="UTF-8")
stderr = kwargs.get("stderr", sys.stderr)
if stderr == subprocess.DEVNULL or not tee:
# pylint: disable=consider-using-with
stderr = open(os.devnull, "w", encoding="UTF-8")

# We need to tell subprocess which shell to use when running shell-like
# commands.
# * SHELL is not always defined
# * /bin/bash does not exit on alpine, /bin/sh seems bit more portable
if "executable" not in kwargs and isinstance(args, str) and " " in args:
platform_settings["executable"] = os.environ.get("SHELL", "/bin/sh")

# pass kwargs we know to be supported
for arg in ["cwd", "env"]:
if arg in kwargs:
platform_settings[arg] = kwargs[arg]

# Some users are reporting that default (undocumented) limit 64k is too
# low
process = await asyncio.create_subprocess_shell(
args,
limit=STREAM_LIMIT,
stdin=kwargs.get("stdin", False),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
**platform_settings,
)
out: List[str] = []
err: List[str] = []

def tee_func(line: bytes, sink: List[str], pipe: Optional[Any]) -> None:
line_str = line.decode("utf-8").rstrip()
sink.append(line_str)
if not kwargs.get("quiet", False):
print(line_str, file=pipe)

loop = asyncio.get_event_loop()
tasks = []
if process.stdout:
tasks.append(
loop.create_task(
_read_stream(process.stdout, lambda l: tee_func(l, out, stdout))
)
with open(os.devnull, "w", encoding="UTF-8") as devnull:
if stdout == subprocess.DEVNULL or not tee:
stdout = devnull
stderr = kwargs.get("stderr", sys.stderr)
if stderr == subprocess.DEVNULL or not tee:
stderr = devnull

# We need to tell subprocess which shell to use when running shell-like
# commands.
# * SHELL is not always defined
# * /bin/bash does not exit on alpine, /bin/sh seems bit more portable
if "executable" not in kwargs and isinstance(args, str) and " " in args:
platform_settings["executable"] = os.environ.get("SHELL", "/bin/sh")

# pass kwargs we know to be supported
for arg in ["cwd", "env"]:
if arg in kwargs:
platform_settings[arg] = kwargs[arg]

# Some users are reporting that default (undocumented) limit 64k is too
# low
process = await asyncio.create_subprocess_shell(
args,
limit=STREAM_LIMIT,
stdin=kwargs.get("stdin", False),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
**platform_settings,
)
if process.stderr:
tasks.append(
loop.create_task(
_read_stream(process.stderr, lambda l: tee_func(l, err, stderr))
out: List[str] = []
err: List[str] = []

def tee_func(line: bytes, sink: List[str], pipe: Optional[Any]) -> None:
line_str = line.decode("utf-8").rstrip()
sink.append(line_str)
if not kwargs.get("quiet", False):
print(line_str, file=pipe)

loop = asyncio.get_event_loop()
tasks = []
if process.stdout:
tasks.append(
loop.create_task(
_read_stream(process.stdout, lambda l: tee_func(l, out, stdout))
)
)
if process.stderr:
tasks.append(
loop.create_task(
_read_stream(process.stderr, lambda l: tee_func(l, err, stderr))
)
)
)

await asyncio.wait(set(tasks))

# We need to be sure we keep the stdout/stderr output identical with
# the ones procued by subprocess.run(), at least when in text mode.
check = kwargs.get("check", False)
stdout = None if check else ""
stderr = None if check else ""
if out:
stdout = os.linesep.join(out) os.linesep
if err:
stderr = os.linesep.join(err) os.linesep

return CompletedProcess(
args=args,
returncode=await process.wait(),
stdout=stdout,
stderr=stderr,
)
await asyncio.wait(set(tasks))

# We need to be sure we keep the stdout/stderr output identical with
# the ones procued by subprocess.run(), at least when in text mode.
check = kwargs.get("check", False)
stdout = None if check else ""
stderr = None if check else ""
if out:
stdout = os.linesep.join(out) os.linesep
if err:
stderr = os.linesep.join(err) os.linesep

return CompletedProcess(
args=args,
returncode=await process.wait(),
stdout=stdout,
stderr=stderr,
)


def run(args: Union[str, List[str]], **kwargs: Any) -> CompletedProcess:
Expand Down

0 comments on commit 3e3c902

Please sign in to comment.