mirror of https://github.com/k3s-io/k3s
Merge pull request #11968 from brendandburns/flow
Optionally limit flow on all upgraded proxy connectionspull/6/head
commit
85781b674d
|
@ -425,6 +425,10 @@
|
|||
"ImportPath": "github.com/mitchellh/mapstructure",
|
||||
"Rev": "740c764bc6149d3f1806231418adb9f52c11bcbf"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/mxk/go-flowrate/flowrate",
|
||||
"Rev": "cca7078d478f8520f85629ad7c68962d31ed7682"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/onsi/ginkgo",
|
||||
"Comment": "v1.2.0-6-gd981d36",
|
||||
|
|
267
Godeps/_workspace/src/github.com/mxk/go-flowrate/flowrate/flowrate.go
generated
vendored
Normal file
267
Godeps/_workspace/src/github.com/mxk/go-flowrate/flowrate/flowrate.go
generated
vendored
Normal file
|
@ -0,0 +1,267 @@
|
|||
//
|
||||
// Written by Maxim Khitrov (November 2012)
|
||||
//
|
||||
|
||||
// Package flowrate provides the tools for monitoring and limiting the flow rate
|
||||
// of an arbitrary data stream.
|
||||
package flowrate
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Monitor monitors and limits the transfer rate of a data stream.
|
||||
type Monitor struct {
|
||||
mu sync.Mutex // Mutex guarding access to all internal fields
|
||||
active bool // Flag indicating an active transfer
|
||||
start time.Duration // Transfer start time (clock() value)
|
||||
bytes int64 // Total number of bytes transferred
|
||||
samples int64 // Total number of samples taken
|
||||
|
||||
rSample float64 // Most recent transfer rate sample (bytes per second)
|
||||
rEMA float64 // Exponential moving average of rSample
|
||||
rPeak float64 // Peak transfer rate (max of all rSamples)
|
||||
rWindow float64 // rEMA window (seconds)
|
||||
|
||||
sBytes int64 // Number of bytes transferred since sLast
|
||||
sLast time.Duration // Most recent sample time (stop time when inactive)
|
||||
sRate time.Duration // Sampling rate
|
||||
|
||||
tBytes int64 // Number of bytes expected in the current transfer
|
||||
tLast time.Duration // Time of the most recent transfer of at least 1 byte
|
||||
}
|
||||
|
||||
// New creates a new flow control monitor. Instantaneous transfer rate is
|
||||
// measured and updated for each sampleRate interval. windowSize determines the
|
||||
// weight of each sample in the exponential moving average (EMA) calculation.
|
||||
// The exact formulas are:
|
||||
//
|
||||
// sampleTime = currentTime - prevSampleTime
|
||||
// sampleRate = byteCount / sampleTime
|
||||
// weight = 1 - exp(-sampleTime/windowSize)
|
||||
// newRate = weight*sampleRate + (1-weight)*oldRate
|
||||
//
|
||||
// The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s,
|
||||
// respectively.
|
||||
func New(sampleRate, windowSize time.Duration) *Monitor {
|
||||
if sampleRate = clockRound(sampleRate); sampleRate <= 0 {
|
||||
sampleRate = 5 * clockRate
|
||||
}
|
||||
if windowSize <= 0 {
|
||||
windowSize = 1 * time.Second
|
||||
}
|
||||
now := clock()
|
||||
return &Monitor{
|
||||
active: true,
|
||||
start: now,
|
||||
rWindow: windowSize.Seconds(),
|
||||
sLast: now,
|
||||
sRate: sampleRate,
|
||||
tLast: now,
|
||||
}
|
||||
}
|
||||
|
||||
// Update records the transfer of n bytes and returns n. It should be called
|
||||
// after each Read/Write operation, even if n is 0.
|
||||
func (m *Monitor) Update(n int) int {
|
||||
m.mu.Lock()
|
||||
m.update(n)
|
||||
m.mu.Unlock()
|
||||
return n
|
||||
}
|
||||
|
||||
// IO is a convenience method intended to wrap io.Reader and io.Writer method
|
||||
// execution. It calls m.Update(n) and then returns (n, err) unmodified.
|
||||
func (m *Monitor) IO(n int, err error) (int, error) {
|
||||
return m.Update(n), err
|
||||
}
|
||||
|
||||
// Done marks the transfer as finished and prevents any further updates or
|
||||
// limiting. Instantaneous and current transfer rates drop to 0. Update, IO, and
|
||||
// Limit methods become NOOPs. It returns the total number of bytes transferred.
|
||||
func (m *Monitor) Done() int64 {
|
||||
m.mu.Lock()
|
||||
if now := m.update(0); m.sBytes > 0 {
|
||||
m.reset(now)
|
||||
}
|
||||
m.active = false
|
||||
m.tLast = 0
|
||||
n := m.bytes
|
||||
m.mu.Unlock()
|
||||
return n
|
||||
}
|
||||
|
||||
// timeRemLimit is the maximum Status.TimeRem value.
|
||||
const timeRemLimit = 999*time.Hour + 59*time.Minute + 59*time.Second
|
||||
|
||||
// Status represents the current Monitor status. All transfer rates are in bytes
|
||||
// per second rounded to the nearest byte.
|
||||
type Status struct {
|
||||
Active bool // Flag indicating an active transfer
|
||||
Start time.Time // Transfer start time
|
||||
Duration time.Duration // Time period covered by the statistics
|
||||
Idle time.Duration // Time since the last transfer of at least 1 byte
|
||||
Bytes int64 // Total number of bytes transferred
|
||||
Samples int64 // Total number of samples taken
|
||||
InstRate int64 // Instantaneous transfer rate
|
||||
CurRate int64 // Current transfer rate (EMA of InstRate)
|
||||
AvgRate int64 // Average transfer rate (Bytes / Duration)
|
||||
PeakRate int64 // Maximum instantaneous transfer rate
|
||||
BytesRem int64 // Number of bytes remaining in the transfer
|
||||
TimeRem time.Duration // Estimated time to completion
|
||||
Progress Percent // Overall transfer progress
|
||||
}
|
||||
|
||||
// Status returns current transfer status information. The returned value
|
||||
// becomes static after a call to Done.
|
||||
func (m *Monitor) Status() Status {
|
||||
m.mu.Lock()
|
||||
now := m.update(0)
|
||||
s := Status{
|
||||
Active: m.active,
|
||||
Start: clockToTime(m.start),
|
||||
Duration: m.sLast - m.start,
|
||||
Idle: now - m.tLast,
|
||||
Bytes: m.bytes,
|
||||
Samples: m.samples,
|
||||
PeakRate: round(m.rPeak),
|
||||
BytesRem: m.tBytes - m.bytes,
|
||||
Progress: percentOf(float64(m.bytes), float64(m.tBytes)),
|
||||
}
|
||||
if s.BytesRem < 0 {
|
||||
s.BytesRem = 0
|
||||
}
|
||||
if s.Duration > 0 {
|
||||
rAvg := float64(s.Bytes) / s.Duration.Seconds()
|
||||
s.AvgRate = round(rAvg)
|
||||
if s.Active {
|
||||
s.InstRate = round(m.rSample)
|
||||
s.CurRate = round(m.rEMA)
|
||||
if s.BytesRem > 0 {
|
||||
if tRate := 0.8*m.rEMA + 0.2*rAvg; tRate > 0 {
|
||||
ns := float64(s.BytesRem) / tRate * 1e9
|
||||
if ns > float64(timeRemLimit) {
|
||||
ns = float64(timeRemLimit)
|
||||
}
|
||||
s.TimeRem = clockRound(time.Duration(ns))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
m.mu.Unlock()
|
||||
return s
|
||||
}
|
||||
|
||||
// Limit restricts the instantaneous (per-sample) data flow to rate bytes per
|
||||
// second. It returns the maximum number of bytes (0 <= n <= want) that may be
|
||||
// transferred immediately without exceeding the limit. If block == true, the
|
||||
// call blocks until n > 0. want is returned unmodified if want < 1, rate < 1,
|
||||
// or the transfer is inactive (after a call to Done).
|
||||
//
|
||||
// At least one byte is always allowed to be transferred in any given sampling
|
||||
// period. Thus, if the sampling rate is 100ms, the lowest achievable flow rate
|
||||
// is 10 bytes per second.
|
||||
//
|
||||
// For usage examples, see the implementation of Reader and Writer in io.go.
|
||||
func (m *Monitor) Limit(want int, rate int64, block bool) (n int) {
|
||||
if want < 1 || rate < 1 {
|
||||
return want
|
||||
}
|
||||
m.mu.Lock()
|
||||
|
||||
// Determine the maximum number of bytes that can be sent in one sample
|
||||
limit := round(float64(rate) * m.sRate.Seconds())
|
||||
if limit <= 0 {
|
||||
limit = 1
|
||||
}
|
||||
|
||||
// If block == true, wait until m.sBytes < limit
|
||||
if now := m.update(0); block {
|
||||
for m.sBytes >= limit && m.active {
|
||||
now = m.waitNextSample(now)
|
||||
}
|
||||
}
|
||||
|
||||
// Make limit <= want (unlimited if the transfer is no longer active)
|
||||
if limit -= m.sBytes; limit > int64(want) || !m.active {
|
||||
limit = int64(want)
|
||||
}
|
||||
m.mu.Unlock()
|
||||
|
||||
if limit < 0 {
|
||||
limit = 0
|
||||
}
|
||||
return int(limit)
|
||||
}
|
||||
|
||||
// SetTransferSize specifies the total size of the data transfer, which allows
|
||||
// the Monitor to calculate the overall progress and time to completion.
|
||||
func (m *Monitor) SetTransferSize(bytes int64) {
|
||||
if bytes < 0 {
|
||||
bytes = 0
|
||||
}
|
||||
m.mu.Lock()
|
||||
m.tBytes = bytes
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
// update accumulates the transferred byte count for the current sample until
|
||||
// clock() - m.sLast >= m.sRate. The monitor status is updated once the current
|
||||
// sample is done.
|
||||
func (m *Monitor) update(n int) (now time.Duration) {
|
||||
if !m.active {
|
||||
return
|
||||
}
|
||||
if now = clock(); n > 0 {
|
||||
m.tLast = now
|
||||
}
|
||||
m.sBytes += int64(n)
|
||||
if sTime := now - m.sLast; sTime >= m.sRate {
|
||||
t := sTime.Seconds()
|
||||
if m.rSample = float64(m.sBytes) / t; m.rSample > m.rPeak {
|
||||
m.rPeak = m.rSample
|
||||
}
|
||||
|
||||
// Exponential moving average using a method similar to *nix load
|
||||
// average calculation. Longer sampling periods carry greater weight.
|
||||
if m.samples > 0 {
|
||||
w := math.Exp(-t / m.rWindow)
|
||||
m.rEMA = m.rSample + w*(m.rEMA-m.rSample)
|
||||
} else {
|
||||
m.rEMA = m.rSample
|
||||
}
|
||||
m.reset(now)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// reset clears the current sample state in preparation for the next sample.
|
||||
func (m *Monitor) reset(sampleTime time.Duration) {
|
||||
m.bytes += m.sBytes
|
||||
m.samples++
|
||||
m.sBytes = 0
|
||||
m.sLast = sampleTime
|
||||
}
|
||||
|
||||
// waitNextSample sleeps for the remainder of the current sample. The lock is
|
||||
// released and reacquired during the actual sleep period, so it's possible for
|
||||
// the transfer to be inactive when this method returns.
|
||||
func (m *Monitor) waitNextSample(now time.Duration) time.Duration {
|
||||
const minWait = 5 * time.Millisecond
|
||||
current := m.sLast
|
||||
|
||||
// sleep until the last sample time changes (ideally, just one iteration)
|
||||
for m.sLast == current && m.active {
|
||||
d := current + m.sRate - now
|
||||
m.mu.Unlock()
|
||||
if d < minWait {
|
||||
d = minWait
|
||||
}
|
||||
time.Sleep(d)
|
||||
m.mu.Lock()
|
||||
now = m.update(0)
|
||||
}
|
||||
return now
|
||||
}
|
|
@ -0,0 +1,133 @@
|
|||
//
|
||||
// Written by Maxim Khitrov (November 2012)
|
||||
//
|
||||
|
||||
package flowrate
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
)
|
||||
|
||||
// ErrLimit is returned by the Writer when a non-blocking write is short due to
|
||||
// the transfer rate limit.
|
||||
var ErrLimit = errors.New("flowrate: flow rate limit exceeded")
|
||||
|
||||
// Limiter is implemented by the Reader and Writer to provide a consistent
|
||||
// interface for monitoring and controlling data transfer.
|
||||
type Limiter interface {
|
||||
Done() int64
|
||||
Status() Status
|
||||
SetTransferSize(bytes int64)
|
||||
SetLimit(new int64) (old int64)
|
||||
SetBlocking(new bool) (old bool)
|
||||
}
|
||||
|
||||
// Reader implements io.ReadCloser with a restriction on the rate of data
|
||||
// transfer.
|
||||
type Reader struct {
|
||||
io.Reader // Data source
|
||||
*Monitor // Flow control monitor
|
||||
|
||||
limit int64 // Rate limit in bytes per second (unlimited when <= 0)
|
||||
block bool // What to do when no new bytes can be read due to the limit
|
||||
}
|
||||
|
||||
// NewReader restricts all Read operations on r to limit bytes per second.
|
||||
func NewReader(r io.Reader, limit int64) *Reader {
|
||||
return &Reader{r, New(0, 0), limit, true}
|
||||
}
|
||||
|
||||
// Read reads up to len(p) bytes into p without exceeding the current transfer
|
||||
// rate limit. It returns (0, nil) immediately if r is non-blocking and no new
|
||||
// bytes can be read at this time.
|
||||
func (r *Reader) Read(p []byte) (n int, err error) {
|
||||
p = p[:r.Limit(len(p), r.limit, r.block)]
|
||||
if len(p) > 0 {
|
||||
n, err = r.IO(r.Reader.Read(p))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// SetLimit changes the transfer rate limit to new bytes per second and returns
|
||||
// the previous setting.
|
||||
func (r *Reader) SetLimit(new int64) (old int64) {
|
||||
old, r.limit = r.limit, new
|
||||
return
|
||||
}
|
||||
|
||||
// SetBlocking changes the blocking behavior and returns the previous setting. A
|
||||
// Read call on a non-blocking reader returns immediately if no additional bytes
|
||||
// may be read at this time due to the rate limit.
|
||||
func (r *Reader) SetBlocking(new bool) (old bool) {
|
||||
old, r.block = r.block, new
|
||||
return
|
||||
}
|
||||
|
||||
// Close closes the underlying reader if it implements the io.Closer interface.
|
||||
func (r *Reader) Close() error {
|
||||
defer r.Done()
|
||||
if c, ok := r.Reader.(io.Closer); ok {
|
||||
return c.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Writer implements io.WriteCloser with a restriction on the rate of data
|
||||
// transfer.
|
||||
type Writer struct {
|
||||
io.Writer // Data destination
|
||||
*Monitor // Flow control monitor
|
||||
|
||||
limit int64 // Rate limit in bytes per second (unlimited when <= 0)
|
||||
block bool // What to do when no new bytes can be written due to the limit
|
||||
}
|
||||
|
||||
// NewWriter restricts all Write operations on w to limit bytes per second. The
|
||||
// transfer rate and the default blocking behavior (true) can be changed
|
||||
// directly on the returned *Writer.
|
||||
func NewWriter(w io.Writer, limit int64) *Writer {
|
||||
return &Writer{w, New(0, 0), limit, true}
|
||||
}
|
||||
|
||||
// Write writes len(p) bytes from p to the underlying data stream without
|
||||
// exceeding the current transfer rate limit. It returns (n, ErrLimit) if w is
|
||||
// non-blocking and no additional bytes can be written at this time.
|
||||
func (w *Writer) Write(p []byte) (n int, err error) {
|
||||
var c int
|
||||
for len(p) > 0 && err == nil {
|
||||
s := p[:w.Limit(len(p), w.limit, w.block)]
|
||||
if len(s) > 0 {
|
||||
c, err = w.IO(w.Writer.Write(s))
|
||||
} else {
|
||||
return n, ErrLimit
|
||||
}
|
||||
p = p[c:]
|
||||
n += c
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// SetLimit changes the transfer rate limit to new bytes per second and returns
|
||||
// the previous setting.
|
||||
func (w *Writer) SetLimit(new int64) (old int64) {
|
||||
old, w.limit = w.limit, new
|
||||
return
|
||||
}
|
||||
|
||||
// SetBlocking changes the blocking behavior and returns the previous setting. A
|
||||
// Write call on a non-blocking writer returns as soon as no additional bytes
|
||||
// may be written at this time due to the rate limit.
|
||||
func (w *Writer) SetBlocking(new bool) (old bool) {
|
||||
old, w.block = w.block, new
|
||||
return
|
||||
}
|
||||
|
||||
// Close closes the underlying writer if it implements the io.Closer interface.
|
||||
func (w *Writer) Close() error {
|
||||
defer w.Done()
|
||||
if c, ok := w.Writer.(io.Closer); ok {
|
||||
return c.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
146
Godeps/_workspace/src/github.com/mxk/go-flowrate/flowrate/io_test.go
generated
vendored
Normal file
146
Godeps/_workspace/src/github.com/mxk/go-flowrate/flowrate/io_test.go
generated
vendored
Normal file
|
@ -0,0 +1,146 @@
|
|||
//
|
||||
// Written by Maxim Khitrov (November 2012)
|
||||
//
|
||||
|
||||
package flowrate
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
_50ms = 50 * time.Millisecond
|
||||
_100ms = 100 * time.Millisecond
|
||||
_200ms = 200 * time.Millisecond
|
||||
_300ms = 300 * time.Millisecond
|
||||
_400ms = 400 * time.Millisecond
|
||||
_500ms = 500 * time.Millisecond
|
||||
)
|
||||
|
||||
func nextStatus(m *Monitor) Status {
|
||||
samples := m.samples
|
||||
for i := 0; i < 30; i++ {
|
||||
if s := m.Status(); s.Samples != samples {
|
||||
return s
|
||||
}
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
}
|
||||
return m.Status()
|
||||
}
|
||||
|
||||
func TestReader(t *testing.T) {
|
||||
in := make([]byte, 100)
|
||||
for i := range in {
|
||||
in[i] = byte(i)
|
||||
}
|
||||
b := make([]byte, 100)
|
||||
r := NewReader(bytes.NewReader(in), 100)
|
||||
start := time.Now()
|
||||
|
||||
// Make sure r implements Limiter
|
||||
_ = Limiter(r)
|
||||
|
||||
// 1st read of 10 bytes is performed immediately
|
||||
if n, err := r.Read(b); n != 10 || err != nil {
|
||||
t.Fatalf("r.Read(b) expected 10 (<nil>); got %v (%v)", n, err)
|
||||
} else if rt := time.Since(start); rt > _50ms {
|
||||
t.Fatalf("r.Read(b) took too long (%v)", rt)
|
||||
}
|
||||
|
||||
// No new Reads allowed in the current sample
|
||||
r.SetBlocking(false)
|
||||
if n, err := r.Read(b); n != 0 || err != nil {
|
||||
t.Fatalf("r.Read(b) expected 0 (<nil>); got %v (%v)", n, err)
|
||||
} else if rt := time.Since(start); rt > _50ms {
|
||||
t.Fatalf("r.Read(b) took too long (%v)", rt)
|
||||
}
|
||||
|
||||
status := [6]Status{0: r.Status()} // No samples in the first status
|
||||
|
||||
// 2nd read of 10 bytes blocks until the next sample
|
||||
r.SetBlocking(true)
|
||||
if n, err := r.Read(b[10:]); n != 10 || err != nil {
|
||||
t.Fatalf("r.Read(b[10:]) expected 10 (<nil>); got %v (%v)", n, err)
|
||||
} else if rt := time.Since(start); rt < _100ms {
|
||||
t.Fatalf("r.Read(b[10:]) returned ahead of time (%v)", rt)
|
||||
}
|
||||
|
||||
status[1] = r.Status() // 1st sample
|
||||
status[2] = nextStatus(r.Monitor) // 2nd sample
|
||||
status[3] = nextStatus(r.Monitor) // No activity for the 3rd sample
|
||||
|
||||
if n := r.Done(); n != 20 {
|
||||
t.Fatalf("r.Done() expected 20; got %v", n)
|
||||
}
|
||||
|
||||
status[4] = r.Status()
|
||||
status[5] = nextStatus(r.Monitor) // Timeout
|
||||
start = status[0].Start
|
||||
|
||||
// Active, Start, Duration, Idle, Bytes, Samples, InstRate, CurRate, AvgRate, PeakRate, BytesRem, TimeRem, Progress
|
||||
want := []Status{
|
||||
Status{true, start, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
Status{true, start, _100ms, 0, 10, 1, 100, 100, 100, 100, 0, 0, 0},
|
||||
Status{true, start, _200ms, _100ms, 20, 2, 100, 100, 100, 100, 0, 0, 0},
|
||||
Status{true, start, _300ms, _200ms, 20, 3, 0, 90, 67, 100, 0, 0, 0},
|
||||
Status{false, start, _300ms, 0, 20, 3, 0, 0, 67, 100, 0, 0, 0},
|
||||
Status{false, start, _300ms, 0, 20, 3, 0, 0, 67, 100, 0, 0, 0},
|
||||
}
|
||||
for i, s := range status {
|
||||
if !reflect.DeepEqual(&s, &want[i]) {
|
||||
t.Errorf("r.Status(%v) expected %v; got %v", i, want[i], s)
|
||||
}
|
||||
}
|
||||
if !bytes.Equal(b[:20], in[:20]) {
|
||||
t.Errorf("r.Read() input doesn't match output")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriter(t *testing.T) {
|
||||
b := make([]byte, 100)
|
||||
for i := range b {
|
||||
b[i] = byte(i)
|
||||
}
|
||||
w := NewWriter(&bytes.Buffer{}, 200)
|
||||
start := time.Now()
|
||||
|
||||
// Make sure w implements Limiter
|
||||
_ = Limiter(w)
|
||||
|
||||
// Non-blocking 20-byte write for the first sample returns ErrLimit
|
||||
w.SetBlocking(false)
|
||||
if n, err := w.Write(b); n != 20 || err != ErrLimit {
|
||||
t.Fatalf("w.Write(b) expected 20 (ErrLimit); got %v (%v)", n, err)
|
||||
} else if rt := time.Since(start); rt > _50ms {
|
||||
t.Fatalf("w.Write(b) took too long (%v)", rt)
|
||||
}
|
||||
|
||||
// Blocking 80-byte write
|
||||
w.SetBlocking(true)
|
||||
if n, err := w.Write(b[20:]); n != 80 || err != nil {
|
||||
t.Fatalf("w.Write(b[20:]) expected 80 (<nil>); got %v (%v)", n, err)
|
||||
} else if rt := time.Since(start); rt < _400ms {
|
||||
t.Fatalf("w.Write(b[20:]) returned ahead of time (%v)", rt)
|
||||
}
|
||||
|
||||
w.SetTransferSize(100)
|
||||
status := []Status{w.Status(), nextStatus(w.Monitor)}
|
||||
start = status[0].Start
|
||||
|
||||
// Active, Start, Duration, Idle, Bytes, Samples, InstRate, CurRate, AvgRate, PeakRate, BytesRem, TimeRem, Progress
|
||||
want := []Status{
|
||||
Status{true, start, _400ms, 0, 80, 4, 200, 200, 200, 200, 20, _100ms, 80000},
|
||||
Status{true, start, _500ms, _100ms, 100, 5, 200, 200, 200, 200, 0, 0, 100000},
|
||||
}
|
||||
for i, s := range status {
|
||||
if !reflect.DeepEqual(&s, &want[i]) {
|
||||
t.Errorf("w.Status(%v) expected %v; got %v", i, want[i], s)
|
||||
}
|
||||
}
|
||||
if !bytes.Equal(b, w.Writer.(*bytes.Buffer).Bytes()) {
|
||||
t.Errorf("w.Write() input doesn't match output")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
//
|
||||
// Written by Maxim Khitrov (November 2012)
|
||||
//
|
||||
|
||||
package flowrate
|
||||
|
||||
import (
|
||||
"math"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
// clockRate is the resolution and precision of clock().
|
||||
const clockRate = 20 * time.Millisecond
|
||||
|
||||
// czero is the process start time rounded down to the nearest clockRate
|
||||
// increment.
|
||||
var czero = time.Duration(time.Now().UnixNano()) / clockRate * clockRate
|
||||
|
||||
// clock returns a low resolution timestamp relative to the process start time.
|
||||
func clock() time.Duration {
|
||||
return time.Duration(time.Now().UnixNano())/clockRate*clockRate - czero
|
||||
}
|
||||
|
||||
// clockToTime converts a clock() timestamp to an absolute time.Time value.
|
||||
func clockToTime(c time.Duration) time.Time {
|
||||
return time.Unix(0, int64(czero+c))
|
||||
}
|
||||
|
||||
// clockRound returns d rounded to the nearest clockRate increment.
|
||||
func clockRound(d time.Duration) time.Duration {
|
||||
return (d + clockRate>>1) / clockRate * clockRate
|
||||
}
|
||||
|
||||
// round returns x rounded to the nearest int64 (non-negative values only).
|
||||
func round(x float64) int64 {
|
||||
if _, frac := math.Modf(x); frac >= 0.5 {
|
||||
return int64(math.Ceil(x))
|
||||
}
|
||||
return int64(math.Floor(x))
|
||||
}
|
||||
|
||||
// Percent represents a percentage in increments of 1/1000th of a percent.
|
||||
type Percent uint32
|
||||
|
||||
// percentOf calculates what percent of the total is x.
|
||||
func percentOf(x, total float64) Percent {
|
||||
if x < 0 || total <= 0 {
|
||||
return 0
|
||||
} else if p := round(x / total * 1e5); p <= math.MaxUint32 {
|
||||
return Percent(p)
|
||||
}
|
||||
return Percent(math.MaxUint32)
|
||||
}
|
||||
|
||||
func (p Percent) Float() float64 {
|
||||
return float64(p) * 1e-3
|
||||
}
|
||||
|
||||
func (p Percent) String() string {
|
||||
var buf [12]byte
|
||||
b := strconv.AppendUint(buf[:0], uint64(p)/1000, 10)
|
||||
n := len(b)
|
||||
b = strconv.AppendUint(b, 1000+uint64(p)%1000, 10)
|
||||
b[n] = '.'
|
||||
return string(append(b, '%'))
|
||||
}
|
|
@ -101,6 +101,7 @@ type APIServer struct {
|
|||
LongRunningRequestRE string
|
||||
SSHUser string
|
||||
SSHKeyfile string
|
||||
MaxConnectionBytesPerSec int64
|
||||
}
|
||||
|
||||
// NewAPIServer creates a new APIServer object with default parameters
|
||||
|
@ -205,6 +206,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
|
|||
fs.StringVar(&s.LongRunningRequestRE, "long-running-request-regexp", defaultLongRunningRequestRE, "A regular expression matching long running requests which should be excluded from maximum inflight request handling.")
|
||||
fs.StringVar(&s.SSHUser, "ssh-user", "", "If non-empty, use secure SSH proxy to the nodes, using this user name")
|
||||
fs.StringVar(&s.SSHKeyfile, "ssh-keyfile", "", "If non-empty, use secure SSH proxy to the nodes, using this user keyfile")
|
||||
fs.Int64Var(&s.MaxConnectionBytesPerSec, "max-connection-bytes-per-sec", 0, "If non-zero, throttle each user connection to this number of bytes/sec. Currently only applies to long-running requests")
|
||||
}
|
||||
|
||||
// TODO: Longer term we should read this from some config store, rather than a flag.
|
||||
|
@ -255,7 +257,8 @@ func (s *APIServer) Run(_ []string) error {
|
|||
capabilities.Initialize(capabilities.Capabilities{
|
||||
AllowPrivileged: s.AllowPrivileged,
|
||||
// TODO(vmarmol): Implement support for HostNetworkSources.
|
||||
HostNetworkSources: []string{},
|
||||
HostNetworkSources: []string{},
|
||||
PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
|
||||
})
|
||||
|
||||
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
|
||||
|
|
|
@ -604,7 +604,7 @@ func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) error {
|
|||
} else {
|
||||
glog.Warning("No api server defined - no events will be sent to API server.")
|
||||
}
|
||||
capabilities.Setup(kcfg.AllowPrivileged, kcfg.HostNetworkSources)
|
||||
capabilities.Setup(kcfg.AllowPrivileged, kcfg.HostNetworkSources, 0)
|
||||
|
||||
credentialprovider.SetPreferredDockercfgPath(kcfg.RootDirectory)
|
||||
|
||||
|
|
|
@ -27,6 +27,9 @@ type Capabilities struct {
|
|||
|
||||
// List of pod sources for which using host network is allowed.
|
||||
HostNetworkSources []string
|
||||
|
||||
// PerConnectionBandwidthLimitBytesPerSec limits the throughput of each connection (currently only used for proxy, exec, attach)
|
||||
PerConnectionBandwidthLimitBytesPerSec int64
|
||||
}
|
||||
|
||||
// TODO: Clean these up into a singleton
|
||||
|
@ -43,10 +46,11 @@ func Initialize(c Capabilities) {
|
|||
}
|
||||
|
||||
// Setup the capability set. It wraps Initialize for improving usibility.
|
||||
func Setup(allowPrivileged bool, hostNetworkSources []string) {
|
||||
func Setup(allowPrivileged bool, hostNetworkSources []string, perConnectionBytesPerSec int64) {
|
||||
Initialize(Capabilities{
|
||||
AllowPrivileged: allowPrivileged,
|
||||
HostNetworkSources: hostNetworkSources,
|
||||
AllowPrivileged: allowPrivileged,
|
||||
HostNetworkSources: hostNetworkSources,
|
||||
PerConnectionBandwidthLimitBytesPerSec: perConnectionBytesPerSec,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ import (
|
|||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/third_party/golang/netutil"
|
||||
"github.com/golang/glog"
|
||||
"github.com/mxk/go-flowrate/flowrate"
|
||||
)
|
||||
|
||||
// UpgradeAwareProxyHandler is a handler for proxy requests that may require an upgrade
|
||||
|
@ -42,6 +43,7 @@ type UpgradeAwareProxyHandler struct {
|
|||
Location *url.URL
|
||||
Transport http.RoundTripper
|
||||
FlushInterval time.Duration
|
||||
MaxBytesPerSec int64
|
||||
err error
|
||||
}
|
||||
|
||||
|
@ -152,7 +154,13 @@ func (h *UpgradeAwareProxyHandler) tryUpgrade(w http.ResponseWriter, req *http.R
|
|||
wg.Add(2)
|
||||
|
||||
go func() {
|
||||
_, err := io.Copy(backendConn, requestHijackedConn)
|
||||
var writer io.WriteCloser
|
||||
if h.MaxBytesPerSec > 0 {
|
||||
writer = flowrate.NewWriter(backendConn, h.MaxBytesPerSec)
|
||||
} else {
|
||||
writer = backendConn
|
||||
}
|
||||
_, err := io.Copy(writer, requestHijackedConn)
|
||||
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
|
||||
glog.Errorf("Error proxying data from client to backend: %v", err)
|
||||
}
|
||||
|
@ -160,7 +168,13 @@ func (h *UpgradeAwareProxyHandler) tryUpgrade(w http.ResponseWriter, req *http.R
|
|||
}()
|
||||
|
||||
go func() {
|
||||
_, err := io.Copy(requestHijackedConn, backendConn)
|
||||
var reader io.ReadCloser
|
||||
if h.MaxBytesPerSec > 0 {
|
||||
reader = flowrate.NewReader(backendConn, h.MaxBytesPerSec)
|
||||
} else {
|
||||
reader = backendConn
|
||||
}
|
||||
_, err := io.Copy(requestHijackedConn, reader)
|
||||
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
|
||||
glog.Errorf("Error proxying data from backend to client: %v", err)
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
|
@ -277,7 +278,7 @@ func (r *ProxyREST) Connect(ctx api.Context, id string, opts runtime.Object) (re
|
|||
return nil, err
|
||||
}
|
||||
location.Path = path.Join(location.Path, proxyOpts.Path)
|
||||
return genericrest.NewUpgradeAwareProxyHandler(location, nil, false), nil
|
||||
return newUpgradeAwareProxyHandler(location, nil, false), nil
|
||||
}
|
||||
|
||||
// Support both GET and POST methods. Over time, we want to move all clients to start using POST and then stop supporting GET.
|
||||
|
@ -307,7 +308,7 @@ func (r *ExecREST) Connect(ctx api.Context, name string, opts runtime.Object) (r
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return genericrest.NewUpgradeAwareProxyHandler(location, transport, true), nil
|
||||
return newUpgradeAwareProxyHandler(location, transport, true), nil
|
||||
}
|
||||
|
||||
// NewConnectOptions returns the versioned object that represents exec parameters
|
||||
|
@ -350,5 +351,11 @@ func (r *PortForwardREST) Connect(ctx api.Context, name string, opts runtime.Obj
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return genericrest.NewUpgradeAwareProxyHandler(location, transport, true), nil
|
||||
return newUpgradeAwareProxyHandler(location, transport, true), nil
|
||||
}
|
||||
|
||||
func newUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, upgradeRequired bool) *genericrest.UpgradeAwareProxyHandler {
|
||||
handler := genericrest.NewUpgradeAwareProxyHandler(location, transport, upgradeRequired)
|
||||
handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
|
||||
return handler
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue