From 3f4c3bd94f1d92c6ee59ccccd2833b27f3d6c5de Mon Sep 17 00:00:00 2001 From: JiaoPL Date: Fri, 10 Nov 2023 16:01:40 +0800 Subject: [PATCH] catch exception of all ranks --- configs/7B_sft.py | 1 + internlm/initialize/launch.py | 7 ++++- internlm/monitor/monitor.py | 48 +++++++++++++++++++++++++++++++---- 3 files changed, 50 insertions(+), 6 deletions(-) diff --git a/configs/7B_sft.py b/configs/7B_sft.py index 865b959..ca2adc6 100644 --- a/configs/7B_sft.py +++ b/configs/7B_sft.py @@ -169,5 +169,6 @@ monitor = dict( enable_feishu_alert=DO_ALERT, feishu_alert_address=None, # feishu webhook to send alert message light_monitor_address=None, # light_monitor address to send heartbeat + alert_file_path=f"llm_alter/{JOB_NAME}_alert.log", ), ) diff --git a/internlm/initialize/launch.py b/internlm/initialize/launch.py index ad404f2..c09e2c1 100644 --- a/internlm/initialize/launch.py +++ b/internlm/initialize/launch.py @@ -311,7 +311,12 @@ def args_sanity_check(): monitor_default_config = { "alert_address": None, # compatible with old alert config "monitor": { # new monitoring config - "alert": {"enable_feishu_alert": False, "feishu_alert_address": None, "light_monitor_address": None} + "alert": { + "enable_feishu_alert": False, + "feishu_alert_address": None, + "light_monitor_address": None, + "alert_file_path": None, + } }, } diff --git a/internlm/monitor/monitor.py b/internlm/monitor/monitor.py index 541b3ef..25ad67f 100644 --- a/internlm/monitor/monitor.py +++ b/internlm/monitor/monitor.py @@ -1,3 +1,4 @@ +import fcntl import os import signal import socket @@ -22,7 +23,8 @@ def send_alert_message(address: str = None, title: str = None, message: str = No message (str): The message body, defaults to None. """ - if address is not None and gpc.is_rank_for_log(): + # send alert message only if the alert is enable + if address is not None and gpc.config.monitor.alert.get("enable_feishu_alert", False) and gpc.is_rank_for_log(): send_feishu_msg_with_webhook( webhook=address, title=title if title else get_job_key(), @@ -133,6 +135,7 @@ class MonitorManager(metaclass=SingletonMeta): self.loss_spike_limit = loss_spike_limit self.last_step_loss = -1 self.send_exception = try_import_send_exception() + self.alert_file_path = None def monitor_loss_spike(self, alert_address: str = None, step_count: int = 0, cur_step_loss: float = 0.0): """Check loss value, if loss spike occurs, send alert message to Feishu.""" @@ -149,6 +152,27 @@ class MonitorManager(metaclass=SingletonMeta): ) self.last_step_loss = cur_step_loss + def exception_should_be_alert(self, msg: str, alert_address: str = None): + enable_alert = gpc.config.monitor.alert.get("enable_feishu_alert", False) + try: + with open(self.alert_file_path, "a+") as f: + fcntl.flock(f, fcntl.LOCK_EX) + + f.seek(0) + if msg in f.read(): + fcntl.flock(f, fcntl.LOCK_UN) + return False + + f.write(msg) + fcntl.flock(f, fcntl.LOCK_UN) + return enable_alert and True + except Exception as err: + send_alert_message( + address=alert_address, + message=f"Failed to open ALERT file: {err}", + ) + return enable_alert and True + def monitor_exception(self, alert_address: str = None, excp_info: str = None): """Catch and format exception information, send alert message to Feishu.""" filtered_trace = excp_info.split("\n")[-10:] @@ -161,10 +185,16 @@ class MonitorManager(metaclass=SingletonMeta): and gpc.config.monitor.alert.get("light_monitor_address", None) ): self.send_exception(format_trace, gpc.get_global_rank()) - send_alert_message( - address=alert_address, - message=f"Catch Exception from {socket.gethostname()} with rank id {gpc.get_global_rank()}:{format_trace}", - ) + message = f"Catch Exception from {socket.gethostname()} with rank id {gpc.get_global_rank()}:{format_trace}" + if self.alert_file_path: + if self.exception_should_be_alert(format_trace, alert_address): + send_feishu_msg_with_webhook( + webhook=alert_address, + title=get_job_key(), + message=message, + ) + else: + send_alert_message(alert_address, message) def handle_sigterm(self, alert_address: str = None): """Catch SIGTERM signal, and send alert message to Feishu.""" @@ -207,6 +237,14 @@ class MonitorManager(metaclass=SingletonMeta): # initialize some variables for monitoring set_env_var(key="JOB_NAME", value=job_name) + # initialize alert file + self.alert_file_path = gpc.config.monitor.alert.get("alert_file_path") + if self.alert_file_path and gpc.is_rank_for_log(): + alert_file_dir = os.path.dirname(self.alert_file_path) + os.makedirs(alert_file_dir, exist_ok=True) + if os.path.exists(self.alert_file_path): + os.remove(self.alert_file_path) + # start a monitor thread, periodically check the training status self.monitor_thread = MonitorTracker( alert_address=alert_address,