From 8d8d811e107ee3223a2ab6ea09d7e80bf5e67ee8 Mon Sep 17 00:00:00 2001 From: jiaopenglong <44927264+JiaoPL@users.noreply.github.com> Date: Tue, 5 Sep 2023 19:24:01 +0800 Subject: [PATCH] feat(monitor): add light monitor (#275) * add light monitor * filter key of metrics dict * test no light_monitor case * mv init_light_monitor to initialize_distributed_env --- internlm/initialize/launch.py | 9 +++++ internlm/monitor/__init__.py | 9 ++++- internlm/monitor/alert.py | 51 +++++++++++++++++++++++++++++ internlm/train/training_internlm.py | 5 ++- 4 files changed, 72 insertions(+), 2 deletions(-) diff --git a/internlm/initialize/launch.py b/internlm/initialize/launch.py index b446934..bd45183 100644 --- a/internlm/initialize/launch.py +++ b/internlm/initialize/launch.py @@ -10,6 +10,7 @@ import torch from internlm.core.context import Config from internlm.core.context import global_context as gpc +from internlm.monitor import initialize_light_monitor from internlm.utils.common import get_master_node from internlm.utils.logger import get_logger @@ -332,6 +333,14 @@ def launch( f"tensor parallel size: {gpc.tensor_parallel_size}", ) + # init light monitor client + light_monitor_address = gpc.config.get("light_monitor_address", None) + if light_monitor_address is None: + if gpc.is_rank_for_log(): + logger.warning("monitor address is none, monitor could not be used!") + else: + initialize_light_monitor(light_monitor_address) + def launch_from_slurm( config: Union[str, Path, Config, Dict], diff --git a/internlm/monitor/__init__.py b/internlm/monitor/__init__.py index b100cde..2501d66 100644 --- a/internlm/monitor/__init__.py +++ b/internlm/monitor/__init__.py @@ -1,4 +1,11 @@ +from .alert import initialize_light_monitor, send_heartbeat from .monitor import initialize_monitor_manager, send_alert_message from .utils import set_env_var -__all__ = ["send_alert_message", "initialize_monitor_manager", "set_env_var"] +__all__ = [ + "send_alert_message", + "initialize_monitor_manager", + "set_env_var", + "initialize_light_monitor", + "send_heartbeat", +] diff --git a/internlm/monitor/alert.py b/internlm/monitor/alert.py index 78b6040..1772e7f 100644 --- a/internlm/monitor/alert.py +++ b/internlm/monitor/alert.py @@ -1,8 +1,59 @@ import json +import math +import os +import re import time +from typing import Dict import requests +from internlm.utils.logger import get_logger + +logger = get_logger(__file__) + + +def initialize_light_monitor(monitor_address: str = None): + try: + from uniscale_monitoring import init_monitor + + init_monitor(monitor_address) + except Exception as e: + logger.warning(f"init monitor meet error: {e}") + + +def send_heartbeat(msg_type: str, msg: Dict): + def nan2none(v): + if isinstance(v, float) and math.isnan(v): + return None + return v + + try: + from uniscale_monitoring import send_meta + + data = {} + for k, v in msg.items(): + if isinstance(v, Dict): + for k1, v1 in v.items(): + new_k = f"{k}_{k1}".split(" ")[0] + new_k = re.sub(r"[^a-zA-Z0-9_]", "_", new_k) + data[new_k] = nan2none(v1) + else: + new_k = k.split(" ")[0] + new_k = re.sub(r"[^a-zA-Z0-9_]", "_", new_k) + data[new_k] = nan2none(v) + + if os.getenv("CLUSTER_NAME"): + data.update({"cluster": os.getenv("CLUSTER_NAME")}) + if msg_type == "train_metrics": + data.update({"msg_type": "train_metrics"}) + elif msg_type == "init_time": + data.update({"msg_type": "init_time"}) + elif msg_type == "stage_time": + data.update({"msg_type": "stage_time"}) + send_meta(data, timeout=0.1) + except Exception as e: + logger.warning(f"send heartbeat meet error: {e}") + def send_feishu_msg_with_webhook(webhook: str, title: str, message: str): """ diff --git a/internlm/train/training_internlm.py b/internlm/train/training_internlm.py index 9c2ded0..a42758a 100644 --- a/internlm/train/training_internlm.py +++ b/internlm/train/training_internlm.py @@ -24,7 +24,7 @@ from internlm.data.packed_dataset import ( get_packed_dataset_without_short_length, ) from internlm.data.utils import DATASET_TYPE_IDS_MAP, unpack_data -from internlm.monitor import set_env_var +from internlm.monitor import send_heartbeat, set_env_var from internlm.monitor.monitor import monitor_manager as mm from internlm.solver.beta2_scheduler import Beta2Scheduler from internlm.solver.lr_scheduler import FineTuneCosineAnnealingWarmupLR @@ -394,6 +394,9 @@ def record_current_batch_training_metrics( else: writer.add_scalar(key=key, value=value, step=train_state.step_count) + if gpc.config.get("light_monitor_address", None) and batch_count % 50 == 0: + send_heartbeat("train_metrics", infos) + if update_panel: # metrics shown with dashboard panels panel_metrics = {