mirror of https://github.com/k3s-io/k3s
Merge pull request #24625 from pmorie/dapi-volume-atomic
Automatic merge from submit-queue Refactor downward API volume to use AtomicWriter Make the downward API plugin use `AtomicWriter` instead. @thockin @saad-ali @sdminonne <!-- Reviewable:start --> --- This change is [<img src="http://reviewable.k8s.io/review_button.svg" height="35" align="absmiddle" alt="Reviewable"/>](http://reviewable.k8s.io/reviews/kubernetes/kubernetes/24625) <!-- Reviewable:end -->pull/6/head
commit
6fe3498ef9
|
@ -17,13 +17,10 @@ limitations under the License.
|
||||||
package downwardapi
|
package downwardapi
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io/ioutil"
|
"fmt"
|
||||||
"os"
|
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/fieldpath"
|
"k8s.io/kubernetes/pkg/fieldpath"
|
||||||
|
@ -31,6 +28,7 @@ import (
|
||||||
utilerrors "k8s.io/kubernetes/pkg/util/errors"
|
utilerrors "k8s.io/kubernetes/pkg/util/errors"
|
||||||
utilstrings "k8s.io/kubernetes/pkg/util/strings"
|
utilstrings "k8s.io/kubernetes/pkg/util/strings"
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
|
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
@ -151,17 +149,18 @@ func (b *downwardAPIVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !b.isDataChanged(data) {
|
writerContext := fmt.Sprintf("pod %v/%v volume %v", b.pod.Namespace, b.pod.Name, b.volName)
|
||||||
// No data changed: nothing to write
|
writer, err := volumeutil.NewAtomicWriter(dir, writerContext)
|
||||||
return nil
|
if err != nil {
|
||||||
}
|
glog.Errorf("Error creating atomic writer: %v", err)
|
||||||
|
|
||||||
if err := b.writeData(data); err != nil {
|
|
||||||
glog.Errorf("Unable to dump files for downwardAPI volume %v for pod %v/%v: %s", b.volName, b.pod.Namespace, b.pod.Name, err.Error())
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(3).Infof("Data dumped for downwardAPI volume %v for pod %v/%v", b.volName, b.pod.Namespace, b.pod.Name)
|
err = writer.Write(data)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error writing payload to dir: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
volume.SetVolumeOwnership(b, fsGroup)
|
volume.SetVolumeOwnership(b, fsGroup)
|
||||||
|
|
||||||
|
@ -171,179 +170,20 @@ func (b *downwardAPIVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
|
||||||
// collectData collects requested downwardAPI in data map.
|
// collectData collects requested downwardAPI in data map.
|
||||||
// Map's key is the requested name of file to dump
|
// Map's key is the requested name of file to dump
|
||||||
// Map's value is the (sorted) content of the field to be dumped in the file.
|
// Map's value is the (sorted) content of the field to be dumped in the file.
|
||||||
func (d *downwardAPIVolume) collectData() (map[string]string, error) {
|
func (d *downwardAPIVolume) collectData() (map[string][]byte, error) {
|
||||||
errlist := []error{}
|
errlist := []error{}
|
||||||
data := make(map[string]string)
|
data := make(map[string][]byte)
|
||||||
for fieldReference, fileName := range d.fieldReferenceFileNames {
|
for fieldReference, fileName := range d.fieldReferenceFileNames {
|
||||||
if values, err := fieldpath.ExtractFieldPathAsString(d.pod, fieldReference); err != nil {
|
if values, err := fieldpath.ExtractFieldPathAsString(d.pod, fieldReference); err != nil {
|
||||||
glog.Errorf("Unable to extract field %s: %s", fieldReference, err.Error())
|
glog.Errorf("Unable to extract field %s: %s", fieldReference, err.Error())
|
||||||
errlist = append(errlist, err)
|
errlist = append(errlist, err)
|
||||||
} else {
|
} else {
|
||||||
data[fileName] = sortLines(values)
|
data[fileName] = []byte(sortLines(values))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return data, utilerrors.NewAggregate(errlist)
|
return data, utilerrors.NewAggregate(errlist)
|
||||||
}
|
}
|
||||||
|
|
||||||
// isDataChanged iterate over all the entries to check whether at least one
|
|
||||||
// file needs to be updated.
|
|
||||||
func (d *downwardAPIVolume) isDataChanged(data map[string]string) bool {
|
|
||||||
for fileName, values := range data {
|
|
||||||
if isFileToGenerate(path.Join(d.GetPath(), fileName), values) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// isFileToGenerate compares actual file with the new values. If
|
|
||||||
// different (or the file does not exist) return true
|
|
||||||
func isFileToGenerate(fileName, values string) bool {
|
|
||||||
if _, err := os.Lstat(fileName); os.IsNotExist(err) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return readFile(fileName) != values
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
|
||||||
downwardAPIDir = "..downwardapi"
|
|
||||||
downwardAPITmpDir = "..downwardapi_tmp"
|
|
||||||
// It seems reasonable to allow dot-files in the config, so we reserved double-dot-files for the implementation".
|
|
||||||
)
|
|
||||||
|
|
||||||
// writeData writes requested downwardAPI in specified files.
|
|
||||||
//
|
|
||||||
// The file visible in this volume are symlinks to files in the '..downwardapi'
|
|
||||||
// directory. Actual files are stored in an hidden timestamped directory which is
|
|
||||||
// symlinked to by '..downwardapi'. The timestamped directory and '..downwardapi' symlink
|
|
||||||
// are created in the plugin root dir. This scheme allows the files to be
|
|
||||||
// atomically updated by changing the target of the '..downwardapi' symlink. When new
|
|
||||||
// data is available:
|
|
||||||
//
|
|
||||||
// 1. A new timestamped dir is created by writeDataInTimestampDir and requested data
|
|
||||||
// is written inside new timestamped directory
|
|
||||||
// 2. Symlinks and directory for new files are created (if needed).
|
|
||||||
// For example for files:
|
|
||||||
// <volume-dir>/user_space/labels
|
|
||||||
// <volume-dir>/k8s_space/annotations
|
|
||||||
// <volume-dir>/podName
|
|
||||||
// This structure is created:
|
|
||||||
// <volume-dir>/podName -> ..downwardapi/podName
|
|
||||||
// <volume-dir>/user_space/labels -> ../..downwardapi/user_space/labels
|
|
||||||
// <volume-dir>/k8s_space/annotations -> ../..downwardapi/k8s_space/annotations
|
|
||||||
// <volume-dir>/..downwardapi -> ..downwardapi.12345678
|
|
||||||
// where ..downwardapi.12345678 is a randomly generated directory which contains
|
|
||||||
// the real data. If a file has to be dumped in subdirectory (for example <volume-dir>/user_space/labels)
|
|
||||||
// plugin builds a relative symlink (<volume-dir>/user_space/labels -> ../..downwardapi/user_space/labels)
|
|
||||||
// 3. The previous timestamped directory is detected reading the '..downwardapi' symlink
|
|
||||||
// 4. In case no symlink exists then it's created
|
|
||||||
// 5. In case symlink exists a new temporary symlink is created ..downwardapi_tmp
|
|
||||||
// 6. ..downwardapi_tmp is renamed to ..downwardapi
|
|
||||||
// 7. The previous timestamped directory is removed
|
|
||||||
|
|
||||||
func (d *downwardAPIVolume) writeData(data map[string]string) error {
|
|
||||||
timestampDir, err := d.writeDataInTimestampDir(data)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Unable to write data in temporary directory: %s", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// update symbolic links for relative paths
|
|
||||||
if err = d.updateSymlinksToCurrentDir(); err != nil {
|
|
||||||
os.RemoveAll(timestampDir)
|
|
||||||
glog.Errorf("Unable to create symlinks and/or directory: %s", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
_, timestampDirBaseName := filepath.Split(timestampDir)
|
|
||||||
var oldTimestampDirectory string
|
|
||||||
oldTimestampDirectory, err = os.Readlink(path.Join(d.GetPath(), downwardAPIDir))
|
|
||||||
|
|
||||||
if err = os.Symlink(timestampDirBaseName, path.Join(d.GetPath(), downwardAPITmpDir)); err != nil {
|
|
||||||
os.RemoveAll(timestampDir)
|
|
||||||
glog.Errorf("Unable to create symolic link: %s", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Rename the symbolic link downwardAPITmpDir to downwardAPIDir
|
|
||||||
if err = os.Rename(path.Join(d.GetPath(), downwardAPITmpDir), path.Join(d.GetPath(), downwardAPIDir)); err != nil {
|
|
||||||
// in case of error remove latest data and downwardAPITmpDir
|
|
||||||
os.Remove(path.Join(d.GetPath(), downwardAPITmpDir))
|
|
||||||
os.RemoveAll(timestampDir)
|
|
||||||
glog.Errorf("Unable to rename symbolic link: %s", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Remove oldTimestampDirectory
|
|
||||||
if len(oldTimestampDirectory) > 0 {
|
|
||||||
if err := os.RemoveAll(path.Join(d.GetPath(), oldTimestampDirectory)); err != nil {
|
|
||||||
glog.Errorf("Unable to remove directory: %s", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// writeDataInTimestampDir writes the latest data into a new temporary directory with a timestamp.
|
|
||||||
func (d *downwardAPIVolume) writeDataInTimestampDir(data map[string]string) (string, error) {
|
|
||||||
errlist := []error{}
|
|
||||||
timestampDir, err := ioutil.TempDir(d.GetPath(), ".."+time.Now().Format("2006_01_02_15_04_05"))
|
|
||||||
for fileName, values := range data {
|
|
||||||
fullPathFile := path.Join(timestampDir, fileName)
|
|
||||||
dir, _ := filepath.Split(fullPathFile)
|
|
||||||
if err = os.MkdirAll(dir, os.ModePerm); err != nil {
|
|
||||||
glog.Errorf("Unable to create directory `%s`: %s", dir, err.Error())
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
if err := ioutil.WriteFile(fullPathFile, []byte(values), 0644); err != nil {
|
|
||||||
glog.Errorf("Unable to write file `%s`: %s", fullPathFile, err.Error())
|
|
||||||
errlist = append(errlist, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return timestampDir, utilerrors.NewAggregate(errlist)
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateSymlinksToCurrentDir creates the relative symlinks for all the files configured in this volume.
|
|
||||||
// If the directory in a file path does not exist, it is created.
|
|
||||||
//
|
|
||||||
// For example for files: "bar", "foo/bar", "baz/bar", "foo/baz/blah"
|
|
||||||
// the following symlinks and subdirectory are created:
|
|
||||||
// bar -> ..downwardapi/bar
|
|
||||||
// baz/bar -> ../..downwardapi/baz/bar
|
|
||||||
// foo/bar -> ../..downwardapi/foo/bar
|
|
||||||
// foo/baz/blah -> ../../..downwardapi/foo/baz/blah
|
|
||||||
func (d *downwardAPIVolume) updateSymlinksToCurrentDir() error {
|
|
||||||
for _, f := range d.fieldReferenceFileNames {
|
|
||||||
dir, _ := filepath.Split(f)
|
|
||||||
nbOfSubdir := 0
|
|
||||||
if len(dir) > 0 {
|
|
||||||
// if dir is not empty f contains at least a subdirectory (for example: f="foo/bar")
|
|
||||||
// since filepath.Split leaves a trailing '/' we have dir="foo/"
|
|
||||||
// and since len(strings.Split"foo/")=2 to count the number
|
|
||||||
// of sub directory you need to remove 1
|
|
||||||
nbOfSubdir = len(strings.Split(dir, "/")) - 1
|
|
||||||
if err := os.MkdirAll(path.Join(d.GetPath(), dir), os.ModePerm); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if _, err := os.Readlink(path.Join(d.GetPath(), f)); err != nil {
|
|
||||||
// link does not exist create it
|
|
||||||
presentedFile := path.Join(strings.Repeat("../", nbOfSubdir), downwardAPIDir, f)
|
|
||||||
actualFile := path.Join(d.GetPath(), f)
|
|
||||||
if err := os.Symlink(presentedFile, actualFile); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// readFile reads the file at the given path and returns the content as a string.
|
|
||||||
func readFile(path string) string {
|
|
||||||
if data, err := ioutil.ReadFile(path); err == nil {
|
|
||||||
return string(data)
|
|
||||||
}
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
// sortLines sorts the strings generated from map based data
|
// sortLines sorts the strings generated from map based data
|
||||||
// (annotations and labels)
|
// (annotations and labels)
|
||||||
func sortLines(values string) string {
|
func sortLines(values string) string {
|
||||||
|
@ -356,7 +196,7 @@ func (d *downwardAPIVolume) GetPath() string {
|
||||||
return d.plugin.host.GetPodVolumeDir(d.podUID, utilstrings.EscapeQualifiedNameForDisk(downwardAPIPluginName), d.volName)
|
return d.plugin.host.GetPodVolumeDir(d.podUID, utilstrings.EscapeQualifiedNameForDisk(downwardAPIPluginName), d.volName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// downwardAPIVolumeCleander handles cleaning up downwardAPI volumes
|
// downwardAPIVolumeCleaner handles cleaning up downwardAPI volumes
|
||||||
type downwardAPIVolumeUnmounter struct {
|
type downwardAPIVolumeUnmounter struct {
|
||||||
*downwardAPIVolume
|
*downwardAPIVolume
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,8 @@ import (
|
||||||
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const downwardAPIDir = "..data"
|
||||||
|
|
||||||
func formatMap(m map[string]string) (fmtstr string) {
|
func formatMap(m map[string]string) (fmtstr string) {
|
||||||
for key, value := range m {
|
for key, value := range m {
|
||||||
fmtstr += fmt.Sprintf("%v=%q\n", key, value)
|
fmtstr += fmt.Sprintf("%v=%q\n", key, value)
|
||||||
|
@ -407,7 +409,7 @@ func TestWriteTwiceNoUpdate(t *testing.T) {
|
||||||
// get the link of the link
|
// get the link of the link
|
||||||
var currentTarget string
|
var currentTarget string
|
||||||
if currentTarget, err = os.Readlink(path.Join(volumePath, downwardAPIDir)); err != nil {
|
if currentTarget, err = os.Readlink(path.Join(volumePath, downwardAPIDir)); err != nil {
|
||||||
t.Errorf(".current should be a link... %s\n", err.Error())
|
t.Errorf(".data should be a link... %s\n", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
err = mounter.SetUp(nil) // now re-run Setup
|
err = mounter.SetUp(nil) // now re-run Setup
|
||||||
|
@ -418,7 +420,7 @@ func TestWriteTwiceNoUpdate(t *testing.T) {
|
||||||
// get the link of the link
|
// get the link of the link
|
||||||
var currentTarget2 string
|
var currentTarget2 string
|
||||||
if currentTarget2, err = os.Readlink(path.Join(volumePath, downwardAPIDir)); err != nil {
|
if currentTarget2, err = os.Readlink(path.Join(volumePath, downwardAPIDir)); err != nil {
|
||||||
t.Errorf(".current should be a link... %s\n", err.Error())
|
t.Errorf(".data should be a link... %s\n", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
if currentTarget2 != currentTarget {
|
if currentTarget2 != currentTarget {
|
||||||
|
|
Loading…
Reference in New Issue