Switch windows runtime endpoints to npipe

pull/58/head
Pengfei Ni 2018-10-08 17:16:41 +08:00
parent cf3a930938
commit 053b71d5d2
7 changed files with 205 additions and 37 deletions

View File

@ -212,7 +212,7 @@ func NewKubeletFlags() *KubeletFlags {
if runtime.GOOS == "linux" {
remoteRuntimeEndpoint = "unix:///var/run/dockershim.sock"
} else if runtime.GOOS == "windows" {
remoteRuntimeEndpoint = "tcp://localhost:3735"
remoteRuntimeEndpoint = "npipe:////./pipe/dockershim"
}
return &KubeletFlags{
@ -376,8 +376,8 @@ func (f *KubeletFlags) AddFlags(mainfs *pflag.FlagSet) {
fs.StringVar(&f.ExperimentalMounterPath, "experimental-mounter-path", f.ExperimentalMounterPath, "[Experimental] Path of mounter binary. Leave empty to use the default mount.")
fs.StringSliceVar(&f.AllowedUnsafeSysctls, "allowed-unsafe-sysctls", f.AllowedUnsafeSysctls, "Comma-separated whitelist of unsafe sysctls or unsafe sysctl patterns (ending in *). Use these at your own risk. Sysctls feature gate is enabled by default.")
fs.BoolVar(&f.ExperimentalKernelMemcgNotification, "experimental-kernel-memcg-notification", f.ExperimentalKernelMemcgNotification, "If enabled, the kubelet will integrate with the kernel memcg notification to determine if memory eviction thresholds are crossed rather than polling.")
fs.StringVar(&f.RemoteRuntimeEndpoint, "container-runtime-endpoint", f.RemoteRuntimeEndpoint, "[Experimental] The endpoint of remote runtime service. Currently unix socket is supported on Linux, and tcp is supported on windows. Examples:'unix:///var/run/dockershim.sock', 'tcp://localhost:3735'")
fs.StringVar(&f.RemoteImageEndpoint, "image-service-endpoint", f.RemoteImageEndpoint, "[Experimental] The endpoint of remote image service. If not specified, it will be the same with container-runtime-endpoint by default. Currently unix socket is supported on Linux, and tcp is supported on windows. Examples:'unix:///var/run/dockershim.sock', 'tcp://localhost:3735'")
fs.StringVar(&f.RemoteRuntimeEndpoint, "container-runtime-endpoint", f.RemoteRuntimeEndpoint, "[Experimental] The endpoint of remote runtime service. Currently unix socket and tcp endpoints are supported on Linux, while npipe and tcp endpoints are supported on windows. Examples:'unix:///var/run/dockershim.sock', 'npipe:////./pipe/dockershim'")
fs.StringVar(&f.RemoteImageEndpoint, "image-service-endpoint", f.RemoteImageEndpoint, "[Experimental] The endpoint of remote image service. If not specified, it will be the same with container-runtime-endpoint by default. Currently unix socket and tcp endpoints are supported on Linux, while npipe and tcp endpoints are supported on windows. Examples:'unix:///var/run/dockershim.sock', 'npipe:////./pipe/dockershim'")
fs.BoolVar(&f.ExperimentalCheckNodeCapabilitiesBeforeMount, "experimental-check-node-capabilities-before-mount", f.ExperimentalCheckNodeCapabilitiesBeforeMount, "[Experimental] if set true, the kubelet will check the underlying node for required components (binaries, etc.) before performing the mount")
fs.BoolVar(&f.ExperimentalNodeAllocatableIgnoreEvictionThreshold, "experimental-allocatable-ignore-eviction", f.ExperimentalNodeAllocatableIgnoreEvictionThreshold, "When set to 'true', Hard Eviction Thresholds will be ignored while calculating Node Allocatable. See https://kubernetes.io/docs/tasks/administer-cluster/reserve-compute-resources/ for more details. [default=false]")
bindableNodeLabels := flag.ConfigurationMap(f.NodeLabels)

View File

@ -8,11 +8,27 @@ load(
go_test(
name = "go_default_test",
srcs = ["util_test.go"],
embed = [":go_default_library"],
deps = [
"//vendor/github.com/stretchr/testify/assert:go_default_library",
srcs = [
"util_unix_test.go",
"util_windows_test.go",
],
embed = [":go_default_library"],
deps = select({
"@io_bazel_rules_go//go/platform:darwin": [
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
"@io_bazel_rules_go//go/platform:freebsd": [
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
"@io_bazel_rules_go//go/platform:linux": [
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
"@io_bazel_rules_go//go/platform:windows": [
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
"//conditions:default": [],
}),
)
go_library(
@ -40,6 +56,9 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/golang.org/x/sys/unix:go_default_library",
],
"@io_bazel_rules_go//go/platform:windows": [
"//vendor/github.com/Microsoft/go-winio:go_default_library",
],
"//conditions:default": [],
}),
)

View File

@ -17,9 +17,6 @@ limitations under the License.
package util
import (
"fmt"
"net/url"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@ -28,20 +25,3 @@ import (
func FromApiserverCache(opts *metav1.GetOptions) {
opts.ResourceVersion = "0"
}
func parseEndpoint(endpoint string) (string, string, error) {
u, err := url.Parse(endpoint)
if err != nil {
return "", "", err
}
if u.Scheme == "tcp" {
return "tcp", u.Host, nil
} else if u.Scheme == "unix" {
return "unix", u.Path, nil
} else if u.Scheme == "" {
return "", "", fmt.Errorf("Using %q as endpoint is deprecated, please consider using full url format", endpoint)
} else {
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
}
}

View File

@ -21,6 +21,7 @@ package util
import (
"fmt"
"net"
"net/url"
"os"
"time"
@ -77,3 +78,24 @@ func parseEndpointWithFallbackProtocol(endpoint string, fallbackProtocol string)
}
return
}
func parseEndpoint(endpoint string) (string, string, error) {
u, err := url.Parse(endpoint)
if err != nil {
return "", "", err
}
switch u.Scheme {
case "tcp":
return "tcp", u.Host, nil
case "unix":
return "unix", u.Path, nil
case "":
return "", "", fmt.Errorf("Using %q as endpoint is deprecated, please consider using full url format", endpoint)
default:
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
}
}

View File

@ -1,5 +1,7 @@
// +build freebsd linux darwin
/*
Copyright 2017 The Kubernetes Authors.
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -39,6 +41,11 @@ func TestParseEndpoint(t *testing.T) {
expectedProtocol: "tcp",
expectedAddr: "localhost:15880",
},
{
endpoint: "npipe://./pipe/mypipe",
expectedProtocol: "npipe",
expectError: true,
},
{
endpoint: "tcp1://abc",
expectedProtocol: "tcp1",

View File

@ -21,11 +21,16 @@ package util
import (
"fmt"
"net"
"net/url"
"strings"
"time"
"github.com/Microsoft/go-winio"
)
const (
tcpProtocol = "tcp"
tcpProtocol = "tcp"
npipeProtocol = "npipe"
)
func CreateListener(endpoint string) (net.Listener, error) {
@ -33,11 +38,17 @@ func CreateListener(endpoint string) (net.Listener, error) {
if err != nil {
return nil, err
}
if protocol != tcpProtocol {
return nil, fmt.Errorf("only support tcp endpoint")
}
return net.Listen(protocol, addr)
switch protocol {
case tcpProtocol:
return net.Listen(tcpProtocol, addr)
case npipeProtocol:
return winio.ListenPipe(addr, nil)
default:
return nil, fmt.Errorf("only support tcp and npipe endpoint")
}
}
func GetAddressAndDialer(endpoint string) (string, func(addr string, timeout time.Duration) (net.Conn, error), error) {
@ -45,13 +56,50 @@ func GetAddressAndDialer(endpoint string) (string, func(addr string, timeout tim
if err != nil {
return "", nil, err
}
if protocol != tcpProtocol {
return "", nil, fmt.Errorf("only support tcp endpoint")
if protocol == tcpProtocol {
return addr, tcpDial, nil
}
return addr, dial, nil
if protocol == npipeProtocol {
return addr, npipeDial, nil
}
return "", nil, fmt.Errorf("only support tcp and npipe endpoint")
}
func dial(addr string, timeout time.Duration) (net.Conn, error) {
func tcpDial(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout(tcpProtocol, addr, timeout)
}
func npipeDial(addr string, timeout time.Duration) (net.Conn, error) {
return winio.DialPipe(addr, &timeout)
}
func parseEndpoint(endpoint string) (string, string, error) {
// url.Parse doesn't recognize \, so replace with / first.
endpoint = strings.Replace(endpoint, "\\", "/", -1)
u, err := url.Parse(endpoint)
if err != nil {
return "", "", err
}
if u.Scheme == "tcp" {
return "tcp", u.Host, nil
} else if u.Scheme == "npipe" {
if strings.HasPrefix(u.Path, "//./pipe") {
return "npipe", u.Path, nil
}
// fallback host if not provided.
host := u.Host
if host == "" {
host = "."
}
return "npipe", fmt.Sprintf("//%s%s", host, u.Path), nil
} else if u.Scheme == "" {
return "", "", fmt.Errorf("Using %q as endpoint is deprecated, please consider using full url format", endpoint)
} else {
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
}
}

View File

@ -0,0 +1,92 @@
// +build windows
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestParseEndpoint(t *testing.T) {
tests := []struct {
endpoint string
expectError bool
expectedProtocol string
expectedAddr string
}{
{
endpoint: "unix:///tmp/s1.sock",
expectedProtocol: "unix",
expectError: true,
},
{
endpoint: "tcp://localhost:15880",
expectedProtocol: "tcp",
expectedAddr: "localhost:15880",
},
{
endpoint: "npipe://./pipe/mypipe",
expectedProtocol: "npipe",
expectedAddr: "//./pipe/mypipe",
},
{
endpoint: "npipe:////./pipe/mypipe2",
expectedProtocol: "npipe",
expectedAddr: "//./pipe/mypipe2",
},
{
endpoint: "npipe:/pipe/mypipe3",
expectedProtocol: "npipe",
expectedAddr: "//./pipe/mypipe3",
},
{
endpoint: "npipe:\\\\.\\pipe\\mypipe4",
expectedProtocol: "npipe",
expectedAddr: "//./pipe/mypipe4",
},
{
endpoint: "npipe:\\pipe\\mypipe5",
expectedProtocol: "npipe",
expectedAddr: "//./pipe/mypipe5",
},
{
endpoint: "tcp1://abc",
expectedProtocol: "tcp1",
expectError: true,
},
{
endpoint: "a b c",
expectError: true,
},
}
for _, test := range tests {
protocol, addr, err := parseEndpoint(test.endpoint)
assert.Equal(t, test.expectedProtocol, protocol)
if test.expectError {
assert.NotNil(t, err, "Expect error during parsing %q", test.endpoint)
continue
}
require.Nil(t, err, "Expect no error during parsing %q", test.endpoint)
assert.Equal(t, test.expectedAddr, addr)
}
}