k3s/vendor/github.com/clusterhq/flocker-go/client.go

377 lines
10 KiB
Go
Raw Normal View History

package flocker
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"time"
)
// From https://github.com/ClusterHQ/flocker-docker-plugin/blob/master/flockerdockerplugin/adapter.py#L18
const defaultVolumeSize = json.Number("107374182400")
var (
// A volume can take a long time to be available, if we don't want
// Kubernetes to wait forever we need to stop trying after some time, that
// time is defined here
timeoutWaitingForVolume = 2 * time.Minute
tickerWaitingForVolume = 5 * time.Second
errStateNotFound = errors.New("State not found by Dataset ID")
errConfigurationNotFound = errors.New("Configuration not found by Name")
errFlockerControlServiceHost = errors.New("The volume config must have a key CONTROL_SERVICE_HOST defined in the OtherAttributes field")
errFlockerControlServicePort = errors.New("The volume config must have a key CONTROL_SERVICE_PORT defined in the OtherAttributes field")
errVolumeAlreadyExists = errors.New("The volume already exists")
errVolumeDoesNotExist = errors.New("The volume does not exist")
errUpdatingDataset = errors.New("It was impossible to update the dataset")
)
// Clientable exposes the needed methods to implement your own Flocker Client.
type Clientable interface {
CreateDataset(options *CreateDatasetOptions) (*DatasetState, error)
DeleteDataset(datasetID string) error
GetDatasetState(datasetID string) (*DatasetState, error)
GetDatasetID(metaName string) (datasetID string, err error)
GetPrimaryUUID() (primaryUUID string, err error)
ListNodes() (nodes []NodeState, err error)
UpdatePrimaryForDataset(primaryUUID, datasetID string) (*DatasetState, error)
}
// Client is a default Flocker Client.
type Client struct {
*http.Client
schema string
host string
port int
version string
clientIP string
maximumSize json.Number
}
var _ Clientable = &Client{}
// NewClient creates a wrapper over http.Client to communicate with the flocker control service.
func NewClient(host string, port int, clientIP string, caCertPath, keyPath, certPath string) (*Client, error) {
client, err := newTLSClient(caCertPath, keyPath, certPath)
if err != nil {
return nil, err
}
return &Client{
Client: client,
schema: "https",
host: host,
port: port,
version: "v1",
maximumSize: defaultVolumeSize,
clientIP: clientIP,
}, nil
}
/*
request do a request using the http.Client embedded to the control service
and returns the response or an error in case it happens.
Note: you will need to deal with the response body call to Close if you
don't want to deal with problems later.
*/
func (c Client) request(method, url string, payload interface{}) (*http.Response, error) {
var (
b []byte
err error
)
if method == "POST" { // Just allow payload on POST
b, err = json.Marshal(payload)
if err != nil {
return nil, err
}
}
req, err := http.NewRequest(method, url, bytes.NewBuffer(b))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
// REMEMBER TO CLOSE THE BODY IN THE OUTSIDE FUNCTION
return c.Do(req)
}
// post performs a post request with the indicated payload
func (c Client) post(url string, payload interface{}) (*http.Response, error) {
return c.request("POST", url, payload)
}
// delete performs a delete request with the indicated payload
func (c Client) delete(url string, payload interface{}) (*http.Response, error) {
return c.request("DELETE", url, payload)
}
// get performs a get request
func (c Client) get(url string) (*http.Response, error) {
return c.request("GET", url, nil)
}
// getURL returns a full URI to the control service
func (c Client) getURL(path string) string {
return fmt.Sprintf("%s://%s:%d/%s/%s", c.schema, c.host, c.port, c.version, path)
}
type configurationPayload struct {
Deleted bool `json:"deleted"`
Primary string `json:"primary"`
DatasetID string `json:"dataset_id,omitempty"`
MaximumSize json.Number `json:"maximum_size,omitempty"`
Metadata metadataPayload `json:"metadata,omitempty"`
}
type CreateDatasetOptions struct {
Primary string `json:"primary"`
DatasetID string `json:"dataset_id,omitempty"`
MaximumSize int64 `json:"maximum_size,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}
type metadataPayload struct {
Name string `json:"name,omitempty"`
}
type DatasetState struct {
Path string `json:"path"`
DatasetID string `json:"dataset_id"`
Primary string `json:"primary,omitempty"`
MaximumSize json.Number `json:"maximum_size,omitempty"`
}
type datasetStatePayload struct {
*DatasetState
}
type NodeState struct {
UUID string `json:"uuid"`
Host string `json:"host"`
}
// findIDInConfigurationsPayload returns the datasetID if it was found in the
// configurations payload, otherwise it will return an error.
func (c Client) findIDInConfigurationsPayload(body io.ReadCloser, name string) (datasetID string, err error) {
var configurations []configurationPayload
if err = json.NewDecoder(body).Decode(&configurations); err == nil {
for _, r := range configurations {
if r.Metadata.Name == name {
return r.DatasetID, nil
}
}
return "", errConfigurationNotFound
}
return "", err
}
// ListNodes returns a list of dataset agent nodes from Flocker Control Service
func (c *Client) ListNodes() (nodes []NodeState, err error) {
resp, err := c.get(c.getURL("state/nodes"))
if err != nil {
return []NodeState{}, err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return []NodeState{}, fmt.Errorf("Expected: {1,2}xx listing nodes, got: %d", resp.StatusCode)
}
err = json.NewDecoder(resp.Body).Decode(&nodes)
if err != nil {
return []NodeState{}, err
}
return nodes, err
}
// GetPrimaryUUID returns the UUID of the primary Flocker Control Service for
// the given host.
func (c Client) GetPrimaryUUID() (uuid string, err error) {
states, err := c.ListNodes()
if err != nil {
return "", err
}
for _, s := range states {
if s.Host == c.clientIP {
return s.UUID, nil
}
}
return "", fmt.Errorf("No node found with IP '%s', available nodes %+v", c.clientIP, states)
}
// DeleteDataset performs a delete request to the given datasetID
func (c *Client) DeleteDataset(datasetID string) error {
url := c.getURL(fmt.Sprintf("configuration/datasets/%s", datasetID))
resp, err := c.delete(url, nil)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("Expected: {1,2}xx deleting the dataset %s, got: %d", datasetID, resp.StatusCode)
}
return nil
}
// GetDatasetState performs a get request to get the state of the given datasetID, if
// something goes wrong or the datasetID was not found it returns an error.
func (c Client) GetDatasetState(datasetID string) (*DatasetState, error) {
resp, err := c.get(c.getURL("state/datasets"))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var states []datasetStatePayload
if err = json.NewDecoder(resp.Body).Decode(&states); err == nil {
for _, s := range states {
if s.DatasetID == datasetID {
return s.DatasetState, nil
}
}
return nil, errStateNotFound
}
return nil, err
}
/*
CreateDataset creates a volume in Flocker, waits for it to be ready and
returns the dataset id.
This process is a little bit complex but follows this flow:
1. Find the Flocker Control Service UUID
2. If it already exists an error is returned
3. If it didn't previously exist, wait for it to be ready
*/
func (c *Client) CreateDataset(options *CreateDatasetOptions) (datasetState *DatasetState, err error) {
// 1) Find the primary Flocker UUID
// Note: it could be cached, but doing this query we health check it
if options.Primary == "" {
options.Primary, err = c.GetPrimaryUUID()
if err != nil {
return nil, err
}
}
if options.MaximumSize == 0 {
options.MaximumSize, _ = c.maximumSize.Int64()
}
resp, err := c.post(c.getURL("configuration/datasets"), options)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// 2) Return if the dataset was previously created
if resp.StatusCode == http.StatusConflict {
return nil, errVolumeAlreadyExists
}
if resp.StatusCode >= 300 {
return nil, fmt.Errorf("Expected: {1,2}xx creating the volume, got: %d", resp.StatusCode)
}
var p configurationPayload
if err := json.NewDecoder(resp.Body).Decode(&p); err != nil {
return nil, err
}
// 3) Wait until the dataset is ready for usage. In case it never gets
// ready there is a timeoutChan that will return an error
timeoutChan := time.NewTimer(timeoutWaitingForVolume).C
tickChan := time.NewTicker(tickerWaitingForVolume).C
for {
var strErrDel string
s, err := c.GetDatasetState(p.DatasetID)
if err == nil {
return s, nil
} else if err != errStateNotFound {
errDel := c.DeleteDataset(p.DatasetID)
if errDel != nil {
strErrDel = fmt.Sprintf(", deletion of dataset failed with %s", errDel)
}
return nil, fmt.Errorf("Flocker API error during dataset creation (datasetID %s): %s%s", p.DatasetID, err, strErrDel)
}
select {
case <-timeoutChan:
errDel := c.DeleteDataset(p.DatasetID)
if errDel != nil {
strErrDel = fmt.Sprintf(", deletion of dataset failed with %s", errDel)
}
return nil, fmt.Errorf("Flocker API timeout during dataset creation (datasetID %s): %s%s", p.DatasetID, err, strErrDel)
case <-tickChan:
break
}
}
}
// UpdatePrimaryForDataset will update the Primary for the given dataset
// returning the current DatasetState.
func (c Client) UpdatePrimaryForDataset(newPrimaryUUID, datasetID string) (*DatasetState, error) {
payload := struct {
Primary string `json:"primary"`
}{
Primary: newPrimaryUUID,
}
url := c.getURL(fmt.Sprintf("configuration/datasets/%s", datasetID))
resp, err := c.post(url, payload)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return nil, errUpdatingDataset
}
var s DatasetState
if err := json.NewDecoder(resp.Body).Decode(&s); err != nil {
return nil, err
}
return &s, nil
}
// GetDatasetID will return the DatasetID found for the given metadata name.
func (c Client) GetDatasetID(metaName string) (datasetID string, err error) {
resp, err := c.get(c.getURL("configuration/datasets"))
if err != nil {
return "", err
}
defer resp.Body.Close()
var configurations []configurationPayload
if err = json.NewDecoder(resp.Body).Decode(&configurations); err == nil {
for _, c := range configurations {
if c.Metadata.Name == metaName && c.Deleted == false {
return c.DatasetID, nil
}
}
return "", errConfigurationNotFound
}
return "", err
}