portainer/pkg/snapshot/kubernetes.go

243 lines
7.6 KiB
Go

package snapshot
import (
"context"
"errors"
"fmt"
"io"
"math"
"os"
"reflect"
"strings"
"time"
"github.com/segmentio/encoding/json"
"github.com/aws/smithy-go/ptr"
portainer "github.com/portainer/portainer/api"
edgeutils "github.com/portainer/portainer/pkg/edge"
networkingutils "github.com/portainer/portainer/pkg/networking"
"github.com/rs/zerolog/log"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
)
func CreateKubernetesSnapshot(cli *kubernetes.Clientset) (*portainer.KubernetesSnapshot, error) {
kubernetesSnapshot := &portainer.KubernetesSnapshot{}
err := kubernetesSnapshotVersion(kubernetesSnapshot, cli)
if err != nil {
log.Warn().Err(err).Msg("unable to snapshot cluster version")
}
err = kubernetesSnapshotNodes(kubernetesSnapshot, cli)
if err != nil {
log.Warn().Err(err).Msg("unable to snapshot cluster nodes")
}
kubernetesSnapshot.Time = time.Now().Unix()
return kubernetesSnapshot, nil
}
func kubernetesSnapshotVersion(snapshot *portainer.KubernetesSnapshot, cli kubernetes.Interface) error {
versionInfo, err := cli.Discovery().ServerVersion()
if err != nil {
return err
}
snapshot.KubernetesVersion = versionInfo.GitVersion
return nil
}
func kubernetesSnapshotNodes(snapshot *portainer.KubernetesSnapshot, cli kubernetes.Interface) error {
nodeList, err := cli.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
return err
}
if len(nodeList.Items) == 0 {
return nil
}
var totalCPUs, totalMemory int64
for _, node := range nodeList.Items {
totalCPUs += node.Status.Capacity.Cpu().Value()
totalMemory += node.Status.Capacity.Memory().Value()
}
snapshot.TotalCPU = totalCPUs
snapshot.TotalMemory = totalMemory
snapshot.NodeCount = len(nodeList.Items)
// Collect performance metrics if we have a real client, otherwise use zero values
if clientset, ok := cli.(*kubernetes.Clientset); ok {
kubernetesSnapshotPerformanceMetricsWithClient(nodeList, clientset, snapshot)
} else {
snapshot.PerformanceMetrics = &portainer.PerformanceMetrics{
CPUUsage: 0,
MemoryUsage: 0,
NetworkUsage: 0,
}
}
return nil
}
func kubernetesSnapshotPerformanceMetricsWithClient(
nodeList *corev1.NodeList,
cli *kubernetes.Clientset,
snapshot *portainer.KubernetesSnapshot,
) {
performanceMetrics := &portainer.PerformanceMetrics{
CPUUsage: 0,
MemoryUsage: 0,
NetworkUsage: 0,
}
for _, node := range nodeList.Items {
nodeMetrics, err := kubernetesSnapshotNodePerformanceMetrics(cli, node, nil)
if err != nil {
log.Warn().Err(err).Msgf("failed to snapshot performance metrics for node %s", node.Name)
continue
}
if nodeMetrics != nil {
performanceMetrics.CPUUsage += nodeMetrics.CPUUsage
performanceMetrics.MemoryUsage += nodeMetrics.MemoryUsage
performanceMetrics.NetworkUsage += nodeMetrics.NetworkUsage
}
}
snapshot.PerformanceMetrics = performanceMetrics
}
// KubernetesSnapshotDiagnostics returns the diagnostics data for the agent
func KubernetesSnapshotDiagnostics(cli *kubernetes.Clientset, edgeKey string) (*portainer.DiagnosticsData, error) {
podID := os.Getenv("HOSTNAME")
snapshot := &portainer.KubernetesSnapshot{
DiagnosticsData: &portainer.DiagnosticsData{
DNS: make(map[string]string),
Telnet: make(map[string]string),
},
}
err := kubernetesSnapshotPodErrorLogs(snapshot, cli, "portainer", podID)
if err != nil {
return nil, fmt.Errorf("failed to snapshot pod error logs: %w", err)
}
if edgeKey != "" {
url, err := edgeutils.GetPortainerURLFromEdgeKey(edgeKey)
if err != nil {
return nil, fmt.Errorf("failed to get portainer URL from edge key: %w", err)
}
snapshot.DiagnosticsData.DNS["edge-to-portainer"] = networkingutils.ProbeDNSConnection(url)
snapshot.DiagnosticsData.Telnet["edge-to-portainer"] = networkingutils.ProbeTelnetConnection(url)
}
return snapshot.DiagnosticsData, nil
}
// KubernetesSnapshotPodErrorLogs returns 0 to 10 lines of the most recent error logs of the agent container
// this will primarily be used for agent snapshot
func kubernetesSnapshotPodErrorLogs(snapshot *portainer.KubernetesSnapshot, cli *kubernetes.Clientset, namespace, podID string) error {
if namespace == "" || podID == "" {
return errors.New("both namespace and podID are required to capture pod error logs in the snapshot")
}
logsStream, err := cli.CoreV1().Pods(namespace).GetLogs(podID, &corev1.PodLogOptions{TailLines: ptr.Int64(10), Timestamps: true}).Stream(context.TODO())
if err != nil {
return fmt.Errorf("failed to stream logs: %w", err)
}
defer logsStream.Close()
logBytes, err := io.ReadAll(logsStream)
if err != nil {
return fmt.Errorf("failed to read error logs: %w", err)
}
logs := filterLogsByPattern(logBytes, []string{"error", "err", "level=error", "exception", "fatal", "panic"})
jsonLogs, err := json.Marshal(logs)
if err != nil {
return fmt.Errorf("failed to marshal logs: %w", err)
}
snapshot.DiagnosticsData.Log = string(jsonLogs)
return nil
}
func kubernetesSnapshotNodePerformanceMetrics(cli *kubernetes.Clientset, node corev1.Node, _ *portainer.PerformanceMetrics) (*portainer.PerformanceMetrics, error) {
result := cli.RESTClient().Get().AbsPath(fmt.Sprintf("/api/v1/nodes/%s/proxy/stats/summary", node.Name)).Do(context.TODO())
if result.Error() != nil {
return nil, fmt.Errorf("failed to get node performance metrics: %w", result.Error())
}
raw, err := result.Raw()
if err != nil {
return nil, fmt.Errorf("failed to get node performance metrics: %w", err)
}
stats := statsapi.Summary{}
err = json.Unmarshal(raw, &stats)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal node performance metrics: %w", err)
}
nodeStats := stats.Node
metrics := calculateNodeMetrics(nodeStats, node)
return metrics, nil
}
// calculateNodeMetrics calculates performance metrics from node stats - extracted for testability
func calculateNodeMetrics(nodeStats statsapi.NodeStats, node corev1.Node) *portainer.PerformanceMetrics {
if reflect.DeepEqual(nodeStats, statsapi.NodeStats{}) {
return nil
}
metrics := &portainer.PerformanceMetrics{}
// Calculate CPU usage percentage
if nodeStats.CPU != nil && nodeStats.CPU.UsageNanoCores != nil {
totalCapacityNanoCores := node.Status.Capacity.Cpu().Value() * 1_000_000_000
metrics.CPUUsage = math.Round(float64(*nodeStats.CPU.UsageNanoCores) / float64(totalCapacityNanoCores) * 100)
}
// Calculate Memory usage percentage
if nodeStats.Memory != nil && nodeStats.Memory.WorkingSetBytes != nil {
totalCapacityBytes := node.Status.Capacity.Memory().Value()
metrics.MemoryUsage = math.Round(float64(*nodeStats.Memory.WorkingSetBytes) / float64(totalCapacityBytes) * 100)
}
// Calculate Network usage in MB
if nodeStats.Network != nil && nodeStats.Network.RxBytes != nil && nodeStats.Network.TxBytes != nil {
totalBytes := float64(*nodeStats.Network.RxBytes) + float64(*nodeStats.Network.TxBytes)
const bytesToMB = 1024 * 1024
metrics.NetworkUsage = math.Round(totalBytes / bytesToMB)
}
return metrics
}
func filterLogsByPattern(logBytes []byte, patterns []string) []map[string]string {
logs := []map[string]string{}
for _, line := range strings.Split(strings.TrimSpace(string(logBytes)), "\n") {
if line == "" {
continue
}
if parts := strings.SplitN(line, " ", 2); len(parts) == 2 {
messageLower := strings.ToLower(parts[1])
for _, pattern := range patterns {
if strings.Contains(messageLower, pattern) {
logs = append(logs, map[string]string{
"timestamp": parts[0],
"message": parts[1],
})
break
}
}
}
}
return logs
}