From 05bfe7b29275451b694e55a5b763dd6fb288f627 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=A9=B7?= Date: Wed, 12 Jul 2023 17:47:30 +0800 Subject: [PATCH] feat(utils/writer.py): support tensorboard writer --- internlm/initialize/launch.py | 8 ++ internlm/utils/writer.py | 134 ++++++++++++++++++++++++++++++++++ train.py | 26 +++++-- 3 files changed, 163 insertions(+), 5 deletions(-) create mode 100644 internlm/utils/writer.py diff --git a/internlm/initialize/launch.py b/internlm/initialize/launch.py index d6eda24..d2d61b1 100644 --- a/internlm/initialize/launch.py +++ b/internlm/initialize/launch.py @@ -127,6 +127,14 @@ def args_sanity_check(): logger.info(f"save_ckpt_folder: {gpc.config.ckpt.save_ckpt_folder}") logger.info(f"checkpoint_every: {gpc.config.ckpt.checkpoint_every}") + # tensorboard writer config + if "enable_tb" not in gpc.config: + gpc.config._add_item("enable_tb", True) + if "tensorboard_folder" not in gpc.config: + gpc.config._add_item("tensorboard_folder", None) + if "resume_tb_folder" not in gpc.config: + gpc.config._add_item("resume_tb_folder", None) + # cudnn torch.backends.cudnn.benchmark = gpc.config.get("cudnn_benchmark", False) torch.backends.cudnn.deterministic = gpc.config.get("cudnn_deterministic", False) diff --git a/internlm/utils/writer.py b/internlm/utils/writer.py new file mode 100644 index 0000000..16514c7 --- /dev/null +++ b/internlm/utils/writer.py @@ -0,0 +1,134 @@ +import logging +import os +import socket +import sys +import traceback +from functools import partial + +import torch +from torch.utils.tensorboard import SummaryWriter + +from internlm.core.context import ParallelMode +from internlm.core.context import global_context as gpc + + +def get_tb_log_file_name(): + if gpc.is_rank_for_log(): + tb_prefix = "main_" # Indicates a rank with more output information + else: + tb_prefix = "" + + tb_log_file_name = ( + f"{tb_prefix}dp={gpc.get_local_rank(ParallelMode.DATA)}_" + f"tp={gpc.get_local_rank(ParallelMode.TENSOR)}_pp={gpc.get_local_rank(ParallelMode.PIPELINE)}" + ) + return tb_log_file_name + + +def copy_ignore_folder(source_path, target_path): + os.system(f"cp -r {source_path}/* {target_path}/") + + +def tb_save_run_info(writer, config_lines, global_step=0): + writer.add_text(tag="cmd", text_string=" ".join(sys.argv[:]), global_step=global_step) + lines = [] + for line in config_lines: + if line.strip().startswith("#"): + continue + lines.append(line) + writer.add_text(tag="config", text_string="\n".join(lines), global_step=global_step) + + +def init_tb_writer( + launch_time, + tensorboard_folder: str, + resume_tb_folder: str, + step_count: int, + config: str, + logger: logging.Logger, +): + tb_log_file_name = get_tb_log_file_name() + if not tensorboard_folder: + tb_folder = os.path.join(gpc.config.JOB_NAME, launch_time) + else: + tb_folder = tensorboard_folder + + if gpc.get_global_rank() == 0: + if resume_tb_folder is not None: + logger.info(f"Try mv tensorboard logs: {resume_tb_folder} to {tb_folder}...") + copy_ignore_folder(resume_tb_folder, tb_folder) + else: + logger.info(f"Login tensorboard logs to: {tb_folder}") + + tb_logdir = os.path.join(tb_folder, tb_log_file_name) + writer = SummaryWriter(log_dir=tb_logdir, max_queue=5, purge_step=step_count, flush_secs=3) + writer.add_text(tag="job_name", text_string=gpc.config.JOB_NAME, global_step=step_count) + writer.add_text(tag="tensorboard_folder", text_string=tb_logdir, global_step=step_count) + + torch.distributed.broadcast_object_list([tb_folder], src=0) + else: + objects = [None] + torch.distributed.broadcast_object_list(objects, src=0) + tb_folder = objects[0] + tb_logdir = os.path.join(tb_folder, tb_log_file_name) + writer = SummaryWriter(log_dir=tb_logdir, max_queue=5, purge_step=step_count, flush_secs=3) + + if gpc.is_rank_for_log(): + tb_save_run_info( + writer=writer, + config_lines=config, + global_step=step_count, + ) + + writer.add_text( + tag=f"mapping_{tb_log_file_name}", + text_string=f"file_path={tb_logdir} hostname={socket.gethostname()} device={torch.cuda.current_device()}", + global_step=step_count, + ) + writer.add_scaler = partial(writer.add_scalar, new_style=True) + + return writer, tb_logdir + + +class Writer: + """ + Customed writer based on tensorboard for recording training metrics. + + Args: + + Return: + """ + + def __init__( + self, + launch_time, + tensorboard_folder: str = None, + resume_tb_folder: str = None, + step_count: int = 0, + config: str = None, + logger: logging.Logger = None, + enable_tb: bool = True, + ) -> None: + self.enable_tb = enable_tb + self.tb_writer, self.tb_logdir = init_tb_writer( + launch_time=launch_time, + tensorboard_folder=tensorboard_folder, + resume_tb_folder=resume_tb_folder, + step_count=step_count, + config=config, + logger=logger, + ) + + def add_scalar(self, key, value, step): + try: + if self.enable_tb and self.tb_writer is not None: + self.tb_writer.add_scalar(tag=key, scalar_value=value, global_step=step) + except Exception: + traceback.print_exc() + + def add_text(self, key, value, step): + try: + if self.enable_tb and self.tb_writer is not None: + self.tb_writer.add_text(tag=key, text_string=value, global_step=step) + except Exception: + traceback.print_exc() diff --git a/train.py b/train.py index 8e0c4ba..f68a309 100644 --- a/train.py +++ b/train.py @@ -54,6 +54,7 @@ from internlm.utils.parallel import ( sync_model_param_within_tp, ) from internlm.utils.registry import MODEL_INITIALIZER +from internlm.utils.writer import Writer # global llm logger logger = get_logger(__file__) @@ -246,6 +247,7 @@ def initialize_optimizer(model: nn.Module): def record_current_batch_training_metrics( get_tflops_func, logger, + writer, success_update, batch_count, batch, @@ -307,12 +309,13 @@ def record_current_batch_training_metrics( infos["smallest_batch"] = min_samples_in_batch infos["adam_beta2"] = beta2_scheduler.get_beta2() - line = "" - for k, v in infos.items(): - line += f"{k}={v}," - fwd_bwd_time = round(timer("fwd-bwd").elapsed(), 2) - line += f"fwd_bwd_time={fwd_bwd_time}" + infos["fwd_bwd_time"] = fwd_bwd_time + + line = "" + for key, value in infos.items(): + line += f"{key}={value}," + writer.add_scalar(key=key, value=value, step=train_state.step_count) logger.info(line) @@ -355,6 +358,18 @@ def main(args): dist.broadcast_object_list(objs, src=0) current_time = objs[0] + # initialize customed llm writer + with open(args.config, "r") as f: + config_lines = f.readlines() + writer = Writer( + launch_time=current_time, + tensorboard_folder=gpc.config.tensorboard_folder, + resume_tb_folder=gpc.config.resume_tb_folder, + config=config_lines, + logger=logger, + enable_tb=gpc.config.enable_tb, + ) + model_load_path = None if load_resume_ckpt_folder is not None: logger.info( @@ -469,6 +484,7 @@ def main(args): record_current_batch_training_metrics( get_tflops_func=get_tflops_func, logger=logger, + writer=writer, success_update=success_update, batch_count=batch_count, batch=batch,