mirror of https://github.com/k3s-io/k3s
commit
4a7b0eecbb
|
@ -27,6 +27,7 @@ NUM_MINIONS=${NUM_MINIONS:-4}
|
|||
AWS_S3_REGION=${AWS_S3_REGION:-us-east-1}
|
||||
|
||||
INSTANCE_PREFIX="${KUBE_AWS_INSTANCE_PREFIX:-kubernetes}"
|
||||
CLUSTER_ID=${INSTANCE_PREFIX}
|
||||
AWS_SSH_KEY=${AWS_SSH_KEY:-$HOME/.ssh/kube_aws_rsa}
|
||||
IAM_PROFILE_MASTER="kubernetes-master"
|
||||
IAM_PROFILE_MINION="kubernetes-minion"
|
||||
|
|
|
@ -23,6 +23,7 @@ NUM_MINIONS=${NUM_MINIONS:-2}
|
|||
AWS_S3_REGION=${AWS_S3_REGION:-us-east-1}
|
||||
|
||||
INSTANCE_PREFIX="${KUBE_AWS_INSTANCE_PREFIX:-e2e-test-${USER}}"
|
||||
CLUSTER_ID=${INSTANCE_PREFIX}
|
||||
AWS_SSH_KEY=${AWS_SSH_KEY:-$HOME/.ssh/kube_aws_rsa}
|
||||
IAM_PROFILE_MASTER="kubernetes-master"
|
||||
IAM_PROFILE_MINION="kubernetes-minion"
|
||||
|
|
|
@ -7,6 +7,21 @@
|
|||
"Resource": [
|
||||
"arn:aws:s3:::kubernetes-*"
|
||||
]
|
||||
},
|
||||
{
|
||||
"Effect": "Allow",
|
||||
"Action": "ec2:Describe*",
|
||||
"Resource": "*"
|
||||
},
|
||||
{
|
||||
"Effect": "Allow",
|
||||
"Action": "ec2:AttachVolume",
|
||||
"Resource": "*"
|
||||
},
|
||||
{
|
||||
"Effect": "Allow",
|
||||
"Action": "ec2:DetachVolume",
|
||||
"Resource": "*"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
@ -36,12 +36,12 @@ function json_val {
|
|||
|
||||
# TODO (ayurchuk) Refactor the get_* functions to use filters
|
||||
# TODO (bburns) Parameterize this for multiple cluster per project
|
||||
function get_instance_ids {
|
||||
python -c "import json,sys; lst = [str(instance['InstanceId']) for reservation in json.load(sys.stdin)['Reservations'] for instance in reservation['Instances'] for tag in instance.get('Tags', []) if tag['Value'].startswith('${MASTER_TAG}') or tag['Value'].startswith('${MINION_TAG}')]; print ' '.join(lst)"
|
||||
}
|
||||
|
||||
function get_vpc_id {
|
||||
python -c 'import json,sys; lst = [str(vpc["VpcId"]) for vpc in json.load(sys.stdin)["Vpcs"] for tag in vpc.get("Tags", []) if tag["Value"] == "kubernetes-vpc"]; print "".join(lst)'
|
||||
$AWS_CMD --output text describe-vpcs \
|
||||
--filters Name=tag:Name,Values=kubernetes-vpc \
|
||||
Name=tag:KubernetesCluster,Values=${CLUSTER_ID} \
|
||||
--query Vpcs[].VpcId
|
||||
}
|
||||
|
||||
function get_subnet_id {
|
||||
|
@ -69,7 +69,9 @@ function expect_instance_states {
|
|||
function get_instance_public_ip {
|
||||
local tagName=$1
|
||||
$AWS_CMD --output text describe-instances \
|
||||
--filters Name=tag:Name,Values=${tagName} Name=instance-state-name,Values=running \
|
||||
--filters Name=tag:Name,Values=${tagName} \
|
||||
Name=instance-state-name,Values=running \
|
||||
Name=tag:KubernetesCluster,Values=${CLUSTER_ID} \
|
||||
--query Reservations[].Instances[].NetworkInterfaces[0].Association.PublicIp
|
||||
}
|
||||
|
||||
|
@ -371,7 +373,7 @@ function kube-up {
|
|||
|
||||
$AWS_CMD import-key-pair --key-name kubernetes --public-key-material "file://$AWS_SSH_KEY.pub" > $LOG 2>&1 || true
|
||||
|
||||
VPC_ID=$($AWS_CMD describe-vpcs | get_vpc_id)
|
||||
VPC_ID=$(get_vpc_id)
|
||||
|
||||
if [[ -z "$VPC_ID" ]]; then
|
||||
echo "Creating vpc."
|
||||
|
@ -379,6 +381,7 @@ function kube-up {
|
|||
$AWS_CMD modify-vpc-attribute --vpc-id $VPC_ID --enable-dns-support '{"Value": true}' > $LOG
|
||||
$AWS_CMD modify-vpc-attribute --vpc-id $VPC_ID --enable-dns-hostnames '{"Value": true}' > $LOG
|
||||
add-tag $VPC_ID Name kubernetes-vpc
|
||||
add-tag $VPC_ID KubernetesCluster ${CLUSTER_ID}
|
||||
fi
|
||||
|
||||
echo "Using VPC $VPC_ID"
|
||||
|
@ -467,6 +470,7 @@ function kube-up {
|
|||
--user-data file://${KUBE_TEMP}/master-start.sh | json_val '["Instances"][0]["InstanceId"]')
|
||||
add-tag $master_id Name $MASTER_NAME
|
||||
add-tag $master_id Role $MASTER_TAG
|
||||
add-tag $master_id KubernetesCluster ${CLUSTER_ID}
|
||||
|
||||
echo "Waiting for master to be ready"
|
||||
|
||||
|
@ -548,6 +552,7 @@ function kube-up {
|
|||
|
||||
add-tag $minion_id Name ${MINION_NAMES[$i]}
|
||||
add-tag $minion_id Role $MINION_TAG
|
||||
add-tag $minion_id KubernetesCluster ${CLUSTER_ID}
|
||||
|
||||
MINION_IDS[$i]=$minion_id
|
||||
done
|
||||
|
@ -700,25 +705,7 @@ EOF
|
|||
}
|
||||
|
||||
function kube-down {
|
||||
instance_ids=$($AWS_CMD describe-instances | get_instance_ids)
|
||||
if [[ -n ${instance_ids} ]]; then
|
||||
$AWS_CMD terminate-instances --instance-ids $instance_ids > $LOG
|
||||
echo "Waiting for instances deleted"
|
||||
while true; do
|
||||
instance_states=$($AWS_CMD describe-instances --instance-ids $instance_ids | expect_instance_states terminated)
|
||||
if [[ "$instance_states" == "" ]]; then
|
||||
echo "All instances terminated"
|
||||
break
|
||||
else
|
||||
echo "Instances not yet terminated: $instance_states"
|
||||
echo "Sleeping for 3 seconds..."
|
||||
sleep 3
|
||||
fi
|
||||
done
|
||||
fi
|
||||
|
||||
echo "Deleting VPC"
|
||||
vpc_id=$($AWS_CMD describe-vpcs | get_vpc_id)
|
||||
vpc_id=$(get_vpc_id)
|
||||
if [[ -n "${vpc_id}" ]]; then
|
||||
local elb_ids=$(get_elbs_in_vpc ${vpc_id})
|
||||
if [[ -n ${elb_ids} ]]; then
|
||||
|
@ -741,6 +728,27 @@ function kube-down {
|
|||
done
|
||||
fi
|
||||
|
||||
echo "Deleting instances in VPC: ${vpc_id}"
|
||||
instance_ids=$($AWS_CMD --output text describe-instances \
|
||||
--filters Name=vpc-id,Values=${vpc_id} \
|
||||
Name=tag:KubernetesCluster,Values=${CLUSTER_ID} \
|
||||
--query Reservations[].Instances[].InstanceId)
|
||||
if [[ -n ${instance_ids} ]]; then
|
||||
$AWS_CMD terminate-instances --instance-ids $instance_ids > $LOG
|
||||
echo "Waiting for instances to be deleted"
|
||||
while true; do
|
||||
instance_states=$($AWS_CMD describe-instances --instance-ids $instance_ids | expect_instance_states terminated)
|
||||
if [[ "$instance_states" == "" ]]; then
|
||||
echo "All instances deleted"
|
||||
break
|
||||
else
|
||||
echo "Instances not yet deleted: $instance_states"
|
||||
echo "Sleeping for 3 seconds..."
|
||||
sleep 3
|
||||
fi
|
||||
done
|
||||
fi
|
||||
|
||||
echo "Deleting VPC: ${vpc_id}"
|
||||
default_sg_id=$($AWS_CMD --output text describe-security-groups \
|
||||
--filters Name=vpc-id,Values=$vpc_id Name=group-name,Values=default \
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
{% if grains['cloud'] is defined and grains['cloud'] == 'aws' %}
|
||||
/usr/share/google:
|
||||
file.directory:
|
||||
- user: root
|
||||
- group: root
|
||||
- dir_mode: 755
|
||||
|
||||
/usr/share/google/safe_format_and_mount:
|
||||
file.managed:
|
||||
- source: salt://helpers/safe_format_and_mount
|
||||
- user: root
|
||||
- group: root
|
||||
- mode: 755
|
||||
{% endif %}
|
|
@ -0,0 +1,145 @@
|
|||
#! /bin/bash
|
||||
# Copyright 2013 Google Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Mount a disk, formatting it if necessary. If the disk looks like it may
|
||||
# have been formatted before, we will not format it.
|
||||
#
|
||||
# This script uses blkid and file to search for magic "formatted" bytes
|
||||
# at the beginning of the disk. Furthermore, it attempts to use fsck to
|
||||
# repair the filesystem before formatting it.
|
||||
|
||||
FSCK=fsck.ext4
|
||||
MOUNT_OPTIONS="discard,defaults"
|
||||
MKFS="mkfs.ext4 -E lazy_itable_init=0,lazy_journal_init=0 -F"
|
||||
if grep -q '6\..' /etc/redhat-release; then
|
||||
# lazy_journal_init is not recognized in redhat 6
|
||||
MKFS="mkfs.ext4 -E lazy_itable_init=0 -F"
|
||||
elif grep -q '7\..' /etc/redhat-release; then
|
||||
FSCK=fsck.xfs
|
||||
MKFS=mkfs.xfs
|
||||
fi
|
||||
|
||||
LOGTAG=safe_format_and_mount
|
||||
LOGFACILITY=user
|
||||
|
||||
function log() {
|
||||
local readonly severity=$1; shift;
|
||||
logger -t ${LOGTAG} -p ${LOGFACILITY}.${severity} -s "$@"
|
||||
}
|
||||
|
||||
function log_command() {
|
||||
local readonly log_file=$(mktemp)
|
||||
local readonly retcode
|
||||
log info "Running: $*"
|
||||
$* > ${log_file} 2>&1
|
||||
retcode=$?
|
||||
# only return the last 1000 lines of the logfile, just in case it's HUGE.
|
||||
tail -1000 ${log_file} | logger -t ${LOGTAG} -p ${LOGFACILITY}.info -s
|
||||
rm -f ${log_file}
|
||||
return ${retcode}
|
||||
}
|
||||
|
||||
function help() {
|
||||
cat >&2 <<EOF
|
||||
$0 [-f fsck_cmd] [-m mkfs_cmd] [-o mount_opts] <device> <mountpoint>
|
||||
EOF
|
||||
exit 0
|
||||
}
|
||||
|
||||
while getopts ":hf:o:m:" opt; do
|
||||
case $opt in
|
||||
h) help;;
|
||||
f) FSCK=$OPTARG;;
|
||||
o) MOUNT_OPTIONS=$OPTARG;;
|
||||
m) MKFS=$OPTARG;;
|
||||
-) break;;
|
||||
\?) log error "Invalid option: -${OPTARG}"; exit 1;;
|
||||
:) log "Option -${OPTARG} requires an argument."; exit 1;;
|
||||
esac
|
||||
done
|
||||
|
||||
shift $(($OPTIND - 1))
|
||||
readonly DISK=$1
|
||||
readonly MOUNTPOINT=$2
|
||||
|
||||
[[ -z ${DISK} ]] && help
|
||||
[[ -z ${MOUNTPOINT} ]] && help
|
||||
|
||||
function disk_looks_unformatted() {
|
||||
blkid ${DISK}
|
||||
if [[ $? == 0 ]]; then
|
||||
return 0
|
||||
fi
|
||||
|
||||
local readonly file_type=$(file --special-files ${DISK})
|
||||
case ${file_type} in
|
||||
*filesystem*)
|
||||
return 0;;
|
||||
esac
|
||||
|
||||
return 1
|
||||
}
|
||||
|
||||
function format_disk() {
|
||||
log_command ${MKFS} ${DISK}
|
||||
}
|
||||
|
||||
function try_repair_disk() {
|
||||
log_command ${FSCK} -a ${DISK}
|
||||
local readonly fsck_return=$?
|
||||
if [[ ${fsck_return} -ge 8 ]]; then
|
||||
log error "Fsck could not correct errors on ${DISK}"
|
||||
return 1
|
||||
fi
|
||||
if [[ ${fsck_return} -gt 0 ]]; then
|
||||
log warning "Fsck corrected errors on ${DISK}"
|
||||
fi
|
||||
return 0
|
||||
}
|
||||
|
||||
function try_mount() {
|
||||
local mount_retcode
|
||||
try_repair_disk
|
||||
|
||||
log_command mount -o ${MOUNT_OPTIONS} ${DISK} ${MOUNTPOINT}
|
||||
mount_retcode=$?
|
||||
if [[ ${mount_retcode} == 0 ]]; then
|
||||
return 0
|
||||
fi
|
||||
|
||||
# Check to see if it looks like a filesystem before formatting it.
|
||||
disk_looks_unformatted ${DISK}
|
||||
if [[ $? == 0 ]]; then
|
||||
log error "Disk ${DISK} looks formatted but won't mount. Giving up."
|
||||
return ${mount_retcode}
|
||||
fi
|
||||
|
||||
# The disk looks like it's not been formatted before.
|
||||
format_disk
|
||||
if [[ $? != 0 ]]; then
|
||||
log error "Format of ${DISK} failed."
|
||||
fi
|
||||
|
||||
log_command mount -o ${MOUNT_OPTIONS} ${DISK} ${MOUNTPOINT}
|
||||
mount_retcode=$?
|
||||
if [[ ${mount_retcode} == 0 ]]; then
|
||||
return 0
|
||||
fi
|
||||
log error "Tried everything we could, but could not mount ${DISK}."
|
||||
return ${mount_retcode}
|
||||
}
|
||||
|
||||
try_mount
|
||||
exit $?
|
|
@ -11,6 +11,7 @@ base:
|
|||
{% else %}
|
||||
- sdn
|
||||
{% endif %}
|
||||
- helpers
|
||||
- cadvisor
|
||||
- kubelet
|
||||
- kube-proxy
|
||||
|
|
|
@ -17,10 +17,13 @@ limitations under the License.
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
goruntime "runtime"
|
||||
"strings"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/test/e2e"
|
||||
"github.com/golang/glog"
|
||||
|
@ -28,8 +31,8 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
context = &e2e.TestContextType{}
|
||||
gceConfig = &context.GCEConfig
|
||||
context = &e2e.TestContextType{}
|
||||
cloudConfig = &context.CloudConfig
|
||||
|
||||
orderseed = flag.Int64("orderseed", 0, "If non-zero, seed of random test shuffle order. (Otherwise random.)")
|
||||
reportDir = flag.String("report_dir", "", "Path to the directory where the JUnit XML reports should be saved. Default is empty, which doesn't generate these reports.")
|
||||
|
@ -47,9 +50,11 @@ func init() {
|
|||
flag.StringVar(&context.Host, "host", "", "The host, or apiserver, to connect to")
|
||||
flag.StringVar(&context.RepoRoot, "repo_root", "./", "Root directory of kubernetes repository, for finding test files. Default assumes working directory is repository root")
|
||||
flag.StringVar(&context.Provider, "provider", "", "The name of the Kubernetes provider (gce, gke, local, vagrant, etc.)")
|
||||
flag.StringVar(&gceConfig.MasterName, "kube_master", "", "Name of the kubernetes master. Only required if provider is gce or gke")
|
||||
flag.StringVar(&gceConfig.ProjectID, "gce_project", "", "The GCE project being used, if applicable")
|
||||
flag.StringVar(&gceConfig.Zone, "gce_zone", "", "GCE zone being used, if applicable")
|
||||
|
||||
// TODO: Flags per provider? Rename gce_project/gce_zone?
|
||||
flag.StringVar(&cloudConfig.MasterName, "kube_master", "", "Name of the kubernetes master. Only required if provider is gce or gke")
|
||||
flag.StringVar(&cloudConfig.ProjectID, "gce_project", "", "The GCE project being used, if applicable")
|
||||
flag.StringVar(&cloudConfig.Zone, "gce_zone", "", "GCE zone being used, if applicable")
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
@ -63,5 +68,22 @@ func main() {
|
|||
glog.Error("Invalid --times (negative or no testing requested)!")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if context.Provider == "aws" {
|
||||
awsConfig := "[Global]\n"
|
||||
if cloudConfig.Zone == "" {
|
||||
glog.Error("gce_zone must be specified for AWS")
|
||||
os.Exit(1)
|
||||
}
|
||||
awsConfig += fmt.Sprintf("Zone=%s\n", cloudConfig.Zone)
|
||||
|
||||
var err error
|
||||
cloudConfig.Provider, err = cloudprovider.GetCloudProvider(context.Provider, strings.NewReader(awsConfig))
|
||||
if err != nil {
|
||||
glog.Error("Error building AWS provider: ", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
e2e.RunE2ETests(context, *orderseed, *times, *reportDir, testList)
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network/exec"
|
||||
// Volume plugins
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume/aws_ebs"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume/empty_dir"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume/gce_pd"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume/git_repo"
|
||||
|
@ -49,6 +50,7 @@ func ProbeVolumePlugins() []volume.VolumePlugin {
|
|||
// The list of plugins to probe is decided by the kubelet binary, not
|
||||
// by dynamic linking or other "magic". Plugins will be analyzed and
|
||||
// initialized later.
|
||||
allPlugins = append(allPlugins, aws_ebs.ProbeVolumePlugins()...)
|
||||
allPlugins = append(allPlugins, empty_dir.ProbeVolumePlugins()...)
|
||||
allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...)
|
||||
allPlugins = append(allPlugins, git_repo.ProbeVolumePlugins()...)
|
||||
|
|
|
@ -173,7 +173,7 @@ func FuzzerFor(t *testing.T, version string, src rand.Source) *fuzz.Fuzzer {
|
|||
func(vs *api.VolumeSource, c fuzz.Continue) {
|
||||
// Exactly one of the fields should be set.
|
||||
//FIXME: the fuzz can still end up nil. What if fuzz allowed me to say that?
|
||||
fuzzOneOf(c, &vs.HostPath, &vs.EmptyDir, &vs.GCEPersistentDisk, &vs.GitRepo, &vs.Secret, &vs.NFS, &vs.ISCSI, &vs.Glusterfs)
|
||||
fuzzOneOf(c, &vs.HostPath, &vs.EmptyDir, &vs.GCEPersistentDisk, &vs.AWSElasticBlockStore, &vs.GitRepo, &vs.Secret, &vs.NFS, &vs.ISCSI, &vs.Glusterfs)
|
||||
},
|
||||
func(d *api.DNSPolicy, c fuzz.Continue) {
|
||||
policies := []api.DNSPolicy{api.DNSClusterFirst, api.DNSDefault}
|
||||
|
|
|
@ -189,6 +189,9 @@ type VolumeSource struct {
|
|||
// GCEPersistentDisk represents a GCE Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"gcePersistentDisk"`
|
||||
// AWSElasticBlockStore represents an AWS EBS disk that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
AWSElasticBlockStore *AWSElasticBlockStoreVolumeSource `json:"awsElasticBlockStore"`
|
||||
// GitRepo represents a git repository at a particular revision.
|
||||
GitRepo *GitRepoVolumeSource `json:"gitRepo"`
|
||||
// Secret represents a secret that should populate this volume.
|
||||
|
@ -208,6 +211,9 @@ type PersistentVolumeSource struct {
|
|||
// GCEPersistentDisk represents a GCE Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"gcePersistentDisk"`
|
||||
// AWSElasticBlockStore represents an AWS EBS disk that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
AWSElasticBlockStore *AWSElasticBlockStoreVolumeSource `json:"awsElasticBlockStore"`
|
||||
// HostPath represents a directory on the host.
|
||||
// This is useful for development and testing only.
|
||||
// on-host storage is not supported in any way
|
||||
|
@ -394,6 +400,28 @@ type ISCSIVolumeSource struct {
|
|||
ReadOnly bool `json:"readOnly,omitempty"`
|
||||
}
|
||||
|
||||
// AWSElasticBlockStoreVolumeSource represents a Persistent Disk resource in AWS.
|
||||
//
|
||||
// An AWS EBS disk must exist and be formatted before mounting to a container.
|
||||
// The disk must also be in the same AWS zone as the kubelet.
|
||||
// A AWS EBS disk can only be mounted as read/write once.
|
||||
type AWSElasticBlockStoreVolumeSource struct {
|
||||
// Unique id of the persistent disk resource. Used to identify the disk in AWS
|
||||
VolumeID string `json:"volumeID"`
|
||||
// Required: Filesystem type to mount.
|
||||
// Must be a filesystem type supported by the host operating system.
|
||||
// Ex. "ext4", "xfs", "ntfs"
|
||||
// TODO: how do we prevent errors in the filesystem from compromising the machine
|
||||
FSType string `json:"fsType,omitempty"`
|
||||
// Optional: Partition on the disk to mount.
|
||||
// If omitted, kubelet will attempt to mount the device name.
|
||||
// Ex. For /dev/sda1, this field is "1", for /dev/sda, this field is 0 or empty.
|
||||
Partition int `json:"partition,omitempty"`
|
||||
// Optional: Defaults to false (read/write). ReadOnly here will force
|
||||
// the ReadOnly setting in VolumeMounts.
|
||||
ReadOnly bool `json:"readOnly,omitempty"`
|
||||
}
|
||||
|
||||
// GitRepoVolumeSource represents a volume that is pulled from git when the pod is created.
|
||||
type GitRepoVolumeSource struct {
|
||||
// Repository URL
|
||||
|
|
|
@ -1170,6 +1170,9 @@ func init() {
|
|||
if err := s.Convert(&in.ISCSI, &out.ISCSI, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.Convert(&in.AWSElasticBlockStore, &out.AWSElasticBlockStore, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.Convert(&in.HostPath, &out.HostDir, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1197,6 +1200,9 @@ func init() {
|
|||
if err := s.Convert(&in.ISCSI, &out.ISCSI, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.Convert(&in.AWSElasticBlockStore, &out.AWSElasticBlockStore, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.Convert(&in.HostDir, &out.HostPath, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -88,7 +88,7 @@ type Volume struct {
|
|||
// Source represents the location and type of a volume to mount.
|
||||
// This is optional for now. If not specified, the Volume is implied to be an EmptyDir.
|
||||
// This implied behavior is deprecated and will be removed in a future version.
|
||||
Source VolumeSource `json:"source,omitempty" description:"location and type of volume to mount; at most one of HostDir, EmptyDir, GCEPersistentDisk, or GitRepo; default is EmptyDir"`
|
||||
Source VolumeSource `json:"source,omitempty" description:"location and type of volume to mount; at most one of HostDir, EmptyDir, GCEPersistentDisk, AWSElasticBlockStore, or GitRepo; default is EmptyDir"`
|
||||
}
|
||||
|
||||
// VolumeSource represents the source location of a volume to mount.
|
||||
|
@ -105,6 +105,9 @@ type VolumeSource struct {
|
|||
// GCEPersistentDisk represents a GCE Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"persistentDisk" description:"GCE disk resource attached to the host machine on demand"`
|
||||
// AWSElasticBlockStore represents an AWS Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
AWSElasticBlockStore *AWSElasticBlockStoreVolumeSource `json:"awsElasticBlockStore" description:"AWS disk resource attached to the host machine on demand"`
|
||||
// GitRepo represents a git repository at a particular revision.
|
||||
GitRepo *GitRepoVolumeSource `json:"gitRepo" description:"git repository at a particular revision"`
|
||||
// Secret represents a secret to populate the volume with
|
||||
|
@ -124,6 +127,9 @@ type PersistentVolumeSource struct {
|
|||
// GCEPersistentDisk represents a GCE Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"persistentDisk" description:"GCE disk resource provisioned by an admin"`
|
||||
// AWSElasticBlockStore represents an AWS EBS volume that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
AWSElasticBlockStore *AWSElasticBlockStoreVolumeSource `json:"awsElasticBlockStore" description:"AWS disk resource provisioned by an admin"`
|
||||
// HostPath represents a directory on the host.
|
||||
// This is useful for development and testing only.
|
||||
// on-host storage is not supported in any way.
|
||||
|
@ -302,6 +308,29 @@ type ISCSIVolumeSource struct {
|
|||
ReadOnly bool `json:"readOnly,omitempty" description:"read-only if true, read-write otherwise (false or unspecified)"`
|
||||
}
|
||||
|
||||
// AWSElasticBlockStoreVolumeSource represents a Persistent Disk resource in AWS.
|
||||
//
|
||||
// An AWS PD must exist and be formatted before mounting to a container.
|
||||
// The disk must also be in the same AWS zone as the kubelet.
|
||||
// A AWS PD can only be mounted on a single machine.
|
||||
type AWSElasticBlockStoreVolumeSource struct {
|
||||
// Unique id of the PD resource. Used to identify the disk in AWS
|
||||
VolumeID string `json:"volumeID" description:"unique id of the PD resource in AWS"`
|
||||
// Required: Filesystem type to mount.
|
||||
// Must be a filesystem type supported by the host operating system.
|
||||
// Ex. "ext4", "xfs", "ntfs"
|
||||
// TODO: how do we prevent errors in the filesystem from compromising the machine
|
||||
// TODO: why omitempty if required?
|
||||
FSType string `json:"fsType,omitempty" description:"file system type to mount, such as ext4, xfs, ntfs"`
|
||||
// Optional: Partition on the disk to mount.
|
||||
// If omitted, kubelet will attempt to mount the device name.
|
||||
// Ex. For /dev/sda1, this field is "1", for /dev/sda, this field 0 or empty.
|
||||
Partition int `json:"partition,omitempty" description:"partition on the disk to mount (e.g., '1' for /dev/sda1); if omitted the plain device name (e.g., /dev/sda) will be mounted"`
|
||||
// Optional: Defaults to false (read/write). ReadOnly here will force
|
||||
// the ReadOnly setting in VolumeMounts.
|
||||
ReadOnly bool `json:"readOnly,omitempty" description:"read-only if true, read-write otherwise (false or unspecified)"`
|
||||
}
|
||||
|
||||
// GitRepoVolumeSource represents a volume that is pulled from git when the pod is created.
|
||||
type GitRepoVolumeSource struct {
|
||||
// Repository URL
|
||||
|
|
|
@ -1097,6 +1097,9 @@ func init() {
|
|||
if err := s.Convert(&in.GCEPersistentDisk, &out.GCEPersistentDisk, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.Convert(&in.AWSElasticBlockStore, &out.AWSElasticBlockStore, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.Convert(&in.HostPath, &out.HostDir, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1121,6 +1124,9 @@ func init() {
|
|||
if err := s.Convert(&in.GCEPersistentDisk, &out.GCEPersistentDisk, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.Convert(&in.AWSElasticBlockStore, &out.AWSElasticBlockStore, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.Convert(&in.ISCSI, &out.ISCSI, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ type Volume struct {
|
|||
// Source represents the location and type of a volume to mount.
|
||||
// This is optional for now. If not specified, the Volume is implied to be an EmptyDir.
|
||||
// This implied behavior is deprecated and will be removed in a future version.
|
||||
Source VolumeSource `json:"source,omitempty" description:"location and type of volume to mount; at most one of HostDir, EmptyDir, GCEPersistentDisk, or GitRepo; default is EmptyDir"`
|
||||
Source VolumeSource `json:"source,omitempty" description:"location and type of volume to mount; at most one of HostDir, EmptyDir, GCEPersistentDisk, AWSElasticBlockStore, or GitRepo; default is EmptyDir"`
|
||||
}
|
||||
|
||||
// VolumeSource represents the source location of a volume to mount.
|
||||
|
@ -74,6 +74,9 @@ type VolumeSource struct {
|
|||
// A persistent disk that is mounted to the
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"persistentDisk" description:"GCE disk resource attached to the host machine on demand"`
|
||||
// An AWS persistent disk that is mounted to the
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
AWSElasticBlockStore *AWSElasticBlockStoreVolumeSource `json:"awsElasticBlockStore" description:"AWS disk resource attached to the host machine on demand"`
|
||||
// GitRepo represents a git repository at a particular revision.
|
||||
GitRepo *GitRepoVolumeSource `json:"gitRepo" description:"git repository at a particular revision"`
|
||||
// Secret is a secret to populate the volume with
|
||||
|
@ -93,6 +96,9 @@ type PersistentVolumeSource struct {
|
|||
// GCEPersistentDisk represents a GCE Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"persistentDisk" description:"GCE disk resource provisioned by an admin"`
|
||||
// AWSElasticBlockStore represents an AWS Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
AWSElasticBlockStore *AWSElasticBlockStoreVolumeSource `json:"awsElasticBlockStore" description:"AWS disk resource provisioned by an admin"`
|
||||
// HostPath represents a directory on the host.
|
||||
// This is useful for development and testing only.
|
||||
// on-host storage is not supported in any way.
|
||||
|
@ -284,6 +290,29 @@ type GCEPersistentDiskVolumeSource struct {
|
|||
ReadOnly bool `json:"readOnly,omitempty" description:"read-only if true, read-write otherwise (false or unspecified)"`
|
||||
}
|
||||
|
||||
// AWSElasticBlockStoreVolumeSource represents a Persistent Disk resource in AWS.
|
||||
//
|
||||
// An AWS PD must exist and be formatted before mounting to a container.
|
||||
// The disk must also be in the same AWS zone as the kubelet.
|
||||
// A AWS PD can only be mounted on a single machine.
|
||||
type AWSElasticBlockStoreVolumeSource struct {
|
||||
// Unique id of the PD resource. Used to identify the disk in AWS
|
||||
VolumeID string `json:"volumeID" description:"unique id of the PD resource in AWS"`
|
||||
// Required: Filesystem type to mount.
|
||||
// Must be a filesystem type supported by the host operating system.
|
||||
// Ex. "ext4", "xfs", "ntfs"
|
||||
// TODO: how do we prevent errors in the filesystem from compromising the machine
|
||||
// TODO: why omitempty if required?
|
||||
FSType string `json:"fsType,omitempty" description:"file system type to mount, such as ext4, xfs, ntfs"`
|
||||
// Optional: Partition on the disk to mount.
|
||||
// If omitted, kubelet will attempt to mount the device name.
|
||||
// Ex. For /dev/sda1, this field is "1", for /dev/sda, this field 0 or empty.
|
||||
Partition int `json:"partition,omitempty" description:"partition on the disk to mount (e.g., '1' for /dev/sda1); if omitted the plain device name (e.g., /dev/sda) will be mounted"`
|
||||
// Optional: Defaults to false (read/write). ReadOnly here will force
|
||||
// the ReadOnly setting in VolumeMounts.
|
||||
ReadOnly bool `json:"readOnly,omitempty" description:"read-only if true, read-write otherwise (false or unspecified)"`
|
||||
}
|
||||
|
||||
// GitRepoVolumeSource represents a volume that is pulled from git when the pod is created.
|
||||
type GitRepoVolumeSource struct {
|
||||
// Repository URL
|
||||
|
|
|
@ -206,6 +206,9 @@ type VolumeSource struct {
|
|||
// GCEPersistentDisk represents a GCE Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"gcePersistentDisk" description:"GCE disk resource attached to the host machine on demand"`
|
||||
// AWSElasticBlockStore represents an AWS Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
AWSElasticBlockStore *AWSElasticBlockStoreVolumeSource `json:"awsElasticBlockStore" description:"AWS disk resource attached to the host machine on demand"`
|
||||
// GitRepo represents a git repository at a particular revision.
|
||||
GitRepo *GitRepoVolumeSource `json:"gitRepo" description:"git repository at a particular revision"`
|
||||
// Secret represents a secret that should populate this volume.
|
||||
|
@ -225,6 +228,9 @@ type PersistentVolumeSource struct {
|
|||
// GCEPersistentDisk represents a GCE Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
GCEPersistentDisk *GCEPersistentDiskVolumeSource `json:"gcePersistentDisk" description:"GCE disk resource provisioned by an admin"`
|
||||
// AWSElasticBlockStore represents an AWS Disk resource that is attached to a
|
||||
// kubelet's host machine and then exposed to the pod.
|
||||
AWSElasticBlockStore *AWSElasticBlockStoreVolumeSource `json:"awsElasticBlockStore" description:"AWS disk resource provisioned by an admin"`
|
||||
// HostPath represents a directory on the host.
|
||||
// This is useful for development and testing only.
|
||||
// on-host storage is not supported in any way.
|
||||
|
@ -400,6 +406,29 @@ type GCEPersistentDiskVolumeSource struct {
|
|||
ReadOnly bool `json:"readOnly,omitempty" description:"read-only if true, read-write otherwise (false or unspecified)"`
|
||||
}
|
||||
|
||||
// AWSElasticBlockStoreVolumeSource represents a Persistent Disk resource in AWS.
|
||||
//
|
||||
// An AWS PD must exist and be formatted before mounting to a container.
|
||||
// The disk must also be in the same AWS zone as the kubelet.
|
||||
// A AWS PD can only be mounted on a single machine.
|
||||
type AWSElasticBlockStoreVolumeSource struct {
|
||||
// Unique id of the PD resource. Used to identify the disk in AWS
|
||||
VolumeID string `json:"volumeID" description:"unique id of the PD resource in AWS"`
|
||||
// Required: Filesystem type to mount.
|
||||
// Must be a filesystem type supported by the host operating system.
|
||||
// Ex. "ext4", "xfs", "ntfs"
|
||||
// TODO: how do we prevent errors in the filesystem from compromising the machine
|
||||
// TODO: why omitempty if required?
|
||||
FSType string `json:"fsType,omitempty" description:"file system type to mount, such as ext4, xfs, ntfs"`
|
||||
// Optional: Partition on the disk to mount.
|
||||
// If omitted, kubelet will attempt to mount the device name.
|
||||
// Ex. For /dev/sda1, this field is "1", for /dev/sda, this field 0 or empty.
|
||||
Partition int `json:"partition,omitempty" description:"partition on the disk to mount (e.g., '1' for /dev/sda1); if omitted the plain device name (e.g., /dev/sda) will be mounted"`
|
||||
// Optional: Defaults to false (read/write). ReadOnly here will force
|
||||
// the ReadOnly setting in VolumeMounts.
|
||||
ReadOnly bool `json:"readOnly,omitempty" description:"read-only if true, read-write otherwise (false or unspecified)"`
|
||||
}
|
||||
|
||||
// GitRepoVolumeSource represents a volume that is pulled from git when the pod is created.
|
||||
type GitRepoVolumeSource struct {
|
||||
// Repository URL
|
||||
|
|
|
@ -299,6 +299,10 @@ func validateSource(source *api.VolumeSource) errs.ValidationErrorList {
|
|||
numVolumes++
|
||||
allErrs = append(allErrs, validateGCEPersistentDiskVolumeSource(source.GCEPersistentDisk).Prefix("persistentDisk")...)
|
||||
}
|
||||
if source.AWSElasticBlockStore != nil {
|
||||
numVolumes++
|
||||
allErrs = append(allErrs, validateAWSElasticBlockStoreVolumeSource(source.AWSElasticBlockStore).Prefix("awsElasticBlockStore")...)
|
||||
}
|
||||
if source.Secret != nil {
|
||||
numVolumes++
|
||||
allErrs = append(allErrs, validateSecretVolumeSource(source.Secret).Prefix("secret")...)
|
||||
|
@ -368,6 +372,20 @@ func validateGCEPersistentDiskVolumeSource(PD *api.GCEPersistentDiskVolumeSource
|
|||
return allErrs
|
||||
}
|
||||
|
||||
func validateAWSElasticBlockStoreVolumeSource(PD *api.AWSElasticBlockStoreVolumeSource) errs.ValidationErrorList {
|
||||
allErrs := errs.ValidationErrorList{}
|
||||
if PD.VolumeID == "" {
|
||||
allErrs = append(allErrs, errs.NewFieldRequired("volumeID"))
|
||||
}
|
||||
if PD.FSType == "" {
|
||||
allErrs = append(allErrs, errs.NewFieldRequired("fsType"))
|
||||
}
|
||||
if PD.Partition < 0 || PD.Partition > 255 {
|
||||
allErrs = append(allErrs, errs.NewFieldInvalid("partition", PD.Partition, pdPartitionErrorMsg))
|
||||
}
|
||||
return allErrs
|
||||
}
|
||||
|
||||
func validateSecretVolumeSource(secretSource *api.SecretVolumeSource) errs.ValidationErrorList {
|
||||
allErrs := errs.ValidationErrorList{}
|
||||
if secretSource.SecretName == "" {
|
||||
|
@ -426,6 +444,10 @@ func ValidatePersistentVolume(pv *api.PersistentVolume) errs.ValidationErrorList
|
|||
numVolumes++
|
||||
allErrs = append(allErrs, validateGCEPersistentDiskVolumeSource(pv.Spec.GCEPersistentDisk).Prefix("persistentDisk")...)
|
||||
}
|
||||
if pv.Spec.AWSElasticBlockStore != nil {
|
||||
numVolumes++
|
||||
allErrs = append(allErrs, validateAWSElasticBlockStoreVolumeSource(pv.Spec.AWSElasticBlockStore).Prefix("awsElasticBlockStore")...)
|
||||
}
|
||||
if numVolumes != 1 {
|
||||
allErrs = append(allErrs, errs.NewFieldInvalid("", pv.Spec.PersistentVolumeSource, "exactly 1 volume type is required"))
|
||||
}
|
||||
|
@ -1021,6 +1043,7 @@ func ValidateReadOnlyPersistentDisks(volumes []api.Volume) errs.ValidationErrorL
|
|||
allErrs = append(allErrs, errs.NewFieldInvalid("GCEPersistentDisk.ReadOnly", false, "ReadOnly must be true for replicated pods > 1, as GCE PD can only be mounted on multiple machines if it is read-only."))
|
||||
}
|
||||
}
|
||||
// TODO: What to do for AWS? It doesn't support replicas
|
||||
}
|
||||
return allErrs
|
||||
}
|
||||
|
|
|
@ -516,6 +516,7 @@ func TestValidateVolumes(t *testing.T) {
|
|||
{Name: "abc-123", VolumeSource: api.VolumeSource{HostPath: &api.HostPathVolumeSource{"/mnt/path3"}}},
|
||||
{Name: "empty", VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{}}},
|
||||
{Name: "gcepd", VolumeSource: api.VolumeSource{GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{"my-PD", "ext4", 1, false}}},
|
||||
{Name: "awsebs", VolumeSource: api.VolumeSource{AWSElasticBlockStore: &api.AWSElasticBlockStoreVolumeSource{"my-PD", "ext4", 1, false}}},
|
||||
{Name: "gitrepo", VolumeSource: api.VolumeSource{GitRepo: &api.GitRepoVolumeSource{"my-repo", "hashstring"}}},
|
||||
{Name: "iscsidisk", VolumeSource: api.VolumeSource{ISCSI: &api.ISCSIVolumeSource{"127.0.0.1", "iqn.2015-02.example.com:test", 1, "ext4", false}}},
|
||||
{Name: "secret", VolumeSource: api.VolumeSource{Secret: &api.SecretVolumeSource{"my-secret"}}},
|
||||
|
|
|
@ -17,11 +17,15 @@ limitations under the License.
|
|||
package aws_cloud
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"code.google.com/p/gcfg"
|
||||
"github.com/mitchellh/goamz/aws"
|
||||
|
@ -38,6 +42,18 @@ import (
|
|||
type EC2 interface {
|
||||
// Query EC2 for instances matching the filter
|
||||
Instances(instIds []string, filter *ec2InstanceFilter) (resp *ec2.InstancesResp, err error)
|
||||
|
||||
// Attach a volume to an instance
|
||||
AttachVolume(volumeID string, instanceId string, mountDevice string) (resp *ec2.AttachVolumeResp, err error)
|
||||
// Detach a volume from whatever instance it is attached to
|
||||
// TODO: We should specify the InstanceID and the Device, for safety
|
||||
DetachVolume(volumeID string) (resp *ec2.SimpleResp, err error)
|
||||
// Lists volumes
|
||||
Volumes(volumeIDs []string, filter *ec2.Filter) (resp *ec2.VolumesResp, err error)
|
||||
// Create an EBS volume
|
||||
CreateVolume(request *ec2.CreateVolume) (resp *ec2.CreateVolumeResp, err error)
|
||||
// Delete an EBS volume
|
||||
DeleteVolume(volumeID string) (resp *ec2.SimpleResp, err error)
|
||||
}
|
||||
|
||||
// Abstraction over the AWS metadata service
|
||||
|
@ -46,12 +62,37 @@ type AWSMetadata interface {
|
|||
GetMetaData(key string) ([]byte, error)
|
||||
}
|
||||
|
||||
type VolumeOptions struct {
|
||||
CapacityMB int
|
||||
}
|
||||
|
||||
// Volumes is an interface for managing cloud-provisioned volumes
|
||||
type Volumes interface {
|
||||
// Attach the disk to the specified instance
|
||||
// instanceName can be empty to mean "the instance on which we are running"
|
||||
// Returns the device (e.g. /dev/xvdf) where we attached the volume
|
||||
AttachDisk(instanceName string, volumeName string, readOnly bool) (string, error)
|
||||
// Detach the disk from the specified instance
|
||||
// instanceName can be empty to mean "the instance on which we are running"
|
||||
DetachDisk(instanceName string, volumeName string) error
|
||||
|
||||
// Create a volume with the specified options
|
||||
CreateVolume(volumeOptions *VolumeOptions) (volumeName string, err error)
|
||||
DeleteVolume(volumeName string) error
|
||||
}
|
||||
|
||||
// AWSCloud is an implementation of Interface, TCPLoadBalancer and Instances for Amazon Web Services.
|
||||
type AWSCloud struct {
|
||||
ec2 EC2
|
||||
metadata AWSMetadata
|
||||
cfg *AWSCloudConfig
|
||||
availabilityZone string
|
||||
region aws.Region
|
||||
|
||||
// The AWS instance that we are running on
|
||||
selfAWSInstance *awsInstance
|
||||
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
type AWSCloudConfig struct {
|
||||
|
@ -76,12 +117,12 @@ func (f *ec2InstanceFilter) Matches(instance ec2.Instance) bool {
|
|||
}
|
||||
|
||||
// goamzEC2 is an implementation of the EC2 interface, backed by goamz
|
||||
type GoamzEC2 struct {
|
||||
type goamzEC2 struct {
|
||||
ec2 *ec2.EC2
|
||||
}
|
||||
|
||||
// Implementation of EC2.Instances
|
||||
func (self *GoamzEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) (resp *ec2.InstancesResp, err error) {
|
||||
func (self *goamzEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) (resp *ec2.InstancesResp, err error) {
|
||||
var goamzFilter *ec2.Filter
|
||||
if filter != nil {
|
||||
goamzFilter = ec2.NewFilter()
|
||||
|
@ -106,6 +147,26 @@ func (self *goamzMetadata) GetMetaData(key string) ([]byte, error) {
|
|||
|
||||
type AuthFunc func() (auth aws.Auth, err error)
|
||||
|
||||
func (s *goamzEC2) AttachVolume(volumeID string, instanceId string, device string) (resp *ec2.AttachVolumeResp, err error) {
|
||||
return s.ec2.AttachVolume(volumeID, instanceId, device)
|
||||
}
|
||||
|
||||
func (s *goamzEC2) DetachVolume(volumeID string) (resp *ec2.SimpleResp, err error) {
|
||||
return s.ec2.DetachVolume(volumeID)
|
||||
}
|
||||
|
||||
func (s *goamzEC2) Volumes(volumeIDs []string, filter *ec2.Filter) (resp *ec2.VolumesResp, err error) {
|
||||
return s.ec2.Volumes(volumeIDs, filter)
|
||||
}
|
||||
|
||||
func (s *goamzEC2) CreateVolume(request *ec2.CreateVolume) (resp *ec2.CreateVolumeResp, err error) {
|
||||
return s.ec2.CreateVolume(request)
|
||||
}
|
||||
|
||||
func (s *goamzEC2) DeleteVolume(volumeID string) (resp *ec2.SimpleResp, err error) {
|
||||
return s.ec2.DeleteVolume(volumeID)
|
||||
}
|
||||
|
||||
func init() {
|
||||
cloudprovider.RegisterCloudProvider("aws", func(config io.Reader) (cloudprovider.Interface, error) {
|
||||
metadata := &goamzMetadata{}
|
||||
|
@ -157,6 +218,7 @@ func getAvailabilityZone(metadata AWSMetadata) (string, error) {
|
|||
}
|
||||
|
||||
// newAWSCloud creates a new instance of AWSCloud.
|
||||
// authFunc and instanceId are primarily for tests
|
||||
func newAWSCloud(config io.Reader, authFunc AuthFunc, metadata AWSMetadata) (*AWSCloud, error) {
|
||||
cfg, err := readAWSCloudConfig(config, metadata)
|
||||
if err != nil {
|
||||
|
@ -179,12 +241,17 @@ func newAWSCloud(config io.Reader, authFunc AuthFunc, metadata AWSMetadata) (*AW
|
|||
return nil, fmt.Errorf("not a valid AWS zone (unknown region): %s", zone)
|
||||
}
|
||||
|
||||
return &AWSCloud{
|
||||
ec2: &GoamzEC2{ec2: ec2.New(auth, region)},
|
||||
ec2 := &goamzEC2{ec2: ec2.New(auth, region)}
|
||||
|
||||
awsCloud := &AWSCloud{
|
||||
ec2: ec2,
|
||||
cfg: cfg,
|
||||
region: region,
|
||||
availabilityZone: zone,
|
||||
}, nil
|
||||
metadata: metadata,
|
||||
}
|
||||
|
||||
return awsCloud, nil
|
||||
}
|
||||
|
||||
func (aws *AWSCloud) Clusters() (cloudprovider.Clusters, bool) {
|
||||
|
@ -377,7 +444,7 @@ func getResourcesByInstanceType(instanceType string) (*api.NodeResources, error)
|
|||
return makeNodeResources("t1", 0.125, 0.615)
|
||||
|
||||
// t2: Burstable
|
||||
// TODO: The ECUs are fake values (because they are burstable), so this is just a guess...
|
||||
// TODO: The ECUs are fake values (because they are burstable), so this is just a guess...
|
||||
case "t2.micro":
|
||||
return makeNodeResources("t2", 0.25, 1)
|
||||
case "t2.small":
|
||||
|
@ -506,3 +573,383 @@ func (self *AWSCloud) GetZone() (cloudprovider.Zone, error) {
|
|||
Region: self.region.Name,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Abstraction around AWS Instance Types
|
||||
// There isn't an API to get information for a particular instance type (that I know of)
|
||||
type awsInstanceType struct {
|
||||
}
|
||||
|
||||
// TODO: Also return number of mounts allowed?
|
||||
func (self *awsInstanceType) getEBSMountDevices() []string {
|
||||
// See: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/block-device-mapping-concepts.html
|
||||
devices := []string{}
|
||||
for c := 'f'; c <= 'p'; c++ {
|
||||
devices = append(devices, fmt.Sprintf("/dev/sd%c", c))
|
||||
}
|
||||
return devices
|
||||
}
|
||||
|
||||
type awsInstance struct {
|
||||
ec2 EC2
|
||||
|
||||
// id in AWS
|
||||
awsID string
|
||||
|
||||
mutex sync.Mutex
|
||||
|
||||
// We must cache because otherwise there is a race condition,
|
||||
// where we assign a device mapping and then get a second request before we attach the volume
|
||||
deviceMappings map[string]string
|
||||
}
|
||||
|
||||
func newAWSInstance(ec2 EC2, awsID string) *awsInstance {
|
||||
self := &awsInstance{ec2: ec2, awsID: awsID}
|
||||
|
||||
// We lazy-init deviceMappings
|
||||
self.deviceMappings = nil
|
||||
|
||||
return self
|
||||
}
|
||||
|
||||
// Gets the awsInstanceType that models the instance type of this instance
|
||||
func (self *awsInstance) getInstanceType() *awsInstanceType {
|
||||
// TODO: Make this real
|
||||
awsInstanceType := &awsInstanceType{}
|
||||
return awsInstanceType
|
||||
}
|
||||
|
||||
// Gets the full information about this instance from the EC2 API
|
||||
func (self *awsInstance) getInfo() (*ec2.Instance, error) {
|
||||
resp, err := self.ec2.Instances([]string{self.awsID}, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error querying ec2 for instance info: %v", err)
|
||||
}
|
||||
if len(resp.Reservations) == 0 {
|
||||
return nil, fmt.Errorf("no reservations found for instance: %s", self.awsID)
|
||||
}
|
||||
if len(resp.Reservations) > 1 {
|
||||
return nil, fmt.Errorf("multiple reservations found for instance: %s", self.awsID)
|
||||
}
|
||||
if len(resp.Reservations[0].Instances) == 0 {
|
||||
return nil, fmt.Errorf("no instances found for instance: %s", self.awsID)
|
||||
}
|
||||
if len(resp.Reservations[0].Instances) > 1 {
|
||||
return nil, fmt.Errorf("multiple instances found for instance: %s", self.awsID)
|
||||
}
|
||||
return &resp.Reservations[0].Instances[0], nil
|
||||
}
|
||||
|
||||
// Assigns an unused mount device for the specified volume.
|
||||
// If the volume is already assigned, this will return the existing mount device and true
|
||||
func (self *awsInstance) assignMountDevice(volumeID string) (mountDevice string, alreadyAttached bool, err error) {
|
||||
instanceType := self.getInstanceType()
|
||||
if instanceType == nil {
|
||||
return "", false, fmt.Errorf("could not get instance type for instance: %s", self.awsID)
|
||||
}
|
||||
|
||||
// We lock to prevent concurrent mounts from conflicting
|
||||
// We may still conflict if someone calls the API concurrently,
|
||||
// but the AWS API will then fail one of the two attach operations
|
||||
self.mutex.Lock()
|
||||
defer self.mutex.Unlock()
|
||||
|
||||
// We cache both for efficiency and correctness
|
||||
if self.deviceMappings == nil {
|
||||
info, err := self.getInfo()
|
||||
if err != nil {
|
||||
return "", false, err
|
||||
}
|
||||
deviceMappings := map[string]string{}
|
||||
for _, blockDevice := range info.BlockDevices {
|
||||
deviceMappings[blockDevice.DeviceName] = blockDevice.VolumeId
|
||||
}
|
||||
self.deviceMappings = deviceMappings
|
||||
}
|
||||
|
||||
// Check to see if this volume is already assigned a device on this machine
|
||||
for deviceName, mappingVolumeID := range self.deviceMappings {
|
||||
if volumeID == mappingVolumeID {
|
||||
glog.Warningf("Got assignment call for already-assigned volume: %s@%s", deviceName, mappingVolumeID)
|
||||
return deviceName, true, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Check all the valid mountpoints to see if any of them are free
|
||||
valid := instanceType.getEBSMountDevices()
|
||||
chosen := ""
|
||||
for _, device := range valid {
|
||||
_, found := self.deviceMappings[device]
|
||||
if !found {
|
||||
chosen = device
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if chosen == "" {
|
||||
glog.Warningf("Could not assign a mount device (all in use?). mappings=%v, valid=%v", self.deviceMappings, valid)
|
||||
return "", false, nil
|
||||
}
|
||||
|
||||
self.deviceMappings[chosen] = volumeID
|
||||
glog.V(2).Infof("Assigned mount device %s -> volume %s", chosen, volumeID)
|
||||
|
||||
return chosen, false, nil
|
||||
}
|
||||
|
||||
func (self *awsInstance) releaseMountDevice(volumeID string, mountDevice string) {
|
||||
self.mutex.Lock()
|
||||
defer self.mutex.Unlock()
|
||||
|
||||
existingVolumeID, found := self.deviceMappings[mountDevice]
|
||||
if !found {
|
||||
glog.Errorf("releaseMountDevice on non-allocated device")
|
||||
return
|
||||
}
|
||||
if volumeID != existingVolumeID {
|
||||
glog.Errorf("releaseMountDevice on device assigned to different volume")
|
||||
return
|
||||
}
|
||||
glog.V(2).Infof("Releasing mount device mapping: %s -> volume %s", mountDevice, volumeID)
|
||||
delete(self.deviceMappings, mountDevice)
|
||||
}
|
||||
|
||||
type awsDisk struct {
|
||||
ec2 EC2
|
||||
|
||||
// Name in k8s
|
||||
name string
|
||||
// id in AWS
|
||||
awsID string
|
||||
// az which holds the volume
|
||||
az string
|
||||
}
|
||||
|
||||
func newAWSDisk(ec2 EC2, name string) (*awsDisk, error) {
|
||||
// name looks like aws://availability-zone/id
|
||||
url, err := url.Parse(name)
|
||||
if err != nil {
|
||||
// TODO: Maybe we should pass a URL into the Volume functions
|
||||
return nil, fmt.Errorf("Invalid disk name (%s): %v", name, err)
|
||||
}
|
||||
if url.Scheme != "aws" {
|
||||
return nil, fmt.Errorf("Invalid scheme for AWS volume (%s)", name)
|
||||
}
|
||||
|
||||
awsID := url.Path
|
||||
if len(awsID) > 1 && awsID[0] == '/' {
|
||||
awsID = awsID[1:]
|
||||
}
|
||||
|
||||
// TODO: Regex match?
|
||||
if strings.Contains(awsID, "/") || !strings.HasPrefix(awsID, "vol-") {
|
||||
return nil, fmt.Errorf("Invalid format for AWS volume (%s)", name)
|
||||
}
|
||||
az := url.Host
|
||||
// TODO: Better validation?
|
||||
// TODO: Default to our AZ? Look it up?
|
||||
// TODO: Should this be a region or an AZ?
|
||||
if az == "" {
|
||||
return nil, fmt.Errorf("Invalid format for AWS volume (%s)", name)
|
||||
}
|
||||
disk := &awsDisk{ec2: ec2, name: name, awsID: awsID, az: az}
|
||||
return disk, nil
|
||||
}
|
||||
|
||||
// Gets the full information about this volume from the EC2 API
|
||||
func (self *awsDisk) getInfo() (*ec2.Volume, error) {
|
||||
resp, err := self.ec2.Volumes([]string{self.awsID}, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error querying ec2 for volume info: %v", err)
|
||||
}
|
||||
if len(resp.Volumes) == 0 {
|
||||
return nil, fmt.Errorf("no volumes found for volume: %s", self.awsID)
|
||||
}
|
||||
if len(resp.Volumes) > 1 {
|
||||
return nil, fmt.Errorf("multiple volumes found for volume: %s", self.awsID)
|
||||
}
|
||||
return &resp.Volumes[0], nil
|
||||
}
|
||||
|
||||
func (self *awsDisk) waitForAttachmentStatus(status string) error {
|
||||
// TODO: There may be a faster way to get this when we're attaching locally
|
||||
attempt := 0
|
||||
maxAttempts := 60
|
||||
|
||||
for {
|
||||
info, err := self.getInfo()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(info.Attachments) > 1 {
|
||||
glog.Warningf("Found multiple attachments for volume: %v", info)
|
||||
}
|
||||
attachmentStatus := ""
|
||||
for _, attachment := range info.Attachments {
|
||||
if attachmentStatus != "" {
|
||||
glog.Warning("Found multiple attachments: ", info)
|
||||
}
|
||||
attachmentStatus = attachment.Status
|
||||
}
|
||||
if attachmentStatus == "" {
|
||||
attachmentStatus = "detached"
|
||||
}
|
||||
if attachmentStatus == status {
|
||||
return nil
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Waiting for volume state: actual=%s, desired=%s", attachmentStatus, status)
|
||||
|
||||
attempt++
|
||||
if attempt > maxAttempts {
|
||||
glog.Warningf("Timeout waiting for volume state: actual=%s, desired=%s", attachmentStatus, status)
|
||||
return errors.New("Timeout waiting for volume state")
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// Deletes the EBS disk
|
||||
func (self *awsDisk) delete() error {
|
||||
_, err := self.ec2.DeleteVolume(self.awsID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error delete EBS volumes: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Gets the awsInstance for the EC2 instance on which we are running
|
||||
// may return nil in case of error
|
||||
func (aws *AWSCloud) getSelfAWSInstance() (*awsInstance, error) {
|
||||
// Note that we cache some state in awsInstance (mountpoints), so we must preserve the instance
|
||||
|
||||
aws.mutex.Lock()
|
||||
defer aws.mutex.Unlock()
|
||||
|
||||
i := aws.selfAWSInstance
|
||||
if i == nil {
|
||||
instanceIdBytes, err := aws.metadata.GetMetaData("instance-id")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error fetching instance-id from ec2 metadata service: %v", err)
|
||||
}
|
||||
i = newAWSInstance(aws.ec2, string(instanceIdBytes))
|
||||
aws.selfAWSInstance = i
|
||||
}
|
||||
|
||||
return i, nil
|
||||
}
|
||||
|
||||
// Implements Volumes.AttachDisk
|
||||
func (aws *AWSCloud) AttachDisk(instanceName string, diskName string, readOnly bool) (string, error) {
|
||||
disk, err := newAWSDisk(aws.ec2, diskName)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
var awsInstance *awsInstance
|
||||
if instanceName == "" {
|
||||
awsInstance, err = aws.getSelfAWSInstance()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Error getting self-instance: %v", err)
|
||||
}
|
||||
} else {
|
||||
instance, err := aws.getInstancesByDnsName(instanceName)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Error finding instance: %v", err)
|
||||
}
|
||||
|
||||
awsInstance = newAWSInstance(aws.ec2, instance.InstanceId)
|
||||
}
|
||||
|
||||
if readOnly {
|
||||
// TODO: We could enforce this when we mount the volume (?)
|
||||
// TODO: We could also snapshot the volume and attach copies of it
|
||||
return "", errors.New("AWS volumes cannot be mounted read-only")
|
||||
}
|
||||
|
||||
mountDevice, alreadyAttached, err := awsInstance.assignMountDevice(disk.awsID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
attached := false
|
||||
defer func() {
|
||||
if !attached {
|
||||
awsInstance.releaseMountDevice(disk.awsID, mountDevice)
|
||||
}
|
||||
}()
|
||||
|
||||
if !alreadyAttached {
|
||||
attachResponse, err := aws.ec2.AttachVolume(disk.awsID, awsInstance.awsID, mountDevice)
|
||||
if err != nil {
|
||||
// TODO: Check if the volume was concurrently attached?
|
||||
return "", fmt.Errorf("Error attaching EBS volume: %v", err)
|
||||
}
|
||||
|
||||
glog.V(2).Info("AttachVolume request returned %v", attachResponse)
|
||||
}
|
||||
|
||||
err = disk.waitForAttachmentStatus("attached")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
attached = true
|
||||
|
||||
hostDevice := mountDevice
|
||||
if strings.HasPrefix(hostDevice, "/dev/sd") {
|
||||
// Inside the instance, the mountpoint /dev/sdf looks like /dev/xvdf
|
||||
hostDevice = "/dev/xvd" + hostDevice[7:]
|
||||
}
|
||||
return hostDevice, nil
|
||||
}
|
||||
|
||||
// Implements Volumes.DetachDisk
|
||||
func (aws *AWSCloud) DetachDisk(instanceName string, diskName string) error {
|
||||
disk, err := newAWSDisk(aws.ec2, diskName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: We should specify the InstanceID and the Device, for safety
|
||||
response, err := aws.ec2.DetachVolume(disk.awsID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error detaching EBS volume: %v", err)
|
||||
}
|
||||
if response == nil {
|
||||
return errors.New("no response from DetachVolume")
|
||||
}
|
||||
err = disk.waitForAttachmentStatus("detached")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Implements Volumes.CreateVolume
|
||||
func (aws *AWSCloud) CreateVolume(volumeOptions *VolumeOptions) (string, error) {
|
||||
request := &ec2.CreateVolume{}
|
||||
request.AvailZone = aws.availabilityZone
|
||||
request.Size = (int64(volumeOptions.CapacityMB) + 1023) / 1024
|
||||
response, err := aws.ec2.CreateVolume(request)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
az := response.AvailZone
|
||||
awsID := response.VolumeId
|
||||
|
||||
volumeName := "aws://" + az + "/" + awsID
|
||||
|
||||
return volumeName, nil
|
||||
}
|
||||
|
||||
// Implements Volumes.DeleteVolume
|
||||
func (aws *AWSCloud) DeleteVolume(volumeName string) error {
|
||||
awsDisk, err := newAWSDisk(aws.ec2, volumeName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return awsDisk.delete()
|
||||
}
|
||||
|
|
|
@ -112,17 +112,17 @@ func TestNewAWSCloud(t *testing.T) {
|
|||
}{
|
||||
{
|
||||
"No config reader",
|
||||
nil, fakeAuthFunc, nil,
|
||||
nil, fakeAuthFunc, &FakeMetadata{},
|
||||
true, "",
|
||||
},
|
||||
{
|
||||
"Config specified invalid zone",
|
||||
strings.NewReader("[global]\nzone = blahonga"), fakeAuthFunc, nil,
|
||||
strings.NewReader("[global]\nzone = blahonga"), fakeAuthFunc, &FakeMetadata{},
|
||||
true, "",
|
||||
},
|
||||
{
|
||||
"Config specifies valid zone",
|
||||
strings.NewReader("[global]\nzone = eu-west-1a"), fakeAuthFunc, nil,
|
||||
strings.NewReader("[global]\nzone = eu-west-1a"), fakeAuthFunc, &FakeMetadata{},
|
||||
false, "eu-west-1a",
|
||||
},
|
||||
{
|
||||
|
@ -178,16 +178,39 @@ func (self *FakeEC2) Instances(instanceIds []string, filter *ec2InstanceFilter)
|
|||
|
||||
type FakeMetadata struct {
|
||||
availabilityZone string
|
||||
instanceId string
|
||||
}
|
||||
|
||||
func (self *FakeMetadata) GetMetaData(key string) ([]byte, error) {
|
||||
if key == "placement/availability-zone" {
|
||||
return []byte(self.availabilityZone), nil
|
||||
} else if key == "instance-id" {
|
||||
return []byte(self.instanceId), nil
|
||||
} else {
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (ec2 *FakeEC2) AttachVolume(volumeID string, instanceId string, mountDevice string) (resp *ec2.AttachVolumeResp, err error) {
|
||||
panic("Not implemented")
|
||||
}
|
||||
|
||||
func (ec2 *FakeEC2) DetachVolume(volumeID string) (resp *ec2.SimpleResp, err error) {
|
||||
panic("Not implemented")
|
||||
}
|
||||
|
||||
func (ec2 *FakeEC2) Volumes(volumeIDs []string, filter *ec2.Filter) (resp *ec2.VolumesResp, err error) {
|
||||
panic("Not implemented")
|
||||
}
|
||||
|
||||
func (ec2 *FakeEC2) CreateVolume(request *ec2.CreateVolume) (resp *ec2.CreateVolumeResp, err error) {
|
||||
panic("Not implemented")
|
||||
}
|
||||
|
||||
func (ec2 *FakeEC2) DeleteVolume(volumeID string) (resp *ec2.SimpleResp, err error) {
|
||||
panic("Not implemented")
|
||||
}
|
||||
|
||||
func mockInstancesResp(instances []ec2.Instance) (aws *AWSCloud) {
|
||||
availabilityZone := "us-west-2d"
|
||||
return &AWSCloud{
|
||||
|
|
|
@ -50,16 +50,26 @@ func (nodes ClientNodeInfo) GetNodeInfo(nodeID string) (*api.Node, error) {
|
|||
}
|
||||
|
||||
func isVolumeConflict(volume api.Volume, pod *api.Pod) bool {
|
||||
if volume.GCEPersistentDisk == nil {
|
||||
return false
|
||||
}
|
||||
pdName := volume.GCEPersistentDisk.PDName
|
||||
if volume.GCEPersistentDisk != nil {
|
||||
pdName := volume.GCEPersistentDisk.PDName
|
||||
|
||||
manifest := &(pod.Spec)
|
||||
for ix := range manifest.Volumes {
|
||||
if manifest.Volumes[ix].GCEPersistentDisk != nil &&
|
||||
manifest.Volumes[ix].GCEPersistentDisk.PDName == pdName {
|
||||
return true
|
||||
manifest := &(pod.Spec)
|
||||
for ix := range manifest.Volumes {
|
||||
if manifest.Volumes[ix].GCEPersistentDisk != nil &&
|
||||
manifest.Volumes[ix].GCEPersistentDisk.PDName == pdName {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
if volume.AWSElasticBlockStore != nil {
|
||||
volumeID := volume.AWSElasticBlockStore.VolumeID
|
||||
|
||||
manifest := &(pod.Spec)
|
||||
for ix := range manifest.Volumes {
|
||||
if manifest.Volumes[ix].AWSElasticBlockStore != nil &&
|
||||
manifest.Volumes[ix].AWSElasticBlockStore.VolumeID == volumeID {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
|
|
|
@ -321,6 +321,55 @@ func TestDiskConflicts(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestAWSDiskConflicts(t *testing.T) {
|
||||
volState := api.PodSpec{
|
||||
Volumes: []api.Volume{
|
||||
{
|
||||
VolumeSource: api.VolumeSource{
|
||||
AWSElasticBlockStore: &api.AWSElasticBlockStoreVolumeSource{
|
||||
VolumeID: "foo",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
volState2 := api.PodSpec{
|
||||
Volumes: []api.Volume{
|
||||
{
|
||||
VolumeSource: api.VolumeSource{
|
||||
AWSElasticBlockStore: &api.AWSElasticBlockStoreVolumeSource{
|
||||
VolumeID: "bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
tests := []struct {
|
||||
pod api.Pod
|
||||
existingPods []api.Pod
|
||||
isOk bool
|
||||
test string
|
||||
}{
|
||||
{api.Pod{}, []api.Pod{}, true, "nothing"},
|
||||
{api.Pod{}, []api.Pod{{Spec: volState}}, true, "one state"},
|
||||
{api.Pod{Spec: volState}, []api.Pod{{Spec: volState}}, false, "same state"},
|
||||
{api.Pod{Spec: volState2}, []api.Pod{{Spec: volState}}, true, "different state"},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
ok, err := NoDiskConflict(test.pod, test.existingPods, "machine")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if test.isOk && !ok {
|
||||
t.Errorf("expected ok, got none. %v %v %s", test.pod, test.existingPods, test.test)
|
||||
}
|
||||
if !test.isOk && ok {
|
||||
t.Errorf("expected no ok, got one. %v %v %s", test.pod, test.existingPods, test.test)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPodFitsSelector(t *testing.T) {
|
||||
tests := []struct {
|
||||
pod api.Pod
|
||||
|
|
|
@ -0,0 +1,320 @@
|
|||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package aws_ebs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/aws"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// This is the primary entrypoint for volume plugins.
|
||||
func ProbeVolumePlugins() []volume.VolumePlugin {
|
||||
return []volume.VolumePlugin{&awsElasticBlockStorePlugin{nil}}
|
||||
}
|
||||
|
||||
type awsElasticBlockStorePlugin struct {
|
||||
host volume.VolumeHost
|
||||
}
|
||||
|
||||
var _ volume.VolumePlugin = &awsElasticBlockStorePlugin{}
|
||||
|
||||
const (
|
||||
awsElasticBlockStorePluginName = "kubernetes.io/aws-ebs"
|
||||
)
|
||||
|
||||
func (plugin *awsElasticBlockStorePlugin) Init(host volume.VolumeHost) {
|
||||
plugin.host = host
|
||||
}
|
||||
|
||||
func (plugin *awsElasticBlockStorePlugin) Name() string {
|
||||
return awsElasticBlockStorePluginName
|
||||
}
|
||||
|
||||
func (plugin *awsElasticBlockStorePlugin) CanSupport(spec *api.Volume) bool {
|
||||
if spec.AWSElasticBlockStore != nil {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (plugin *awsElasticBlockStorePlugin) GetAccessModes() []api.AccessModeType {
|
||||
return []api.AccessModeType{
|
||||
api.ReadWriteOnce,
|
||||
}
|
||||
}
|
||||
|
||||
func (plugin *awsElasticBlockStorePlugin) NewBuilder(spec *api.Volume, podRef *api.ObjectReference) (volume.Builder, error) {
|
||||
// Inject real implementations here, test through the internal function.
|
||||
return plugin.newBuilderInternal(spec, podRef.UID, &AWSDiskUtil{}, mount.New())
|
||||
}
|
||||
|
||||
func (plugin *awsElasticBlockStorePlugin) newBuilderInternal(spec *api.Volume, podUID types.UID, manager pdManager, mounter mount.Interface) (volume.Builder, error) {
|
||||
volumeID := spec.AWSElasticBlockStore.VolumeID
|
||||
fsType := spec.AWSElasticBlockStore.FSType
|
||||
partition := ""
|
||||
if spec.AWSElasticBlockStore.Partition != 0 {
|
||||
partition = strconv.Itoa(spec.AWSElasticBlockStore.Partition)
|
||||
}
|
||||
readOnly := spec.AWSElasticBlockStore.ReadOnly
|
||||
|
||||
return &awsElasticBlockStore{
|
||||
podUID: podUID,
|
||||
volName: spec.Name,
|
||||
volumeID: volumeID,
|
||||
fsType: fsType,
|
||||
partition: partition,
|
||||
readOnly: readOnly,
|
||||
manager: manager,
|
||||
mounter: mounter,
|
||||
diskMounter: &awsSafeFormatAndMount{mounter, exec.New()},
|
||||
plugin: plugin,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (plugin *awsElasticBlockStorePlugin) NewCleaner(volName string, podUID types.UID) (volume.Cleaner, error) {
|
||||
// Inject real implementations here, test through the internal function.
|
||||
return plugin.newCleanerInternal(volName, podUID, &AWSDiskUtil{}, mount.New())
|
||||
}
|
||||
|
||||
func (plugin *awsElasticBlockStorePlugin) newCleanerInternal(volName string, podUID types.UID, manager pdManager, mounter mount.Interface) (volume.Cleaner, error) {
|
||||
return &awsElasticBlockStore{
|
||||
podUID: podUID,
|
||||
volName: volName,
|
||||
manager: manager,
|
||||
mounter: mounter,
|
||||
diskMounter: &awsSafeFormatAndMount{mounter, exec.New()},
|
||||
plugin: plugin,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Abstract interface to PD operations.
|
||||
type pdManager interface {
|
||||
// Attaches the disk to the kubelet's host machine.
|
||||
AttachAndMountDisk(pd *awsElasticBlockStore, globalPDPath string) error
|
||||
// Detaches the disk from the kubelet's host machine.
|
||||
DetachDisk(pd *awsElasticBlockStore) error
|
||||
}
|
||||
|
||||
// awsElasticBlockStore volumes are disk resources provided by Google Compute Engine
|
||||
// that are attached to the kubelet's host machine and exposed to the pod.
|
||||
type awsElasticBlockStore struct {
|
||||
volName string
|
||||
podUID types.UID
|
||||
// Unique id of the PD, used to find the disk resource in the provider.
|
||||
volumeID string
|
||||
// Filesystem type, optional.
|
||||
fsType string
|
||||
// Specifies the partition to mount
|
||||
partition string
|
||||
// Specifies whether the disk will be attached as read-only.
|
||||
readOnly bool
|
||||
// Utility interface that provides API calls to the provider to attach/detach disks.
|
||||
manager pdManager
|
||||
// Mounter interface that provides system calls to mount the global path to the pod local path.
|
||||
mounter mount.Interface
|
||||
// diskMounter provides the interface that is used to mount the actual block device.
|
||||
diskMounter mount.Interface
|
||||
plugin *awsElasticBlockStorePlugin
|
||||
}
|
||||
|
||||
func detachDiskLogError(pd *awsElasticBlockStore) {
|
||||
err := pd.manager.DetachDisk(pd)
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to detach disk: %v (%v)", pd, err)
|
||||
}
|
||||
}
|
||||
|
||||
// getVolumeProvider returns the AWS Volumes interface
|
||||
func (pd *awsElasticBlockStore) getVolumeProvider() (aws_cloud.Volumes, error) {
|
||||
name := "aws"
|
||||
cloud, err := cloudprovider.GetCloudProvider(name, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
volumes, ok := cloud.(aws_cloud.Volumes)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Cloud provider does not support volumes")
|
||||
}
|
||||
return volumes, nil
|
||||
}
|
||||
|
||||
// SetUp attaches the disk and bind mounts to the volume path.
|
||||
func (pd *awsElasticBlockStore) SetUp() error {
|
||||
return pd.SetUpAt(pd.GetPath())
|
||||
}
|
||||
|
||||
// SetUpAt attaches the disk and bind mounts to the volume path.
|
||||
func (pd *awsElasticBlockStore) SetUpAt(dir string) error {
|
||||
// TODO: handle failed mounts here.
|
||||
mountpoint, err := pd.mounter.IsMountPoint(dir)
|
||||
glog.V(4).Infof("PersistentDisk set up: %s %v %v", dir, mountpoint, err)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
if mountpoint {
|
||||
return nil
|
||||
}
|
||||
|
||||
globalPDPath := makeGlobalPDPath(pd.plugin.host, pd.volumeID)
|
||||
if err := pd.manager.AttachAndMountDisk(pd, globalPDPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
flags := uintptr(0)
|
||||
if pd.readOnly {
|
||||
flags = mount.FlagReadOnly
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(dir, 0750); err != nil {
|
||||
// TODO: we should really eject the attach/detach out into its own control loop.
|
||||
detachDiskLogError(pd)
|
||||
return err
|
||||
}
|
||||
|
||||
// Perform a bind mount to the full path to allow duplicate mounts of the same PD.
|
||||
err = pd.mounter.Mount(globalPDPath, dir, "", mount.FlagBind|flags, "")
|
||||
if err != nil {
|
||||
mountpoint, mntErr := pd.mounter.IsMountPoint(dir)
|
||||
if mntErr != nil {
|
||||
glog.Errorf("isMountpoint check failed: %v", mntErr)
|
||||
return err
|
||||
}
|
||||
if mountpoint {
|
||||
if mntErr = pd.mounter.Unmount(dir, 0); mntErr != nil {
|
||||
glog.Errorf("Failed to unmount: %v", mntErr)
|
||||
return err
|
||||
}
|
||||
mountpoint, mntErr := pd.mounter.IsMountPoint(dir)
|
||||
if mntErr != nil {
|
||||
glog.Errorf("isMountpoint check failed: %v", mntErr)
|
||||
return err
|
||||
}
|
||||
if mountpoint {
|
||||
// This is very odd, we don't expect it. We'll try again next sync loop.
|
||||
glog.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop.", dir)
|
||||
return err
|
||||
}
|
||||
}
|
||||
os.Remove(dir)
|
||||
// TODO: we should really eject the attach/detach out into its own control loop.
|
||||
detachDiskLogError(pd)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func makeGlobalPDPath(host volume.VolumeHost, volumeID string) string {
|
||||
// Clean up the URI to be more fs-friendly
|
||||
name := volumeID
|
||||
name = strings.Replace(name, "://", "/", -1)
|
||||
return path.Join(host.GetPluginDir(awsElasticBlockStorePluginName), "mounts", name)
|
||||
}
|
||||
|
||||
func getVolumeIDFromGlobalMount(host volume.VolumeHost, globalPath string) (string, error) {
|
||||
basePath := path.Join(host.GetPluginDir(awsElasticBlockStorePluginName), "mounts")
|
||||
rel, err := filepath.Rel(basePath, globalPath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if strings.Contains(rel, "../") {
|
||||
return "", fmt.Errorf("Unexpected mount path: " + globalPath)
|
||||
}
|
||||
// Reverse the :// replacement done in makeGlobalPDPath
|
||||
volumeID := rel
|
||||
if strings.HasPrefix(volumeID, "aws/") {
|
||||
volumeID = strings.Replace(volumeID, "aws/", "aws://", 1)
|
||||
}
|
||||
glog.V(2).Info("Mapping mount dir ", globalPath, " to volumeID ", volumeID)
|
||||
return volumeID, nil
|
||||
}
|
||||
|
||||
func (pd *awsElasticBlockStore) GetPath() string {
|
||||
name := awsElasticBlockStorePluginName
|
||||
return pd.plugin.host.GetPodVolumeDir(pd.podUID, util.EscapeQualifiedNameForDisk(name), pd.volName)
|
||||
}
|
||||
|
||||
// Unmounts the bind mount, and detaches the disk only if the PD
|
||||
// resource was the last reference to that disk on the kubelet.
|
||||
func (pd *awsElasticBlockStore) TearDown() error {
|
||||
return pd.TearDownAt(pd.GetPath())
|
||||
}
|
||||
|
||||
// Unmounts the bind mount, and detaches the disk only if the PD
|
||||
// resource was the last reference to that disk on the kubelet.
|
||||
func (pd *awsElasticBlockStore) TearDownAt(dir string) error {
|
||||
mountpoint, err := pd.mounter.IsMountPoint(dir)
|
||||
if err != nil {
|
||||
glog.V(2).Info("Error checking if mountpoint ", dir, ": ", err)
|
||||
return err
|
||||
}
|
||||
if !mountpoint {
|
||||
glog.V(2).Info("Not mountpoint, deleting")
|
||||
return os.Remove(dir)
|
||||
}
|
||||
|
||||
refs, err := mount.GetMountRefs(pd.mounter, dir)
|
||||
if err != nil {
|
||||
glog.V(2).Info("Error getting mountrefs for ", dir, ": ", err)
|
||||
return err
|
||||
}
|
||||
// Unmount the bind-mount inside this pod
|
||||
if err := pd.mounter.Unmount(dir, 0); err != nil {
|
||||
glog.V(2).Info("Error unmounting dir ", dir, ": ", err)
|
||||
return err
|
||||
}
|
||||
// If len(refs) is 1, then all bind mounts have been removed, and the
|
||||
// remaining reference is the global mount. It is safe to detach.
|
||||
if len(refs) == 1 {
|
||||
// pd.volumeID is not initially set for volume-cleaners, so set it here.
|
||||
pd.volumeID, err = getVolumeIDFromGlobalMount(pd.plugin.host, refs[0])
|
||||
if err != nil {
|
||||
glog.V(2).Info("Could not determine volumeID from mountpoint ", refs[0], ": ", err)
|
||||
return err
|
||||
}
|
||||
if err := pd.manager.DetachDisk(pd); err != nil {
|
||||
glog.V(2).Info("Error detaching disk ", pd.volumeID, ": ", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
mountpoint, mntErr := pd.mounter.IsMountPoint(dir)
|
||||
if mntErr != nil {
|
||||
glog.Errorf("isMountpoint check failed: %v", mntErr)
|
||||
return err
|
||||
}
|
||||
if !mountpoint {
|
||||
if err := os.Remove(dir); err != nil {
|
||||
glog.V(2).Info("Error removing mountpoint ", dir, ": ", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,156 @@
|
|||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package aws_ebs
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
|
||||
)
|
||||
|
||||
func TestCanSupport(t *testing.T) {
|
||||
plugMgr := volume.VolumePluginMgr{}
|
||||
plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
|
||||
|
||||
plug, err := plugMgr.FindPluginByName("kubernetes.io/aws-ebs")
|
||||
if err != nil {
|
||||
t.Errorf("Can't find the plugin by name")
|
||||
}
|
||||
if plug.Name() != "kubernetes.io/aws-ebs" {
|
||||
t.Errorf("Wrong name: %s", plug.Name())
|
||||
}
|
||||
if !plug.CanSupport(&api.Volume{VolumeSource: api.VolumeSource{AWSElasticBlockStore: &api.AWSElasticBlockStoreVolumeSource{}}}) {
|
||||
t.Errorf("Expected true")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetAccessModes(t *testing.T) {
|
||||
plugMgr := volume.VolumePluginMgr{}
|
||||
plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
|
||||
|
||||
plug, err := plugMgr.FindPersistentPluginByName("kubernetes.io/aws-ebs")
|
||||
if err != nil {
|
||||
t.Errorf("Can't find the plugin by name")
|
||||
}
|
||||
if !contains(plug.GetAccessModes(), api.ReadWriteOnce) {
|
||||
t.Errorf("Expected to find AccessMode: %s", api.ReadWriteOnce)
|
||||
}
|
||||
if len(plug.GetAccessModes()) != 1 {
|
||||
t.Errorf("Expected to find exactly one AccessMode")
|
||||
}
|
||||
}
|
||||
|
||||
func contains(modes []api.AccessModeType, mode api.AccessModeType) bool {
|
||||
for _, m := range modes {
|
||||
if m == mode {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type fakePDManager struct{}
|
||||
|
||||
// TODO(jonesdl) To fully test this, we could create a loopback device
|
||||
// and mount that instead.
|
||||
func (fake *fakePDManager) AttachAndMountDisk(pd *awsElasticBlockStore, globalPDPath string) error {
|
||||
globalPath := makeGlobalPDPath(pd.plugin.host, pd.volumeID)
|
||||
err := os.MkdirAll(globalPath, 0750)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fake *fakePDManager) DetachDisk(pd *awsElasticBlockStore) error {
|
||||
globalPath := makeGlobalPDPath(pd.plugin.host, pd.volumeID)
|
||||
err := os.RemoveAll(globalPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestPlugin(t *testing.T) {
|
||||
plugMgr := volume.VolumePluginMgr{}
|
||||
plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
|
||||
|
||||
plug, err := plugMgr.FindPluginByName("kubernetes.io/aws-ebs")
|
||||
if err != nil {
|
||||
t.Errorf("Can't find the plugin by name")
|
||||
}
|
||||
spec := &api.Volume{
|
||||
Name: "vol1",
|
||||
VolumeSource: api.VolumeSource{
|
||||
AWSElasticBlockStore: &api.AWSElasticBlockStoreVolumeSource{
|
||||
VolumeID: "pd",
|
||||
FSType: "ext4",
|
||||
},
|
||||
},
|
||||
}
|
||||
builder, err := plug.(*awsElasticBlockStorePlugin).newBuilderInternal(spec, types.UID("poduid"), &fakePDManager{}, &mount.FakeMounter{})
|
||||
if err != nil {
|
||||
t.Errorf("Failed to make a new Builder: %v", err)
|
||||
}
|
||||
if builder == nil {
|
||||
t.Errorf("Got a nil Builder")
|
||||
}
|
||||
|
||||
path := builder.GetPath()
|
||||
if path != "/tmp/fake/pods/poduid/volumes/kubernetes.io~aws-ebs/vol1" {
|
||||
t.Errorf("Got unexpected path: %s", path)
|
||||
}
|
||||
|
||||
if err := builder.SetUp(); err != nil {
|
||||
t.Errorf("Expected success, got: %v", err)
|
||||
}
|
||||
if _, err := os.Stat(path); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
t.Errorf("SetUp() failed, volume path not created: %s", path)
|
||||
} else {
|
||||
t.Errorf("SetUp() failed: %v", err)
|
||||
}
|
||||
}
|
||||
if _, err := os.Stat(path); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
t.Errorf("SetUp() failed, volume path not created: %s", path)
|
||||
} else {
|
||||
t.Errorf("SetUp() failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
cleaner, err := plug.(*awsElasticBlockStorePlugin).newCleanerInternal("vol1", types.UID("poduid"), &fakePDManager{}, &mount.FakeMounter{})
|
||||
if err != nil {
|
||||
t.Errorf("Failed to make a new Cleaner: %v", err)
|
||||
}
|
||||
if cleaner == nil {
|
||||
t.Errorf("Got a nil Cleaner")
|
||||
}
|
||||
|
||||
if err := cleaner.TearDown(); err != nil {
|
||||
t.Errorf("Expected success, got: %v", err)
|
||||
}
|
||||
if _, err := os.Stat(path); err == nil {
|
||||
t.Errorf("TearDown() failed, volume path still exists: %s", path)
|
||||
} else if !os.IsNotExist(err) {
|
||||
t.Errorf("SetUp() failed: %v", err)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,143 @@
|
|||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package aws_ebs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
type AWSDiskUtil struct{}
|
||||
|
||||
// Attaches a disk specified by a volume.AWSElasticBlockStore to the current kubelet.
|
||||
// Mounts the disk to it's global path.
|
||||
func (util *AWSDiskUtil) AttachAndMountDisk(pd *awsElasticBlockStore, globalPDPath string) error {
|
||||
volumes, err := pd.getVolumeProvider()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
flags := uintptr(0)
|
||||
if pd.readOnly {
|
||||
flags = mount.FlagReadOnly
|
||||
}
|
||||
devicePath, err := volumes.AttachDisk("", pd.volumeID, pd.readOnly)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if pd.partition != "" {
|
||||
devicePath = devicePath + pd.partition
|
||||
}
|
||||
//TODO(jonesdl) There should probably be better method than busy-waiting here.
|
||||
numTries := 0
|
||||
for {
|
||||
_, err := os.Stat(devicePath)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
numTries++
|
||||
if numTries == 10 {
|
||||
return errors.New("Could not attach disk: Timeout after 10s (" + devicePath + ")")
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
// Only mount the PD globally once.
|
||||
mountpoint, err := pd.mounter.IsMountPoint(globalPDPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
if err := os.MkdirAll(globalPDPath, 0750); err != nil {
|
||||
return err
|
||||
}
|
||||
mountpoint = false
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if !mountpoint {
|
||||
err = pd.diskMounter.Mount(devicePath, globalPDPath, pd.fsType, flags, "")
|
||||
if err != nil {
|
||||
os.Remove(globalPDPath)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Unmounts the device and detaches the disk from the kubelet's host machine.
|
||||
func (util *AWSDiskUtil) DetachDisk(pd *awsElasticBlockStore) error {
|
||||
// Unmount the global PD mount, which should be the only one.
|
||||
globalPDPath := makeGlobalPDPath(pd.plugin.host, pd.volumeID)
|
||||
if err := pd.mounter.Unmount(globalPDPath, 0); err != nil {
|
||||
glog.V(2).Info("Error unmount dir ", globalPDPath, ": ", err)
|
||||
return err
|
||||
}
|
||||
if err := os.Remove(globalPDPath); err != nil {
|
||||
glog.V(2).Info("Error removing dir ", globalPDPath, ": ", err)
|
||||
return err
|
||||
}
|
||||
// Detach the disk
|
||||
volumes, err := pd.getVolumeProvider()
|
||||
if err != nil {
|
||||
glog.V(2).Info("Error getting volume provider for volumeID ", pd.volumeID, ": ", err)
|
||||
return err
|
||||
}
|
||||
if err := volumes.DetachDisk("", pd.volumeID); err != nil {
|
||||
glog.V(2).Info("Error detaching disk ", pd.volumeID, ": ", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// safe_format_and_mount is a utility script on AWS VMs that probes a persistent disk, and if
|
||||
// necessary formats it before mounting it.
|
||||
// This eliminates the necessity to format a PD before it is used with a Pod on AWS.
|
||||
// TODO: port this script into Go and use it for all Linux platforms
|
||||
type awsSafeFormatAndMount struct {
|
||||
mount.Interface
|
||||
runner exec.Interface
|
||||
}
|
||||
|
||||
// uses /usr/share/google/safe_format_and_mount to optionally mount, and format a disk
|
||||
func (mounter *awsSafeFormatAndMount) Mount(source string, target string, fstype string, flags uintptr, data string) error {
|
||||
// Don't attempt to format if mounting as readonly. Go straight to mounting.
|
||||
if (flags & mount.FlagReadOnly) != 0 {
|
||||
return mounter.Interface.Mount(source, target, fstype, flags, data)
|
||||
}
|
||||
args := []string{}
|
||||
// ext4 is the default for safe_format_and_mount
|
||||
if len(fstype) > 0 && fstype != "ext4" {
|
||||
args = append(args, "-m", fmt.Sprintf("mkfs.%s", fstype))
|
||||
}
|
||||
args = append(args, source, target)
|
||||
// TODO: Accept other options here?
|
||||
glog.V(5).Infof("exec-ing: /usr/share/google/safe_format_and_mount %v", args)
|
||||
cmd := mounter.runner.Command("/usr/share/google/safe_format_and_mount", args...)
|
||||
dataOut, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
glog.V(5).Infof("error running /usr/share/google/safe_format_and_mount\n%s", string(dataOut))
|
||||
}
|
||||
return err
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package aws_ebs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec"
|
||||
)
|
||||
|
||||
func TestSafeFormatAndMount(t *testing.T) {
|
||||
tests := []struct {
|
||||
fstype string
|
||||
expectedArgs []string
|
||||
err error
|
||||
}{
|
||||
{
|
||||
fstype: "ext4",
|
||||
expectedArgs: []string{"/dev/foo", "/mnt/bar"},
|
||||
},
|
||||
{
|
||||
fstype: "vfat",
|
||||
expectedArgs: []string{"-m", "mkfs.vfat", "/dev/foo", "/mnt/bar"},
|
||||
},
|
||||
{
|
||||
err: fmt.Errorf("test error"),
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
|
||||
var cmdOut string
|
||||
var argsOut []string
|
||||
fake := exec.FakeExec{
|
||||
CommandScript: []exec.FakeCommandAction{
|
||||
func(cmd string, args ...string) exec.Cmd {
|
||||
cmdOut = cmd
|
||||
argsOut = args
|
||||
fake := exec.FakeCmd{
|
||||
CombinedOutputScript: []exec.FakeCombinedOutputAction{
|
||||
func() ([]byte, error) { return []byte{}, test.err },
|
||||
},
|
||||
}
|
||||
return exec.InitFakeCmd(&fake, cmd, args...)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
mounter := awsSafeFormatAndMount{
|
||||
runner: &fake,
|
||||
}
|
||||
|
||||
err := mounter.Mount("/dev/foo", "/mnt/bar", test.fstype, 0, "")
|
||||
if test.err == nil && err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if test.err != nil {
|
||||
if err == nil {
|
||||
t.Errorf("unexpected non-error")
|
||||
}
|
||||
return
|
||||
}
|
||||
if cmdOut != "/usr/share/google/safe_format_and_mount" {
|
||||
t.Errorf("unexpected command: %s", cmdOut)
|
||||
}
|
||||
if len(argsOut) != len(test.expectedArgs) {
|
||||
t.Errorf("unexpected args: %v, expected: %v", argsOut, test.expectedArgs)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -38,8 +38,8 @@ var _ = Describe("MasterCerts", func() {
|
|||
}
|
||||
|
||||
for _, certFile := range []string{"kubecfg.key", "kubecfg.crt", "ca.crt"} {
|
||||
cmd := exec.Command("gcloud", "compute", "ssh", "--project", testContext.GCEConfig.ProjectID,
|
||||
"--zone", testContext.GCEConfig.Zone, testContext.GCEConfig.MasterName,
|
||||
cmd := exec.Command("gcloud", "compute", "ssh", "--project", testContext.CloudConfig.ProjectID,
|
||||
"--zone", testContext.CloudConfig.Zone, testContext.CloudConfig.MasterName,
|
||||
"--command", fmt.Sprintf("ls /srv/kubernetes/%s", certFile))
|
||||
if _, err := cmd.CombinedOutput(); err != nil {
|
||||
Fail(fmt.Sprintf("Error checking for cert file %s on master: %v", certFile, err))
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/golang/glog"
|
||||
"github.com/onsi/ginkgo"
|
||||
|
@ -35,10 +36,12 @@ import (
|
|||
|
||||
type testResult bool
|
||||
|
||||
type GCEConfig struct {
|
||||
type CloudConfig struct {
|
||||
ProjectID string
|
||||
Zone string
|
||||
MasterName string
|
||||
|
||||
Provider cloudprovider.Interface
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
149
test/e2e/pd.go
149
test/e2e/pd.go
|
@ -24,6 +24,7 @@ import (
|
|||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/aws"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
|
@ -35,7 +36,6 @@ var _ = Describe("PD", func() {
|
|||
var (
|
||||
c *client.Client
|
||||
podClient client.PodInterface
|
||||
diskName string
|
||||
host0Name string
|
||||
host1Name string
|
||||
)
|
||||
|
@ -51,37 +51,37 @@ var _ = Describe("PD", func() {
|
|||
expectNoError(err, "Failed to list nodes for e2e cluster.")
|
||||
Expect(len(nodes.Items) >= 2).To(BeTrue())
|
||||
|
||||
diskName = fmt.Sprintf("e2e-%s", string(util.NewUUID()))
|
||||
host0Name = nodes.Items[0].ObjectMeta.Name
|
||||
host1Name = nodes.Items[1].ObjectMeta.Name
|
||||
})
|
||||
|
||||
It("should schedule a pod w/ a RW PD, remove it, then schedule it on another host", func() {
|
||||
if testContext.Provider != "gce" {
|
||||
By(fmt.Sprintf("Skipping PD test, which is only supported for provider gce (not %s)",
|
||||
if testContext.Provider != "gce" && testContext.Provider != "aws" {
|
||||
By(fmt.Sprintf("Skipping PD test, which is only supported for providers gce & aws (not %s)",
|
||||
testContext.Provider))
|
||||
return
|
||||
}
|
||||
|
||||
By("creating PD")
|
||||
diskName, err := createPD()
|
||||
expectNoError(err, "Error creating PD")
|
||||
|
||||
host0Pod := testPDPod(diskName, host0Name, false)
|
||||
host1Pod := testPDPod(diskName, host1Name, false)
|
||||
|
||||
By(fmt.Sprintf("creating PD %q", diskName))
|
||||
expectNoError(createPD(diskName, testContext.GCEConfig.Zone), "Error creating PD")
|
||||
|
||||
defer func() {
|
||||
By("cleaning up PD-RW test environment")
|
||||
// Teardown pods, PD. Ignore errors.
|
||||
// Teardown should do nothing unless test failed.
|
||||
podClient.Delete(host0Pod.Name)
|
||||
podClient.Delete(host1Pod.Name)
|
||||
detachPD(host0Name, diskName, testContext.GCEConfig.Zone)
|
||||
detachPD(host1Name, diskName, testContext.GCEConfig.Zone)
|
||||
deletePD(diskName, testContext.GCEConfig.Zone)
|
||||
detachPD(host0Name, diskName)
|
||||
detachPD(host1Name, diskName)
|
||||
deletePD(diskName)
|
||||
}()
|
||||
|
||||
By("submitting host0Pod to kubernetes")
|
||||
_, err := podClient.Create(host0Pod)
|
||||
_, err = podClient.Create(host0Pod)
|
||||
expectNoError(err, fmt.Sprintf("Failed to create host0Pod: %v", err))
|
||||
|
||||
expectNoError(waitForPodRunning(c, host0Pod.Name))
|
||||
|
@ -100,10 +100,11 @@ var _ = Describe("PD", func() {
|
|||
|
||||
By(fmt.Sprintf("deleting PD %q", diskName))
|
||||
for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) {
|
||||
if err = deletePD(diskName, testContext.GCEConfig.Zone); err != nil {
|
||||
Logf("Couldn't delete PD. Sleeping 5 seconds")
|
||||
if err = deletePD(diskName); err != nil {
|
||||
Logf("Couldn't delete PD. Sleeping 5 seconds (%v)", err)
|
||||
continue
|
||||
}
|
||||
Logf("Deleted PD %v", diskName)
|
||||
break
|
||||
}
|
||||
expectNoError(err, "Error deleting PD")
|
||||
|
@ -118,6 +119,10 @@ var _ = Describe("PD", func() {
|
|||
return
|
||||
}
|
||||
|
||||
By("creating PD")
|
||||
diskName, err := createPD()
|
||||
expectNoError(err, "Error creating PD")
|
||||
|
||||
rwPod := testPDPod(diskName, host0Name, false)
|
||||
host0ROPod := testPDPod(diskName, host0Name, true)
|
||||
host1ROPod := testPDPod(diskName, host1Name, true)
|
||||
|
@ -129,16 +134,14 @@ var _ = Describe("PD", func() {
|
|||
podClient.Delete(rwPod.Name)
|
||||
podClient.Delete(host0ROPod.Name)
|
||||
podClient.Delete(host1ROPod.Name)
|
||||
detachPD(host0Name, diskName, testContext.GCEConfig.Zone)
|
||||
detachPD(host1Name, diskName, testContext.GCEConfig.Zone)
|
||||
deletePD(diskName, testContext.GCEConfig.Zone)
|
||||
|
||||
detachPD(host0Name, diskName)
|
||||
detachPD(host1Name, diskName)
|
||||
deletePD(diskName)
|
||||
}()
|
||||
|
||||
By(fmt.Sprintf("creating PD %q", diskName))
|
||||
expectNoError(createPD(diskName, testContext.GCEConfig.Zone), "Error creating PD")
|
||||
|
||||
By("submitting rwPod to ensure PD is formatted")
|
||||
_, err := podClient.Create(rwPod)
|
||||
_, err = podClient.Create(rwPod)
|
||||
expectNoError(err, "Failed to create rwPod")
|
||||
expectNoError(waitForPodRunning(c, rwPod.Name))
|
||||
expectNoError(podClient.Delete(rwPod.Name), "Failed to delete host0Pod")
|
||||
|
@ -163,7 +166,7 @@ var _ = Describe("PD", func() {
|
|||
|
||||
By(fmt.Sprintf("deleting PD %q", diskName))
|
||||
for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) {
|
||||
if err = deletePD(diskName, testContext.GCEConfig.Zone); err != nil {
|
||||
if err = deletePD(diskName); err != nil {
|
||||
Logf("Couldn't delete PD. Sleeping 5 seconds")
|
||||
continue
|
||||
}
|
||||
|
@ -173,24 +176,62 @@ var _ = Describe("PD", func() {
|
|||
})
|
||||
})
|
||||
|
||||
func createPD(pdName, zone string) error {
|
||||
// TODO: make this hit the compute API directly instread of shelling out to gcloud.
|
||||
return exec.Command("gcloud", "compute", "disks", "create", "--zone="+zone, "--size=10GB", pdName).Run()
|
||||
func createPD() (string, error) {
|
||||
if testContext.Provider == "gce" {
|
||||
pdName := fmt.Sprintf("e2e-%s", string(util.NewUUID()))
|
||||
|
||||
zone := testContext.CloudConfig.Zone
|
||||
// TODO: make this hit the compute API directly instread of shelling out to gcloud.
|
||||
err := exec.Command("gcloud", "compute", "disks", "create", "--zone="+zone, "--size=10GB", pdName).Run()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return pdName, nil
|
||||
} else {
|
||||
volumes, ok := testContext.CloudConfig.Provider.(aws_cloud.Volumes)
|
||||
if !ok {
|
||||
return "", fmt.Errorf("Provider does not support volumes")
|
||||
}
|
||||
volumeOptions := &aws_cloud.VolumeOptions{}
|
||||
volumeOptions.CapacityMB = 10 * 1024
|
||||
return volumes.CreateVolume(volumeOptions)
|
||||
}
|
||||
}
|
||||
|
||||
func deletePD(pdName, zone string) error {
|
||||
// TODO: make this hit the compute API directly.
|
||||
return exec.Command("gcloud", "compute", "disks", "delete", "--zone="+zone, pdName).Run()
|
||||
func deletePD(pdName string) error {
|
||||
if testContext.Provider == "gce" {
|
||||
zone := testContext.CloudConfig.Zone
|
||||
|
||||
// TODO: make this hit the compute API directly.
|
||||
return exec.Command("gcloud", "compute", "disks", "delete", "--zone="+zone, pdName).Run()
|
||||
} else {
|
||||
volumes, ok := testContext.CloudConfig.Provider.(aws_cloud.Volumes)
|
||||
if !ok {
|
||||
return fmt.Errorf("Provider does not support volumes")
|
||||
}
|
||||
return volumes.DeleteVolume(pdName)
|
||||
}
|
||||
}
|
||||
|
||||
func detachPD(hostName, pdName, zone string) error {
|
||||
instanceName := strings.Split(hostName, ".")[0]
|
||||
// TODO: make this hit the compute API directly.
|
||||
return exec.Command("gcloud", "compute", "instances", "detach-disk", "--zone="+zone, "--disk="+pdName, instanceName).Run()
|
||||
func detachPD(hostName, pdName string) error {
|
||||
if testContext.Provider == "gce" {
|
||||
instanceName := strings.Split(hostName, ".")[0]
|
||||
|
||||
zone := testContext.CloudConfig.Zone
|
||||
|
||||
// TODO: make this hit the compute API directly.
|
||||
return exec.Command("gcloud", "compute", "instances", "detach-disk", "--zone="+zone, "--disk="+pdName, instanceName).Run()
|
||||
} else {
|
||||
volumes, ok := testContext.CloudConfig.Provider.(aws_cloud.Volumes)
|
||||
if !ok {
|
||||
return fmt.Errorf("Provider does not support volumes")
|
||||
}
|
||||
return volumes.DetachDisk(hostName, pdName)
|
||||
}
|
||||
}
|
||||
|
||||
func testPDPod(diskName, targetHost string, readOnly bool) *api.Pod {
|
||||
return &api.Pod{
|
||||
pod := &api.Pod{
|
||||
TypeMeta: api.TypeMeta{
|
||||
Kind: "Pod",
|
||||
APIVersion: "v1beta1",
|
||||
|
@ -199,18 +240,6 @@ func testPDPod(diskName, targetHost string, readOnly bool) *api.Pod {
|
|||
Name: "pd-test-" + string(util.NewUUID()),
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Volumes: []api.Volume{
|
||||
{
|
||||
Name: "testpd",
|
||||
VolumeSource: api.VolumeSource{
|
||||
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
|
||||
PDName: diskName,
|
||||
FSType: "ext4",
|
||||
ReadOnly: readOnly,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: "testpd",
|
||||
|
@ -226,4 +255,36 @@ func testPDPod(diskName, targetHost string, readOnly bool) *api.Pod {
|
|||
Host: targetHost,
|
||||
},
|
||||
}
|
||||
|
||||
if testContext.Provider == "gce" {
|
||||
pod.Spec.Volumes = []api.Volume{
|
||||
{
|
||||
Name: "testpd",
|
||||
VolumeSource: api.VolumeSource{
|
||||
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
|
||||
PDName: diskName,
|
||||
FSType: "ext4",
|
||||
ReadOnly: readOnly,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
} else if testContext.Provider == "aws" {
|
||||
pod.Spec.Volumes = []api.Volume{
|
||||
{
|
||||
Name: "testpd",
|
||||
VolumeSource: api.VolumeSource{
|
||||
AWSElasticBlockStore: &api.AWSElasticBlockStoreVolumeSource{
|
||||
VolumeID: diskName,
|
||||
FSType: "ext4",
|
||||
ReadOnly: readOnly,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
} else {
|
||||
panic("Unknown provider: " + testContext.Provider)
|
||||
}
|
||||
|
||||
return pod
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ type TestContextType struct {
|
|||
Host string
|
||||
RepoRoot string
|
||||
Provider string
|
||||
GCEConfig GCEConfig
|
||||
CloudConfig CloudConfig
|
||||
}
|
||||
|
||||
var testContext TestContextType
|
||||
|
|
Loading…
Reference in New Issue