mirror of https://github.com/k3s-io/k3s
456 lines
13 KiB
Go
456 lines
13 KiB
Go
package storage
|
|
|
|
// Copyright (c) Microsoft Corporation. All rights reserved.
|
|
// Licensed under the MIT License. See License.txt in the project root for license information.
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"net/url"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/gofrs/uuid"
|
|
)
|
|
|
|
// Annotating as secure for gas scanning
|
|
/* #nosec */
|
|
const (
|
|
partitionKeyNode = "PartitionKey"
|
|
rowKeyNode = "RowKey"
|
|
etagErrorTemplate = "Etag didn't match: %v"
|
|
)
|
|
|
|
var (
|
|
errEmptyPayload = errors.New("Empty payload is not a valid metadata level for this operation")
|
|
errNilPreviousResult = errors.New("The previous results page is nil")
|
|
errNilNextLink = errors.New("There are no more pages in this query results")
|
|
)
|
|
|
|
// Entity represents an entity inside an Azure table.
|
|
type Entity struct {
|
|
Table *Table
|
|
PartitionKey string
|
|
RowKey string
|
|
TimeStamp time.Time
|
|
OdataMetadata string
|
|
OdataType string
|
|
OdataID string
|
|
OdataEtag string
|
|
OdataEditLink string
|
|
Properties map[string]interface{}
|
|
}
|
|
|
|
// GetEntityReference returns an Entity object with the specified
|
|
// partition key and row key.
|
|
func (t *Table) GetEntityReference(partitionKey, rowKey string) *Entity {
|
|
return &Entity{
|
|
PartitionKey: partitionKey,
|
|
RowKey: rowKey,
|
|
Table: t,
|
|
}
|
|
}
|
|
|
|
// EntityOptions includes options for entity operations.
|
|
type EntityOptions struct {
|
|
Timeout uint
|
|
RequestID string `header:"x-ms-client-request-id"`
|
|
}
|
|
|
|
// GetEntityOptions includes options for a get entity operation
|
|
type GetEntityOptions struct {
|
|
Select []string
|
|
RequestID string `header:"x-ms-client-request-id"`
|
|
}
|
|
|
|
// Get gets the referenced entity. Which properties to get can be
|
|
// specified using the select option.
|
|
// See:
|
|
// https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/query-entities
|
|
// https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/querying-tables-and-entities
|
|
func (e *Entity) Get(timeout uint, ml MetadataLevel, options *GetEntityOptions) error {
|
|
if ml == EmptyPayload {
|
|
return errEmptyPayload
|
|
}
|
|
// RowKey and PartitionKey could be lost if not included in the query
|
|
// As those are the entity identifiers, it is best if they are not lost
|
|
rk := e.RowKey
|
|
pk := e.PartitionKey
|
|
|
|
query := url.Values{
|
|
"timeout": {strconv.FormatUint(uint64(timeout), 10)},
|
|
}
|
|
headers := e.Table.tsc.client.getStandardHeaders()
|
|
headers[headerAccept] = string(ml)
|
|
|
|
if options != nil {
|
|
if len(options.Select) > 0 {
|
|
query.Add("$select", strings.Join(options.Select, ","))
|
|
}
|
|
headers = mergeHeaders(headers, headersFromStruct(*options))
|
|
}
|
|
|
|
uri := e.Table.tsc.client.getEndpoint(tableServiceName, e.buildPath(), query)
|
|
resp, err := e.Table.tsc.client.exec(http.MethodGet, uri, headers, nil, e.Table.tsc.auth)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer drainRespBody(resp)
|
|
|
|
if err = checkRespCode(resp, []int{http.StatusOK}); err != nil {
|
|
return err
|
|
}
|
|
|
|
respBody, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = json.Unmarshal(respBody, e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
e.PartitionKey = pk
|
|
e.RowKey = rk
|
|
|
|
return nil
|
|
}
|
|
|
|
// Insert inserts the referenced entity in its table.
|
|
// The function fails if there is an entity with the same
|
|
// PartitionKey and RowKey in the table.
|
|
// ml determines the level of detail of metadata in the operation response,
|
|
// or no data at all.
|
|
// See: https://docs.microsoft.com/rest/api/storageservices/fileservices/insert-entity
|
|
func (e *Entity) Insert(ml MetadataLevel, options *EntityOptions) error {
|
|
query, headers := options.getParameters()
|
|
headers = mergeHeaders(headers, e.Table.tsc.client.getStandardHeaders())
|
|
|
|
body, err := json.Marshal(e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
headers = addBodyRelatedHeaders(headers, len(body))
|
|
headers = addReturnContentHeaders(headers, ml)
|
|
|
|
uri := e.Table.tsc.client.getEndpoint(tableServiceName, e.Table.buildPath(), query)
|
|
resp, err := e.Table.tsc.client.exec(http.MethodPost, uri, headers, bytes.NewReader(body), e.Table.tsc.auth)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer drainRespBody(resp)
|
|
|
|
if ml != EmptyPayload {
|
|
if err = checkRespCode(resp, []int{http.StatusCreated}); err != nil {
|
|
return err
|
|
}
|
|
data, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err = e.UnmarshalJSON(data); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if err = checkRespCode(resp, []int{http.StatusNoContent}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Update updates the contents of an entity. The function fails if there is no entity
|
|
// with the same PartitionKey and RowKey in the table or if the ETag is different
|
|
// than the one in Azure.
|
|
// See: https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/update-entity2
|
|
func (e *Entity) Update(force bool, options *EntityOptions) error {
|
|
return e.updateMerge(force, http.MethodPut, options)
|
|
}
|
|
|
|
// Merge merges the contents of entity specified with PartitionKey and RowKey
|
|
// with the content specified in Properties.
|
|
// The function fails if there is no entity with the same PartitionKey and
|
|
// RowKey in the table or if the ETag is different than the one in Azure.
|
|
// Read more: https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/merge-entity
|
|
func (e *Entity) Merge(force bool, options *EntityOptions) error {
|
|
return e.updateMerge(force, "MERGE", options)
|
|
}
|
|
|
|
// Delete deletes the entity.
|
|
// The function fails if there is no entity with the same PartitionKey and
|
|
// RowKey in the table or if the ETag is different than the one in Azure.
|
|
// See: https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/delete-entity1
|
|
func (e *Entity) Delete(force bool, options *EntityOptions) error {
|
|
query, headers := options.getParameters()
|
|
headers = mergeHeaders(headers, e.Table.tsc.client.getStandardHeaders())
|
|
|
|
headers = addIfMatchHeader(headers, force, e.OdataEtag)
|
|
headers = addReturnContentHeaders(headers, EmptyPayload)
|
|
|
|
uri := e.Table.tsc.client.getEndpoint(tableServiceName, e.buildPath(), query)
|
|
resp, err := e.Table.tsc.client.exec(http.MethodDelete, uri, headers, nil, e.Table.tsc.auth)
|
|
if err != nil {
|
|
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
|
|
return fmt.Errorf(etagErrorTemplate, err)
|
|
}
|
|
return err
|
|
}
|
|
defer drainRespBody(resp)
|
|
|
|
if err = checkRespCode(resp, []int{http.StatusNoContent}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return e.updateTimestamp(resp.Header)
|
|
}
|
|
|
|
// InsertOrReplace inserts an entity or replaces the existing one.
|
|
// Read more: https://docs.microsoft.com/rest/api/storageservices/fileservices/insert-or-replace-entity
|
|
func (e *Entity) InsertOrReplace(options *EntityOptions) error {
|
|
return e.insertOr(http.MethodPut, options)
|
|
}
|
|
|
|
// InsertOrMerge inserts an entity or merges the existing one.
|
|
// Read more: https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/insert-or-merge-entity
|
|
func (e *Entity) InsertOrMerge(options *EntityOptions) error {
|
|
return e.insertOr("MERGE", options)
|
|
}
|
|
|
|
func (e *Entity) buildPath() string {
|
|
return fmt.Sprintf("%s(PartitionKey='%s',RowKey='%s')", e.Table.buildPath(), e.PartitionKey, e.RowKey)
|
|
}
|
|
|
|
// MarshalJSON is a custom marshaller for entity
|
|
func (e *Entity) MarshalJSON() ([]byte, error) {
|
|
completeMap := map[string]interface{}{}
|
|
completeMap[partitionKeyNode] = e.PartitionKey
|
|
completeMap[rowKeyNode] = e.RowKey
|
|
for k, v := range e.Properties {
|
|
typeKey := strings.Join([]string{k, OdataTypeSuffix}, "")
|
|
switch t := v.(type) {
|
|
case []byte:
|
|
completeMap[typeKey] = OdataBinary
|
|
completeMap[k] = t
|
|
case time.Time:
|
|
completeMap[typeKey] = OdataDateTime
|
|
completeMap[k] = t.Format(time.RFC3339Nano)
|
|
case uuid.UUID:
|
|
completeMap[typeKey] = OdataGUID
|
|
completeMap[k] = t.String()
|
|
case int64:
|
|
completeMap[typeKey] = OdataInt64
|
|
completeMap[k] = fmt.Sprintf("%v", v)
|
|
case float32, float64:
|
|
completeMap[typeKey] = OdataDouble
|
|
completeMap[k] = fmt.Sprintf("%v", v)
|
|
default:
|
|
completeMap[k] = v
|
|
}
|
|
if strings.HasSuffix(k, OdataTypeSuffix) {
|
|
if !(completeMap[k] == OdataBinary ||
|
|
completeMap[k] == OdataDateTime ||
|
|
completeMap[k] == OdataGUID ||
|
|
completeMap[k] == OdataInt64 ||
|
|
completeMap[k] == OdataDouble) {
|
|
return nil, fmt.Errorf("Odata.type annotation %v value is not valid", k)
|
|
}
|
|
valueKey := strings.TrimSuffix(k, OdataTypeSuffix)
|
|
if _, ok := completeMap[valueKey]; !ok {
|
|
return nil, fmt.Errorf("Odata.type annotation %v defined without value defined", k)
|
|
}
|
|
}
|
|
}
|
|
return json.Marshal(completeMap)
|
|
}
|
|
|
|
// UnmarshalJSON is a custom unmarshaller for entities
|
|
func (e *Entity) UnmarshalJSON(data []byte) error {
|
|
errorTemplate := "Deserializing error: %v"
|
|
|
|
props := map[string]interface{}{}
|
|
err := json.Unmarshal(data, &props)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// deselialize metadata
|
|
e.OdataMetadata = stringFromMap(props, "odata.metadata")
|
|
e.OdataType = stringFromMap(props, "odata.type")
|
|
e.OdataID = stringFromMap(props, "odata.id")
|
|
e.OdataEtag = stringFromMap(props, "odata.etag")
|
|
e.OdataEditLink = stringFromMap(props, "odata.editLink")
|
|
e.PartitionKey = stringFromMap(props, partitionKeyNode)
|
|
e.RowKey = stringFromMap(props, rowKeyNode)
|
|
|
|
// deserialize timestamp
|
|
timeStamp, ok := props["Timestamp"]
|
|
if ok {
|
|
str, ok := timeStamp.(string)
|
|
if !ok {
|
|
return fmt.Errorf(errorTemplate, "Timestamp casting error")
|
|
}
|
|
t, err := time.Parse(time.RFC3339Nano, str)
|
|
if err != nil {
|
|
return fmt.Errorf(errorTemplate, err)
|
|
}
|
|
e.TimeStamp = t
|
|
}
|
|
delete(props, "Timestamp")
|
|
delete(props, "Timestamp@odata.type")
|
|
|
|
// deserialize entity (user defined fields)
|
|
for k, v := range props {
|
|
if strings.HasSuffix(k, OdataTypeSuffix) {
|
|
valueKey := strings.TrimSuffix(k, OdataTypeSuffix)
|
|
str, ok := props[valueKey].(string)
|
|
if !ok {
|
|
return fmt.Errorf(errorTemplate, fmt.Sprintf("%v casting error", v))
|
|
}
|
|
switch v {
|
|
case OdataBinary:
|
|
props[valueKey], err = base64.StdEncoding.DecodeString(str)
|
|
if err != nil {
|
|
return fmt.Errorf(errorTemplate, err)
|
|
}
|
|
case OdataDateTime:
|
|
t, err := time.Parse("2006-01-02T15:04:05Z", str)
|
|
if err != nil {
|
|
return fmt.Errorf(errorTemplate, err)
|
|
}
|
|
props[valueKey] = t
|
|
case OdataGUID:
|
|
props[valueKey] = uuid.FromStringOrNil(str)
|
|
case OdataInt64:
|
|
i, err := strconv.ParseInt(str, 10, 64)
|
|
if err != nil {
|
|
return fmt.Errorf(errorTemplate, err)
|
|
}
|
|
props[valueKey] = i
|
|
case OdataDouble:
|
|
f, err := strconv.ParseFloat(str, 64)
|
|
if err != nil {
|
|
return fmt.Errorf(errorTemplate, err)
|
|
}
|
|
props[valueKey] = f
|
|
default:
|
|
return fmt.Errorf(errorTemplate, fmt.Sprintf("%v is not supported", v))
|
|
}
|
|
delete(props, k)
|
|
}
|
|
}
|
|
|
|
e.Properties = props
|
|
return nil
|
|
}
|
|
|
|
func getAndDelete(props map[string]interface{}, key string) interface{} {
|
|
if value, ok := props[key]; ok {
|
|
delete(props, key)
|
|
return value
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func addIfMatchHeader(h map[string]string, force bool, etag string) map[string]string {
|
|
if force {
|
|
h[headerIfMatch] = "*"
|
|
} else {
|
|
h[headerIfMatch] = etag
|
|
}
|
|
return h
|
|
}
|
|
|
|
// updates Etag and timestamp
|
|
func (e *Entity) updateEtagAndTimestamp(headers http.Header) error {
|
|
e.OdataEtag = headers.Get(headerEtag)
|
|
return e.updateTimestamp(headers)
|
|
}
|
|
|
|
func (e *Entity) updateTimestamp(headers http.Header) error {
|
|
str := headers.Get(headerDate)
|
|
t, err := time.Parse(time.RFC1123, str)
|
|
if err != nil {
|
|
return fmt.Errorf("Update timestamp error: %v", err)
|
|
}
|
|
e.TimeStamp = t
|
|
return nil
|
|
}
|
|
|
|
func (e *Entity) insertOr(verb string, options *EntityOptions) error {
|
|
query, headers := options.getParameters()
|
|
headers = mergeHeaders(headers, e.Table.tsc.client.getStandardHeaders())
|
|
|
|
body, err := json.Marshal(e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
headers = addBodyRelatedHeaders(headers, len(body))
|
|
headers = addReturnContentHeaders(headers, EmptyPayload)
|
|
|
|
uri := e.Table.tsc.client.getEndpoint(tableServiceName, e.buildPath(), query)
|
|
resp, err := e.Table.tsc.client.exec(verb, uri, headers, bytes.NewReader(body), e.Table.tsc.auth)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer drainRespBody(resp)
|
|
|
|
if err = checkRespCode(resp, []int{http.StatusNoContent}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return e.updateEtagAndTimestamp(resp.Header)
|
|
}
|
|
|
|
func (e *Entity) updateMerge(force bool, verb string, options *EntityOptions) error {
|
|
query, headers := options.getParameters()
|
|
headers = mergeHeaders(headers, e.Table.tsc.client.getStandardHeaders())
|
|
|
|
body, err := json.Marshal(e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
headers = addBodyRelatedHeaders(headers, len(body))
|
|
headers = addIfMatchHeader(headers, force, e.OdataEtag)
|
|
headers = addReturnContentHeaders(headers, EmptyPayload)
|
|
|
|
uri := e.Table.tsc.client.getEndpoint(tableServiceName, e.buildPath(), query)
|
|
resp, err := e.Table.tsc.client.exec(verb, uri, headers, bytes.NewReader(body), e.Table.tsc.auth)
|
|
if err != nil {
|
|
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
|
|
return fmt.Errorf(etagErrorTemplate, err)
|
|
}
|
|
return err
|
|
}
|
|
defer drainRespBody(resp)
|
|
|
|
if err = checkRespCode(resp, []int{http.StatusNoContent}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return e.updateEtagAndTimestamp(resp.Header)
|
|
}
|
|
|
|
func stringFromMap(props map[string]interface{}, key string) string {
|
|
value := getAndDelete(props, key)
|
|
if value != nil {
|
|
return value.(string)
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (options *EntityOptions) getParameters() (url.Values, map[string]string) {
|
|
query := url.Values{}
|
|
headers := map[string]string{}
|
|
if options != nil {
|
|
query = addTimeout(query, options.Timeout)
|
|
headers = headersFromStruct(*options)
|
|
}
|
|
return query, headers
|
|
}
|