diff --git a/pkg/kubelet/winstats/network_stats.go b/pkg/kubelet/winstats/network_stats.go new file mode 100644 index 0000000000..5ff1d2a7be --- /dev/null +++ b/pkg/kubelet/winstats/network_stats.go @@ -0,0 +1,312 @@ +// +build windows + +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package winstats + +import ( + "sync" + + cadvisorapi "github.com/google/cadvisor/info/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog" +) + +const ( + packetsReceivedPerSecondQuery = "\\Network Adapter(*)\\Packets Received/sec" + packetsSentPerSecondQuery = "\\Network Adapter(*)\\Packets Sent/sec" + bytesReceivedPerSecondQuery = "\\Network Adapter(*)\\Bytes Received/sec" + bytesSentPerSecondQuery = "\\Network Adapter(*)\\Bytes Sent/sec" + packetsReceivedDiscardedQuery = "\\Network Adapter(*)\\Packets Received Discarded" + packetsReceivedErrorsQuery = "\\Network Adapter(*)\\Packets Received Errors" + packetsOutboundDiscardedQuery = "\\Network Adapter(*)\\Packets Outbound Discarded" + packetsOutboundErrorsQuery = "\\Network Adapter(*)\\Packets Outbound Errors" +) + +// networkCounter contains the counters for network adapters. +type networkCounter struct { + packetsReceivedPerSecondCounter *perfCounter + packetsSentPerSecondCounter *perfCounter + bytesReceivedPerSecondCounter *perfCounter + bytesSentPerSecondCounter *perfCounter + packetsReceivedDiscardedCounter *perfCounter + packetsReceivedErrorsCounter *perfCounter + packetsOutboundDiscardedCounter *perfCounter + packetsOutboundErrorsCounter *perfCounter + + mu sync.RWMutex + adapterStats map[string]cadvisorapi.InterfaceStats +} + +func newNetworkCounters() (*networkCounter, error) { + packetsReceivedPerSecondCounter, err := newPerfCounter(packetsReceivedPerSecondQuery) + if err != nil { + return nil, err + } + + packetsSentPerSecondCounter, err := newPerfCounter(packetsSentPerSecondQuery) + if err != nil { + return nil, err + } + + bytesReceivedPerSecondCounter, err := newPerfCounter(bytesReceivedPerSecondQuery) + if err != nil { + return nil, err + } + + bytesSentPerSecondCounter, err := newPerfCounter(bytesSentPerSecondQuery) + if err != nil { + return nil, err + } + + packetsReceivedDiscardedCounter, err := newPerfCounter(packetsReceivedDiscardedQuery) + if err != nil { + return nil, err + } + + packetsReceivedErrorsCounter, err := newPerfCounter(packetsReceivedErrorsQuery) + if err != nil { + return nil, err + } + + packetsOutboundDiscardedCounter, err := newPerfCounter(packetsOutboundDiscardedQuery) + if err != nil { + return nil, err + } + + packetsOutboundErrorsCounter, err := newPerfCounter(packetsOutboundErrorsQuery) + if err != nil { + return nil, err + } + + return &networkCounter{ + packetsReceivedPerSecondCounter: packetsReceivedPerSecondCounter, + packetsSentPerSecondCounter: packetsSentPerSecondCounter, + bytesReceivedPerSecondCounter: bytesReceivedPerSecondCounter, + bytesSentPerSecondCounter: bytesSentPerSecondCounter, + packetsReceivedDiscardedCounter: packetsReceivedDiscardedCounter, + packetsReceivedErrorsCounter: packetsReceivedErrorsCounter, + packetsOutboundDiscardedCounter: packetsOutboundDiscardedCounter, + packetsOutboundErrorsCounter: packetsOutboundErrorsCounter, + adapterStats: map[string]cadvisorapi.InterfaceStats{}, + }, nil +} + +func (n *networkCounter) getData() ([]cadvisorapi.InterfaceStats, error) { + packetsReceivedPerSecondData, err := n.packetsReceivedPerSecondCounter.getDataList() + if err != nil { + klog.Errorf("Unable to get packetsReceivedPerSecond perf counter data; err: %v", err) + return nil, err + } + + packetsSentPerSecondData, err := n.packetsSentPerSecondCounter.getDataList() + if err != nil { + klog.Errorf("Unable to get packetsSentPerSecond perf counter data; err: %v", err) + return nil, err + } + + bytesReceivedPerSecondData, err := n.bytesReceivedPerSecondCounter.getDataList() + if err != nil { + klog.Errorf("Unable to get bytesReceivedPerSecond perf counter data; err: %v", err) + return nil, err + } + + bytesSentPerSecondData, err := n.bytesSentPerSecondCounter.getDataList() + if err != nil { + klog.Errorf("Unable to get bytesSentPerSecond perf counter data; err: %v", err) + return nil, err + } + + packetsReceivedDiscardedData, err := n.packetsReceivedDiscardedCounter.getDataList() + if err != nil { + klog.Errorf("Unable to get packetsReceivedDiscarded perf counter data; err: %v", err) + return nil, err + } + + packetsReceivedErrorsData, err := n.packetsReceivedErrorsCounter.getDataList() + if err != nil { + klog.Errorf("Unable to get packetsReceivedErrors perf counter data; err: %v", err) + return nil, err + } + + packetsOutboundDiscardedData, err := n.packetsOutboundDiscardedCounter.getDataList() + if err != nil { + klog.Errorf("Unable to get packetsOutboundDiscarded perf counter data; err: %v", err) + return nil, err + } + + packetsOutboundErrorsData, err := n.packetsOutboundErrorsCounter.getDataList() + if err != nil { + klog.Errorf("Unable to get packetsOutboundErrors perf counter data; err: %v", err) + return nil, err + } + + n.mu.Lock() + defer n.mu.Unlock() + n.mergeCollectedData( + packetsReceivedPerSecondData, + packetsSentPerSecondData, + bytesReceivedPerSecondData, + bytesSentPerSecondData, + packetsReceivedDiscardedData, + packetsReceivedErrorsData, + packetsOutboundDiscardedData, + packetsOutboundErrorsData, + ) + return n.listInterfaceStats(), nil +} + +// mergeCollectedData merges the collected data into cache. It should be invoked under lock protected. +func (n *networkCounter) mergeCollectedData(packetsReceivedPerSecondData, + packetsSentPerSecondData, + bytesReceivedPerSecondData, + bytesSentPerSecondData, + packetsReceivedDiscardedData, + packetsReceivedErrorsData, + packetsOutboundDiscardedData, + packetsOutboundErrorsData map[string]uint64) { + adapters := sets.NewString() + + // merge the collected data and list of adapters. + adapters.Insert(n.mergePacketsReceivedPerSecondData(packetsReceivedPerSecondData)...) + adapters.Insert(n.mergePacketsSentPerSecondData(packetsSentPerSecondData)...) + adapters.Insert(n.mergeBytesReceivedPerSecondData(bytesReceivedPerSecondData)...) + adapters.Insert(n.mergeBytesSentPerSecondData(bytesSentPerSecondData)...) + adapters.Insert(n.mergePacketsReceivedDiscardedData(packetsReceivedDiscardedData)...) + adapters.Insert(n.mergePacketsReceivedErrorsData(packetsReceivedErrorsData)...) + adapters.Insert(n.mergePacketsOutboundDiscardedData(packetsOutboundDiscardedData)...) + adapters.Insert(n.mergePacketsOutboundErrorsData(packetsOutboundErrorsData)...) + + // delete the cache for non-existing adapters. + for adapter := range n.adapterStats { + if !adapters.Has(adapter) { + delete(n.adapterStats, adapter) + } + } +} + +func (n *networkCounter) mergePacketsReceivedPerSecondData(packetsReceivedPerSecondData map[string]uint64) []string { + var adapters []string + for adapterName, value := range packetsReceivedPerSecondData { + adapters = append(adapters, adapterName) + newStat := n.adapterStats[adapterName] + newStat.Name = adapterName + newStat.RxPackets = newStat.RxPackets + value + n.adapterStats[adapterName] = newStat + } + + return adapters +} + +func (n *networkCounter) mergePacketsSentPerSecondData(packetsSentPerSecondData map[string]uint64) []string { + var adapters []string + for adapterName, value := range packetsSentPerSecondData { + adapters = append(adapters, adapterName) + newStat := n.adapterStats[adapterName] + newStat.Name = adapterName + newStat.TxPackets = newStat.TxPackets + value + n.adapterStats[adapterName] = newStat + } + + return adapters +} + +func (n *networkCounter) mergeBytesReceivedPerSecondData(bytesReceivedPerSecondData map[string]uint64) []string { + var adapters []string + for adapterName, value := range bytesReceivedPerSecondData { + adapters = append(adapters, adapterName) + newStat := n.adapterStats[adapterName] + newStat.Name = adapterName + newStat.RxBytes = newStat.RxBytes + value + n.adapterStats[adapterName] = newStat + } + + return adapters +} + +func (n *networkCounter) mergeBytesSentPerSecondData(bytesSentPerSecondData map[string]uint64) []string { + var adapters []string + for adapterName, value := range bytesSentPerSecondData { + adapters = append(adapters, adapterName) + newStat := n.adapterStats[adapterName] + newStat.Name = adapterName + newStat.TxBytes = newStat.TxBytes + value + n.adapterStats[adapterName] = newStat + } + + return adapters +} + +func (n *networkCounter) mergePacketsReceivedDiscardedData(packetsReceivedDiscardedData map[string]uint64) []string { + var adapters []string + for adapterName, value := range packetsReceivedDiscardedData { + adapters = append(adapters, adapterName) + newStat := n.adapterStats[adapterName] + newStat.Name = adapterName + newStat.RxDropped = value + n.adapterStats[adapterName] = newStat + } + + return adapters +} + +func (n *networkCounter) mergePacketsReceivedErrorsData(packetsReceivedErrorsData map[string]uint64) []string { + var adapters []string + for adapterName, value := range packetsReceivedErrorsData { + adapters = append(adapters, adapterName) + newStat := n.adapterStats[adapterName] + newStat.Name = adapterName + newStat.RxErrors = value + n.adapterStats[adapterName] = newStat + } + + return adapters +} + +func (n *networkCounter) mergePacketsOutboundDiscardedData(packetsOutboundDiscardedData map[string]uint64) []string { + var adapters []string + for adapterName, value := range packetsOutboundDiscardedData { + adapters = append(adapters, adapterName) + newStat := n.adapterStats[adapterName] + newStat.Name = adapterName + newStat.TxDropped = value + n.adapterStats[adapterName] = newStat + } + + return adapters +} + +func (n *networkCounter) mergePacketsOutboundErrorsData(packetsOutboundErrorsData map[string]uint64) []string { + var adapters []string + for adapterName, value := range packetsOutboundErrorsData { + adapters = append(adapters, adapterName) + newStat := n.adapterStats[adapterName] + newStat.Name = adapterName + newStat.TxErrors = value + n.adapterStats[adapterName] = newStat + } + + return adapters +} + +func (n *networkCounter) listInterfaceStats() []cadvisorapi.InterfaceStats { + stats := make([]cadvisorapi.InterfaceStats, 0, len(n.adapterStats)) + for _, stat := range n.adapterStats { + stats = append(stats, stat) + } + return stats +} diff --git a/pkg/kubelet/winstats/perfcounter_nodestats.go b/pkg/kubelet/winstats/perfcounter_nodestats.go index 51fb15ae93..578f635881 100644 --- a/pkg/kubelet/winstats/perfcounter_nodestats.go +++ b/pkg/kubelet/winstats/perfcounter_nodestats.go @@ -101,8 +101,13 @@ func (p *perfCounterNodeStatsClient) startMonitoring() error { return err } + networkAdapterCounter, err := newNetworkCounters() + if err != nil { + return err + } + go wait.Forever(func() { - p.collectMetricsData(cpuCounter, memWorkingSetCounter, memCommittedBytesCounter) + p.collectMetricsData(cpuCounter, memWorkingSetCounter, memCommittedBytesCounter, networkAdapterCounter) }, perfCounterUpdatePeriod) return nil @@ -138,7 +143,7 @@ func (p *perfCounterNodeStatsClient) getNodeInfo() nodeInfo { return p.nodeInfo } -func (p *perfCounterNodeStatsClient) collectMetricsData(cpuCounter, memWorkingSetCounter, memCommittedBytesCounter *perfCounter) { +func (p *perfCounterNodeStatsClient) collectMetricsData(cpuCounter, memWorkingSetCounter, memCommittedBytesCounter *perfCounter, networkAdapterCounter *networkCounter) { cpuValue, err := cpuCounter.getData() if err != nil { klog.Errorf("Unable to get cpu perf counter data; err: %v", err) @@ -157,12 +162,19 @@ func (p *perfCounterNodeStatsClient) collectMetricsData(cpuCounter, memWorkingSe return } + networkAdapterStats, err := networkAdapterCounter.getData() + if err != nil { + klog.Errorf("Unable to get network adapter perf counter data; err: %v", err) + return + } + p.mu.Lock() defer p.mu.Unlock() p.nodeMetrics = nodeMetrics{ cpuUsageCoreNanoSeconds: p.convertCPUValue(cpuValue), memoryPrivWorkingSetBytes: memWorkingSetValue, memoryCommittedBytes: memCommittedBytesValue, + interfaceStats: networkAdapterStats, timeStamp: time.Now(), } } diff --git a/pkg/kubelet/winstats/perfcounters.go b/pkg/kubelet/winstats/perfcounters.go index e5ed450b78..91d91a5f80 100644 --- a/pkg/kubelet/winstats/perfcounters.go +++ b/pkg/kubelet/winstats/perfcounters.go @@ -71,6 +71,7 @@ func newPerfCounter(counter string) (*perfCounter, error) { }, nil } +// getData is used for getting data without * in counter name. func (p *perfCounter) getData() (uint64, error) { ret := win_pdh.PdhCollectQueryData(p.queryHandle) if ret != win_pdh.ERROR_SUCCESS { @@ -100,3 +101,36 @@ func (p *perfCounter) getData() (uint64, error) { return data, nil } + +// getData is used for getting data with * in counter name. +func (p *perfCounter) getDataList() (map[string]uint64, error) { + ret := win_pdh.PdhCollectQueryData(p.queryHandle) + if ret != win_pdh.ERROR_SUCCESS { + return nil, fmt.Errorf("unable to collect data from counter. Error code is %x", ret) + } + + var bufSize, bufCount uint32 + var size = uint32(unsafe.Sizeof(win_pdh.PDH_FMT_COUNTERVALUE_ITEM_DOUBLE{})) + var emptyBuf [1]win_pdh.PDH_FMT_COUNTERVALUE_ITEM_DOUBLE // need at least 1 addressable null ptr. + data := map[string]uint64{} + + ret = win_pdh.PdhGetFormattedCounterArrayDouble(p.counterHandle, &bufSize, &bufCount, &emptyBuf[0]) + if ret != win_pdh.PDH_MORE_DATA { + return nil, fmt.Errorf("unable to collect data from counter. Error code is %x", ret) + } + + filledBuf := make([]win_pdh.PDH_FMT_COUNTERVALUE_ITEM_DOUBLE, bufCount*size) + ret = win_pdh.PdhGetFormattedCounterArrayDouble(p.counterHandle, &bufSize, &bufCount, &filledBuf[0]) + if ret != win_pdh.ERROR_SUCCESS { + return nil, fmt.Errorf("unable to collect data from counter. Error code is %x", ret) + } + + for i := 0; i < int(bufCount); i++ { + c := filledBuf[i] + value := uint64(c.FmtValue.DoubleValue) + name := win_pdh.UTF16PtrToString(c.SzName) + data[name] = value + } + + return data, nil +} diff --git a/pkg/kubelet/winstats/winstats.go b/pkg/kubelet/winstats/winstats.go index 2c483f8967..055ba84675 100644 --- a/pkg/kubelet/winstats/winstats.go +++ b/pkg/kubelet/winstats/winstats.go @@ -58,6 +58,7 @@ type nodeMetrics struct { memoryPrivWorkingSetBytes uint64 memoryCommittedBytes uint64 timeStamp time.Time + interfaceStats []cadvisorapi.InterfaceStats } type nodeInfo struct { @@ -109,12 +110,11 @@ func (c *StatsClient) WinVersionInfo() (*cadvisorapi.VersionInfo, error) { func (c *StatsClient) createRootContainerInfo() (*cadvisorapiv2.ContainerInfo, error) { nodeMetrics, err := c.client.getNodeMetrics() - if err != nil { return nil, err } - var stats []*cadvisorapiv2.ContainerStats + var stats []*cadvisorapiv2.ContainerStats stats = append(stats, &cadvisorapiv2.ContainerStats{ Timestamp: nodeMetrics.timeStamp, Cpu: &cadvisorapi.CpuStats{ @@ -126,6 +126,9 @@ func (c *StatsClient) createRootContainerInfo() (*cadvisorapiv2.ContainerInfo, e WorkingSet: nodeMetrics.memoryPrivWorkingSetBytes, Usage: nodeMetrics.memoryCommittedBytes, }, + Network: &cadvisorapiv2.NetworkStats{ + Interfaces: nodeMetrics.interfaceStats, + }, }) nodeInfo := c.client.getNodeInfo() @@ -134,6 +137,7 @@ func (c *StatsClient) createRootContainerInfo() (*cadvisorapiv2.ContainerInfo, e CreationTime: nodeInfo.startTime, HasCpu: true, HasMemory: true, + HasNetwork: true, Memory: cadvisorapiv2.MemorySpec{ Limit: nodeInfo.memoryPhysicalCapacityBytes, }, diff --git a/pkg/kubelet/winstats/winstats_test.go b/pkg/kubelet/winstats/winstats_test.go index 830f76af03..51f7a290d1 100644 --- a/pkg/kubelet/winstats/winstats_test.go +++ b/pkg/kubelet/winstats/winstats_test.go @@ -86,8 +86,9 @@ func TestWinContainerInfos(t *testing.T) { infos := make(map[string]cadvisorapiv2.ContainerInfo) infos["/"] = cadvisorapiv2.ContainerInfo{ Spec: cadvisorapiv2.ContainerSpec{ - HasCpu: true, - HasMemory: true, + HasCpu: true, + HasMemory: true, + HasNetwork: true, Memory: cadvisorapiv2.MemorySpec{ Limit: 1.6e+10, }, @@ -95,7 +96,11 @@ func TestWinContainerInfos(t *testing.T) { Stats: stats, } - assert.Equal(t, actualRootInfos, infos) + assert.Equal(t, len(actualRootInfos), len(infos)) + assert.Equal(t, actualRootInfos["/"].Spec, infos["/"].Spec) + assert.Equal(t, len(actualRootInfos["/"].Stats), len(infos["/"].Stats)) + assert.Equal(t, actualRootInfos["/"].Stats[0].Cpu, infos["/"].Stats[0].Cpu) + assert.Equal(t, actualRootInfos["/"].Stats[0].Memory, infos["/"].Stats[0].Memory) } func TestWinMachineInfo(t *testing.T) {