From 42e0bff8f7a1d09efbec9c68d5188fd0cab19dc3 Mon Sep 17 00:00:00 2001 From: ssongliu <73214554+ssongliu@users.noreply.github.com> Date: Tue, 10 Oct 2023 17:30:28 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E7=9B=91=E6=8E=A7=E9=87=87=E9=9B=86?= =?UTF-8?q?=E6=96=B9=E5=BC=8F=E4=BF=AE=E6=94=B9=20(#2489)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/app/service/monitor.go | 218 +++++++++++++++++++-------------- backend/app/service/setting.go | 1 + 2 files changed, 129 insertions(+), 90 deletions(-) diff --git a/backend/app/service/monitor.go b/backend/app/service/monitor.go index 09d7b2f03..706bd2412 100644 --- a/backend/app/service/monitor.go +++ b/backend/app/service/monitor.go @@ -1,6 +1,7 @@ package service import ( + "context" "fmt" "strconv" "time" @@ -15,21 +16,28 @@ import ( "github.com/shirou/gopsutil/v3/net" ) -type MonitorService struct{} +type MonitorService struct { + DiskIO chan ([]disk.IOCountersStat) + NetIO chan ([]net.IOCountersStat) +} + +var monitorCancel context.CancelFunc type IMonitorService interface { Run() + + saveIODataToDB(ctx context.Context, interval float64) + saveNetDataToDB(ctx context.Context, interval float64) } func NewIMonitorService() IMonitorService { - return &MonitorService{} + return &MonitorService{ + DiskIO: make(chan []disk.IOCountersStat, 2), + NetIO: make(chan []net.IOCountersStat, 2), + } } func (m *MonitorService) Run() { - monitorStatus, _ := settingRepo.Get(settingRepo.WithByKey("MonitorStatus")) - if monitorStatus.Value == "disable" { - return - } var itemModel model.MonitorBase totalPercent, _ := cpu.Percent(3*time.Second, false) if len(totalPercent) == 1 { @@ -50,8 +58,8 @@ func (m *MonitorService) Run() { global.LOG.Errorf("Insert basic monitoring data failed, err: %v", err) } - go loadDiskIO() - go loadNetIO() + m.loadDiskIO() + m.loadNetIO() MonitorStoreDays, err := settingRepo.Get(settingRepo.WithByKey("MonitorStoreDays")) if err != nil { @@ -64,115 +72,145 @@ func (m *MonitorService) Run() { _ = settingRepo.DelMonitorNet(timeForDelete) } -func loadDiskIO() { +func (m *MonitorService) loadDiskIO() { ioStat, _ := disk.IOCounters() - - time.Sleep(60 * time.Second) - - ioStat2, _ := disk.IOCounters() - var ioList []model.MonitorIO - for _, io2 := range ioStat2 { - for _, io1 := range ioStat { - if io2.Name == io1.Name { - var itemIO model.MonitorIO - itemIO.Name = io1.Name - if io2.ReadBytes != 0 && io1.ReadBytes != 0 && io2.ReadBytes > io1.ReadBytes { - itemIO.Read = uint64(float64(io2.ReadBytes-io1.ReadBytes) / 60) - } - if io2.WriteBytes != 0 && io1.WriteBytes != 0 && io2.WriteBytes > io1.WriteBytes { - itemIO.Write = uint64(float64(io2.WriteBytes-io1.WriteBytes) / 60) - } - - if io2.ReadCount != 0 && io1.ReadCount != 0 && io2.ReadCount > io1.ReadCount { - itemIO.Count = uint64(float64(io2.ReadCount-io1.ReadCount) / 60) - } - writeCount := uint64(0) - if io2.WriteCount != 0 && io1.WriteCount != 0 && io2.WriteCount > io1.WriteCount { - writeCount = uint64(float64(io2.WriteCount-io1.WriteCount) / 60) - } - if writeCount > itemIO.Count { - itemIO.Count = writeCount - } - - if io2.ReadTime != 0 && io1.ReadTime != 0 && io2.ReadTime > io1.ReadTime { - itemIO.Time = uint64(float64(io2.ReadTime-io1.ReadTime) / 60) - } - writeTime := uint64(0) - if io2.WriteTime != 0 && io1.WriteTime != 0 && io2.WriteTime > io1.WriteTime { - writeTime = uint64(float64(io2.WriteTime-io1.WriteTime) / 60) - } - if writeTime > itemIO.Time { - itemIO.Time = writeTime - } - ioList = append(ioList, itemIO) - break - } - } - } - if err := settingRepo.BatchCreateMonitorIO(ioList); err != nil { - global.LOG.Errorf("Insert io monitoring data failed, err: %v", err) + var diskIOList []disk.IOCountersStat + for _, io := range ioStat { + diskIOList = append(diskIOList, io) } + m.DiskIO <- diskIOList } -func loadNetIO() { +func (m *MonitorService) loadNetIO() { netStat, _ := net.IOCounters(true) netStatAll, _ := net.IOCounters(false) + var netList []net.IOCountersStat + netList = append(netList, netStat...) + netList = append(netList, netStatAll...) + m.NetIO <- netList +} - time.Sleep(60 * time.Second) - - netStat2, _ := net.IOCounters(true) - var netList []model.MonitorNetwork - for _, net2 := range netStat2 { - for _, net1 := range netStat { - if net2.Name == net1.Name { - var itemNet model.MonitorNetwork - itemNet.Name = net1.Name - - if net2.BytesSent != 0 && net1.BytesSent != 0 && net2.BytesSent > net1.BytesSent { - itemNet.Up = float64(net2.BytesSent-net1.BytesSent) / 1024 / 60 +func (m *MonitorService) saveIODataToDB(ctx context.Context, interval float64) { + defer close(m.DiskIO) + for { + select { + case <-ctx.Done(): + return + case ioStat := <-m.DiskIO: + select { + case <-ctx.Done(): + return + case ioStat2 := <-m.DiskIO: + var ioList []model.MonitorIO + for _, io2 := range ioStat2 { + for _, io1 := range ioStat { + if io2.Name == io1.Name { + var itemIO model.MonitorIO + itemIO.Name = io1.Name + if io2.ReadBytes != 0 && io1.ReadBytes != 0 && io2.ReadBytes > io1.ReadBytes { + itemIO.Read = uint64(float64(io2.ReadBytes-io1.ReadBytes) / interval / 60) + } + if io2.WriteBytes != 0 && io1.WriteBytes != 0 && io2.WriteBytes > io1.WriteBytes { + itemIO.Write = uint64(float64(io2.WriteBytes-io1.WriteBytes) / interval / 60) + } + + if io2.ReadCount != 0 && io1.ReadCount != 0 && io2.ReadCount > io1.ReadCount { + itemIO.Count = uint64(float64(io2.ReadCount-io1.ReadCount) / interval / 60) + } + writeCount := uint64(0) + if io2.WriteCount != 0 && io1.WriteCount != 0 && io2.WriteCount > io1.WriteCount { + writeCount = uint64(float64(io2.WriteCount-io1.WriteCount) / interval * 60) + } + if writeCount > itemIO.Count { + itemIO.Count = writeCount + } + + if io2.ReadTime != 0 && io1.ReadTime != 0 && io2.ReadTime > io1.ReadTime { + itemIO.Time = uint64(float64(io2.ReadTime-io1.ReadTime) / interval / 60) + } + writeTime := uint64(0) + if io2.WriteTime != 0 && io1.WriteTime != 0 && io2.WriteTime > io1.WriteTime { + writeTime = uint64(float64(io2.WriteTime-io1.WriteTime) / interval / 60) + } + if writeTime > itemIO.Time { + itemIO.Time = writeTime + } + ioList = append(ioList, itemIO) + break + } + } } - if net2.BytesRecv != 0 && net1.BytesRecv != 0 && net2.BytesRecv > net1.BytesRecv { - itemNet.Down = float64(net2.BytesRecv-net1.BytesRecv) / 1024 / 60 + if err := settingRepo.BatchCreateMonitorIO(ioList); err != nil { + global.LOG.Errorf("Insert io monitoring data failed, err: %v", err) } - netList = append(netList, itemNet) - break + m.DiskIO <- ioStat2 } } } - netStatAll2, _ := net.IOCounters(false) - for _, net2 := range netStatAll2 { - for _, net1 := range netStatAll { - if net2.Name == net1.Name { - var itemNet model.MonitorNetwork - itemNet.Name = net1.Name - if net2.BytesSent != 0 && net1.BytesSent != 0 && net2.BytesSent > net1.BytesSent { - itemNet.Up = float64(net2.BytesSent-net1.BytesSent) / 1024 / 60 +} + +func (m *MonitorService) saveNetDataToDB(ctx context.Context, interval float64) { + defer close(m.NetIO) + for { + select { + case <-ctx.Done(): + return + case netStat := <-m.NetIO: + select { + case <-ctx.Done(): + return + case netStat2 := <-m.NetIO: + var netList []model.MonitorNetwork + for _, net2 := range netStat2 { + for _, net1 := range netStat { + if net2.Name == net1.Name { + var itemNet model.MonitorNetwork + itemNet.Name = net1.Name + + if net2.BytesSent != 0 && net1.BytesSent != 0 && net2.BytesSent > net1.BytesSent { + itemNet.Up = float64(net2.BytesSent-net1.BytesSent) / 1024 / interval / 60 + } + if net2.BytesRecv != 0 && net1.BytesRecv != 0 && net2.BytesRecv > net1.BytesRecv { + itemNet.Down = float64(net2.BytesRecv-net1.BytesRecv) / 1024 / interval / 60 + } + netList = append(netList, itemNet) + break + } + } } - if net2.BytesRecv != 0 && net1.BytesRecv != 0 && net2.BytesRecv > net1.BytesRecv { - itemNet.Down = float64(net2.BytesRecv-net1.BytesRecv) / 1024 / 60 + if err := settingRepo.BatchCreateMonitorNet(netList); err != nil { + global.LOG.Errorf("Insert network monitoring data failed, err: %v", err) } - netList = append(netList, itemNet) - break + m.NetIO <- netStat2 } } } - - if err := settingRepo.BatchCreateMonitorNet(netList); err != nil { - global.LOG.Errorf("Insert network monitoring data failed, err: %v", err) - } } func StartMonitor(removeBefore bool, interval string) error { if removeBefore { + monitorCancel() global.Cron.Remove(cron.EntryID(global.MonitorCronID)) } - imservice := NewIMonitorService() - monitorID, err := global.Cron.AddJob(fmt.Sprintf("@every %sm", interval), imservice) + intervalItem, err := strconv.Atoi(interval) + if err != nil { + return err + } + + service := NewIMonitorService() + ctx, cancel := context.WithCancel(context.Background()) + monitorCancel = cancel + monitorID, err := global.Cron.AddJob(fmt.Sprintf("@every %sm", interval), service) if err != nil { return err } - imservice.Run() + + service.Run() + + go service.saveIODataToDB(ctx, float64(intervalItem)) + go service.saveNetDataToDB(ctx, float64(intervalItem)) + global.MonitorCronID = int(monitorID) return nil } diff --git a/backend/app/service/setting.go b/backend/app/service/setting.go index e087afd5a..7decd5caf 100644 --- a/backend/app/service/setting.go +++ b/backend/app/service/setting.go @@ -89,6 +89,7 @@ func (u *SettingService) Update(key, value string) error { } } if value == "disable" && global.MonitorCronID != 0 { + monitorCancel() global.Cron.Remove(cron.EntryID(global.MonitorCronID)) global.MonitorCronID = 0 }