From f63e91d2803186550011ba2843d4f84cf1ffa2ba Mon Sep 17 00:00:00 2001 From: FrankLeeeee Date: Tue, 19 Apr 2022 15:14:54 +0800 Subject: [PATCH] [cli] fixed a bug in user args and refactored the module structure --- colossalai/cli/cli.py | 65 +++------------ colossalai/cli/launcher/__init__.py | 70 ++++++++++++++++ colossalai/cli/launcher/run.py | 123 ++++++---------------------- setup.py | 2 +- 4 files changed, 108 insertions(+), 152 deletions(-) diff --git a/colossalai/cli/cli.py b/colossalai/cli/cli.py index 67dbf967c..abbb6cae2 100644 --- a/colossalai/cli/cli.py +++ b/colossalai/cli/cli.py @@ -1,77 +1,34 @@ import click -from colossalai.cli.launcher.run import main as col_launch +from .launcher import run from colossalai.cli.benchmark.utils import BATCH_SIZE, SEQ_LENGTH, HIDDEN_DIM, ITER_TIMES from colossalai.cli.benchmark.run import launch as col_benchmark + class Arguments(): + def __init__(self, arg_dict): for k, v in arg_dict.items(): self.__dict__[k] = v + @click.group() def cli(): pass + @click.command() -@click.option("--num_gpus", - type=int, - default=-1) -@click.option("--bs", - type=int, - default=BATCH_SIZE) -@click.option("--seq_len", - type=int, - default=SEQ_LENGTH) -@click.option("--hid_dim", - type=int, - default=HIDDEN_DIM) -@click.option("--num_steps", - type=int, - default=ITER_TIMES) +@click.option("--num_gpus", type=int, default=-1) +@click.option("--bs", type=int, default=BATCH_SIZE) +@click.option("--seq_len", type=int, default=SEQ_LENGTH) +@click.option("--hid_dim", type=int, default=HIDDEN_DIM) +@click.option("--num_steps", type=int, default=ITER_TIMES) def benchmark(num_gpus, bs, seq_len, hid_dim, num_steps): args_dict = locals() args = Arguments(args_dict) col_benchmark(args) -@click.command() -@click.option("--hostfile", - type=str, - default="") -@click.option("--include", - type=str, - default="") -@click.option("--exclude", - type=str, - default="") -@click.option("--num_nodes", - type=int, - default=-1) -@click.option("--num_gpus", - type=int, - default=-1) -@click.option("--master_port", - type=int, - default=29500) -@click.option("--master_addr", - type=str, - default="127.0.0.1") -@click.option("--launcher", - type=str, - default="torch") -@click.option("--launcher_args", - type=str, - default="") -@click.argument("user_script", - type=str) -@click.argument('user_args', nargs=-1) -def launch(hostfile, num_nodes, num_gpus, include, exclude, master_addr, master_port, - launcher, launcher_args, user_script, user_args): - args_dict = locals() - args = Arguments(args_dict) - args.user_args = list(args.user_args) - col_launch(args) -cli.add_command(launch) +cli.add_command(run) cli.add_command(benchmark) if __name__ == '__main__': diff --git a/colossalai/cli/launcher/__init__.py b/colossalai/cli/launcher/__init__.py index e69de29bb..21decd492 100644 --- a/colossalai/cli/launcher/__init__.py +++ b/colossalai/cli/launcher/__init__.py @@ -0,0 +1,70 @@ +import click +from .run import launch_multi_processes +from colossalai.context import Config + + +@click.command(help="Launch distributed training on a single node or multiple nodes", + context_settings=dict(ignore_unknown_options=True)) +@click.option("-H", "-host", "--host", type=str, default=None, help="the list of machines to launch") +@click.option("--hostfile", + type=str, + default=None, + help="Hostfile path that defines the device pool available to the job (e.g. worker-name:number of slots)") +@click.option( + "--include", + type=str, + default=None, + help= + "Specify computing devices to use during execution. String format is NODE_SPEC@NODE_SPEC where NODE_SPEC=:" +) +@click.option( + "--exclude", + type=str, + default=None, + help= + "Specify computing devices to NOT use during execution. Mutually exclusive with --include. Formatting is the same as --include." +) +@click.option("--num_nodes", type=int, default=-1, help="Total number of worker nodes to use.") +@click.option("--nprocs_per_node", type=int, default=-1, help="Number of GPUs to use on each node.") +@click.option("--master_port", + type=int, + default=29500, + help="(optional) Port used by PyTorch distributed for communication during distributed training.") +@click.option("--master_addr", + type=str, + default="127.0.0.1", + help="(optional) IP address of node 0, will be inferred via 'hostname -I' if not specified.") +@click.option( + "--launcher", + type=click.Choice(['torch', 'openmpi', 'slurm'], case_sensitive=False), + default="torch", + help="(optional) choose launcher backend for multi-node training. Options currently include PDSH, OpenMPI, SLURM.") +@click.option("--launcher_args", + type=str, + default=None, + help="(optional) pass launcher specific arguments as a single quoted argument.") +@click.argument("user_script", type=str) +@click.argument('user_args', nargs=-1) +def run(host: str, hostfile: str, num_nodes: int, nprocs_per_node: int, include: str, exclude: str, master_addr: str, + master_port: int, launcher: str, launcher_args: str, user_script: str, user_args: str): + """ + To launch multiple processes on a single node or multiple nodes via command line. + + Usage:: + # run on the current node with all available GPUs + colossalai run train.py + + # run with only 2 GPUs on the current node + colossalai run --nprocs_per_node 2 train.py + + # run on two nodes + colossalai run --host , train.py + + # run with hostfile + colossalai run --hostfile train.py + """ + args_dict = locals() + args = Config(args_dict) + args.user_args = list(args.user_args) + # (lsg) TODO: fix this function + # launch_multi_processes(args) diff --git a/colossalai/cli/launcher/run.py b/colossalai/cli/launcher/run.py index 8097f1b56..699118770 100644 --- a/colossalai/cli/launcher/run.py +++ b/colossalai/cli/launcher/run.py @@ -6,79 +6,10 @@ import sys import os import torch from colossalai.logging import get_dist_logger +from colossalai.context import Config from .multinode_runner import PDSHRunner, OpenMPIRunner, SLURMRunner +from copy import deepcopy -def build_args_parser() -> ArgumentParser: - """Helper function parsing the command line options.""" - - parser = ArgumentParser(description="colossal distributed training launcher") - - parser.add_argument("-H", - "--hostfile", - type=str, - default="", - help="Hostfile path that defines the " - "device pool available to the job (e.g., " - "worker-name:number of slots)") - - parser.add_argument("-i", - "--include", - type=str, - default="", - help="Specify computing devices to use during execution." - "String format is NODE_SPEC@NODE_SPEC" - "where NODE_SPEC=:") - - parser.add_argument("-e", - "--exclude", - type=str, - default="", - help="Specify computing devices to NOT use during execution." - "Mutually exclusive with --include. Formatting" - "is the same as --include.") - - parser.add_argument("--num_nodes", - type=int, - default=-1, - help="Total number of worker nodes to use.") - - parser.add_argument("--num_gpus", - type=int, - default=-1, - help="Number of GPUs to use on each node.") - - parser.add_argument("--master_port", - default=29500, - type=int, - help="(optional) Port used by PyTorch distributed for " - "communication during distributed training.") - - parser.add_argument("--master_addr", - default="127.0.0.1", - type=str, - help="(optional) IP address of node 0, will be " - "inferred via 'hostname -I' if not specified.") - - parser.add_argument("--launcher", - default="torch", - type=str, - help="(optional) choose launcher backend for multi-node " - "training. Options currently include PDSH, OpenMPI, SLURM.") - - parser.add_argument("--launcher_args", - default="", - type=str, - help="(optional) pass launcher specific arguments as a " - "single quoted argument.") - - parser.add_argument("user_script", - type=str, - help="User script to launch, followed by any required " - "arguments.") - - parser.add_argument('user_args', nargs=argparse.REMAINDER) - - return parser def fetch_hostfile(hostfile_path): logger = get_dist_logger() @@ -106,6 +37,7 @@ def fetch_hostfile(hostfile_path): return device_pool + def _stable_remove_duplicates(data): # Create a new list in the same order as original but with duplicates # removed, should never be more than ~16 elements so simple is best @@ -115,6 +47,7 @@ def _stable_remove_duplicates(data): new_list.append(x) return new_list + def parse_device_filter(host_info, include_str="", exclude_str=""): '''Parse an inclusion or exclusion string and filter a hostfile dictionary. @@ -202,26 +135,27 @@ def parse_device_filter(host_info, include_str="", exclude_str=""): return ordered_hosts + def parse_inclusion_exclusion(device_pool, inclusion, exclusion): active_devices = collections.OrderedDict() for hostname, slots in device_pool.items(): active_devices[hostname] = list(range(slots)) - return parse_device_filter(active_devices, - include_str=inclusion, - exclude_str=exclusion) + return parse_device_filter(active_devices, include_str=inclusion, exclude_str=exclusion) -def main(args=None): - logger = get_dist_logger() - assert args is not None, "args should not be None." - - device_pool = fetch_hostfile(args.hostfile) + +def launch_multi_processes(args): + assert isinstance(args, Config), f'expected args to be of type Config, but got {type(args)}' + + # check + if args.hostfile: + device_pool = fetch_hostfile(args.hostfile) + else: + device_pool = None active_devices = None if device_pool: - active_devices = parse_inclusion_exclusion(device_pool, - args.include, - args.exclude) + active_devices = parse_inclusion_exclusion(device_pool, args.include, args.exclude) if args.num_nodes > 0: updated_active_devices = collections.OrderedDict() for count, hostname in enumerate(active_devices.keys()): @@ -244,16 +178,15 @@ def main(args=None): else: nproc_per_node = args.num_gpus if torch.__version__ <= "1.09": - cmd = [sys.executable, "-u", "-m", - "torch.distributed.launch", - f"--nproc_per_node={nproc_per_node}", - f"--master_addr={args.master_addr}", - f"--master_port={args.master_port}"] + [args.user_script] + args.user_args + cmd = [ + sys.executable, "-u", "-m", "torch.distributed.launch", f"--nproc_per_node={nproc_per_node}", + f"--master_addr={args.master_addr}", f"--master_port={args.master_port}" + ] + [args.user_script] + args.user_args else: - cmd = ["torchrun", - f"--nproc_per_node={nproc_per_node}", - f"--master_addr={args.master_addr}", - f"--master_port={args.master_port}"] + [args.user_script] + args.user_args + cmd = [ + "torchrun", f"--nproc_per_node={nproc_per_node}", f"--master_addr={args.master_addr}", + f"--master_port={args.master_port}" + ] + [args.user_script] + args.user_args else: if args.launcher == "torch": runner = PDSHRunner(args) @@ -272,14 +205,10 @@ def main(args=None): env['PYTHONPATH'] = curr_path + ":" + env['PYTHONPATH'] else: env['PYTHONPATH'] = curr_path - + cmd = runner.get_cmd(env, active_devices, args) - + result = subprocess.Popen(cmd, env=env) result.wait() if result.returncode > 0: sys.exit(result.returncode) - - -if __name__ == "__main__": - main() diff --git a/setup.py b/setup.py index 12b12c31d..a4308f0f7 100644 --- a/setup.py +++ b/setup.py @@ -215,7 +215,7 @@ setup( install_requires=fetch_requirements('requirements/requirements.txt'), entry_points=''' [console_scripts] - colossal=colossalai.cli:cli + colossalai=colossalai.cli:cli ''', python_requires='>=3.7', classifiers=[