k3s/vendor/github.com/google/cadvisor/container/mesos/client.go

191 lines
4.6 KiB
Go

// Copyright 2018 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mesos
import (
"fmt"
"github.com/Rican7/retry"
"github.com/Rican7/retry/strategy"
"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/agent"
"github.com/mesos/mesos-go/api/v1/lib/agent/calls"
mclient "github.com/mesos/mesos-go/api/v1/lib/client"
"github.com/mesos/mesos-go/api/v1/lib/encoding/codecs"
"github.com/mesos/mesos-go/api/v1/lib/httpcli"
"net/url"
"sync"
)
const (
maxRetryAttempts = 3
invalidPID = -1
)
var (
mesosClientOnce sync.Once
mesosClient *client
)
type client struct {
hc *httpcli.Client
}
type mesosAgentClient interface {
ContainerInfo(id string) (*containerInfo, error)
ContainerPid(id string) (int, error)
}
type containerInfo struct {
cntr *mContainer
labels map[string]string
}
// Client is an interface to query mesos agent http endpoints
func Client() (mesosAgentClient, error) {
mesosClientOnce.Do(func() {
// Start Client
apiURL := url.URL{
Scheme: "http",
Host: *MesosAgentAddress,
Path: "/api/v1",
}
mesosClient = &client{
hc: httpcli.New(
httpcli.Endpoint(apiURL.String()),
httpcli.Codec(codecs.ByMediaType[codecs.MediaTypeProtobuf]),
httpcli.Do(httpcli.With(httpcli.Timeout(*MesosAgentTimeout))),
),
}
})
return mesosClient, nil
}
// ContainerInfo returns the container information of the given container id
func (self *client) ContainerInfo(id string) (*containerInfo, error) {
c, err := self.getContainer(id)
if err != nil {
return nil, err
}
// Get labels of the container
l, err := self.getLabels(c)
if err != nil {
return nil, err
}
return &containerInfo{
cntr: c,
labels: l,
}, nil
}
// Get the Pid of the container
func (self *client) ContainerPid(id string) (int, error) {
var pid int
var err error
err = retry.Retry(
func(attempt uint) error {
c, err := self.ContainerInfo(id)
if err != nil {
return err
}
if c.cntr.ContainerStatus != nil {
pid = int(*c.cntr.ContainerStatus.ExecutorPID)
} else {
err = fmt.Errorf("error fetching Pid")
}
return err
},
strategy.Limit(maxRetryAttempts),
)
if err != nil {
return invalidPID, fmt.Errorf("failed to fetch pid")
}
return pid, err
}
func (self *client) getContainer(id string) (*mContainer, error) {
// Get all containers
cntrs, err := self.getContainers()
if err != nil {
return nil, err
}
// Check if there is a container with given id and return the container
for _, c := range cntrs.Containers {
if c.ContainerID.Value == id {
return &c, nil
}
}
return nil, fmt.Errorf("can't locate container %s", id)
}
func (self *client) getContainers() (mContainers, error) {
req := calls.NonStreaming(calls.GetContainers())
result, err := self.fetchAndDecode(req)
if err != nil {
return nil, fmt.Errorf("failed to get mesos containers: %v", err)
}
cntrs := result.GetContainers
return cntrs, nil
}
func (self *client) getLabels(c *mContainer) (map[string]string, error) {
// Get mesos agent state which contains all containers labels
var s state
req := calls.NonStreaming(calls.GetState())
result, err := self.fetchAndDecode(req)
if err != nil {
return map[string]string{}, fmt.Errorf("failed to get mesos agent state: %v", err)
}
s.st = result.GetState
// Fetch labels from state object
labels, err := s.FetchLabels(c.FrameworkID.Value, c.ExecutorID.Value)
if err != nil {
return labels, fmt.Errorf("error while fetching labels from executor: %v", err)
}
return labels, nil
}
func (self *client) fetchAndDecode(req calls.RequestFunc) (*agent.Response, error) {
var res mesos.Response
var err error
// Send request
err = retry.Retry(
func(attempt uint) error {
res, err = mesosClient.hc.Send(req, mclient.ResponseClassSingleton, nil)
return err
},
strategy.Limit(maxRetryAttempts),
)
if err != nil {
return nil, fmt.Errorf("error fetching %s: %s", req.Call(), err)
}
// Decode the result
var target agent.Response
err = res.Decode(&target)
if err != nil {
return nil, fmt.Errorf("error while decoding response body from %s: %s", res, err)
}
return &target, nil
}