monitor task only if DO_ALERT is True

pull/491/head
JiaoPL 2023-11-10 16:39:59 +08:00
parent a8ff9acbfd
commit d680876a9a
1 changed files with 60 additions and 60 deletions

View File

@ -136,24 +136,26 @@ class MonitorManager(metaclass=SingletonMeta):
self.last_step_loss = -1 self.last_step_loss = -1
self.send_exception = try_import_send_exception() self.send_exception = try_import_send_exception()
self.alert_file_path = None self.alert_file_path = None
self.enable_alert = False
self.light_monitor_address = None
def monitor_loss_spike(self, alert_address: str = None, step_count: int = 0, cur_step_loss: float = 0.0): def monitor_loss_spike(self, alert_address: str = None, step_count: int = 0, cur_step_loss: float = 0.0):
"""Check loss value, if loss spike occurs, send alert message to Feishu.""" """Check loss value, if loss spike occurs, send alert message to Feishu."""
set_env_var(key="LOSS", value=cur_step_loss) if self.enable_alert:
set_env_var(key="STEP_ID", value=step_count) set_env_var(key="LOSS", value=cur_step_loss)
set_env_var(key="STEP_ID", value=step_count)
if self.last_step_loss != -1 and cur_step_loss > self.loss_spike_limit * self.last_step_loss: if self.last_step_loss != -1 and cur_step_loss > self.loss_spike_limit * self.last_step_loss:
send_alert_message( send_alert_message(
address=alert_address, address=alert_address,
message=( message=(
f"Checking step by step: Loss spike may be happened in step {step_count}, " f"Checking step by step: Loss spike may be happened in step {step_count}, "
f"loss value from {self.last_step_loss} to {cur_step_loss}, please check it." f"loss value from {self.last_step_loss} to {cur_step_loss}, please check it."
), ),
) )
self.last_step_loss = cur_step_loss self.last_step_loss = cur_step_loss
def exception_should_be_alert(self, msg: str, alert_address: str = None): def exception_should_be_alert(self, msg: str, alert_address: str = None):
enable_alert = gpc.config.monitor.alert.get("enable_feishu_alert", False)
try: try:
with open(self.alert_file_path, "a+") as f: with open(self.alert_file_path, "a+") as f:
fcntl.flock(f, fcntl.LOCK_EX) fcntl.flock(f, fcntl.LOCK_EX)
@ -165,54 +167,49 @@ class MonitorManager(metaclass=SingletonMeta):
f.write(msg) f.write(msg)
fcntl.flock(f, fcntl.LOCK_UN) fcntl.flock(f, fcntl.LOCK_UN)
return enable_alert and True return True
except Exception as err: except Exception as err:
send_alert_message( send_alert_message(
address=alert_address, address=alert_address,
message=f"Failed to open ALERT file: {err}", message=f"Failed to open ALERT file: {err}",
) )
return enable_alert and True return True
def monitor_exception(self, alert_address: str = None, excp_info: str = None): def monitor_exception(self, alert_address: str = None, excp_info: str = None):
"""Catch and format exception information, send alert message to Feishu.""" """Catch and format exception information, send alert message to Feishu."""
filtered_trace = excp_info.split("\n")[-10:] if self.enable_alert:
format_trace = "" filtered_trace = excp_info.split("\n")[-10:]
for line in filtered_trace: format_trace = ""
format_trace += "\n" + line for line in filtered_trace:
if ( format_trace += "\n" + line
self.send_exception
and gpc.config.monitor.alert.get("enable_feishu_alert", False) if self.send_exception and self.light_monitor_address:
and gpc.config.monitor.alert.get("light_monitor_address", None) self.send_exception(format_trace, gpc.get_global_rank())
): message = f"Catch Exception from {socket.gethostname()} with rank id {gpc.get_global_rank()}:{format_trace}"
self.send_exception(format_trace, gpc.get_global_rank()) if self.alert_file_path:
message = f"Catch Exception from {socket.gethostname()} with rank id {gpc.get_global_rank()}:{format_trace}" if self.exception_should_be_alert(format_trace, alert_address):
if self.alert_file_path: send_feishu_msg_with_webhook(
if self.exception_should_be_alert(format_trace, alert_address): webhook=alert_address,
send_feishu_msg_with_webhook( title=get_job_key(),
webhook=alert_address, message=message,
title=get_job_key(), )
message=message, else:
) send_alert_message(alert_address, message)
else:
send_alert_message(alert_address, message)
def handle_sigterm(self, alert_address: str = None): def handle_sigterm(self, alert_address: str = None):
"""Catch SIGTERM signal, and send alert message to Feishu.""" """Catch SIGTERM signal, and send alert message to Feishu."""
def sigterm_handler(sys_signal, frame): def sigterm_handler(sys_signal, frame):
print("receive frame: ", frame) if self.enable_alert:
print("receive signal: ", sys_signal) print("receive frame: ", frame)
message = f"Process received signal {signal} and exited." print("receive signal: ", sys_signal)
if ( message = f"Process received signal {signal} and exited."
self.send_exception if self.send_exception and self.light_monitor_address:
and gpc.config.monitor.alert.get("enable_feishu_alert", False) self.send_exception(message, gpc.get_global_rank())
and gpc.config.monitor.alert.get("light_monitor_address", None) send_alert_message(
): address=alert_address,
self.send_exception(message, gpc.get_global_rank()) message=message,
send_alert_message( )
address=alert_address,
message=message,
)
signal.signal(signal.SIGTERM, sigterm_handler) signal.signal(signal.SIGTERM, sigterm_handler)
@ -236,21 +233,24 @@ class MonitorManager(metaclass=SingletonMeta):
# initialize some variables for monitoring # initialize some variables for monitoring
set_env_var(key="JOB_NAME", value=job_name) set_env_var(key="JOB_NAME", value=job_name)
self.enable_alert = gpc.config.monitor.alert.get("enable_feishu_alert", False)
# initialize alert file if self.enable_alert:
self.alert_file_path = gpc.config.monitor.alert.get("alert_file_path") self.light_monitor_address = gpc.config.monitor.alert.get("light_monitor_address", None)
if self.alert_file_path and gpc.is_rank_for_log(): # initialize alert file
alert_file_dir = os.path.dirname(self.alert_file_path) self.alert_file_path = gpc.config.monitor.alert.get("alert_file_path")
os.makedirs(alert_file_dir, exist_ok=True) if self.alert_file_path and gpc.is_rank_for_log():
if os.path.exists(self.alert_file_path): alert_file_dir = os.path.dirname(self.alert_file_path)
os.remove(self.alert_file_path) os.makedirs(alert_file_dir, exist_ok=True)
if os.path.exists(self.alert_file_path):
os.remove(self.alert_file_path)
# start a monitor thread, periodically check the training status # start a monitor thread, periodically check the training status
self.monitor_thread = MonitorTracker( self.monitor_thread = MonitorTracker(
alert_address=alert_address, alert_address=alert_address,
check_interval=monitor_interval_seconds, check_interval=monitor_interval_seconds,
loss_spike_limit=loss_spike_limit, loss_spike_limit=loss_spike_limit,
) )
def stop_monitor(self): def stop_monitor(self):
"""Stop the monitor and alert thread.""" """Stop the monitor and alert thread."""