From 11bb2f8a958c0bb6fb477b4e7bf0b74f9e4f37bd Mon Sep 17 00:00:00 2001 From: wenlxie Date: Fri, 8 Oct 2021 16:39:26 +0800 Subject: [PATCH] support thread state Signed-off-by: wenlxie --- collector/processes_linux.go | 92 ++++++++++++++++++++++++++----- collector/processes_linux_test.go | 2 +- 2 files changed, 79 insertions(+), 15 deletions(-) diff --git a/collector/processes_linux.go b/collector/processes_linux.go index 04a19547..798aeaeb 100644 --- a/collector/processes_linux.go +++ b/collector/processes_linux.go @@ -20,6 +20,8 @@ import ( "errors" "fmt" "os" + "path" + "strconv" "strings" "syscall" @@ -30,13 +32,14 @@ import ( ) type processCollector struct { - fs procfs.FS - threadAlloc *prometheus.Desc - threadLimit *prometheus.Desc - procsState *prometheus.Desc - pidUsed *prometheus.Desc - pidMax *prometheus.Desc - logger log.Logger + fs procfs.FS + threadAlloc *prometheus.Desc + threadLimit *prometheus.Desc + threadsState *prometheus.Desc + procsState *prometheus.Desc + pidUsed *prometheus.Desc + pidMax *prometheus.Desc + logger log.Logger } func init() { @@ -62,6 +65,11 @@ func NewProcessStatCollector(logger log.Logger) (Collector, error) { "Limit of threads in the system", nil, nil, ), + threadsState: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "threads_state"), + "Number of threads in each state.", + []string{"thread_state"}, nil, + ), procsState: prometheus.NewDesc( prometheus.BuildFQName(namespace, subsystem, "state"), "Number of processes in each state.", @@ -77,7 +85,7 @@ func NewProcessStatCollector(logger log.Logger) (Collector, error) { }, nil } func (c *processCollector) Update(ch chan<- prometheus.Metric) error { - pids, states, threads, err := c.getAllocatedThreads() + pids, states, threads, threadStates, err := c.getAllocatedThreads() if err != nil { return fmt.Errorf("unable to retrieve number of allocated threads: %w", err) } @@ -93,6 +101,10 @@ func (c *processCollector) Update(ch chan<- prometheus.Metric) error { ch <- prometheus.MustNewConstMetric(c.procsState, prometheus.GaugeValue, float64(states[state]), state) } + for state := range threadStates { + ch <- prometheus.MustNewConstMetric(c.threadsState, prometheus.GaugeValue, float64(threadStates[state]), state) + } + pidM, err := readUintFromFile(procFilePath("sys/kernel/pid_max")) if err != nil { return fmt.Errorf("unable to retrieve limit number of maximum pids alloved: %w", err) @@ -103,28 +115,80 @@ func (c *processCollector) Update(ch chan<- prometheus.Metric) error { return nil } -func (c *processCollector) getAllocatedThreads() (int, map[string]int32, int, error) { +func (c *processCollector) getAllocatedThreads() (int, map[string]int32, int, map[string]int32, error) { p, err := c.fs.AllProcs() if err != nil { - return 0, nil, 0, fmt.Errorf("unable to list all processes: %w", err) + return 0, nil, 0, nil, fmt.Errorf("unable to list all processes: %w", err) } pids := 0 thread := 0 procStates := make(map[string]int32) + threadStates := make(map[string]int32) + for _, pid := range p { stat, err := pid.Stat() if err != nil { // PIDs can vanish between getting the list and getting stats. - if errors.Is(err, os.ErrNotExist) || strings.Contains(err.Error(), syscall.ESRCH.Error()) { - level.Debug(c.logger).Log("msg", "file not found when retrieving stats for pid", "pid", pid, "err", err) + if c.isIgnoredError(err) { + level.Debug(c.logger).Log("msg", "file not found when retrieving stats for pid", "pid", pid.PID, "err", err) continue } level.Debug(c.logger).Log("msg", "error reading stat for pid", "pid", pid.PID, "err", err) - return 0, nil, 0, fmt.Errorf("error reading stat for pid %d: %w", pid.PID, err) + return 0, nil, 0, nil, fmt.Errorf("error reading stat for pid %d: %w", pid.PID, err) } pids++ procStates[stat.State]++ thread += stat.NumThreads + err = c.getThreadStates(pid.PID, stat, threadStates) + if err != nil { + return 0, nil, 0, nil, err + } } - return pids, procStates, thread, nil + return pids, procStates, thread, threadStates, nil +} + +func (c *processCollector) getThreadStates(pid int, pidStat procfs.ProcStat, threadStates map[string]int32) error { + fs, err := procfs.NewFS(procFilePath(path.Join(strconv.Itoa(pid), "task"))) + if err != nil { + if c.isIgnoredError(err) { + level.Debug(c.logger).Log("msg", "file not found when retrieving tasks for pid", "pid", pid, "err", err) + return nil + } + level.Debug(c.logger).Log("msg", "error reading tasks for pid", "pid", pid, "err", err) + return fmt.Errorf("error reading task for pid %d: %w", pid, err) + } + + t, err := fs.AllProcs() + if err != nil { + if c.isIgnoredError(err) { + level.Debug(c.logger).Log("msg", "file not found when retrieving tasks for pid", "pid", pid, "err", err) + return nil + } + return fmt.Errorf("unable to list all threads for pid: %d %w", pid, err) + } + + for _, thread := range t { + if pid == thread.PID { + threadStates[pidStat.State]++ + continue + } + threadStat, err := thread.Stat() + if err != nil { + if c.isIgnoredError(err) { + level.Debug(c.logger).Log("msg", "file not found when retrieving stats for thread", "pid", pid, "threadId", thread.PID, "err", err) + continue + } + level.Debug(c.logger).Log("msg", "error reading stat for thread", "pid", pid, "threadId", thread.PID, "err", err) + return fmt.Errorf("error reading stat for pid:%d thread:%d err:%w", pid, thread.PID, err) + } + threadStates[threadStat.State]++ + } + return nil +} + +func (c *processCollector) isIgnoredError(err error) bool { + if errors.Is(err, os.ErrNotExist) || strings.Contains(err.Error(), syscall.ESRCH.Error()) { + return true + } + return false } diff --git a/collector/processes_linux_test.go b/collector/processes_linux_test.go index 253f963b..9a5c86f5 100644 --- a/collector/processes_linux_test.go +++ b/collector/processes_linux_test.go @@ -34,7 +34,7 @@ func TestReadProcessStatus(t *testing.T) { t.Errorf("failed to open procfs: %v", err) } c := processCollector{fs: fs, logger: log.NewNopLogger()} - pids, states, threads, err := c.getAllocatedThreads() + pids, states, threads, _, err := c.getAllocatedThreads() if err != nil { t.Fatalf("Cannot retrieve data from procfs getAllocatedThreads function: %v ", err) }