mirror of https://github.com/k3s-io/k3s
Add network stats for node interfaces
parent
84a7f48cf7
commit
1220025e86
|
@ -0,0 +1,312 @@
|
|||
// +build windows
|
||||
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package winstats
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
cadvisorapi "github.com/google/cadvisor/info/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
const (
|
||||
packetsReceivedPerSecondQuery = "\\Network Adapter(*)\\Packets Received/sec"
|
||||
packetsSentPerSecondQuery = "\\Network Adapter(*)\\Packets Sent/sec"
|
||||
bytesReceivedPerSecondQuery = "\\Network Adapter(*)\\Bytes Received/sec"
|
||||
bytesSentPerSecondQuery = "\\Network Adapter(*)\\Bytes Sent/sec"
|
||||
packetsReceivedDiscardedQuery = "\\Network Adapter(*)\\Packets Received Discarded"
|
||||
packetsReceivedErrorsQuery = "\\Network Adapter(*)\\Packets Received Errors"
|
||||
packetsOutboundDiscardedQuery = "\\Network Adapter(*)\\Packets Outbound Discarded"
|
||||
packetsOutboundErrorsQuery = "\\Network Adapter(*)\\Packets Outbound Errors"
|
||||
)
|
||||
|
||||
// networkCounter contains the counters for network adapters.
|
||||
type networkCounter struct {
|
||||
packetsReceivedPerSecondCounter *perfCounter
|
||||
packetsSentPerSecondCounter *perfCounter
|
||||
bytesReceivedPerSecondCounter *perfCounter
|
||||
bytesSentPerSecondCounter *perfCounter
|
||||
packetsReceivedDiscardedCounter *perfCounter
|
||||
packetsReceivedErrorsCounter *perfCounter
|
||||
packetsOutboundDiscardedCounter *perfCounter
|
||||
packetsOutboundErrorsCounter *perfCounter
|
||||
|
||||
mu sync.RWMutex
|
||||
adapterStats map[string]cadvisorapi.InterfaceStats
|
||||
}
|
||||
|
||||
func newNetworkCounters() (*networkCounter, error) {
|
||||
packetsReceivedPerSecondCounter, err := newPerfCounter(packetsReceivedPerSecondQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
packetsSentPerSecondCounter, err := newPerfCounter(packetsSentPerSecondQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bytesReceivedPerSecondCounter, err := newPerfCounter(bytesReceivedPerSecondQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bytesSentPerSecondCounter, err := newPerfCounter(bytesSentPerSecondQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
packetsReceivedDiscardedCounter, err := newPerfCounter(packetsReceivedDiscardedQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
packetsReceivedErrorsCounter, err := newPerfCounter(packetsReceivedErrorsQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
packetsOutboundDiscardedCounter, err := newPerfCounter(packetsOutboundDiscardedQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
packetsOutboundErrorsCounter, err := newPerfCounter(packetsOutboundErrorsQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &networkCounter{
|
||||
packetsReceivedPerSecondCounter: packetsReceivedPerSecondCounter,
|
||||
packetsSentPerSecondCounter: packetsSentPerSecondCounter,
|
||||
bytesReceivedPerSecondCounter: bytesReceivedPerSecondCounter,
|
||||
bytesSentPerSecondCounter: bytesSentPerSecondCounter,
|
||||
packetsReceivedDiscardedCounter: packetsReceivedDiscardedCounter,
|
||||
packetsReceivedErrorsCounter: packetsReceivedErrorsCounter,
|
||||
packetsOutboundDiscardedCounter: packetsOutboundDiscardedCounter,
|
||||
packetsOutboundErrorsCounter: packetsOutboundErrorsCounter,
|
||||
adapterStats: map[string]cadvisorapi.InterfaceStats{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (n *networkCounter) getData() ([]cadvisorapi.InterfaceStats, error) {
|
||||
packetsReceivedPerSecondData, err := n.packetsReceivedPerSecondCounter.getDataList()
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to get packetsReceivedPerSecond perf counter data; err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
packetsSentPerSecondData, err := n.packetsSentPerSecondCounter.getDataList()
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to get packetsSentPerSecond perf counter data; err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bytesReceivedPerSecondData, err := n.bytesReceivedPerSecondCounter.getDataList()
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to get bytesReceivedPerSecond perf counter data; err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bytesSentPerSecondData, err := n.bytesSentPerSecondCounter.getDataList()
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to get bytesSentPerSecond perf counter data; err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
packetsReceivedDiscardedData, err := n.packetsReceivedDiscardedCounter.getDataList()
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to get packetsReceivedDiscarded perf counter data; err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
packetsReceivedErrorsData, err := n.packetsReceivedErrorsCounter.getDataList()
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to get packetsReceivedErrors perf counter data; err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
packetsOutboundDiscardedData, err := n.packetsOutboundDiscardedCounter.getDataList()
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to get packetsOutboundDiscarded perf counter data; err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
packetsOutboundErrorsData, err := n.packetsOutboundErrorsCounter.getDataList()
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to get packetsOutboundErrors perf counter data; err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
n.mergeCollectedData(
|
||||
packetsReceivedPerSecondData,
|
||||
packetsSentPerSecondData,
|
||||
bytesReceivedPerSecondData,
|
||||
bytesSentPerSecondData,
|
||||
packetsReceivedDiscardedData,
|
||||
packetsReceivedErrorsData,
|
||||
packetsOutboundDiscardedData,
|
||||
packetsOutboundErrorsData,
|
||||
)
|
||||
return n.listInterfaceStats(), nil
|
||||
}
|
||||
|
||||
// mergeCollectedData merges the collected data into cache. It should be invoked under lock protected.
|
||||
func (n *networkCounter) mergeCollectedData(packetsReceivedPerSecondData,
|
||||
packetsSentPerSecondData,
|
||||
bytesReceivedPerSecondData,
|
||||
bytesSentPerSecondData,
|
||||
packetsReceivedDiscardedData,
|
||||
packetsReceivedErrorsData,
|
||||
packetsOutboundDiscardedData,
|
||||
packetsOutboundErrorsData map[string]uint64) {
|
||||
adapters := sets.NewString()
|
||||
|
||||
// merge the collected data and list of adapters.
|
||||
adapters.Insert(n.mergePacketsReceivedPerSecondData(packetsReceivedPerSecondData)...)
|
||||
adapters.Insert(n.mergePacketsSentPerSecondData(packetsSentPerSecondData)...)
|
||||
adapters.Insert(n.mergeBytesReceivedPerSecondData(bytesReceivedPerSecondData)...)
|
||||
adapters.Insert(n.mergeBytesSentPerSecondData(bytesSentPerSecondData)...)
|
||||
adapters.Insert(n.mergePacketsReceivedDiscardedData(packetsReceivedDiscardedData)...)
|
||||
adapters.Insert(n.mergePacketsReceivedErrorsData(packetsReceivedErrorsData)...)
|
||||
adapters.Insert(n.mergePacketsOutboundDiscardedData(packetsOutboundDiscardedData)...)
|
||||
adapters.Insert(n.mergePacketsOutboundErrorsData(packetsOutboundErrorsData)...)
|
||||
|
||||
// delete the cache for non-existing adapters.
|
||||
for adapter := range n.adapterStats {
|
||||
if !adapters.Has(adapter) {
|
||||
delete(n.adapterStats, adapter)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (n *networkCounter) mergePacketsReceivedPerSecondData(packetsReceivedPerSecondData map[string]uint64) []string {
|
||||
var adapters []string
|
||||
for adapterName, value := range packetsReceivedPerSecondData {
|
||||
adapters = append(adapters, adapterName)
|
||||
newStat := n.adapterStats[adapterName]
|
||||
newStat.Name = adapterName
|
||||
newStat.RxPackets = newStat.RxPackets + value
|
||||
n.adapterStats[adapterName] = newStat
|
||||
}
|
||||
|
||||
return adapters
|
||||
}
|
||||
|
||||
func (n *networkCounter) mergePacketsSentPerSecondData(packetsSentPerSecondData map[string]uint64) []string {
|
||||
var adapters []string
|
||||
for adapterName, value := range packetsSentPerSecondData {
|
||||
adapters = append(adapters, adapterName)
|
||||
newStat := n.adapterStats[adapterName]
|
||||
newStat.Name = adapterName
|
||||
newStat.TxPackets = newStat.TxPackets + value
|
||||
n.adapterStats[adapterName] = newStat
|
||||
}
|
||||
|
||||
return adapters
|
||||
}
|
||||
|
||||
func (n *networkCounter) mergeBytesReceivedPerSecondData(bytesReceivedPerSecondData map[string]uint64) []string {
|
||||
var adapters []string
|
||||
for adapterName, value := range bytesReceivedPerSecondData {
|
||||
adapters = append(adapters, adapterName)
|
||||
newStat := n.adapterStats[adapterName]
|
||||
newStat.Name = adapterName
|
||||
newStat.RxBytes = newStat.RxBytes + value
|
||||
n.adapterStats[adapterName] = newStat
|
||||
}
|
||||
|
||||
return adapters
|
||||
}
|
||||
|
||||
func (n *networkCounter) mergeBytesSentPerSecondData(bytesSentPerSecondData map[string]uint64) []string {
|
||||
var adapters []string
|
||||
for adapterName, value := range bytesSentPerSecondData {
|
||||
adapters = append(adapters, adapterName)
|
||||
newStat := n.adapterStats[adapterName]
|
||||
newStat.Name = adapterName
|
||||
newStat.TxBytes = newStat.TxBytes + value
|
||||
n.adapterStats[adapterName] = newStat
|
||||
}
|
||||
|
||||
return adapters
|
||||
}
|
||||
|
||||
func (n *networkCounter) mergePacketsReceivedDiscardedData(packetsReceivedDiscardedData map[string]uint64) []string {
|
||||
var adapters []string
|
||||
for adapterName, value := range packetsReceivedDiscardedData {
|
||||
adapters = append(adapters, adapterName)
|
||||
newStat := n.adapterStats[adapterName]
|
||||
newStat.Name = adapterName
|
||||
newStat.RxDropped = value
|
||||
n.adapterStats[adapterName] = newStat
|
||||
}
|
||||
|
||||
return adapters
|
||||
}
|
||||
|
||||
func (n *networkCounter) mergePacketsReceivedErrorsData(packetsReceivedErrorsData map[string]uint64) []string {
|
||||
var adapters []string
|
||||
for adapterName, value := range packetsReceivedErrorsData {
|
||||
adapters = append(adapters, adapterName)
|
||||
newStat := n.adapterStats[adapterName]
|
||||
newStat.Name = adapterName
|
||||
newStat.RxErrors = value
|
||||
n.adapterStats[adapterName] = newStat
|
||||
}
|
||||
|
||||
return adapters
|
||||
}
|
||||
|
||||
func (n *networkCounter) mergePacketsOutboundDiscardedData(packetsOutboundDiscardedData map[string]uint64) []string {
|
||||
var adapters []string
|
||||
for adapterName, value := range packetsOutboundDiscardedData {
|
||||
adapters = append(adapters, adapterName)
|
||||
newStat := n.adapterStats[adapterName]
|
||||
newStat.Name = adapterName
|
||||
newStat.TxDropped = value
|
||||
n.adapterStats[adapterName] = newStat
|
||||
}
|
||||
|
||||
return adapters
|
||||
}
|
||||
|
||||
func (n *networkCounter) mergePacketsOutboundErrorsData(packetsOutboundErrorsData map[string]uint64) []string {
|
||||
var adapters []string
|
||||
for adapterName, value := range packetsOutboundErrorsData {
|
||||
adapters = append(adapters, adapterName)
|
||||
newStat := n.adapterStats[adapterName]
|
||||
newStat.Name = adapterName
|
||||
newStat.TxErrors = value
|
||||
n.adapterStats[adapterName] = newStat
|
||||
}
|
||||
|
||||
return adapters
|
||||
}
|
||||
|
||||
func (n *networkCounter) listInterfaceStats() []cadvisorapi.InterfaceStats {
|
||||
stats := make([]cadvisorapi.InterfaceStats, 0, len(n.adapterStats))
|
||||
for _, stat := range n.adapterStats {
|
||||
stats = append(stats, stat)
|
||||
}
|
||||
return stats
|
||||
}
|
|
@ -101,8 +101,13 @@ func (p *perfCounterNodeStatsClient) startMonitoring() error {
|
|||
return err
|
||||
}
|
||||
|
||||
networkAdapterCounter, err := newNetworkCounters()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go wait.Forever(func() {
|
||||
p.collectMetricsData(cpuCounter, memWorkingSetCounter, memCommittedBytesCounter)
|
||||
p.collectMetricsData(cpuCounter, memWorkingSetCounter, memCommittedBytesCounter, networkAdapterCounter)
|
||||
}, perfCounterUpdatePeriod)
|
||||
|
||||
return nil
|
||||
|
@ -138,7 +143,7 @@ func (p *perfCounterNodeStatsClient) getNodeInfo() nodeInfo {
|
|||
return p.nodeInfo
|
||||
}
|
||||
|
||||
func (p *perfCounterNodeStatsClient) collectMetricsData(cpuCounter, memWorkingSetCounter, memCommittedBytesCounter *perfCounter) {
|
||||
func (p *perfCounterNodeStatsClient) collectMetricsData(cpuCounter, memWorkingSetCounter, memCommittedBytesCounter *perfCounter, networkAdapterCounter *networkCounter) {
|
||||
cpuValue, err := cpuCounter.getData()
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to get cpu perf counter data; err: %v", err)
|
||||
|
@ -157,12 +162,19 @@ func (p *perfCounterNodeStatsClient) collectMetricsData(cpuCounter, memWorkingSe
|
|||
return
|
||||
}
|
||||
|
||||
networkAdapterStats, err := networkAdapterCounter.getData()
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to get network adapter perf counter data; err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.nodeMetrics = nodeMetrics{
|
||||
cpuUsageCoreNanoSeconds: p.convertCPUValue(cpuValue),
|
||||
memoryPrivWorkingSetBytes: memWorkingSetValue,
|
||||
memoryCommittedBytes: memCommittedBytesValue,
|
||||
interfaceStats: networkAdapterStats,
|
||||
timeStamp: time.Now(),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,6 +71,7 @@ func newPerfCounter(counter string) (*perfCounter, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
// getData is used for getting data without * in counter name.
|
||||
func (p *perfCounter) getData() (uint64, error) {
|
||||
ret := win_pdh.PdhCollectQueryData(p.queryHandle)
|
||||
if ret != win_pdh.ERROR_SUCCESS {
|
||||
|
@ -100,3 +101,36 @@ func (p *perfCounter) getData() (uint64, error) {
|
|||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// getData is used for getting data with * in counter name.
|
||||
func (p *perfCounter) getDataList() (map[string]uint64, error) {
|
||||
ret := win_pdh.PdhCollectQueryData(p.queryHandle)
|
||||
if ret != win_pdh.ERROR_SUCCESS {
|
||||
return nil, fmt.Errorf("unable to collect data from counter. Error code is %x", ret)
|
||||
}
|
||||
|
||||
var bufSize, bufCount uint32
|
||||
var size = uint32(unsafe.Sizeof(win_pdh.PDH_FMT_COUNTERVALUE_ITEM_DOUBLE{}))
|
||||
var emptyBuf [1]win_pdh.PDH_FMT_COUNTERVALUE_ITEM_DOUBLE // need at least 1 addressable null ptr.
|
||||
data := map[string]uint64{}
|
||||
|
||||
ret = win_pdh.PdhGetFormattedCounterArrayDouble(p.counterHandle, &bufSize, &bufCount, &emptyBuf[0])
|
||||
if ret != win_pdh.PDH_MORE_DATA {
|
||||
return nil, fmt.Errorf("unable to collect data from counter. Error code is %x", ret)
|
||||
}
|
||||
|
||||
filledBuf := make([]win_pdh.PDH_FMT_COUNTERVALUE_ITEM_DOUBLE, bufCount*size)
|
||||
ret = win_pdh.PdhGetFormattedCounterArrayDouble(p.counterHandle, &bufSize, &bufCount, &filledBuf[0])
|
||||
if ret != win_pdh.ERROR_SUCCESS {
|
||||
return nil, fmt.Errorf("unable to collect data from counter. Error code is %x", ret)
|
||||
}
|
||||
|
||||
for i := 0; i < int(bufCount); i++ {
|
||||
c := filledBuf[i]
|
||||
value := uint64(c.FmtValue.DoubleValue)
|
||||
name := win_pdh.UTF16PtrToString(c.SzName)
|
||||
data[name] = value
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
|
|
@ -58,6 +58,7 @@ type nodeMetrics struct {
|
|||
memoryPrivWorkingSetBytes uint64
|
||||
memoryCommittedBytes uint64
|
||||
timeStamp time.Time
|
||||
interfaceStats []cadvisorapi.InterfaceStats
|
||||
}
|
||||
|
||||
type nodeInfo struct {
|
||||
|
@ -109,12 +110,11 @@ func (c *StatsClient) WinVersionInfo() (*cadvisorapi.VersionInfo, error) {
|
|||
|
||||
func (c *StatsClient) createRootContainerInfo() (*cadvisorapiv2.ContainerInfo, error) {
|
||||
nodeMetrics, err := c.client.getNodeMetrics()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var stats []*cadvisorapiv2.ContainerStats
|
||||
|
||||
var stats []*cadvisorapiv2.ContainerStats
|
||||
stats = append(stats, &cadvisorapiv2.ContainerStats{
|
||||
Timestamp: nodeMetrics.timeStamp,
|
||||
Cpu: &cadvisorapi.CpuStats{
|
||||
|
@ -126,6 +126,9 @@ func (c *StatsClient) createRootContainerInfo() (*cadvisorapiv2.ContainerInfo, e
|
|||
WorkingSet: nodeMetrics.memoryPrivWorkingSetBytes,
|
||||
Usage: nodeMetrics.memoryCommittedBytes,
|
||||
},
|
||||
Network: &cadvisorapiv2.NetworkStats{
|
||||
Interfaces: nodeMetrics.interfaceStats,
|
||||
},
|
||||
})
|
||||
|
||||
nodeInfo := c.client.getNodeInfo()
|
||||
|
@ -134,6 +137,7 @@ func (c *StatsClient) createRootContainerInfo() (*cadvisorapiv2.ContainerInfo, e
|
|||
CreationTime: nodeInfo.startTime,
|
||||
HasCpu: true,
|
||||
HasMemory: true,
|
||||
HasNetwork: true,
|
||||
Memory: cadvisorapiv2.MemorySpec{
|
||||
Limit: nodeInfo.memoryPhysicalCapacityBytes,
|
||||
},
|
||||
|
|
|
@ -88,6 +88,7 @@ func TestWinContainerInfos(t *testing.T) {
|
|||
Spec: cadvisorapiv2.ContainerSpec{
|
||||
HasCpu: true,
|
||||
HasMemory: true,
|
||||
HasNetwork: true,
|
||||
Memory: cadvisorapiv2.MemorySpec{
|
||||
Limit: 1.6e+10,
|
||||
},
|
||||
|
@ -95,7 +96,11 @@ func TestWinContainerInfos(t *testing.T) {
|
|||
Stats: stats,
|
||||
}
|
||||
|
||||
assert.Equal(t, actualRootInfos, infos)
|
||||
assert.Equal(t, len(actualRootInfos), len(infos))
|
||||
assert.Equal(t, actualRootInfos["/"].Spec, infos["/"].Spec)
|
||||
assert.Equal(t, len(actualRootInfos["/"].Stats), len(infos["/"].Stats))
|
||||
assert.Equal(t, actualRootInfos["/"].Stats[0].Cpu, infos["/"].Stats[0].Cpu)
|
||||
assert.Equal(t, actualRootInfos["/"].Stats[0].Memory, infos["/"].Stats[0].Memory)
|
||||
}
|
||||
|
||||
func TestWinMachineInfo(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue