#!/usr/bin/env python3 import asyncio import argparse import os import re import signal import sys DEFAULT_TIMEOUT = 30 # TODO: signal handler to terminate spawned process group when wait-for is killed # TODO: better return codes esp. when matches are found # TODO: multiple patterns (multiple out, err, both) # TODO: print unmatched patterns async def terminate(p): # Terminate the process group (shell, crowdsec plugins) try: os.killpg(os.getpgid(p.pid), signal.SIGTERM) except ProcessLookupError: pass async def monitor(cmd, args, want_out, want_err, timeout): """Monitor a process and terminate it if a pattern is matched in stdout or stderr. Args: cmd: The command to run. args: A list of arguments to pass to the command. stdout: A regular expression pattern to search for in stdout. stderr: A regular expression pattern to search for in stderr. timeout: The maximum number of seconds to wait for the process to terminate. Returns: The exit code of the process. """ status = None async def read_stream(p, stream, outstream, pattern): nonlocal status if stream is None: return while True: line = await stream.readline() if line: line = line.decode('utf-8') outstream.write(line) if pattern and pattern.search(line): await terminate(process) # this is nasty. # if we timeout, we want to return a different exit code # in case of a match, so that the caller can tell # if the application was still running. # XXX: still not good for match found, but return code != 0 if timeout != DEFAULT_TIMEOUT: status = 128 else: status = 0 break else: break process = await asyncio.create_subprocess_exec( cmd, *args, # capture stdout stdout=asyncio.subprocess.PIPE, # capture stderr stderr=asyncio.subprocess.PIPE, # disable buffering bufsize=0, # create a new process group # (required to kill child processes when cmd is a shell) preexec_fn=os.setsid) out_regex = re.compile(want_out) if want_out else None err_regex = re.compile(want_err) if want_err else None # Apply a timeout try: await asyncio.wait_for( asyncio.wait([ asyncio.create_task(process.wait()), asyncio.create_task(read_stream(process, process.stdout, sys.stdout, out_regex)), asyncio.create_task(read_stream(process, process.stderr, sys.stderr, err_regex)) ]), timeout) if status is None: status = process.returncode except asyncio.TimeoutError: await terminate(process) status = 241 # Return the same exit code, stdout and stderr as the spawned process return status async def main(): parser = argparse.ArgumentParser( description='Monitor a process and terminate it if a pattern is matched in stdout or stderr.') parser.add_argument('cmd', help='The command to run.') parser.add_argument('args', nargs=argparse.REMAINDER, help='A list of arguments to pass to the command.') parser.add_argument('--out', default='', help='A regular expression pattern to search for in stdout.') parser.add_argument('--err', default='', help='A regular expression pattern to search for in stderr.') parser.add_argument('--timeout', type=float, default=DEFAULT_TIMEOUT) args = parser.parse_args() exit_code = await monitor(args.cmd, args.args, args.out, args.err, args.timeout) sys.exit(exit_code) if __name__ == '__main__': asyncio.run(main())