Source code for scripts.pipeline

from runners.utils import load_yaml
from . import cmd, document_parser
from runners.script_runner_pool import ScriptRunnerPool
from src import logging
from multiprocessing import cpu_count
from argparse import ArgumentParser

[docs]@document_parser('pipeline', 'scripts.pipeline.parallel_job_execution') def build_parser(): parser = ArgumentParser() parser.add_argument('--script', type=str, help="Path to script to run.") parser.add_argument('--config', type=str, help="Path to config for script.") parser.add_argument('--run_in', type=str, help="Run in host or container.", default='host') parser.add_argument('--num_gpus', type=int, help="How many GPUs to use.", default=0) parser.add_argument('--blocking', help="Finish this job before proceeding to next.", action='store_true') return parser
[docs]def parallel_job_execution(script_func, jobs, num_jobs=1): """ Takes a .yml file with structure as follows:: script: name of script in scripts/ folder config: path/to/yml/config.yml run_in: 'host' or 'container' (default: host) num_gpus: how many gpus (default: 0) blocking: whether to block on this job or not (default: false) Could also be multiple jobs:: num_jobs: how many jobs to run in parallel (default: 1) jobs: - script: script1.py config: config1.yml - script: script2.py config: config2.yml ... The jobs get executed in sequence or in parallel. Args: path_to_yml_file (str): Path to .yml file specifying the sequence of jobs that should be run. """ num_jobs = min(cpu_count(), num_jobs) logging.info( f"\n Executing scripts with num_jobs: {num_jobs}" ) pool = ScriptRunnerPool(max_workers=num_jobs) pool.submit(jobs)
if __name__ == "__main__": cmd(lambda x: x, build_parser, parallel_job_execution)