mirror of https://github.com/InternLM/InternLM
feat(monitor): send exception to light monitor (#420)
* send exception to light monitor * update try_import_send_exceptionpull/424/head
parent
30f610b1fa
commit
4b5bdedff2
|
@ -9,7 +9,7 @@ from internlm.core.context import global_context as gpc
|
||||||
from internlm.monitor.alert import send_feishu_msg_with_webhook
|
from internlm.monitor.alert import send_feishu_msg_with_webhook
|
||||||
from internlm.utils.common import SingletonMeta
|
from internlm.utils.common import SingletonMeta
|
||||||
|
|
||||||
from .utils import get_job_key, set_env_var
|
from .utils import get_job_key, set_env_var, try_import_send_exception
|
||||||
|
|
||||||
|
|
||||||
def send_alert_message(address: str = None, title: str = None, message: str = None):
|
def send_alert_message(address: str = None, title: str = None, message: str = None):
|
||||||
|
@ -132,6 +132,7 @@ class MonitorManager(metaclass=SingletonMeta):
|
||||||
self.monitor_thread = None
|
self.monitor_thread = None
|
||||||
self.loss_spike_limit = loss_spike_limit
|
self.loss_spike_limit = loss_spike_limit
|
||||||
self.last_step_loss = -1
|
self.last_step_loss = -1
|
||||||
|
self.send_exception = try_import_send_exception()
|
||||||
|
|
||||||
def monitor_loss_spike(self, alert_address: str = None, step_count: int = 0, cur_step_loss: float = 0.0):
|
def monitor_loss_spike(self, alert_address: str = None, step_count: int = 0, cur_step_loss: float = 0.0):
|
||||||
"""Check loss value, if loss spike occurs, send alert message to Feishu."""
|
"""Check loss value, if loss spike occurs, send alert message to Feishu."""
|
||||||
|
@ -154,6 +155,8 @@ class MonitorManager(metaclass=SingletonMeta):
|
||||||
format_trace = ""
|
format_trace = ""
|
||||||
for line in filtered_trace:
|
for line in filtered_trace:
|
||||||
format_trace += "\n" + line
|
format_trace += "\n" + line
|
||||||
|
if self.send_exception:
|
||||||
|
self.send_exception(format_trace, gpc.get_global_rank())
|
||||||
send_alert_message(
|
send_alert_message(
|
||||||
address=alert_address,
|
address=alert_address,
|
||||||
message=f"Catch Exception from {socket.gethostname()} with rank id {gpc.get_global_rank()}:{format_trace}",
|
message=f"Catch Exception from {socket.gethostname()} with rank id {gpc.get_global_rank()}:{format_trace}",
|
||||||
|
@ -165,9 +168,12 @@ class MonitorManager(metaclass=SingletonMeta):
|
||||||
def sigterm_handler(sys_signal, frame):
|
def sigterm_handler(sys_signal, frame):
|
||||||
print("receive frame: ", frame)
|
print("receive frame: ", frame)
|
||||||
print("receive signal: ", sys_signal)
|
print("receive signal: ", sys_signal)
|
||||||
|
message = f"Process received signal {signal} and exited."
|
||||||
|
if self.send_exception:
|
||||||
|
self.send_exception(message, gpc.get_global_rank())
|
||||||
send_alert_message(
|
send_alert_message(
|
||||||
address=alert_address,
|
address=alert_address,
|
||||||
message=f"Process received signal {signal} and exited.",
|
message=message,
|
||||||
)
|
)
|
||||||
|
|
||||||
signal.signal(signal.SIGTERM, sigterm_handler)
|
signal.signal(signal.SIGTERM, sigterm_handler)
|
||||||
|
|
|
@ -32,3 +32,16 @@ def get_job_name():
|
||||||
|
|
||||||
def get_job_key():
|
def get_job_key():
|
||||||
return f"{get_job_id()}_{get_job_name()}"
|
return f"{get_job_id()}_{get_job_name()}"
|
||||||
|
|
||||||
|
|
||||||
|
def try_import_send_exception():
|
||||||
|
"""
|
||||||
|
Try import send_exception from uniscale_monitoring, if failed, return None
|
||||||
|
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
from uniscale_monitoring import send_exception_msg as send_exception
|
||||||
|
|
||||||
|
return send_exception
|
||||||
|
except ImportError:
|
||||||
|
return None
|
||||||
|
|
Loading…
Reference in New Issue