Merge pull request #1672 from galal-hussein/helm_update

helm-controller and kine update
pull/1677/head
Erik Wilson 2020-04-23 14:09:41 -07:00 committed by GitHub
commit b2cab73d27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
62 changed files with 1660 additions and 883 deletions

View File

@ -29,7 +29,7 @@ ENV SELINUX $SELINUX
ARG DQLITE=true
ENV DQLITE $DQLITE
COPY --from=rancher/dqlite-build:v1.3.1-r1 /dist/artifacts /usr/src/
COPY --from=rancher/dqlite-build:v1.4.1-r1 /dist/artifacts /usr/src/
RUN if [ "$DQLITE" = true ]; then \
tar xzf /usr/src/dqlite.tgz -C / && \
apk add --allow-untrusted /usr/local/packages/*.apk \

11
go.mod
View File

@ -61,12 +61,13 @@ replace (
)
require (
github.com/Azure/go-autorest v14.0.1+incompatible // indirect
github.com/NYTimes/gziphandler v1.1.1 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/bhendo/go-powershell v0.0.0-20190719160123-219e7fb4e41e // indirect
github.com/bronze1man/goStrongswanVici v0.0.0-20190828090544-27d02f80ba40 // indirect
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect
github.com/canonical/go-dqlite v1.3.0
github.com/canonical/go-dqlite v1.5.1
github.com/containerd/cgroups v0.0.0-00010101000000-000000000000 // indirect
github.com/containerd/containerd v1.3.0-beta.2.0.20190828155532-0293cbd26c69
github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6 // indirect
@ -100,11 +101,11 @@ require (
github.com/pkg/errors v0.9.1
github.com/rakelkar/gonetsh v0.0.0-20190719023240-501daadcadf8 // indirect
github.com/rancher/dynamiclistener v0.2.0
github.com/rancher/helm-controller v0.4.2-0.20200326195131-eb51d4fa9d8d
github.com/rancher/kine v0.3.5
github.com/rancher/helm-controller v0.5.0
github.com/rancher/kine v0.4.0
github.com/rancher/remotedialer v0.2.0
github.com/rancher/wrangler v0.5.4-0.20200326191509-4054411d9736
github.com/rancher/wrangler-api v0.5.1-0.20200326194427-c13310506d04
github.com/rancher/wrangler v0.6.1
github.com/rancher/wrangler-api v0.6.0
github.com/rootless-containers/rootlesskit v0.7.2
github.com/sirupsen/logrus v1.4.2
github.com/spf13/pflag v1.0.5

17
go.sum
View File

@ -7,6 +7,10 @@ github.com/Azure/azure-sdk-for-go v35.0.0+incompatible h1:PkmdmQUmeSdQQ5258f4SyC
github.com/Azure/azure-sdk-for-go v35.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
github.com/Azure/go-autorest v11.1.2+incompatible h1:viZ3tV5l4gE2Sw0xrasFHytCGtzYCrT+um/rrSQ1BfA=
github.com/Azure/go-autorest v11.1.2+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest v14.0.1+incompatible h1:YhojO9jolWIvvTW7ORhz2ZSNF6Q1TbLqUunKd3jrtyw=
github.com/Azure/go-autorest v14.0.1+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.9.0 h1:MRvx8gncNaXJqOoLmhNjUAKh33JJF8LyxPhomEtOsjs=
github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI=
github.com/Azure/go-autorest/autorest/adal v0.5.0 h1:q2gDruN08/guU9vAjuPWff0+QIrpH6ediguzdAzXAUU=
@ -95,6 +99,8 @@ github.com/caddyserver/caddy v1.0.3/go.mod h1:G+ouvOY32gENkJC+jhgl62TyhvqEsFaDiZ
github.com/canonical/go-dqlite v1.2.0/go.mod h1:wp00vfMvPYgNCyxcPdHB5XExmDoCGoPUGymloAQT17Y=
github.com/canonical/go-dqlite v1.3.0 h1:c+7eGZfh0K7yCmGrBkNRGZdY8R8+2jSSkz6Zr3YCjJE=
github.com/canonical/go-dqlite v1.3.0/go.mod h1:wp00vfMvPYgNCyxcPdHB5XExmDoCGoPUGymloAQT17Y=
github.com/canonical/go-dqlite v1.5.1 h1:1YjtIrFsC1A3XlgsX38ARAiKhvkZS63PqsEd8z3T4yU=
github.com/canonical/go-dqlite v1.5.1/go.mod h1:wp00vfMvPYgNCyxcPdHB5XExmDoCGoPUGymloAQT17Y=
github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/prettybench v0.0.0-20150116022406-03b8cfe5406c/go.mod h1:Xe6ZsFhtM8HrDku0pxJ3/Lr51rwykrzgFwpmTzleatY=
@ -640,8 +646,14 @@ github.com/rancher/flannel v0.11.0-k3s.2 h1:0GVr5ORAIvcri1LYTE8eMQ+NrRbuPeIniPaW
github.com/rancher/flannel v0.11.0-k3s.2/go.mod h1:Hn4ZV+eq0LhLZP63xZnxdGwXEoRSxs5sxELxu27M3UA=
github.com/rancher/helm-controller v0.4.2-0.20200326195131-eb51d4fa9d8d h1:6w5gCRgJzWEGdGi/0Xv4XXuGZY8wgWduRA9A+4c1N8I=
github.com/rancher/helm-controller v0.4.2-0.20200326195131-eb51d4fa9d8d/go.mod h1:3jCGmvjp3bFnbeuHL4HiODje9ZYJ/ujUBNtXHFXrwlM=
github.com/rancher/helm-controller v0.5.0 h1:BY5PG3dz6GWct2O9r8mFv73tZ7E5U9uI89QwMBXV83E=
github.com/rancher/helm-controller v0.5.0/go.mod h1:kEtAI/0AylXIplxWkIRR2xl3nhd4jZ6Wke1nvE/sKUs=
github.com/rancher/kine v0.3.5 h1:Tm4eOtejpnzs1WFBrXj76lCLvX9czLlTkgqUk9luCQk=
github.com/rancher/kine v0.3.5/go.mod h1:xEMl0tLCva9/9me7mXJ3m9Vo6yqHgC4OU3NiK4CPrGQ=
github.com/rancher/kine v0.3.6-0.20200422224205-0a0f5b924129 h1:4HYvCG8+pfkBlpYLv/5ZOdxagg3jTszMOqMjamMQ0hA=
github.com/rancher/kine v0.3.6-0.20200422224205-0a0f5b924129/go.mod h1:xEMl0tLCva9/9me7mXJ3m9Vo6yqHgC4OU3NiK4CPrGQ=
github.com/rancher/kine v0.4.0 h1:1IhWy3TzjExG8xnj46eyUEWdzqNAD1WrgL4eEBKm6Uc=
github.com/rancher/kine v0.4.0/go.mod h1:IImtCJ68AIkE+VY/kUI0NkyJL5q5WzO8QvMsSXqbrpA=
github.com/rancher/kubernetes v1.18.2-k3s.1 h1:LhWNObWF7dL/+T57LkYpuRKtsCBpt0P5G6dRVFG+Ncs=
github.com/rancher/kubernetes v1.18.2-k3s.1/go.mod h1:z8xjOOO1Ljz+TaHpOxVGC7cxtF32TesIamoQ+BZrVS0=
github.com/rancher/kubernetes/staging/src/k8s.io/api v1.18.2-k3s.1 h1:tYDY9g8+xLwUcsG9T6Xg7cBkO/vgU6yv7cQKqUN6NDE=
@ -693,9 +705,14 @@ github.com/rancher/wrangler v0.4.0 h1:iLvuJcZkd38E3RGG74dFMMNEju0PeTzfT1PQiv5okV
github.com/rancher/wrangler v0.4.0/go.mod h1:1cR91WLhZgkZ+U4fV9nVuXqKurWbgXcIReU4wnQvTN8=
github.com/rancher/wrangler v0.5.4-0.20200326191509-4054411d9736 h1:hqpVLgNUxU5sQUV6SzJPMY8Fy7T9Qht2QkA2Q7O/SH0=
github.com/rancher/wrangler v0.5.4-0.20200326191509-4054411d9736/go.mod h1:L4HtjPeX8iqLgsxfJgz+JjKMcX2q3qbRXSeTlC/CSd4=
github.com/rancher/wrangler v0.6.0/go.mod h1:L4HtjPeX8iqLgsxfJgz+JjKMcX2q3qbRXSeTlC/CSd4=
github.com/rancher/wrangler v0.6.1 h1:7tyLk/FV2zCQkYg5SEtT4lSlsHNwa5yMOa797/VJhiQ=
github.com/rancher/wrangler v0.6.1/go.mod h1:L4HtjPeX8iqLgsxfJgz+JjKMcX2q3qbRXSeTlC/CSd4=
github.com/rancher/wrangler-api v0.2.0/go.mod h1:zTPdNLZO07KvRaVOx6XQbKBSV55Fnn4s7nqmrMPJqd8=
github.com/rancher/wrangler-api v0.5.1-0.20200326194427-c13310506d04 h1:y55e+kUaz/UswjN/oJdqHWMuoCG1FxwZJkxJEUONZZE=
github.com/rancher/wrangler-api v0.5.1-0.20200326194427-c13310506d04/go.mod h1:R3nemXoECcrDqXDSHdY7yJay4j42TeEkU79Hep0rdJ8=
github.com/rancher/wrangler-api v0.6.0 h1:d/b0AkgZ+x41EYLIcUJiogtU3Y11Mqss2zr9VEKycRk=
github.com/rancher/wrangler-api v0.6.0/go.mod h1:RbuDkPNHhxcXuwAbLVvEAhH+UPAh+MIkpEd2fcGc0MM=
github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M=
github.com/robfig/cron v1.1.0 h1:jk4/Hud3TTdcrJgUOBgsqrZBarcxl6ADIjSC2iniwLY=
github.com/robfig/cron v1.1.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=

View File

@ -52,18 +52,23 @@ func NewFactoryFromConfigOrDie(config *rest.Config) *Factory {
}
func NewFactoryFromConfig(config *rest.Config) (*Factory, error) {
cs, err := clientset.NewForConfig(config)
if err != nil {
return nil, err
}
informerFactory := informers.NewSharedInformerFactory(cs, 2*time.Hour)
return NewFactory(cs, informerFactory), nil
return NewFactoryFromConfigWithOptions(config, nil)
}
func NewFactoryFromConfigWithNamespace(config *rest.Config, namespace string) (*Factory, error) {
if namespace == "" {
return NewFactoryFromConfig(config)
return NewFactoryFromConfigWithOptions(config, &FactoryOptions{
Namespace: namespace,
})
}
type FactoryOptions struct {
Namespace string
Resync time.Duration
}
func NewFactoryFromConfigWithOptions(config *rest.Config, opts *FactoryOptions) (*Factory, error) {
if opts == nil {
opts = &FactoryOptions{}
}
cs, err := clientset.NewForConfig(config)
@ -71,7 +76,17 @@ func NewFactoryFromConfigWithNamespace(config *rest.Config, namespace string) (*
return nil, err
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(cs, 2*time.Hour, informers.WithNamespace(namespace))
resync := opts.Resync
if resync == 0 {
resync = 2 * time.Hour
}
if opts.Namespace == "" {
informerFactory := informers.NewSharedInformerFactory(cs, resync)
return NewFactory(cs, informerFactory), nil
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(cs, resync, informers.WithNamespace(opts.Namespace))
return NewFactory(cs, informerFactory), nil
}

View File

@ -29,6 +29,7 @@ import (
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/condition"
"github.com/rancher/wrangler/pkg/generic"
"github.com/rancher/wrangler/pkg/kv"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -267,6 +268,7 @@ func RegisterAddonGeneratingHandler(ctx context.Context, controller AddonControl
if opts != nil {
statusHandler.opts = *opts
}
controller.OnChange(ctx, name, statusHandler.Remove)
RegisterAddonStatusHandler(ctx, controller, condition, name, statusHandler.Handle)
}
@ -281,7 +283,7 @@ func (a *addonStatusHandler) sync(key string, obj *v1.Addon) (*v1.Addon, error)
return obj, nil
}
origStatus := obj.Status
origStatus := obj.Status.DeepCopy()
obj = obj.DeepCopy()
newStatus, err := a.handler(obj, obj.Status)
if err != nil {
@ -289,16 +291,16 @@ func (a *addonStatusHandler) sync(key string, obj *v1.Addon) (*v1.Addon, error)
newStatus = *origStatus.DeepCopy()
}
obj.Status = newStatus
if a.condition != "" {
if errors.IsConflict(err) {
a.condition.SetError(obj, "", nil)
a.condition.SetError(&newStatus, "", nil)
} else {
a.condition.SetError(obj, "", err)
a.condition.SetError(&newStatus, "", err)
}
}
if !equality.Semantic.DeepEqual(origStatus, obj.Status) {
if !equality.Semantic.DeepEqual(origStatus, &newStatus) {
var newErr error
obj.Status = newStatus
obj, newErr = a.client.UpdateStatus(obj)
if err == nil {
err = newErr
@ -315,29 +317,28 @@ type addonGeneratingHandler struct {
name string
}
func (a *addonGeneratingHandler) Remove(key string, obj *v1.Addon) (*v1.Addon, error) {
if obj != nil {
return obj, nil
}
obj = &v1.Addon{}
obj.Namespace, obj.Name = kv.RSplit(key, "/")
obj.SetGroupVersionKind(a.gvk)
return nil, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects()
}
func (a *addonGeneratingHandler) Handle(obj *v1.Addon, status v1.AddonStatus) (v1.AddonStatus, error) {
objs, newStatus, err := a.AddonGeneratingHandler(obj, status)
if err != nil {
return newStatus, err
}
apply := a.apply
if !a.opts.DynamicLookup {
apply = apply.WithStrictCaching()
}
if !a.opts.AllowCrossNamespace && !a.opts.AllowClusterScoped {
apply = apply.WithSetOwnerReference(true, false).
WithDefaultNamespace(obj.GetNamespace()).
WithListerNamespace(obj.GetNamespace())
}
if !a.opts.AllowClusterScoped {
apply = apply.WithRestrictClusterScoped()
}
return newStatus, apply.
return newStatus, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects(objs...)

View File

@ -1,5 +1,5 @@
docker.io/rancher/coredns-coredns:1.6.3
docker.io/rancher/klipper-helm:v0.2.3
docker.io/rancher/klipper-helm:v0.2.5
docker.io/rancher/klipper-lb:v0.1.2
docker.io/rancher/library-traefik:1.7.19
docker.io/rancher/local-path-provisioner:v0.0.11

View File

@ -1,4 +1,6 @@
.sqlite
cmd/dqlite/dqlite
cmd/dqlite-demo/dqlite-demo
demo
profile.coverprofile
overalls.coverprofile

View File

@ -10,9 +10,32 @@ Usage
The best way to understand how to use the ```go-dqlite``` package is probably by
looking at the source code of the [demo
program](https://github.com/canonical/go-dqlite/tree/master/cmd/dqlite-demo) and
program](https://github.com/canonical/go-dqlite/tree/master/cmd/dqlite-demo/main.go) and
use it as example.
In general your application will use code such as:
```go
dir := "/path/to/data/directory"
address := "1.2.3.4:666" // Unique node address
cluster := []string{...} // Optional list of existing nodes, when starting a new node
app, err := app.New(dir, app.WithAddress(address), app.WithCluster(cluster))
if err != nil {
// ...
}
db, err := app.Open(context.Background(), "my-database")
if err != nil {
// ...
}
// db is a *sql.DB object
if _, err := db.Exec("CREATE TABLE my_table (n INT)"); err != nil
// ...
}
```
Build
-----
@ -50,42 +73,61 @@ go install -tags libsqlite3 ./cmd/dqlite-demo
from the top-level directory of this repository.
Once the ```dqlite-demo``` binary is installed, start three nodes of the demo
application, respectively with IDs ```1```, ```2,``` and ```3```:
This builds a demo dqlite application, which exposes a simple key/value store
over an HTTP API.
Once the `dqlite-demo` binary is installed (normally under `~/go/bin`),
start three nodes of the demo application:
```bash
dqlite-demo start 1 &
dqlite-demo start 2 &
dqlite-demo start 3 &
dqlite-demo --api 127.0.0.1:8001 --db 127.0.0.1:9001 &
dqlite-demo --api 127.0.0.1:8002 --db 127.0.0.1:9002 --join 127.0.0.1:9001 &
dqlite-demo --api 127.0.0.1:8003 --db 127.0.0.1:9003 --join 127.0.0.1:9001 &
```
The node with ID ```1``` automatically becomes the leader of a single node
cluster, while the nodes with IDs ```2``` and ```3``` are waiting to be notified
what cluster they belong to. Let's make nodes ```2``` and ```3``` join the
cluster:
The `--api` flag tells the demo program where to expose its HTTP API.
The `--db` flag tells the demo program to use the given address for internal
database replication.
The `--join` flag is optional and should be used only for additional nodes after
the first one. It informs them about the existing cluster, so they can
automatically join it.
Now we can start using the cluster. Let's insert a key pair:
```bash
dqlite-demo add 2
dqlite-demo add 3
```
Now we can start using the cluster. The demo application is just a simple
key/value store that stores data in a SQLite table. Let's insert a key pair:
```bash
dqlite-demo update my-key my-value
curl -X PUT -d my-key http://127.0.0.1:8001/my-value
```
and then retrive it from the database:
```bash
dqlite-demo query my-key
curl http://127.0.0.1:8001/my-value
```
Currently node ```1``` is the leader. If we stop it and then try to query the
key again we'll notice that the ```query``` command hangs for a bit waiting for
the failover to occur and for another node to step up as leader:
Currently the first node is the leader. If we stop it and then try to query the
key again curl will fail, but we can simply change the endpoint to another node
and things will work since an automatic failover has taken place:
```bash
kill -TERM %1; curl http://127.0.0.1:8002/my-value
```
Shell
------
A basic SQLite-like dqlite shell can be built with:
```
kill -TERM %1; sleep 0.1; dqlite-demo query my-key; dqlite-demo cluster
go install -tags libsqlite3 ./cmd/dqlite
```
You can test it with the `dqlite-demo` with:
```
dqlite -s 127.0.0.1:9001
```
It supports normal SQL queries plus the special `.cluster` and `.leader`
commands to inspect the cluster members and the current leader.

View File

@ -2,10 +2,6 @@ package client
import (
"context"
"encoding/binary"
"io"
"net"
"strings"
"github.com/canonical/go-dqlite/internal/protocol"
"github.com/pkg/errors"
@ -57,22 +53,13 @@ func New(ctx context.Context, address string, options ...Option) (*Client, error
return nil, errors.Wrap(err, "failed to establish network connection")
}
// Latest protocol version.
proto := make([]byte, 8)
binary.LittleEndian.PutUint64(proto, protocol.VersionOne)
// Perform the protocol handshake.
n, err := conn.Write(proto)
protocol, err := protocol.Handshake(ctx, conn, protocol.VersionOne)
if err != nil {
conn.Close()
return nil, errors.Wrap(err, "failed to send handshake")
}
if n != 8 {
conn.Close()
return nil, errors.Wrap(io.ErrShortWrite, "failed to send handshake")
return nil, err
}
client := &Client{protocol: protocol.NewProtocol(protocol.VersionOne, conn)}
client := &Client{protocol: protocol}
return client, nil
}
@ -274,10 +261,3 @@ func defaultOptions() *options {
LogFunc: DefaultLogFunc,
}
}
func DefaultDialFunc(ctx context.Context, address string) (net.Conn, error) {
if strings.HasPrefix(address, "@") {
return protocol.UnixDial(ctx, address)
}
return protocol.TCPDial(ctx, address)
}

33
vendor/github.com/canonical/go-dqlite/client/dial.go generated vendored Normal file
View File

@ -0,0 +1,33 @@
package client
import (
"context"
"crypto/tls"
"net"
"strings"
"github.com/canonical/go-dqlite/internal/protocol"
)
// DefaultDialFunc is the default dial function, which can handle plain TCP and
// Unix socket endpoints. You can customize it with WithDialFunc()
func DefaultDialFunc(ctx context.Context, address string) (net.Conn, error) {
if strings.HasPrefix(address, "@") {
return protocol.UnixDial(ctx, address)
}
return protocol.TCPDial(ctx, address)
}
// DialFuncWithTLS returns a dial function that uses TLS encryption.
//
// The given dial function will be used to establish the network connection,
// and the given TLS config will be used for encryption.
func DialFuncWithTLS(dial DialFunc, config *tls.Config) DialFunc {
return func(ctx context.Context, addr string) (net.Conn, error) {
conn, err := dial(ctx, addr)
if err != nil {
return nil, err
}
return tls.Client(conn, config), nil
}
}

View File

@ -2,10 +2,7 @@ package client
import (
"context"
"time"
"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
"github.com/canonical/go-dqlite/internal/protocol"
)
@ -19,9 +16,6 @@ func FindLeader(ctx context.Context, store NodeStore, options ...Option) (*Clien
config := protocol.Config{
Dial: o.DialFunc,
AttemptTimeout: time.Second,
RetryStrategies: []strategy.Strategy{
strategy.Backoff(backoff.BinaryExponential(time.Millisecond))},
}
connector := protocol.NewConnector(0, store, config, o.LogFunc)
protocol, err := connector.Connect(ctx)

View File

@ -1,10 +1,6 @@
package client
import (
"fmt"
"log"
"os"
"github.com/canonical/go-dqlite/internal/logging"
)
@ -22,11 +18,5 @@ const (
LogError = logging.Error
)
var (
logger = log.New(os.Stdout, "", log.LstdFlags|log.Lmicroseconds)
)
// DefaultLogFunc emits messages using the stdlib's logger.
func DefaultLogFunc(l LogLevel, format string, a ...interface{}) {
logger.Output(2, fmt.Sprintf("[%s]: %s", l.String(), fmt.Sprintf(format, a...)))
}
// DefaultLogFunc doesn't emit any message.
func DefaultLogFunc(l LogLevel, format string, a ...interface{}) {}

View File

@ -4,7 +4,11 @@ import (
"context"
"database/sql"
"fmt"
"io/ioutil"
"os"
"sync"
"github.com/ghodss/yaml"
"github.com/pkg/errors"
"github.com/canonical/go-dqlite/internal/protocol"
@ -163,3 +167,65 @@ func (d *DatabaseNodeStore) Set(ctx context.Context, servers []NodeInfo) error {
return nil
}
// Persists a list addresses of dqlite nodes in a YAML file.
type YamlNodeStore struct {
path string
servers []NodeInfo
mu sync.RWMutex
}
// NewYamlNodeStore creates a new YamlNodeStore backed by the given YAML file.
func NewYamlNodeStore(path string) (*YamlNodeStore, error) {
servers := []NodeInfo{}
_, err := os.Stat(path)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
} else {
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
if err := yaml.Unmarshal(data, &servers); err != nil {
return nil, err
}
}
store := &YamlNodeStore{
path: path,
servers: servers,
}
return store, nil
}
// Get the current servers.
func (s *YamlNodeStore) Get(ctx context.Context) ([]NodeInfo, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.servers, nil
}
// Set the servers addresses.
func (s *YamlNodeStore) Set(ctx context.Context, servers []NodeInfo) error {
s.mu.Lock()
defer s.mu.Unlock()
data, err := yaml.Marshal(servers)
if err != nil {
return err
}
if err := ioutil.WriteFile(s.path, data, 0600); err != nil {
return err
}
s.servers = servers
return nil
}

View File

@ -5,6 +5,7 @@ import (
"os"
"github.com/canonical/go-dqlite/internal/bindings"
"github.com/canonical/go-dqlite/internal/protocol"
"github.com/pkg/errors"
)
@ -26,7 +27,7 @@ import (
// from setting Single-thread mode at all.
func ConfigMultiThread() error {
if err := bindings.ConfigMultiThread(); err != nil {
if err, ok := err.(bindings.Error); ok && err.Code == 21 /* SQLITE_MISUSE */ {
if err, ok := err.(protocol.Error); ok && err.Code == 21 /* SQLITE_MISUSE */ {
return fmt.Errorf("SQLite is already initialized")
}
return errors.Wrap(err, "unknown error")

View File

@ -24,12 +24,9 @@ import (
"syscall"
"time"
"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
"github.com/pkg/errors"
"github.com/canonical/go-dqlite/client"
"github.com/canonical/go-dqlite/internal/bindings"
"github.com/canonical/go-dqlite/internal/protocol"
)
@ -44,7 +41,7 @@ type Driver struct {
}
// Error is returned in case of database errors.
type Error = bindings.Error
type Error = protocol.Error
// Error codes. Values here mostly overlap with native SQLite codes.
const (
@ -87,6 +84,9 @@ func WithDialFunc(dial DialFunc) Option {
// WithConnectionTimeout sets the connection timeout.
//
// If not used, the default is 5 seconds.
//
// DEPRECATED: Connection cancellation is supported via the driver.Connector
// interface, which is used internally by the stdlib sql package.
func WithConnectionTimeout(timeout time.Duration) Option {
return func(options *options) {
options.ConnectionTimeout = timeout
@ -96,7 +96,7 @@ func WithConnectionTimeout(timeout time.Duration) Option {
// WithConnectionBackoffFactor sets the exponential backoff factor for retrying
// failed connection attempts.
//
// If not used, the default is 50 milliseconds.
// If not used, the default is 100 milliseconds.
func WithConnectionBackoffFactor(factor time.Duration) Option {
return func(options *options) {
options.ConnectionBackoffFactor = factor
@ -113,7 +113,28 @@ func WithConnectionBackoffCap(cap time.Duration) Option {
}
}
// WithAttemptTimeout sets the timeout for each individual connection attempt .
//
// If not used, the default is 60 seconds.
func WithAttemptTimeout(timeout time.Duration) Option {
return func(options *options) {
options.AttemptTimeout = timeout
}
}
// WithRetryLimit sets the maximum number of connection retries.
//
// If not used, the default is 0 (unlimited retries)
func WithRetryLimit(limit uint) Option {
return func(options *options) {
options.RetryLimit = limit
}
}
// WithContext sets a global cancellation context.
//
// DEPRECATED: This API is no a no-op. Users should explicitly pass a context
// if they wish to cancel their requests.
func WithContext(context context.Context) Option {
return func(options *options) {
options.Context = context
@ -123,7 +144,8 @@ func WithContext(context context.Context) Option {
// WithContextTimeout sets the default client context timeout when no context
// deadline is provided.
//
// If not used, the default is 5 seconds.
// DEPRECATED: This API is no a no-op. Users should explicitly pass a context
// if they wish to cancel their requests.
func WithContextTimeout(timeout time.Duration) Option {
return func(options *options) {
options.ContextTimeout = timeout
@ -148,13 +170,10 @@ func New(store client.NodeStore, options ...Option) (*Driver, error) {
}
driver.clientConfig.Dial = o.Dial
driver.clientConfig.AttemptTimeout = 5 * time.Second
driver.clientConfig.RetryStrategies = []strategy.Strategy{
driverConnectionRetryStrategy(
o.ConnectionBackoffFactor,
o.ConnectionBackoffCap,
),
}
driver.clientConfig.AttemptTimeout = o.AttemptTimeout
driver.clientConfig.BackoffFactor = o.ConnectionBackoffFactor
driver.clientConfig.BackoffCap = o.ConnectionBackoffCap
driver.clientConfig.RetryLimit = o.RetryLimit
return driver, nil
}
@ -163,10 +182,12 @@ func New(store client.NodeStore, options ...Option) (*Driver, error) {
type options struct {
Log client.LogFunc
Dial protocol.DialFunc
AttemptTimeout time.Duration
ConnectionTimeout time.Duration
ContextTimeout time.Duration
ConnectionBackoffFactor time.Duration
ConnectionBackoffCap time.Duration
RetryLimit uint
Context context.Context
}
@ -175,51 +196,34 @@ func defaultOptions() *options {
return &options{
Log: client.DefaultLogFunc,
Dial: client.DefaultDialFunc,
ConnectionTimeout: 15 * time.Second,
ContextTimeout: 2 * time.Second,
ConnectionBackoffFactor: 50 * time.Millisecond,
ConnectionBackoffCap: time.Second,
Context: context.Background(),
}
}
// Return a retry strategy with jittered exponential backoff, capped at the
// given amount of time.
func driverConnectionRetryStrategy(factor, cap time.Duration) strategy.Strategy {
backoff := backoff.BinaryExponential(factor)
return func(attempt uint) bool {
if attempt > 0 {
duration := backoff(attempt)
if duration > cap {
duration = cap
}
time.Sleep(duration)
// A Connector represents a driver in a fixed configuration and can create any
// number of equivalent Conns for use by multiple goroutines.
type Connector struct {
uri string
driver *Driver
}
return true
}
// Connect returns a connection to the database.
func (c *Connector) Connect(ctx context.Context) (driver.Conn, error) {
if c.driver.context != nil {
ctx = c.driver.context
}
// Open establishes a new connection to a SQLite database on the dqlite server.
//
// The given name must be a pure file name without any directory segment,
// dqlite will connect to a database with that name in its data directory.
//
// Query parameters are always valid except for "mode=memory".
//
// If this node is not the leader, or the leader is unknown an ErrNotLeader
// error is returned.
func (d *Driver) Open(uri string) (driver.Conn, error) {
ctx, cancel := context.WithTimeout(d.context, d.connectionTimeout)
if c.driver.connectionTimeout != 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, c.driver.connectionTimeout)
defer cancel()
}
// TODO: generate a client ID.
connector := protocol.NewConnector(0, d.store, d.clientConfig, d.log)
connector := protocol.NewConnector(0, c.driver.store, c.driver.clientConfig, c.driver.log)
conn := &Conn{
log: d.log,
contextTimeout: d.contextTimeout,
log: c.driver.log,
contextTimeout: c.driver.contextTimeout,
}
var err error
@ -227,15 +231,11 @@ func (d *Driver) Open(uri string) (driver.Conn, error) {
if err != nil {
return nil, errors.Wrap(err, "failed to create dqlite connection")
}
conn.protocol.SetContextTimeout(d.contextTimeout)
conn.request.Init(4096)
conn.response.Init(4096)
defer conn.request.Reset()
defer conn.response.Reset()
protocol.EncodeOpen(&conn.request, uri, 0, "volatile")
protocol.EncodeOpen(&conn.request, c.uri, 0, "volatile")
if err := conn.protocol.Call(ctx, &conn.request, &conn.response); err != nil {
conn.protocol.Close()
@ -251,11 +251,45 @@ func (d *Driver) Open(uri string) (driver.Conn, error) {
return conn, nil
}
// Driver returns the underlying Driver of the Connector,
func (c *Connector) Driver() driver.Driver {
return c.driver
}
// OpenConnector must parse the name in the same format that Driver.Open
// parses the name parameter.
func (d *Driver) OpenConnector(name string) (driver.Connector, error) {
connector := &Connector{
uri: name,
driver: d,
}
return connector, nil
}
// Open establishes a new connection to a SQLite database on the dqlite server.
//
// The given name must be a pure file name without any directory segment,
// dqlite will connect to a database with that name in its data directory.
//
// Query parameters are always valid except for "mode=memory".
//
// If this node is not the leader, or the leader is unknown an ErrNotLeader
// error is returned.
func (d *Driver) Open(uri string) (driver.Conn, error) {
connector, err := d.OpenConnector(uri)
if err != nil {
return nil, err
}
return connector.Connect(context.Background())
}
// SetContextTimeout sets the default client timeout when no context deadline
// is provided.
func (d *Driver) SetContextTimeout(timeout time.Duration) {
d.contextTimeout = timeout
}
//
// DEPRECATED: This API is no a no-op. Users should explicitly pass a context
// if they wish to cancel their requests.
func (d *Driver) SetContextTimeout(timeout time.Duration) {}
// ErrNoAvailableLeader is returned as root cause of Open() if there's no
// leader available in the cluster.
@ -275,9 +309,6 @@ type Conn struct {
// context is for the preparation of the statement, it must not store the
// context within the statement itself.
func (c *Conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
defer c.request.Reset()
defer c.response.Reset()
stmt := &Stmt{
protocol: c.protocol,
request: &c.request,
@ -306,9 +337,6 @@ func (c *Conn) Prepare(query string) (driver.Stmt, error) {
// ExecContext is an optional interface that may be implemented by a Conn.
func (c *Conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
defer c.request.Reset()
defer c.response.Reset()
protocol.EncodeExecSQL(&c.request, uint64(c.id), query, args)
if err := c.protocol.Call(ctx, &c.request, &c.response); err != nil {
@ -330,18 +358,14 @@ func (c *Conn) Query(query string, args []driver.Value) (driver.Rows, error) {
// QueryContext is an optional interface that may be implemented by a Conn.
func (c *Conn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
defer c.request.Reset()
protocol.EncodeQuerySQL(&c.request, uint64(c.id), query, args)
if err := c.protocol.Call(ctx, &c.request, &c.response); err != nil {
c.response.Reset()
return nil, driverError(err)
}
rows, err := protocol.DecodeRows(&c.response)
if err != nil {
c.response.Reset()
return nil, driverError(err)
}
@ -391,7 +415,15 @@ func (c *Conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, e
//
// Deprecated: Drivers should implement ConnBeginTx instead (or additionally).
func (c *Conn) Begin() (driver.Tx, error) {
return c.BeginTx(context.Background(), driver.TxOptions{})
ctx := context.Background()
if c.contextTimeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(context.Background(), c.contextTimeout)
defer cancel()
}
return c.BeginTx(ctx, driver.TxOptions{})
}
// Tx is a transaction.
@ -401,8 +433,7 @@ type Tx struct {
// Commit the transaction.
func (tx *Tx) Commit() error {
ctx, cancel := context.WithTimeout(context.Background(), tx.conn.contextTimeout)
defer cancel()
ctx := context.Background()
if _, err := tx.conn.ExecContext(ctx, "COMMIT", nil); err != nil {
return driverError(err)
@ -413,8 +444,7 @@ func (tx *Tx) Commit() error {
// Rollback the transaction.
func (tx *Tx) Rollback() error {
ctx, cancel := context.WithTimeout(context.Background(), tx.conn.contextTimeout)
defer cancel()
ctx := context.Background()
if _, err := tx.conn.ExecContext(ctx, "ROLLBACK", nil); err != nil {
return driverError(err)
@ -436,9 +466,6 @@ type Stmt struct {
// Close closes the statement.
func (s *Stmt) Close() error {
defer s.request.Reset()
defer s.response.Reset()
protocol.EncodeFinalize(s.request, s.db, s.id)
ctx := context.Background()
@ -464,9 +491,6 @@ func (s *Stmt) NumInput() int {
//
// ExecContext must honor the context timeout and return when it is canceled.
func (s *Stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) {
defer s.request.Reset()
defer s.response.Reset()
protocol.EncodeExec(s.request, s.db, s.id, args)
if err := s.protocol.Call(ctx, s.request, s.response); err != nil {
@ -491,22 +515,14 @@ func (s *Stmt) Exec(args []driver.Value) (driver.Result, error) {
//
// QueryContext must honor the context timeout and return when it is canceled.
func (s *Stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error) {
defer s.request.Reset()
// FIXME: this shouldn't be needed but we have hit a few panics
// probably due to the response object not being fully reset.
s.response.Reset()
protocol.EncodeQuery(s.request, s.db, s.id, args)
if err := s.protocol.Call(ctx, s.request, s.response); err != nil {
s.response.Reset()
return nil, driverError(err)
}
rows, err := protocol.DecodeRows(s.response)
if err != nil {
s.response.Reset()
return nil, driverError(err)
}

View File

@ -1,19 +0,0 @@
package bindings
/*
#include <sqlite3.h>
*/
import "C"
// Error holds information about a SQLite error.
type Error struct {
Code int
Message string
}
func (e Error) Error() string {
if e.Message != "" {
return e.Message
}
return C.GoString(C.sqlite3_errstr(C.int(e.Code)))
}

View File

@ -90,14 +90,14 @@ func init() {
func ConfigSingleThread() error {
if rc := C.sqlite3ConfigSingleThread(); rc != 0 {
return Error{Code: int(rc)}
return protocol.Error{Message: C.GoString(C.sqlite3_errstr(rc)), Code: int(rc)}
}
return nil
}
func ConfigMultiThread() error {
if rc := C.sqlite3ConfigMultiThread(); rc != 0 {
return Error{Code: int(rc)}
return protocol.Error{Message: C.GoString(C.sqlite3_errstr(rc)), Code: int(rc)}
}
return nil
}
@ -197,6 +197,14 @@ func (s *Node) Recover(cluster []protocol.NodeInfo) error {
return nil
}
// GenerateID generates a unique ID for a server.
func GenerateID(address string) uint64 {
caddress := C.CString(address)
defer C.free(unsafe.Pointer(caddress))
id := C.dqlite_generate_node_id(caddress)
return uint64(id)
}
// Extract the underlying socket from a connection.
func connToSocket(conn net.Conn) (C.int, error) {
file, err := conn.(fileConn).File()

View File

@ -2,13 +2,14 @@ package protocol
import (
"time"
"github.com/Rican7/retry/strategy"
)
// Config holds various configuration parameters for a dqlite client.
type Config struct {
Dial DialFunc // Network dialer.
AttemptTimeout time.Duration // Timeout for each individual Dial attempt.
RetryStrategies []strategy.Strategy // Strategies used for retrying to connect to a leader.
DialTimeout time.Duration // Timeout for establishing a network connection .
AttemptTimeout time.Duration // Timeout for each individual attempt to probe a server's leadership.
BackoffFactor time.Duration // Exponential backoff factor for retries.
BackoffCap time.Duration // Maximum connection retry backoff value,
RetryLimit uint // Maximum number of retries, or 0 for unlimited.
}

View File

@ -6,8 +6,11 @@ import (
"fmt"
"io"
"net"
"time"
"github.com/Rican7/retry"
"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
"github.com/canonical/go-dqlite/internal/logging"
"github.com/pkg/errors"
)
@ -27,6 +30,22 @@ type Connector struct {
// NewConnector returns a new connector that can be used by a dqlite driver to
// create new clients connected to a leader dqlite server.
func NewConnector(id uint64, store NodeStore, config Config, log logging.Func) *Connector {
if config.DialTimeout == 0 {
config.DialTimeout = 10 * time.Second
}
if config.AttemptTimeout == 0 {
config.AttemptTimeout = 60 * time.Second
}
if config.BackoffFactor == 0 {
config.BackoffFactor = 100 * time.Millisecond
}
if config.BackoffCap == 0 {
config.BackoffCap = time.Second
}
connector := &Connector{
id: id,
store: store,
@ -43,6 +62,8 @@ func NewConnector(id uint64, store NodeStore, config Config, log logging.Func) *
func (c *Connector) Connect(ctx context.Context) (*Protocol, error) {
var protocol *Protocol
strategies := makeRetryStrategies(c.config.BackoffFactor, c.config.BackoffCap, c.config.RetryLimit)
// The retry strategy should be configured to retry indefinitely, until
// the given context is done.
err := retry.Retry(func(attempt uint) error {
@ -61,17 +82,16 @@ func (c *Connector) Connect(ctx context.Context) (*Protocol, error) {
var err error
protocol, err = c.connectAttemptAll(ctx, log)
if err != nil {
log(logging.Debug, "connection failed err=%v", err)
return err
}
return nil
}, c.config.RetryStrategies...)
}, strategies...)
if err != nil {
// The retry strategy should never give up until success or
// context expiration.
panic("connect retry aborted unexpectedly")
// We exhausted the number of retries allowed by the configured
// strategy.
return nil, ErrNoAvailableLeader
}
if ctx.Err() != nil {
@ -92,7 +112,7 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P
// Make an attempt for each address until we find the leader.
for _, server := range servers {
log := func(l logging.Level, format string, a ...interface{}) {
format += fmt.Sprintf(" address=%s id=%d", server.Address, server.ID)
format += fmt.Sprintf(" address=%s", server.Address)
log(l, format, a...)
}
@ -107,68 +127,71 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P
}
if err != nil {
// This server is unavailable, try with the next target.
log(logging.Debug, "server connection failed err=%v", err)
log(logging.Warn, "server unavailable err=%v", err)
continue
}
if protocol != nil {
// We found the leader
log(logging.Info, "connected")
log(logging.Debug, "connected")
return protocol, nil
}
if leader == "" {
// This server does not know who the current leader is,
// try with the next target.
log(logging.Warn, "no known leader")
continue
}
// If we get here, it means this server reported that another
// server is the leader, let's close the connection to this
// server and try with the suggested one.
//logger = logger.With(zap.String("leader", leader))
log(logging.Debug, "connect to reported leader %s", leader)
ctx, cancel = context.WithTimeout(ctx, c.config.AttemptTimeout)
defer cancel()
protocol, leader, err = c.connectAttemptOne(ctx, leader, version)
if err != nil {
// The leader reported by the previous server is
// unavailable, try with the next target.
//logger.Info("leader server connection failed", zap.String("err", err.Error()))
log(logging.Warn, "reported leader unavailable err=%v", err)
continue
}
if protocol == nil {
// The leader reported by the target server does not consider itself
// the leader, try with the next target.
//logger.Info("reported leader server is not the leader")
log(logging.Warn, "reported leader server is not the leader")
continue
}
log(logging.Info, "connected")
log(logging.Debug, "connected")
return protocol, nil
}
return nil, ErrNoAvailableLeader
}
// Connect establishes a connection with a dqlite node.
func Connect(ctx context.Context, dial DialFunc, address string, version uint64) (*Protocol, error) {
// Establish the connection.
conn, err := dial(ctx, address)
if err != nil {
return nil, errors.Wrap(err, "failed to establish network connection")
}
// Perform the initial handshake using the given protocol version.
func Handshake(ctx context.Context, conn net.Conn, version uint64) (*Protocol, error) {
// Latest protocol version.
protocol := make([]byte, 8)
binary.LittleEndian.PutUint64(protocol, version)
// Honor the ctx deadline, if present.
if deadline, ok := ctx.Deadline(); ok {
conn.SetDeadline(deadline)
defer conn.SetDeadline(time.Time{})
}
// Perform the protocol handshake.
n, err := conn.Write(protocol)
if err != nil {
conn.Close()
return nil, errors.Wrap(err, "failed to send handshake")
}
if n != 8 {
conn.Close()
return nil, errors.Wrap(io.ErrShortWrite, "failed to send handshake")
}
return NewProtocol(version, conn), nil
return newProtocol(version, conn), nil
}
// Connect to the given dqlite server and check if it's the leader.
@ -181,8 +204,18 @@ func Connect(ctx context.Context, dial DialFunc, address string, version uint64)
// - Target is the leader: -> server, "", nil
//
func (c *Connector) connectAttemptOne(ctx context.Context, address string, version uint64) (*Protocol, string, error) {
protocol, err := Connect(ctx, c.config.Dial, address, version)
dialCtx, cancel := context.WithTimeout(ctx, c.config.DialTimeout)
defer cancel()
// Establish the connection.
conn, err := c.config.Dial(dialCtx, address)
if err != nil {
return nil, "", errors.Wrap(err, "failed to establish network connection")
}
protocol, err := Handshake(ctx, conn, version)
if err != nil {
conn.Close()
return nil, "", err
}
@ -219,8 +252,8 @@ func (c *Connector) connectAttemptOne(ctx context.Context, address string, versi
return nil, "", nil
case address:
// This server is the leader, register ourselves and return.
request.Reset()
response.Reset()
request.reset()
response.reset()
EncodeClient(&request, c.id)
@ -247,4 +280,33 @@ func (c *Connector) connectAttemptOne(ctx context.Context, address string, versi
}
}
// Return a retry strategy with exponential backoff, capped at the given amount
// of time and possibly with a maximum number of retries.
func makeRetryStrategies(factor, cap time.Duration, limit uint) []strategy.Strategy {
backoff := backoff.BinaryExponential(factor)
strategies := []strategy.Strategy{}
if limit > 0 {
strategies = append(strategies, strategy.Limit(limit))
}
strategies = append(strategies,
func(attempt uint) bool {
if attempt > 0 {
duration := backoff(attempt)
// Duration might be negative in case of integer overflow.
if duration > cap || duration <= 0 {
duration = cap
}
time.Sleep(duration)
}
return true
},
)
return strategies
}
var errBadProtocol = fmt.Errorf("bad protocol")

View File

@ -2,6 +2,7 @@ package protocol
import (
"context"
"crypto/tls"
"net"
)
@ -18,3 +19,15 @@ func UnixDial(ctx context.Context, address string) (net.Conn, error) {
dialer := net.Dialer{}
return dialer.DialContext(ctx, "unix", address)
}
// TLSCipherSuites are the cipher suites by the go-dqlite TLS helpers.
var TLSCipherSuites = []uint16{
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
}

View File

@ -27,3 +27,13 @@ func (e ErrRequest) Error() string {
// ErrRowsPart is returned when the first batch of a multi-response result
// batch is done.
var ErrRowsPart = fmt.Errorf("not all rows were returned in this response")
// Error holds information about a SQLite error.
type Error struct {
Code int
Message string
}
func (e Error) Error() string {
return e.Message
}

View File

@ -26,24 +26,23 @@ type Message struct {
flags uint8
extra uint16
header []byte // Statically allocated header buffer
body1 buffer // Statically allocated body data, using bytes
body2 buffer // Dynamically allocated body data
body buffer // Message body data.
}
// Init initializes the message using the given size of the statically
// allocated buffer (i.e. a buffer which is re-used across requests or
// responses encoded or decoded using this message object).
func (m *Message) Init(staticSize int) {
if (staticSize % messageWordSize) != 0 {
panic("static size is not aligned to word boundary")
// Init initializes the message using the given initial size for the data
// buffer, which is re-used across requests or responses encoded or decoded
// using this message object.
func (m *Message) Init(initialBufferSize int) {
if (initialBufferSize % messageWordSize) != 0 {
panic("initial buffer size is not aligned to word boundary")
}
m.header = make([]byte, messageHeaderSize)
m.body1.Bytes = make([]byte, staticSize)
m.Reset()
m.body.Bytes = make([]byte, initialBufferSize)
m.reset()
}
// Reset the state of the message so it can be used to encode or decode again.
func (m *Message) Reset() {
func (m *Message) reset() {
m.words = 0
m.mtype = 0
m.flags = 0
@ -51,9 +50,7 @@ func (m *Message) Reset() {
for i := 0; i < messageHeaderSize; i++ {
m.header[i] = 0
}
m.body1.Offset = 0
m.body2.Bytes = nil
m.body2.Offset = 0
m.body.Offset = 0
}
// Append a byte slice to the message.
@ -232,11 +229,11 @@ func (m *Message) putNamedValues(values NamedValues) {
// Finalize the message by setting the message type and the number
// of words in the body (calculated from the body size).
func (m *Message) putHeader(mtype uint8) {
if m.body1.Offset <= 0 {
if m.body.Offset <= 0 {
panic("static offset is not positive")
}
if (m.body1.Offset % messageWordSize) != 0 {
if (m.body.Offset % messageWordSize) != 0 {
panic("static body is not aligned")
}
@ -244,22 +241,7 @@ func (m *Message) putHeader(mtype uint8) {
m.flags = 0
m.extra = 0
m.words = uint32(m.body1.Offset) / messageWordSize
if m.body2.Bytes == nil {
m.finalize()
return
}
if m.body2.Offset <= 0 {
panic("dynamic offset is not positive")
}
if (m.body2.Offset % messageWordSize) != 0 {
panic("dynamic body is not aligned")
}
m.words += uint32(m.body2.Offset) / messageWordSize
m.words = uint32(m.body.Offset) / messageWordSize
m.finalize()
}
@ -276,27 +258,14 @@ func (m *Message) finalize() {
}
func (m *Message) bufferForPut(size int) *buffer {
if m.body2.Bytes != nil {
if (m.body2.Offset + size) > len(m.body2.Bytes) {
// Grow body2.
//
// TODO: find a good grow strategy.
bytes := make([]byte, m.body2.Offset+size)
copy(bytes, m.body2.Bytes)
m.body2.Bytes = bytes
for (m.body.Offset + size) > len(m.body.Bytes) {
// Grow message buffer.
bytes := make([]byte, len(m.body.Bytes)*2)
copy(bytes, m.body.Bytes)
m.body.Bytes = bytes
}
return &m.body2
}
if (m.body1.Offset + size) > len(m.body1.Bytes) {
m.body2.Bytes = make([]byte, size)
m.body2.Offset = 0
return &m.body2
}
return &m.body1
return &m.body
}
// Return the message type and its flags.
@ -310,31 +279,6 @@ func (m *Message) getString() string {
index := bytes.IndexByte(b.Bytes[b.Offset:], 0)
if index == -1 {
// Check if the string overflows in the dynamic buffer.
if b == &m.body1 && m.body2.Bytes != nil {
// Assert that this is the first read of the dynamic buffer.
if m.body2.Offset != 0 {
panic("static buffer read after dynamic buffer one")
}
index = bytes.IndexByte(m.body2.Bytes[0:], 0)
if index != -1 {
// We found the trailing part of the string.
data := b.Bytes[b.Offset:]
data = append(data, m.body2.Bytes[0:index]...)
index++
if trailing := index % messageWordSize; trailing != 0 {
// Account for padding, moving index to the next word boundary.
index += messageWordSize - trailing
}
m.body1.Offset = len(m.body1.Bytes)
m.body2.Advance(index)
return string(data)
}
}
panic("no string found")
}
s := string(b.Bytes[b.Offset : b.Offset+index])
@ -465,31 +409,23 @@ func (m *Message) getFiles() Files {
func (m *Message) hasBeenConsumed() bool {
size := int(m.words * messageWordSize)
return (m.body1.Offset == size || m.body1.Offset == len(m.body1.Bytes)) &&
m.body1.Offset+m.body2.Offset == size
return m.body.Offset == size
}
func (m *Message) lastByte() byte {
size := int(m.words * messageWordSize)
if size > len(m.body1.Bytes) {
size = size - m.body1.Offset
return m.body2.Bytes[size-1]
}
return m.body1.Bytes[size-1]
return m.body.Bytes[size-1]
}
func (m *Message) bufferForGet() *buffer {
size := int(m.words * messageWordSize)
if m.body1.Offset == size || m.body1.Offset == len(m.body1.Bytes) {
// The static body has been exahusted, use the dynamic one.
if m.body1.Offset+m.body2.Offset == size {
err := fmt.Errorf("short message: type=%d words=%d off=%d", m.mtype, m.words, m.body1.Offset)
if m.body.Offset == size {
err := fmt.Errorf("short message: type=%d words=%d off=%d", m.mtype, m.words, m.body.Offset)
panic(err)
}
return &m.body2
}
return &m.body1
return &m.body
}
// Result holds the result of a statement.
@ -639,7 +575,7 @@ func (r *Rows) Close() error {
err = fmt.Errorf("unexpected end of message")
}
}
r.message.Reset()
r.message.reset()
return err
}
@ -664,7 +600,7 @@ func (f *Files) Next() (string, []byte) {
}
func (f *Files) Close() {
f.message.Reset()
f.message.reset()
}
const (

View File

@ -15,29 +15,21 @@ import (
type Protocol struct {
version uint64 // Protocol version
conn net.Conn // Underlying network connection.
contextTimeout time.Duration // Default context timeout.
closeCh chan struct{} // Stops the heartbeat when the connection gets closed
mu sync.Mutex // Serialize requests
netErr error // A network error occurred
}
func NewProtocol(version uint64, conn net.Conn) *Protocol {
func newProtocol(version uint64, conn net.Conn) *Protocol {
protocol := &Protocol{
version: version,
conn: conn,
closeCh: make(chan struct{}),
contextTimeout: 5 * time.Second,
}
return protocol
}
// SetContextTimeout sets the default context timeout when no deadline is
// provided.
func (p *Protocol) SetContextTimeout(timeout time.Duration) {
p.contextTimeout = timeout
}
// Call invokes a dqlite RPC, sending a request message and receiving a
// response message.
func (p *Protocol) Call(ctx context.Context, request, response *Message) (err error) {
@ -50,21 +42,22 @@ func (p *Protocol) Call(ctx context.Context, request, response *Message) (err er
return p.netErr
}
// Honor the ctx deadline, if present, or use a default.
deadline, ok := ctx.Deadline()
if !ok {
deadline = time.Now().Add(p.contextTimeout)
var budget time.Duration
// Honor the ctx deadline, if present.
if deadline, ok := ctx.Deadline(); ok {
p.conn.SetDeadline(deadline)
budget = time.Until(deadline)
defer p.conn.SetDeadline(time.Time{})
}
p.conn.SetDeadline(deadline)
if err = p.send(request); err != nil {
err = errors.Wrap(err, "failed to send request")
err = errors.Wrapf(err, "send request (budget=%s)", budget)
goto err
}
if err = p.recv(response); err != nil {
err = errors.Wrap(err, "failed to receive response")
err = errors.Wrapf(err, "receive response (budget=%s)", budget)
goto err
}
@ -91,14 +84,11 @@ func (p *Protocol) Interrupt(ctx context.Context, request *Message, response *Me
p.mu.Lock()
defer p.mu.Unlock()
// Honor the ctx deadline, if present, or use a default.
deadline, ok := ctx.Deadline()
if !ok {
deadline = time.Now().Add(2 * time.Second)
}
// Honor the ctx deadline, if present.
if deadline, ok := ctx.Deadline(); ok {
p.conn.SetDeadline(deadline)
defer request.Reset()
defer p.conn.SetDeadline(time.Time{})
}
EncodeInterrupt(request, 0)
@ -108,12 +98,10 @@ func (p *Protocol) Interrupt(ctx context.Context, request *Message, response *Me
for {
if err := p.recv(response); err != nil {
response.Reset()
return errors.Wrap(err, "failed to receive response")
}
mtype, _ := response.getHeader()
response.Reset()
if mtype == ResponseEmpty {
break
@ -155,7 +143,7 @@ func (p *Protocol) sendHeader(req *Message) error {
}
func (p *Protocol) sendBody(req *Message) error {
buf := req.body1.Bytes[:req.body1.Offset]
buf := req.body.Bytes[:req.body.Offset]
n, err := p.conn.Write(buf)
if err != nil {
return errors.Wrap(err, "failed to send static body")
@ -165,24 +153,12 @@ func (p *Protocol) sendBody(req *Message) error {
return errors.Wrap(io.ErrShortWrite, "failed to write body")
}
if req.body2.Bytes == nil {
return nil
}
buf = req.body2.Bytes[:req.body2.Offset]
n, err = p.conn.Write(buf)
if err != nil {
return errors.Wrap(err, "failed to send dynamic body")
}
if n != len(buf) {
return errors.Wrap(io.ErrShortWrite, "failed to write body")
}
return nil
}
func (p *Protocol) recv(res *Message) error {
res.reset()
if err := p.recvHeader(res); err != nil {
return errors.Wrap(err, "failed to receive header")
}
@ -209,30 +185,19 @@ func (p *Protocol) recvHeader(res *Message) error {
func (p *Protocol) recvBody(res *Message) error {
n := int(res.words) * messageWordSize
n1 := n
n2 := 0
if n1 > len(res.body1.Bytes) {
// We need to allocate the dynamic buffer.
n1 = len(res.body1.Bytes)
n2 = n - n1
for n > len(res.body.Bytes) {
// Grow message buffer.
bytes := make([]byte, len(res.body.Bytes)*2)
res.body.Bytes = bytes
}
buf := res.body1.Bytes[:n1]
buf := res.body.Bytes[:n]
if err := p.recvPeek(buf); err != nil {
return errors.Wrap(err, "failed to read body")
}
if n2 > 0 {
res.body2.Bytes = make([]byte, n2)
res.body2.Offset = 0
buf = res.body2.Bytes
if err := p.recvPeek(buf); err != nil {
return errors.Wrap(err, "failed to read body")
}
}
return nil
}

View File

@ -7,6 +7,7 @@ package protocol
// EncodeLeader encodes a Leader request.
func EncodeLeader(request *Message) {
request.reset()
request.putUint64(0)
request.putHeader(RequestLeader)
@ -14,6 +15,7 @@ func EncodeLeader(request *Message) {
// EncodeClient encodes a Client request.
func EncodeClient(request *Message, id uint64) {
request.reset()
request.putUint64(id)
request.putHeader(RequestClient)
@ -21,6 +23,7 @@ func EncodeClient(request *Message, id uint64) {
// EncodeHeartbeat encodes a Heartbeat request.
func EncodeHeartbeat(request *Message, timestamp uint64) {
request.reset()
request.putUint64(timestamp)
request.putHeader(RequestHeartbeat)
@ -28,6 +31,7 @@ func EncodeHeartbeat(request *Message, timestamp uint64) {
// EncodeOpen encodes a Open request.
func EncodeOpen(request *Message, name string, flags uint64, vfs string) {
request.reset()
request.putString(name)
request.putUint64(flags)
request.putString(vfs)
@ -37,6 +41,7 @@ func EncodeOpen(request *Message, name string, flags uint64, vfs string) {
// EncodePrepare encodes a Prepare request.
func EncodePrepare(request *Message, db uint64, sql string) {
request.reset()
request.putUint64(db)
request.putString(sql)
@ -45,6 +50,7 @@ func EncodePrepare(request *Message, db uint64, sql string) {
// EncodeExec encodes a Exec request.
func EncodeExec(request *Message, db uint32, stmt uint32, values NamedValues) {
request.reset()
request.putUint32(db)
request.putUint32(stmt)
request.putNamedValues(values)
@ -54,6 +60,7 @@ func EncodeExec(request *Message, db uint32, stmt uint32, values NamedValues) {
// EncodeQuery encodes a Query request.
func EncodeQuery(request *Message, db uint32, stmt uint32, values NamedValues) {
request.reset()
request.putUint32(db)
request.putUint32(stmt)
request.putNamedValues(values)
@ -63,6 +70,7 @@ func EncodeQuery(request *Message, db uint32, stmt uint32, values NamedValues) {
// EncodeFinalize encodes a Finalize request.
func EncodeFinalize(request *Message, db uint32, stmt uint32) {
request.reset()
request.putUint32(db)
request.putUint32(stmt)
@ -71,6 +79,7 @@ func EncodeFinalize(request *Message, db uint32, stmt uint32) {
// EncodeExecSQL encodes a ExecSQL request.
func EncodeExecSQL(request *Message, db uint64, sql string, values NamedValues) {
request.reset()
request.putUint64(db)
request.putString(sql)
request.putNamedValues(values)
@ -80,6 +89,7 @@ func EncodeExecSQL(request *Message, db uint64, sql string, values NamedValues)
// EncodeQuerySQL encodes a QuerySQL request.
func EncodeQuerySQL(request *Message, db uint64, sql string, values NamedValues) {
request.reset()
request.putUint64(db)
request.putString(sql)
request.putNamedValues(values)
@ -89,6 +99,7 @@ func EncodeQuerySQL(request *Message, db uint64, sql string, values NamedValues)
// EncodeInterrupt encodes a Interrupt request.
func EncodeInterrupt(request *Message, db uint64) {
request.reset()
request.putUint64(db)
request.putHeader(RequestInterrupt)
@ -96,6 +107,7 @@ func EncodeInterrupt(request *Message, db uint64) {
// EncodeAdd encodes a Add request.
func EncodeAdd(request *Message, id uint64, address string) {
request.reset()
request.putUint64(id)
request.putString(address)
@ -104,6 +116,7 @@ func EncodeAdd(request *Message, id uint64, address string) {
// EncodeAssign encodes a Assign request.
func EncodeAssign(request *Message, id uint64, role uint64) {
request.reset()
request.putUint64(id)
request.putUint64(role)
@ -112,6 +125,7 @@ func EncodeAssign(request *Message, id uint64, role uint64) {
// EncodeRemove encodes a Remove request.
func EncodeRemove(request *Message, id uint64) {
request.reset()
request.putUint64(id)
request.putHeader(RequestRemove)
@ -119,6 +133,7 @@ func EncodeRemove(request *Message, id uint64) {
// EncodeDump encodes a Dump request.
func EncodeDump(request *Message, name string) {
request.reset()
request.putString(name)
request.putHeader(RequestDump)
@ -126,6 +141,7 @@ func EncodeDump(request *Message, name string) {
// EncodeCluster encodes a Cluster request.
func EncodeCluster(request *Message, format uint64) {
request.reset()
request.putUint64(format)
request.putHeader(RequestCluster)
@ -133,6 +149,7 @@ func EncodeCluster(request *Message, format uint64) {
// EncodeTransfer encodes a Transfer request.
func EncodeTransfer(request *Message, id uint64) {
request.reset()
request.putUint64(id)
request.putHeader(RequestTransfer)

View File

@ -54,6 +54,7 @@ if [ "$entity" = "--request" ]; then
// Encode${cmd} encodes a $cmd request.
func Encode${cmd}(request *Message${args}) {
request.reset()
EOF
for i in "${@}"

View File

@ -118,6 +118,16 @@ func (s *Node) Close() error {
return nil
}
// BootstrapID is a magic ID that should be used for the fist node in a
// cluster. Alternatively ID 1 can be used as well.
const BootstrapID = 0x2dc171858c3155be
// GenerateID generates a unique ID for a new node, based on a hash of its
// address and the current time.
func GenerateID(address string) uint64 {
return bindings.GenerateID(address)
}
// Create a options object with sane defaults.
func defaultOptions() *options {
return &options{

View File

@ -39,9 +39,10 @@ type Controller struct {
}
const (
image = "rancher/klipper-helm:v0.2.3"
label = "helmcharts.helm.cattle.io/chart"
name = "helm-controller"
image = "rancher/klipper-helm:v0.2.5"
Label = "helmcharts.helm.cattle.io/chart"
CRDName = "helmcharts.helm.cattle.io"
Name = "helm-controller"
)
func Register(ctx context.Context, apply apply.Apply,
@ -50,7 +51,7 @@ func Register(ctx context.Context, apply apply.Apply,
crbs rbaccontroller.ClusterRoleBindingController,
sas corecontroller.ServiceAccountController,
cm corecontroller.ConfigMapController) {
apply = apply.WithSetID(name).
apply = apply.WithSetID(Name).
WithCacheTypes(helms, jobs, crbs, sas, cm).
WithStrictCaching().WithPatcher(batch.SchemeGroupVersion.WithKind("Job"), func(namespace, name string, pt types.PatchType, data []byte) (runtime.Object, error) {
err := jobs.Delete(namespace, name, &metav1.DeleteOptions{})
@ -63,7 +64,7 @@ func Register(ctx context.Context, apply apply.Apply,
relatedresource.Watch(ctx, "helm-pod-watch",
func(namespace, name string, obj runtime.Object) ([]relatedresource.Key, error) {
if job, ok := obj.(*batch.Job); ok {
name := job.Labels[label]
name := job.Labels[Label]
if name != "" {
return []relatedresource.Key{
{
@ -84,8 +85,8 @@ func Register(ctx context.Context, apply apply.Apply,
apply: apply,
}
helms.OnChange(ctx, name, controller.OnHelmChanged)
helms.OnRemove(ctx, name, controller.OnHelmRemove)
helms.OnChange(ctx, Name, controller.OnHelmChanged)
helms.OnRemove(ctx, Name, controller.OnHelmRemove)
}
func (c *Controller) OnHelmChanged(key string, chart *helmv1.HelmChart) (*helmv1.HelmChart, error) {
@ -163,7 +164,7 @@ func job(chart *helmv1.HelmChart) (*batch.Job, *core.ConfigMap) {
Name: fmt.Sprintf("helm-%s-%s", action, chart.Name),
Namespace: chart.Namespace,
Labels: map[string]string{
label: chart.Name,
Label: chart.Name,
},
},
Spec: batch.JobSpec{
@ -171,7 +172,7 @@ func job(chart *helmv1.HelmChart) (*batch.Job, *core.ConfigMap) {
Template: core.PodTemplateSpec{
ObjectMeta: meta.ObjectMeta{
Labels: map[string]string{
label: chart.Name,
Label: chart.Name,
},
},
Spec: core.PodSpec{

View File

@ -35,6 +35,7 @@ var (
);`,
}
nameIdx = "create index kine_name_index on kine (name)"
nameIDIdx = "create index kine_name_id_index on kine (name,id)"
revisionIdx = "create unique index kine_name_prev_revision_uindex on kine (name, prev_revision)"
createDB = "create database if not exists "
)
@ -87,6 +88,7 @@ func setup(db *sql.DB) error {
// check if duplicate indexes
indexes := []string{
nameIdx,
nameIDIdx,
revisionIdx}
for _, idx := range indexes {

View File

@ -35,6 +35,7 @@ var (
old_value bytea
);`,
`CREATE INDEX IF NOT EXISTS kine_name_index ON kine (name)`,
`CREATE INDEX IF NOT EXISTS kine_name_id_index ON kine (name,id)`,
`CREATE UNIQUE INDEX IF NOT EXISTS kine_name_prev_revision_uindex ON kine (name, prev_revision)`,
}
createDB = "create database "

View File

@ -295,6 +295,7 @@ func (l *LogStructured) ttlEvents(ctx context.Context) chan *server.Event {
func (l *LogStructured) ttl(ctx context.Context) {
// vary naive TTL support
mutex := &sync.Mutex{}
for event := range l.ttlEvents(ctx) {
go func(event *server.Event) {
select {
@ -302,7 +303,9 @@ func (l *LogStructured) ttl(ctx context.Context) {
return
case <-time.After(time.Duration(event.KV.Lease) * time.Second):
}
mutex.Lock()
l.Delete(ctx, event.KV.Key, event.KV.ModRevision)
mutex.Unlock()
}(event)
}
}

View File

@ -52,18 +52,23 @@ func NewFactoryFromConfigOrDie(config *rest.Config) *Factory {
}
func NewFactoryFromConfig(config *rest.Config) (*Factory, error) {
cs, err := clientset.NewForConfig(config)
if err != nil {
return nil, err
}
informerFactory := informers.NewSharedInformerFactory(cs, 2*time.Hour)
return NewFactory(cs, informerFactory), nil
return NewFactoryFromConfigWithOptions(config, nil)
}
func NewFactoryFromConfigWithNamespace(config *rest.Config, namespace string) (*Factory, error) {
if namespace == "" {
return NewFactoryFromConfig(config)
return NewFactoryFromConfigWithOptions(config, &FactoryOptions{
Namespace: namespace,
})
}
type FactoryOptions struct {
Namespace string
Resync time.Duration
}
func NewFactoryFromConfigWithOptions(config *rest.Config, opts *FactoryOptions) (*Factory, error) {
if opts == nil {
opts = &FactoryOptions{}
}
cs, err := clientset.NewForConfig(config)
@ -71,7 +76,17 @@ func NewFactoryFromConfigWithNamespace(config *rest.Config, namespace string) (*
return nil, err
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(cs, 2*time.Hour, informers.WithNamespace(namespace))
resync := opts.Resync
if resync == 0 {
resync = 2 * time.Hour
}
if opts.Namespace == "" {
informerFactory := informers.NewSharedInformerFactory(cs, resync)
return NewFactory(cs, informerFactory), nil
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(cs, resync, informers.WithNamespace(opts.Namespace))
return NewFactory(cs, informerFactory), nil
}

View File

@ -25,6 +25,7 @@ import (
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/condition"
"github.com/rancher/wrangler/pkg/generic"
"github.com/rancher/wrangler/pkg/kv"
v1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
@ -267,6 +268,7 @@ func RegisterDaemonSetGeneratingHandler(ctx context.Context, controller DaemonSe
if opts != nil {
statusHandler.opts = *opts
}
controller.OnChange(ctx, name, statusHandler.Remove)
RegisterDaemonSetStatusHandler(ctx, controller, condition, name, statusHandler.Handle)
}
@ -281,7 +283,7 @@ func (a *daemonSetStatusHandler) sync(key string, obj *v1.DaemonSet) (*v1.Daemon
return obj, nil
}
origStatus := obj.Status
origStatus := obj.Status.DeepCopy()
obj = obj.DeepCopy()
newStatus, err := a.handler(obj, obj.Status)
if err != nil {
@ -289,16 +291,16 @@ func (a *daemonSetStatusHandler) sync(key string, obj *v1.DaemonSet) (*v1.Daemon
newStatus = *origStatus.DeepCopy()
}
obj.Status = newStatus
if a.condition != "" {
if errors.IsConflict(err) {
a.condition.SetError(obj, "", nil)
a.condition.SetError(&newStatus, "", nil)
} else {
a.condition.SetError(obj, "", err)
a.condition.SetError(&newStatus, "", err)
}
}
if !equality.Semantic.DeepEqual(origStatus, obj.Status) {
if !equality.Semantic.DeepEqual(origStatus, &newStatus) {
var newErr error
obj.Status = newStatus
obj, newErr = a.client.UpdateStatus(obj)
if err == nil {
err = newErr
@ -315,29 +317,28 @@ type daemonSetGeneratingHandler struct {
name string
}
func (a *daemonSetGeneratingHandler) Remove(key string, obj *v1.DaemonSet) (*v1.DaemonSet, error) {
if obj != nil {
return obj, nil
}
obj = &v1.DaemonSet{}
obj.Namespace, obj.Name = kv.RSplit(key, "/")
obj.SetGroupVersionKind(a.gvk)
return nil, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects()
}
func (a *daemonSetGeneratingHandler) Handle(obj *v1.DaemonSet, status v1.DaemonSetStatus) (v1.DaemonSetStatus, error) {
objs, newStatus, err := a.DaemonSetGeneratingHandler(obj, status)
if err != nil {
return newStatus, err
}
apply := a.apply
if !a.opts.DynamicLookup {
apply = apply.WithStrictCaching()
}
if !a.opts.AllowCrossNamespace && !a.opts.AllowClusterScoped {
apply = apply.WithSetOwnerReference(true, false).
WithDefaultNamespace(obj.GetNamespace()).
WithListerNamespace(obj.GetNamespace())
}
if !a.opts.AllowClusterScoped {
apply = apply.WithRestrictClusterScoped()
}
return newStatus, apply.
return newStatus, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects(objs...)

View File

@ -25,6 +25,7 @@ import (
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/condition"
"github.com/rancher/wrangler/pkg/generic"
"github.com/rancher/wrangler/pkg/kv"
v1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
@ -267,6 +268,7 @@ func RegisterDeploymentGeneratingHandler(ctx context.Context, controller Deploym
if opts != nil {
statusHandler.opts = *opts
}
controller.OnChange(ctx, name, statusHandler.Remove)
RegisterDeploymentStatusHandler(ctx, controller, condition, name, statusHandler.Handle)
}
@ -281,7 +283,7 @@ func (a *deploymentStatusHandler) sync(key string, obj *v1.Deployment) (*v1.Depl
return obj, nil
}
origStatus := obj.Status
origStatus := obj.Status.DeepCopy()
obj = obj.DeepCopy()
newStatus, err := a.handler(obj, obj.Status)
if err != nil {
@ -289,16 +291,16 @@ func (a *deploymentStatusHandler) sync(key string, obj *v1.Deployment) (*v1.Depl
newStatus = *origStatus.DeepCopy()
}
obj.Status = newStatus
if a.condition != "" {
if errors.IsConflict(err) {
a.condition.SetError(obj, "", nil)
a.condition.SetError(&newStatus, "", nil)
} else {
a.condition.SetError(obj, "", err)
a.condition.SetError(&newStatus, "", err)
}
}
if !equality.Semantic.DeepEqual(origStatus, obj.Status) {
if !equality.Semantic.DeepEqual(origStatus, &newStatus) {
var newErr error
obj.Status = newStatus
obj, newErr = a.client.UpdateStatus(obj)
if err == nil {
err = newErr
@ -315,29 +317,28 @@ type deploymentGeneratingHandler struct {
name string
}
func (a *deploymentGeneratingHandler) Remove(key string, obj *v1.Deployment) (*v1.Deployment, error) {
if obj != nil {
return obj, nil
}
obj = &v1.Deployment{}
obj.Namespace, obj.Name = kv.RSplit(key, "/")
obj.SetGroupVersionKind(a.gvk)
return nil, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects()
}
func (a *deploymentGeneratingHandler) Handle(obj *v1.Deployment, status v1.DeploymentStatus) (v1.DeploymentStatus, error) {
objs, newStatus, err := a.DeploymentGeneratingHandler(obj, status)
if err != nil {
return newStatus, err
}
apply := a.apply
if !a.opts.DynamicLookup {
apply = apply.WithStrictCaching()
}
if !a.opts.AllowCrossNamespace && !a.opts.AllowClusterScoped {
apply = apply.WithSetOwnerReference(true, false).
WithDefaultNamespace(obj.GetNamespace()).
WithListerNamespace(obj.GetNamespace())
}
if !a.opts.AllowClusterScoped {
apply = apply.WithRestrictClusterScoped()
}
return newStatus, apply.
return newStatus, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects(objs...)

View File

@ -25,6 +25,7 @@ import (
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/condition"
"github.com/rancher/wrangler/pkg/generic"
"github.com/rancher/wrangler/pkg/kv"
v1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
@ -267,6 +268,7 @@ func RegisterStatefulSetGeneratingHandler(ctx context.Context, controller Statef
if opts != nil {
statusHandler.opts = *opts
}
controller.OnChange(ctx, name, statusHandler.Remove)
RegisterStatefulSetStatusHandler(ctx, controller, condition, name, statusHandler.Handle)
}
@ -281,7 +283,7 @@ func (a *statefulSetStatusHandler) sync(key string, obj *v1.StatefulSet) (*v1.St
return obj, nil
}
origStatus := obj.Status
origStatus := obj.Status.DeepCopy()
obj = obj.DeepCopy()
newStatus, err := a.handler(obj, obj.Status)
if err != nil {
@ -289,16 +291,16 @@ func (a *statefulSetStatusHandler) sync(key string, obj *v1.StatefulSet) (*v1.St
newStatus = *origStatus.DeepCopy()
}
obj.Status = newStatus
if a.condition != "" {
if errors.IsConflict(err) {
a.condition.SetError(obj, "", nil)
a.condition.SetError(&newStatus, "", nil)
} else {
a.condition.SetError(obj, "", err)
a.condition.SetError(&newStatus, "", err)
}
}
if !equality.Semantic.DeepEqual(origStatus, obj.Status) {
if !equality.Semantic.DeepEqual(origStatus, &newStatus) {
var newErr error
obj.Status = newStatus
obj, newErr = a.client.UpdateStatus(obj)
if err == nil {
err = newErr
@ -315,29 +317,28 @@ type statefulSetGeneratingHandler struct {
name string
}
func (a *statefulSetGeneratingHandler) Remove(key string, obj *v1.StatefulSet) (*v1.StatefulSet, error) {
if obj != nil {
return obj, nil
}
obj = &v1.StatefulSet{}
obj.Namespace, obj.Name = kv.RSplit(key, "/")
obj.SetGroupVersionKind(a.gvk)
return nil, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects()
}
func (a *statefulSetGeneratingHandler) Handle(obj *v1.StatefulSet, status v1.StatefulSetStatus) (v1.StatefulSetStatus, error) {
objs, newStatus, err := a.StatefulSetGeneratingHandler(obj, status)
if err != nil {
return newStatus, err
}
apply := a.apply
if !a.opts.DynamicLookup {
apply = apply.WithStrictCaching()
}
if !a.opts.AllowCrossNamespace && !a.opts.AllowClusterScoped {
apply = apply.WithSetOwnerReference(true, false).
WithDefaultNamespace(obj.GetNamespace()).
WithListerNamespace(obj.GetNamespace())
}
if !a.opts.AllowClusterScoped {
apply = apply.WithRestrictClusterScoped()
}
return newStatus, apply.
return newStatus, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects(objs...)

View File

@ -52,18 +52,23 @@ func NewFactoryFromConfigOrDie(config *rest.Config) *Factory {
}
func NewFactoryFromConfig(config *rest.Config) (*Factory, error) {
cs, err := clientset.NewForConfig(config)
if err != nil {
return nil, err
}
informerFactory := informers.NewSharedInformerFactory(cs, 2*time.Hour)
return NewFactory(cs, informerFactory), nil
return NewFactoryFromConfigWithOptions(config, nil)
}
func NewFactoryFromConfigWithNamespace(config *rest.Config, namespace string) (*Factory, error) {
if namespace == "" {
return NewFactoryFromConfig(config)
return NewFactoryFromConfigWithOptions(config, &FactoryOptions{
Namespace: namespace,
})
}
type FactoryOptions struct {
Namespace string
Resync time.Duration
}
func NewFactoryFromConfigWithOptions(config *rest.Config, opts *FactoryOptions) (*Factory, error) {
if opts == nil {
opts = &FactoryOptions{}
}
cs, err := clientset.NewForConfig(config)
@ -71,7 +76,17 @@ func NewFactoryFromConfigWithNamespace(config *rest.Config, namespace string) (*
return nil, err
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(cs, 2*time.Hour, informers.WithNamespace(namespace))
resync := opts.Resync
if resync == 0 {
resync = 2 * time.Hour
}
if opts.Namespace == "" {
informerFactory := informers.NewSharedInformerFactory(cs, resync)
return NewFactory(cs, informerFactory), nil
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(cs, resync, informers.WithNamespace(opts.Namespace))
return NewFactory(cs, informerFactory), nil
}

View File

@ -25,6 +25,7 @@ import (
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/condition"
"github.com/rancher/wrangler/pkg/generic"
"github.com/rancher/wrangler/pkg/kv"
v1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
@ -267,6 +268,7 @@ func RegisterJobGeneratingHandler(ctx context.Context, controller JobController,
if opts != nil {
statusHandler.opts = *opts
}
controller.OnChange(ctx, name, statusHandler.Remove)
RegisterJobStatusHandler(ctx, controller, condition, name, statusHandler.Handle)
}
@ -281,7 +283,7 @@ func (a *jobStatusHandler) sync(key string, obj *v1.Job) (*v1.Job, error) {
return obj, nil
}
origStatus := obj.Status
origStatus := obj.Status.DeepCopy()
obj = obj.DeepCopy()
newStatus, err := a.handler(obj, obj.Status)
if err != nil {
@ -289,16 +291,16 @@ func (a *jobStatusHandler) sync(key string, obj *v1.Job) (*v1.Job, error) {
newStatus = *origStatus.DeepCopy()
}
obj.Status = newStatus
if a.condition != "" {
if errors.IsConflict(err) {
a.condition.SetError(obj, "", nil)
a.condition.SetError(&newStatus, "", nil)
} else {
a.condition.SetError(obj, "", err)
a.condition.SetError(&newStatus, "", err)
}
}
if !equality.Semantic.DeepEqual(origStatus, obj.Status) {
if !equality.Semantic.DeepEqual(origStatus, &newStatus) {
var newErr error
obj.Status = newStatus
obj, newErr = a.client.UpdateStatus(obj)
if err == nil {
err = newErr
@ -315,29 +317,28 @@ type jobGeneratingHandler struct {
name string
}
func (a *jobGeneratingHandler) Remove(key string, obj *v1.Job) (*v1.Job, error) {
if obj != nil {
return obj, nil
}
obj = &v1.Job{}
obj.Namespace, obj.Name = kv.RSplit(key, "/")
obj.SetGroupVersionKind(a.gvk)
return nil, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects()
}
func (a *jobGeneratingHandler) Handle(obj *v1.Job, status v1.JobStatus) (v1.JobStatus, error) {
objs, newStatus, err := a.JobGeneratingHandler(obj, status)
if err != nil {
return newStatus, err
}
apply := a.apply
if !a.opts.DynamicLookup {
apply = apply.WithStrictCaching()
}
if !a.opts.AllowCrossNamespace && !a.opts.AllowClusterScoped {
apply = apply.WithSetOwnerReference(true, false).
WithDefaultNamespace(obj.GetNamespace()).
WithListerNamespace(obj.GetNamespace())
}
if !a.opts.AllowClusterScoped {
apply = apply.WithRestrictClusterScoped()
}
return newStatus, apply.
return newStatus, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects(objs...)

View File

@ -52,18 +52,23 @@ func NewFactoryFromConfigOrDie(config *rest.Config) *Factory {
}
func NewFactoryFromConfig(config *rest.Config) (*Factory, error) {
cs, err := clientset.NewForConfig(config)
if err != nil {
return nil, err
}
informerFactory := informers.NewSharedInformerFactory(cs, 2*time.Hour)
return NewFactory(cs, informerFactory), nil
return NewFactoryFromConfigWithOptions(config, nil)
}
func NewFactoryFromConfigWithNamespace(config *rest.Config, namespace string) (*Factory, error) {
if namespace == "" {
return NewFactoryFromConfig(config)
return NewFactoryFromConfigWithOptions(config, &FactoryOptions{
Namespace: namespace,
})
}
type FactoryOptions struct {
Namespace string
Resync time.Duration
}
func NewFactoryFromConfigWithOptions(config *rest.Config, opts *FactoryOptions) (*Factory, error) {
if opts == nil {
opts = &FactoryOptions{}
}
cs, err := clientset.NewForConfig(config)
@ -71,7 +76,17 @@ func NewFactoryFromConfigWithNamespace(config *rest.Config, namespace string) (*
return nil, err
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(cs, 2*time.Hour, informers.WithNamespace(namespace))
resync := opts.Resync
if resync == 0 {
resync = 2 * time.Hour
}
if opts.Namespace == "" {
informerFactory := informers.NewSharedInformerFactory(cs, resync)
return NewFactory(cs, informerFactory), nil
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(cs, resync, informers.WithNamespace(opts.Namespace))
return NewFactory(cs, informerFactory), nil
}

View File

@ -25,6 +25,7 @@ import (
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/condition"
"github.com/rancher/wrangler/pkg/generic"
"github.com/rancher/wrangler/pkg/kv"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
@ -267,6 +268,7 @@ func RegisterNamespaceGeneratingHandler(ctx context.Context, controller Namespac
if opts != nil {
statusHandler.opts = *opts
}
controller.OnChange(ctx, name, statusHandler.Remove)
RegisterNamespaceStatusHandler(ctx, controller, condition, name, statusHandler.Handle)
}
@ -281,7 +283,7 @@ func (a *namespaceStatusHandler) sync(key string, obj *v1.Namespace) (*v1.Namesp
return obj, nil
}
origStatus := obj.Status
origStatus := obj.Status.DeepCopy()
obj = obj.DeepCopy()
newStatus, err := a.handler(obj, obj.Status)
if err != nil {
@ -289,16 +291,16 @@ func (a *namespaceStatusHandler) sync(key string, obj *v1.Namespace) (*v1.Namesp
newStatus = *origStatus.DeepCopy()
}
obj.Status = newStatus
if a.condition != "" {
if errors.IsConflict(err) {
a.condition.SetError(obj, "", nil)
a.condition.SetError(&newStatus, "", nil)
} else {
a.condition.SetError(obj, "", err)
a.condition.SetError(&newStatus, "", err)
}
}
if !equality.Semantic.DeepEqual(origStatus, obj.Status) {
if !equality.Semantic.DeepEqual(origStatus, &newStatus) {
var newErr error
obj.Status = newStatus
obj, newErr = a.client.UpdateStatus(obj)
if err == nil {
err = newErr
@ -315,29 +317,28 @@ type namespaceGeneratingHandler struct {
name string
}
func (a *namespaceGeneratingHandler) Remove(key string, obj *v1.Namespace) (*v1.Namespace, error) {
if obj != nil {
return obj, nil
}
obj = &v1.Namespace{}
obj.Namespace, obj.Name = kv.RSplit(key, "/")
obj.SetGroupVersionKind(a.gvk)
return nil, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects()
}
func (a *namespaceGeneratingHandler) Handle(obj *v1.Namespace, status v1.NamespaceStatus) (v1.NamespaceStatus, error) {
objs, newStatus, err := a.NamespaceGeneratingHandler(obj, status)
if err != nil {
return newStatus, err
}
apply := a.apply
if !a.opts.DynamicLookup {
apply = apply.WithStrictCaching()
}
if !a.opts.AllowCrossNamespace && !a.opts.AllowClusterScoped {
apply = apply.WithSetOwnerReference(true, false).
WithDefaultNamespace(obj.GetNamespace()).
WithListerNamespace(obj.GetNamespace())
}
if !a.opts.AllowClusterScoped {
apply = apply.WithRestrictClusterScoped()
}
return newStatus, apply.
return newStatus, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects(objs...)

View File

@ -25,6 +25,7 @@ import (
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/condition"
"github.com/rancher/wrangler/pkg/generic"
"github.com/rancher/wrangler/pkg/kv"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
@ -267,6 +268,7 @@ func RegisterNodeGeneratingHandler(ctx context.Context, controller NodeControlle
if opts != nil {
statusHandler.opts = *opts
}
controller.OnChange(ctx, name, statusHandler.Remove)
RegisterNodeStatusHandler(ctx, controller, condition, name, statusHandler.Handle)
}
@ -281,7 +283,7 @@ func (a *nodeStatusHandler) sync(key string, obj *v1.Node) (*v1.Node, error) {
return obj, nil
}
origStatus := obj.Status
origStatus := obj.Status.DeepCopy()
obj = obj.DeepCopy()
newStatus, err := a.handler(obj, obj.Status)
if err != nil {
@ -289,16 +291,16 @@ func (a *nodeStatusHandler) sync(key string, obj *v1.Node) (*v1.Node, error) {
newStatus = *origStatus.DeepCopy()
}
obj.Status = newStatus
if a.condition != "" {
if errors.IsConflict(err) {
a.condition.SetError(obj, "", nil)
a.condition.SetError(&newStatus, "", nil)
} else {
a.condition.SetError(obj, "", err)
a.condition.SetError(&newStatus, "", err)
}
}
if !equality.Semantic.DeepEqual(origStatus, obj.Status) {
if !equality.Semantic.DeepEqual(origStatus, &newStatus) {
var newErr error
obj.Status = newStatus
obj, newErr = a.client.UpdateStatus(obj)
if err == nil {
err = newErr
@ -315,29 +317,28 @@ type nodeGeneratingHandler struct {
name string
}
func (a *nodeGeneratingHandler) Remove(key string, obj *v1.Node) (*v1.Node, error) {
if obj != nil {
return obj, nil
}
obj = &v1.Node{}
obj.Namespace, obj.Name = kv.RSplit(key, "/")
obj.SetGroupVersionKind(a.gvk)
return nil, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects()
}
func (a *nodeGeneratingHandler) Handle(obj *v1.Node, status v1.NodeStatus) (v1.NodeStatus, error) {
objs, newStatus, err := a.NodeGeneratingHandler(obj, status)
if err != nil {
return newStatus, err
}
apply := a.apply
if !a.opts.DynamicLookup {
apply = apply.WithStrictCaching()
}
if !a.opts.AllowCrossNamespace && !a.opts.AllowClusterScoped {
apply = apply.WithSetOwnerReference(true, false).
WithDefaultNamespace(obj.GetNamespace()).
WithListerNamespace(obj.GetNamespace())
}
if !a.opts.AllowClusterScoped {
apply = apply.WithRestrictClusterScoped()
}
return newStatus, apply.
return newStatus, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects(objs...)

View File

@ -25,6 +25,7 @@ import (
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/condition"
"github.com/rancher/wrangler/pkg/generic"
"github.com/rancher/wrangler/pkg/kv"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
@ -267,6 +268,7 @@ func RegisterPersistentVolumeClaimGeneratingHandler(ctx context.Context, control
if opts != nil {
statusHandler.opts = *opts
}
controller.OnChange(ctx, name, statusHandler.Remove)
RegisterPersistentVolumeClaimStatusHandler(ctx, controller, condition, name, statusHandler.Handle)
}
@ -281,7 +283,7 @@ func (a *persistentVolumeClaimStatusHandler) sync(key string, obj *v1.Persistent
return obj, nil
}
origStatus := obj.Status
origStatus := obj.Status.DeepCopy()
obj = obj.DeepCopy()
newStatus, err := a.handler(obj, obj.Status)
if err != nil {
@ -289,16 +291,16 @@ func (a *persistentVolumeClaimStatusHandler) sync(key string, obj *v1.Persistent
newStatus = *origStatus.DeepCopy()
}
obj.Status = newStatus
if a.condition != "" {
if errors.IsConflict(err) {
a.condition.SetError(obj, "", nil)
a.condition.SetError(&newStatus, "", nil)
} else {
a.condition.SetError(obj, "", err)
a.condition.SetError(&newStatus, "", err)
}
}
if !equality.Semantic.DeepEqual(origStatus, obj.Status) {
if !equality.Semantic.DeepEqual(origStatus, &newStatus) {
var newErr error
obj.Status = newStatus
obj, newErr = a.client.UpdateStatus(obj)
if err == nil {
err = newErr
@ -315,29 +317,28 @@ type persistentVolumeClaimGeneratingHandler struct {
name string
}
func (a *persistentVolumeClaimGeneratingHandler) Remove(key string, obj *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) {
if obj != nil {
return obj, nil
}
obj = &v1.PersistentVolumeClaim{}
obj.Namespace, obj.Name = kv.RSplit(key, "/")
obj.SetGroupVersionKind(a.gvk)
return nil, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects()
}
func (a *persistentVolumeClaimGeneratingHandler) Handle(obj *v1.PersistentVolumeClaim, status v1.PersistentVolumeClaimStatus) (v1.PersistentVolumeClaimStatus, error) {
objs, newStatus, err := a.PersistentVolumeClaimGeneratingHandler(obj, status)
if err != nil {
return newStatus, err
}
apply := a.apply
if !a.opts.DynamicLookup {
apply = apply.WithStrictCaching()
}
if !a.opts.AllowCrossNamespace && !a.opts.AllowClusterScoped {
apply = apply.WithSetOwnerReference(true, false).
WithDefaultNamespace(obj.GetNamespace()).
WithListerNamespace(obj.GetNamespace())
}
if !a.opts.AllowClusterScoped {
apply = apply.WithRestrictClusterScoped()
}
return newStatus, apply.
return newStatus, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects(objs...)

View File

@ -25,6 +25,7 @@ import (
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/condition"
"github.com/rancher/wrangler/pkg/generic"
"github.com/rancher/wrangler/pkg/kv"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
@ -267,6 +268,7 @@ func RegisterPodGeneratingHandler(ctx context.Context, controller PodController,
if opts != nil {
statusHandler.opts = *opts
}
controller.OnChange(ctx, name, statusHandler.Remove)
RegisterPodStatusHandler(ctx, controller, condition, name, statusHandler.Handle)
}
@ -281,7 +283,7 @@ func (a *podStatusHandler) sync(key string, obj *v1.Pod) (*v1.Pod, error) {
return obj, nil
}
origStatus := obj.Status
origStatus := obj.Status.DeepCopy()
obj = obj.DeepCopy()
newStatus, err := a.handler(obj, obj.Status)
if err != nil {
@ -289,16 +291,16 @@ func (a *podStatusHandler) sync(key string, obj *v1.Pod) (*v1.Pod, error) {
newStatus = *origStatus.DeepCopy()
}
obj.Status = newStatus
if a.condition != "" {
if errors.IsConflict(err) {
a.condition.SetError(obj, "", nil)
a.condition.SetError(&newStatus, "", nil)
} else {
a.condition.SetError(obj, "", err)
a.condition.SetError(&newStatus, "", err)
}
}
if !equality.Semantic.DeepEqual(origStatus, obj.Status) {
if !equality.Semantic.DeepEqual(origStatus, &newStatus) {
var newErr error
obj.Status = newStatus
obj, newErr = a.client.UpdateStatus(obj)
if err == nil {
err = newErr
@ -315,29 +317,28 @@ type podGeneratingHandler struct {
name string
}
func (a *podGeneratingHandler) Remove(key string, obj *v1.Pod) (*v1.Pod, error) {
if obj != nil {
return obj, nil
}
obj = &v1.Pod{}
obj.Namespace, obj.Name = kv.RSplit(key, "/")
obj.SetGroupVersionKind(a.gvk)
return nil, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects()
}
func (a *podGeneratingHandler) Handle(obj *v1.Pod, status v1.PodStatus) (v1.PodStatus, error) {
objs, newStatus, err := a.PodGeneratingHandler(obj, status)
if err != nil {
return newStatus, err
}
apply := a.apply
if !a.opts.DynamicLookup {
apply = apply.WithStrictCaching()
}
if !a.opts.AllowCrossNamespace && !a.opts.AllowClusterScoped {
apply = apply.WithSetOwnerReference(true, false).
WithDefaultNamespace(obj.GetNamespace()).
WithListerNamespace(obj.GetNamespace())
}
if !a.opts.AllowClusterScoped {
apply = apply.WithRestrictClusterScoped()
}
return newStatus, apply.
return newStatus, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects(objs...)

View File

@ -25,6 +25,7 @@ import (
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/condition"
"github.com/rancher/wrangler/pkg/generic"
"github.com/rancher/wrangler/pkg/kv"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
@ -267,6 +268,7 @@ func RegisterServiceGeneratingHandler(ctx context.Context, controller ServiceCon
if opts != nil {
statusHandler.opts = *opts
}
controller.OnChange(ctx, name, statusHandler.Remove)
RegisterServiceStatusHandler(ctx, controller, condition, name, statusHandler.Handle)
}
@ -281,7 +283,7 @@ func (a *serviceStatusHandler) sync(key string, obj *v1.Service) (*v1.Service, e
return obj, nil
}
origStatus := obj.Status
origStatus := obj.Status.DeepCopy()
obj = obj.DeepCopy()
newStatus, err := a.handler(obj, obj.Status)
if err != nil {
@ -289,16 +291,16 @@ func (a *serviceStatusHandler) sync(key string, obj *v1.Service) (*v1.Service, e
newStatus = *origStatus.DeepCopy()
}
obj.Status = newStatus
if a.condition != "" {
if errors.IsConflict(err) {
a.condition.SetError(obj, "", nil)
a.condition.SetError(&newStatus, "", nil)
} else {
a.condition.SetError(obj, "", err)
a.condition.SetError(&newStatus, "", err)
}
}
if !equality.Semantic.DeepEqual(origStatus, obj.Status) {
if !equality.Semantic.DeepEqual(origStatus, &newStatus) {
var newErr error
obj.Status = newStatus
obj, newErr = a.client.UpdateStatus(obj)
if err == nil {
err = newErr
@ -315,29 +317,28 @@ type serviceGeneratingHandler struct {
name string
}
func (a *serviceGeneratingHandler) Remove(key string, obj *v1.Service) (*v1.Service, error) {
if obj != nil {
return obj, nil
}
obj = &v1.Service{}
obj.Namespace, obj.Name = kv.RSplit(key, "/")
obj.SetGroupVersionKind(a.gvk)
return nil, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects()
}
func (a *serviceGeneratingHandler) Handle(obj *v1.Service, status v1.ServiceStatus) (v1.ServiceStatus, error) {
objs, newStatus, err := a.ServiceGeneratingHandler(obj, status)
if err != nil {
return newStatus, err
}
apply := a.apply
if !a.opts.DynamicLookup {
apply = apply.WithStrictCaching()
}
if !a.opts.AllowCrossNamespace && !a.opts.AllowClusterScoped {
apply = apply.WithSetOwnerReference(true, false).
WithDefaultNamespace(obj.GetNamespace()).
WithListerNamespace(obj.GetNamespace())
}
if !a.opts.AllowClusterScoped {
apply = apply.WithRestrictClusterScoped()
}
return newStatus, apply.
return newStatus, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects(objs...)

View File

@ -52,18 +52,23 @@ func NewFactoryFromConfigOrDie(config *rest.Config) *Factory {
}
func NewFactoryFromConfig(config *rest.Config) (*Factory, error) {
cs, err := clientset.NewForConfig(config)
if err != nil {
return nil, err
}
informerFactory := informers.NewSharedInformerFactory(cs, 2*time.Hour)
return NewFactory(cs, informerFactory), nil
return NewFactoryFromConfigWithOptions(config, nil)
}
func NewFactoryFromConfigWithNamespace(config *rest.Config, namespace string) (*Factory, error) {
if namespace == "" {
return NewFactoryFromConfig(config)
return NewFactoryFromConfigWithOptions(config, &FactoryOptions{
Namespace: namespace,
})
}
type FactoryOptions struct {
Namespace string
Resync time.Duration
}
func NewFactoryFromConfigWithOptions(config *rest.Config, opts *FactoryOptions) (*Factory, error) {
if opts == nil {
opts = &FactoryOptions{}
}
cs, err := clientset.NewForConfig(config)
@ -71,7 +76,17 @@ func NewFactoryFromConfigWithNamespace(config *rest.Config, namespace string) (*
return nil, err
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(cs, 2*time.Hour, informers.WithNamespace(namespace))
resync := opts.Resync
if resync == 0 {
resync = 2 * time.Hour
}
if opts.Namespace == "" {
informerFactory := informers.NewSharedInformerFactory(cs, resync)
return NewFactory(cs, informerFactory), nil
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(cs, resync, informers.WithNamespace(opts.Namespace))
return NewFactory(cs, informerFactory), nil
}

View File

@ -28,18 +28,45 @@ type Reconciler func(oldObj runtime.Object, newObj runtime.Object) (bool, error)
type ClientFactory func(gvr schema.GroupVersionResource) (dynamic.NamespaceableResourceInterface, error)
type InformerFactory interface {
Get(gvk schema.GroupVersionKind, gvr schema.GroupVersionResource) (cache.SharedIndexInformer, error)
}
type InformerGetter interface {
Informer() cache.SharedIndexInformer
GroupVersionKind() schema.GroupVersionKind
}
type PatchByGVK map[schema.GroupVersionKind]map[objectset.ObjectKey]string
func (p PatchByGVK) Add(gvk schema.GroupVersionKind, namespace, name, patch string) {
d, ok := p[gvk]
if !ok {
d = map[objectset.ObjectKey]string{}
p[gvk] = d
}
d[objectset.ObjectKey{
Name: name,
Namespace: namespace,
}] = patch
}
type Plan struct {
Create objectset.ObjectKeyByGVK
Delete objectset.ObjectKeyByGVK
Update PatchByGVK
Objects []runtime.Object
}
type Apply interface {
Apply(set *objectset.ObjectSet) error
ApplyObjects(objs ...runtime.Object) error
WithContext(ctx context.Context) Apply
WithCacheTypes(igs ...InformerGetter) Apply
WithCacheTypeFactory(factory InformerFactory) Apply
WithSetID(id string) Apply
WithOwner(obj runtime.Object) Apply
WithOwnerKey(key string, gvk schema.GroupVersionKind) Apply
WithInjector(injs ...injectors.ConfigInjector) Apply
WithInjectorName(injs ...string) Apply
WithPatcher(gvk schema.GroupVersionKind, patchers Patcher) Apply
@ -53,6 +80,10 @@ type Apply interface {
WithNoDelete() Apply
WithGVK(gvks ...schema.GroupVersionKind) Apply
WithSetOwnerReference(controller, block bool) Apply
FindOwner(obj runtime.Object) (runtime.Object, error)
PurgeOrphan(obj runtime.Object) error
DryRun(objs ...runtime.Object) (Plan, error)
}
func NewForConfig(cfg *rest.Config) (Apply, error) {
@ -70,6 +101,7 @@ func New(discovery discovery.DiscoveryInterface, cf ClientFactory, igs ...Inform
clientFactory: cf,
discovery: discovery,
namespaced: map[schema.GroupVersionKind]bool{},
gvkToGVR: map[schema.GroupVersionKind]schema.GroupVersionResource{},
clients: map[schema.GroupVersionKind]dynamic.NamespaceableResourceInterface{},
},
informers: map[schema.GroupVersionKind]cache.SharedIndexInformer{},
@ -93,6 +125,7 @@ type clients struct {
clientFactory ClientFactory
discovery discovery.DiscoveryInterface
namespaced map[schema.GroupVersionKind]bool
gvkToGVR map[schema.GroupVersionKind]schema.GroupVersionResource
clients map[schema.GroupVersionKind]dynamic.NamespaceableResourceInterface
}
@ -102,6 +135,12 @@ func (c *clients) IsNamespaced(gvk schema.GroupVersionKind) bool {
return c.namespaced[gvk]
}
func (c *clients) gvr(gvk schema.GroupVersionKind) schema.GroupVersionResource {
c.Lock()
defer c.Unlock()
return c.gvkToGVR[gvk]
}
func (c *clients) client(gvk schema.GroupVersionKind) (dynamic.NamespaceableResourceInterface, error) {
c.Lock()
defer c.Unlock()
@ -127,6 +166,7 @@ func (c *clients) client(gvk schema.GroupVersionKind) (dynamic.NamespaceableReso
c.namespaced[gvk] = resource.Namespaced
c.clients[gvk] = client
c.gvkToGVR[gvk] = gvk.GroupVersion().WithResource(resource.Name)
return client, nil
}
@ -144,6 +184,10 @@ func (a *apply) newDesiredSet() desiredSet {
}
}
func (a *apply) DryRun(objs ...runtime.Object) (Plan, error) {
return a.newDesiredSet().DryRun(objs...)
}
func (a *apply) Apply(set *objectset.ObjectSet) error {
return a.newDesiredSet().Apply(set)
}
@ -162,6 +206,10 @@ func (a *apply) WithOwner(obj runtime.Object) Apply {
return a.newDesiredSet().WithOwner(obj)
}
func (a *apply) WithOwnerKey(key string, gvk schema.GroupVersionKind) Apply {
return a.newDesiredSet().WithOwnerKey(key, gvk)
}
func (a *apply) WithInjector(injs ...injectors.ConfigInjector) Apply {
return a.newDesiredSet().WithInjector(injs...)
}
@ -174,6 +222,10 @@ func (a *apply) WithCacheTypes(igs ...InformerGetter) Apply {
return a.newDesiredSet().WithCacheTypes(igs...)
}
func (a *apply) WithCacheTypeFactory(factory InformerFactory) Apply {
return a.newDesiredSet().WithCacheTypeFactory(factory)
}
func (a *apply) WithGVK(gvks ...schema.GroupVersionKind) Apply {
return a.newDesiredSet().WithGVK(gvks...)
}
@ -221,3 +273,11 @@ func (a *apply) WithSetOwnerReference(controller, block bool) Apply {
func (a *apply) WithContext(ctx context.Context) Apply {
return a.newDesiredSet().WithContext(ctx)
}
func (a *apply) FindOwner(obj runtime.Object) (runtime.Object, error) {
return a.newDesiredSet().FindOwner(obj)
}
func (a *apply) PurgeOrphan(obj runtime.Object) error {
return a.newDesiredSet().PurgeOrphan(obj)
}

View File

@ -3,8 +3,10 @@ package apply
import (
"context"
"github.com/rancher/wrangler/pkg/apply/injectors"
"github.com/rancher/wrangler/pkg/kv"
"github.com/rancher/wrangler/pkg/merr"
"github.com/rancher/wrangler/pkg/objectset"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
@ -23,6 +25,7 @@ type desiredSet struct {
pruneTypes map[schema.GroupVersionKind]cache.SharedIndexInformer
patchers map[schema.GroupVersionKind]Patcher
reconcilers map[schema.GroupVersionKind]Reconciler
informerFactory InformerFactory
remove bool
noDelete bool
setID string
@ -33,6 +36,9 @@ type desiredSet struct {
ratelimitingQps float32
injectorNames []string
errs []error
createPlan bool
plan Plan
}
func (o *desiredSet) err(err error) error {
@ -44,6 +50,12 @@ func (o desiredSet) Err() error {
return merr.NewErrors(append(o.errs, o.objs.Err())...)
}
func (o desiredSet) DryRun(objs ...runtime.Object) (Plan, error) {
o.objs = objectset.NewObjectSet()
o.objs.Add(objs...)
return o.dryRun()
}
func (o desiredSet) Apply(set *objectset.ObjectSet) error {
if set == nil {
set = objectset.NewObjectSet()
@ -76,6 +88,14 @@ func (o desiredSet) WithSetID(id string) Apply {
return o
}
func (o desiredSet) WithOwnerKey(key string, gvk schema.GroupVersionKind) Apply {
obj := &v1.PartialObjectMetadata{}
obj.Namespace, obj.Name = kv.RSplit(key, "/")
obj.SetGroupVersionKind(gvk)
o.owner = obj
return o
}
func (o desiredSet) WithOwner(obj runtime.Object) Apply {
o.owner = obj
return o
@ -98,6 +118,11 @@ func (o desiredSet) WithInjectorName(injs ...string) Apply {
return o
}
func (o desiredSet) WithCacheTypeFactory(factory InformerFactory) Apply {
o.informerFactory = factory
return o
}
func (o desiredSet) WithCacheTypes(igs ...InformerGetter) Apply {
pruneTypes := make(map[schema.GroupVersionKind]cache.SharedIndexInformer, len(igs))
for k, v := range o.pruneTypes {
@ -147,7 +172,11 @@ func (o desiredSet) WithRestrictClusterScoped() Apply {
}
func (o desiredSet) WithDefaultNamespace(ns string) Apply {
if ns == "" {
o.defaultNamespace = defaultNamespace
} else {
o.defaultNamespace = ns
}
return o
}

View File

@ -27,6 +27,7 @@ const (
LabelName = "objectset.rio.cattle.io/owner-name"
LabelNamespace = "objectset.rio.cattle.io/owner-namespace"
LabelHash = "objectset.rio.cattle.io/hash"
LabelPrefix = "objectset.rio.cattle.io/"
)
var (
@ -58,6 +59,15 @@ func (o *desiredSet) getRateLimit(labelHash string) flowcontrol.RateLimiter {
return rl
}
func (o *desiredSet) dryRun() (Plan, error) {
o.createPlan = true
o.plan.Create = objectset.ObjectKeyByGVK{}
o.plan.Update = PatchByGVK{}
o.plan.Delete = objectset.ObjectKeyByGVK{}
err := o.apply()
return o.plan, err
}
func (o *desiredSet) apply() error {
if o.objs == nil || o.objs.Len() == 0 {
o.remove = true
@ -67,7 +77,7 @@ func (o *desiredSet) apply() error {
return err
}
labelSet, annotationSet, err := o.getLabelsAndAnnotations()
labelSet, annotationSet, err := GetLabelsAndAnnotations(o.setID, o.owner)
if err != nil {
return o.err(err)
}
@ -90,13 +100,13 @@ func (o *desiredSet) apply() error {
objs := o.collect(objList)
debugID := o.debugID()
req, err := labels.NewRequirement(LabelHash, selection.Equals, []string{labelSet[LabelHash]})
sel, err := GetSelector(labelSet)
if err != nil {
return o.err(err)
}
for _, gvk := range o.objs.GVKOrder(o.knownGVK()...) {
o.process(debugID, labels.NewSelector().Add(*req), gvk, objs[gvk])
o.process(debugID, sel, gvk, objs[gvk])
}
return o.Err()
@ -161,18 +171,26 @@ func (o *desiredSet) runInjectors(objList []runtime.Object) ([]runtime.Object, e
return objList, nil
}
func (o *desiredSet) getLabelsAndAnnotations() (map[string]string, map[string]string, error) {
annotations := map[string]string{
LabelID: o.setID,
func GetSelector(labelSet map[string]string) (labels.Selector, error) {
req, err := labels.NewRequirement(LabelHash, selection.Equals, []string{labelSet[LabelHash]})
if err != nil {
return nil, err
}
return labels.NewSelector().Add(*req), nil
}
if o.owner != nil {
gvk, err := gvk2.Get(o.owner)
func GetLabelsAndAnnotations(setID string, owner runtime.Object) (map[string]string, map[string]string, error) {
annotations := map[string]string{
LabelID: setID,
}
if owner != nil {
gvk, err := gvk2.Get(owner)
if err != nil {
return nil, nil, err
}
annotations[LabelGVK] = gvk.String()
metadata, err := meta.Accessor(o.owner)
metadata, err := meta.Accessor(owner)
if err != nil {
return nil, nil, fmt.Errorf("failed to get metadata for %s", gvk)
}

View File

@ -5,6 +5,9 @@ import (
"compress/gzip"
"encoding/base64"
"io/ioutil"
"strings"
data2 "github.com/rancher/wrangler/pkg/data"
"github.com/pkg/errors"
"github.com/rancher/wrangler/pkg/data/convert"
@ -95,7 +98,7 @@ func emptyMaps(data map[string]interface{}, keys ...string) bool {
return true
}
func sanitizePatch(patch []byte) ([]byte, error) {
func sanitizePatch(patch []byte, removeObjectSetAnnotation bool) ([]byte, error) {
mod := false
data := map[string]interface{}{}
err := json.Unmarshal(patch, &data)
@ -117,6 +120,23 @@ func sanitizePatch(patch []byte) ([]byte, error) {
mod = true
}
if removeObjectSetAnnotation {
metadata := convert.ToMapInterface(data2.GetValueN(data, "metadata"))
annotations := convert.ToMapInterface(data2.GetValueN(data, "metadata", "annotations"))
for k := range annotations {
if strings.HasPrefix(k, LabelPrefix) {
mod = true
delete(annotations, k)
}
}
if mod && len(annotations) == 0 {
delete(metadata, "annotations")
if len(metadata) == 0 {
delete(data, "metadata")
}
}
}
if emptyMaps(data, "metadata", "annotations") {
return []byte("{}"), nil
}
@ -152,7 +172,7 @@ func applyPatch(gvk schema.GroupVersionKind, reconciler Reconciler, patcher Patc
return false, nil
}
patch, err = sanitizePatch(patch)
patch, err = sanitizePatch(patch, false)
if err != nil {
return false, err
}
@ -172,6 +192,9 @@ func applyPatch(gvk schema.GroupVersionKind, reconciler Reconciler, patcher Patc
if err != nil {
return false, err
}
if originalObject == nil {
originalObject = oldObject
}
handled, err := reconciler(originalObject, newObject)
if err != nil {
return false, err
@ -187,13 +210,17 @@ func applyPatch(gvk schema.GroupVersionKind, reconciler Reconciler, patcher Patc
return true, err
}
func (o *desiredSet) compareObjects(gvk schema.GroupVersionKind, patcher Patcher, client dynamic.NamespaceableResourceInterface, debugID string, oldObject, newObject runtime.Object, force bool) error {
func (o *desiredSet) compareObjects(gvk schema.GroupVersionKind, reconciler Reconciler, patcher Patcher, client dynamic.NamespaceableResourceInterface, debugID string, oldObject, newObject runtime.Object, force bool) error {
oldMetadata, err := meta.Accessor(oldObject)
if err != nil {
return err
}
if ran, err := applyPatch(gvk, o.reconcilers[gvk], patcher, debugID, oldObject, newObject); err != nil {
if o.createPlan {
o.plan.Objects = append(o.plan.Objects, oldObject)
}
if ran, err := applyPatch(gvk, reconciler, patcher, debugID, oldObject, newObject); err != nil {
return err
} else if !ran {
logrus.Debugf("DesiredSet - No change(2) %s %s/%s for %s", gvk, oldMetadata.GetNamespace(), oldMetadata.GetName(), debugID)

View File

@ -0,0 +1,152 @@
package apply
import (
"fmt"
"strings"
"github.com/rancher/wrangler/pkg/gvk"
"github.com/rancher/wrangler/pkg/kv"
"github.com/pkg/errors"
namer "github.com/rancher/wrangler/pkg/name"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
)
var (
ErrOwnerNotFound = errors.New("owner not found")
)
func notFound(name string, gvk schema.GroupVersionKind) error {
// this is not proper, but does it really matter that much? If you find this
// line while researching a bug, then the answer is probably yes.
resource := namer.GuessPluralName(strings.ToLower(gvk.Kind))
return apierrors.NewNotFound(schema.GroupResource{
Group: gvk.Group,
Resource: resource,
}, name)
}
func getGVK(gvkLabel string, gvk *schema.GroupVersionKind) error {
parts := strings.Split(gvkLabel, ", Kind=")
if len(parts) != 2 {
return fmt.Errorf("invalid GVK format: %s", gvkLabel)
}
gvk.Group, gvk.Version = kv.Split(parts[0], "/")
gvk.Kind = parts[1]
return nil
}
func (o desiredSet) FindOwner(obj runtime.Object) (runtime.Object, error) {
if obj == nil {
return nil, ErrOwnerNotFound
}
meta, err := meta.Accessor(obj)
if err != nil {
return nil, err
}
var (
debugID = fmt.Sprintf("%s/%s", meta.GetNamespace(), meta.GetName())
gvkLabel = meta.GetAnnotations()[LabelGVK]
namespace = meta.GetAnnotations()[LabelNamespace]
name = meta.GetAnnotations()[LabelName]
gvk schema.GroupVersionKind
)
if gvkLabel == "" {
return nil, ErrOwnerNotFound
}
if err := getGVK(gvkLabel, &gvk); err != nil {
return nil, err
}
cache, client, err := o.getControllerAndClient(debugID, gvk)
if err != nil {
return nil, err
}
if cache != nil {
return o.fromCache(cache, namespace, name, gvk)
}
return o.fromClient(client, namespace, name, gvk)
}
func (o *desiredSet) fromClient(client dynamic.NamespaceableResourceInterface, namespace, name string, gvk schema.GroupVersionKind) (runtime.Object, error) {
var (
err error
obj interface{}
)
if namespace == "" {
obj, err = client.Get(o.ctx, name, metav1.GetOptions{})
} else {
obj, err = client.Namespace(namespace).Get(o.ctx, name, metav1.GetOptions{})
}
if err != nil {
return nil, err
}
if ro, ok := obj.(runtime.Object); ok {
return ro, nil
}
return nil, notFound(name, gvk)
}
func (o *desiredSet) fromCache(cache cache.SharedInformer, namespace, name string, gvk schema.GroupVersionKind) (runtime.Object, error) {
var key string
if namespace == "" {
key = name
} else {
key = namespace + "/" + name
}
item, ok, err := cache.GetStore().GetByKey(key)
if err != nil {
return nil, err
} else if !ok {
return nil, notFound(name, gvk)
} else if ro, ok := item.(runtime.Object); ok {
return ro, nil
}
return nil, notFound(name, gvk)
}
func (o desiredSet) PurgeOrphan(obj runtime.Object) error {
if obj == nil {
return nil
}
meta, err := meta.Accessor(obj)
if err != nil {
return err
}
if _, err := o.FindOwner(obj); apierrors.IsNotFound(err) {
gvk, err := gvk.Get(obj)
if err != nil {
return err
}
o.strictCaching = false
_, client, err := o.getControllerAndClient(meta.GetName(), gvk)
if err != nil {
return err
}
if meta.GetNamespace() == "" {
return client.Delete(o.ctx, meta.GetName(), metav1.DeleteOptions{})
} else {
return client.Namespace(meta.GetNamespace()).Delete(o.ctx, meta.GetName(), metav1.DeleteOptions{})
}
} else if err == ErrOwnerNotFound {
return nil
} else if err != nil {
return err
}
return nil
}

View File

@ -28,19 +28,27 @@ var (
)
func (o *desiredSet) getControllerAndClient(debugID string, gvk schema.GroupVersionKind) (cache.SharedIndexInformer, dynamic.NamespaceableResourceInterface, error) {
// client needs to be accessed first so that the gvk->gvr mapping gets cached
client, err := o.a.clients.client(gvk)
if err != nil {
return nil, nil, err
}
informer, ok := o.pruneTypes[gvk]
if !ok {
informer = o.a.informers[gvk]
}
if informer == nil && o.informerFactory != nil {
newInformer, err := o.informerFactory.Get(gvk, o.a.clients.gvr(gvk))
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to construct informer for %v for %s", gvk, debugID)
}
informer = newInformer
}
if informer == nil && o.strictCaching {
return nil, nil, fmt.Errorf("failed to find informer for %s for %s", gvk, debugID)
}
client, err := o.a.clients.client(gvk)
if err != nil {
return nil, nil, err
}
return informer, client, nil
}
@ -206,6 +214,8 @@ func (o *desiredSet) process(debugID string, set labels.Selector, gvk schema.Gro
patcher = o.createPatcher(client)
}
reconciler := o.reconcilers[gvk]
existing, err := o.list(controller, client, set)
if err != nil {
o.err(errors.Wrapf(err, "failed to list %s for %s", gvk, debugID))
@ -214,6 +224,26 @@ func (o *desiredSet) process(debugID string, set labels.Selector, gvk schema.Gro
toCreate, toDelete, toUpdate := compareSets(existing, objs)
if o.createPlan {
o.plan.Create[gvk] = toCreate
o.plan.Delete[gvk] = toDelete
reconciler = nil
patcher = func(namespace, name string, pt types2.PatchType, data []byte) (runtime.Object, error) {
data, err := sanitizePatch(data, true)
if err != nil {
return nil, err
}
if string(data) != "{}" {
o.plan.Update.Add(gvk, namespace, name, string(data))
}
return nil, nil
}
toCreate = nil
toDelete = nil
}
createF := func(k objectset.ObjectKey) {
obj := objs[k]
obj, err := prepareObjectForCreate(gvk, obj)
@ -248,7 +278,7 @@ func (o *desiredSet) process(debugID string, set labels.Selector, gvk schema.Gro
}
updateF := func(k objectset.ObjectKey) {
err := o.compareObjects(gvk, patcher, client, debugID, existing[k], objs[k], len(toCreate) > 0 || len(toDelete) > 0)
err := o.compareObjects(gvk, reconciler, patcher, client, debugID, existing[k], objs[k], len(toCreate) > 0 || len(toDelete) > 0)
if err == ErrReplace {
deleteF(k, true)
o.err(fmt.Errorf("DesiredSet - Replace Wait %s %s for %s", gvk, k, debugID))

View File

@ -4,6 +4,7 @@ import (
"reflect"
"time"
"github.com/rancher/wrangler/pkg/generic"
"github.com/sirupsen/logrus"
)
@ -14,7 +15,7 @@ func (c Cond) GetStatus(obj interface{}) string {
}
func (c Cond) SetError(obj interface{}, reason string, err error) {
if err == nil {
if err == nil || err == generic.ErrSkip {
c.True(obj)
c.Message(obj, "")
c.Reason(obj, reason)
@ -153,6 +154,9 @@ func getTS(obj interface{}, condName string) string {
}
func setStatus(obj interface{}, condName, status string) {
if reflect.TypeOf(obj).Kind() != reflect.Ptr {
panic("obj passed must be a pointer")
}
cond := findOrCreateCond(obj, condName)
setValue(cond, "Status", status)
}
@ -167,6 +171,9 @@ func setValue(cond reflect.Value, fieldName, newValue string) {
func findOrNotCreateCond(obj interface{}, condName string) *reflect.Value {
condSlice := getValue(obj, "Status", "Conditions")
if !condSlice.IsValid() {
condSlice = getValue(obj, "Conditions")
}
return findCond(obj, condSlice, condName)
}
@ -224,6 +231,9 @@ func getValue(obj interface{}, name ...string) reflect.Value {
}
func getFieldValue(v reflect.Value, name ...string) reflect.Value {
if !v.IsValid() {
return v
}
field := v.FieldByName(name[0])
if len(name) == 1 {
return field

View File

@ -85,18 +85,23 @@ func NewFactoryFromConfigOrDie(config *rest.Config) *Factory {
}
func NewFactoryFromConfig(config *rest.Config) (*Factory, error) {
cs, err := clientset.NewForConfig(config)
if err != nil {
return nil, err
}
informerFactory := informers.NewSharedInformerFactory(cs, 2*time.Hour)
return NewFactory(cs, informerFactory), nil
return NewFactoryFromConfigWithOptions(config, nil)
}
func NewFactoryFromConfigWithNamespace(config *rest.Config, namespace string) (*Factory, error) {
if namespace == "" {
return NewFactoryFromConfig(config)
return NewFactoryFromConfigWithOptions(config, &FactoryOptions{
Namespace: namespace,
})
}
type FactoryOptions struct {
Namespace string
Resync time.Duration
}
func NewFactoryFromConfigWithOptions(config *rest.Config, opts *FactoryOptions) (*Factory, error) {
if opts == nil {
opts = &FactoryOptions{}
}
cs, err := clientset.NewForConfig(config)
@ -104,7 +109,17 @@ func NewFactoryFromConfigWithNamespace(config *rest.Config, namespace string) (*
return nil, err
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(cs, 2*time.Hour, informers.WithNamespace(namespace))
resync := opts.Resync
if resync == 0 {
resync = 2*time.Hour
}
if opts.Namespace == "" {
informerFactory := informers.NewSharedInformerFactory(cs, resync)
return NewFactory(cs, informerFactory), nil
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(cs, resync, informers.WithNamespace(opts.Namespace))
return NewFactory(cs, informerFactory), nil
}

View File

@ -333,6 +333,7 @@ func Register{{.type}}GeneratingHandler(ctx context.Context, controller {{.type}
if opts != nil {
statusHandler.opts = *opts
}
controller.OnChange(ctx, name, statusHandler.Remove)
Register{{.type}}StatusHandler(ctx, controller, condition, name, statusHandler.Handle)
}
@ -347,7 +348,7 @@ func (a *{{.lowerName}}StatusHandler) sync(key string, obj *{{.version}}.{{.type
return obj, nil
}
origStatus := obj.Status
origStatus := obj.Status.DeepCopy()
obj = obj.DeepCopy()
newStatus, err := a.handler(obj, obj.Status)
if err != nil {
@ -355,16 +356,16 @@ func (a *{{.lowerName}}StatusHandler) sync(key string, obj *{{.version}}.{{.type
newStatus = *origStatus.DeepCopy()
}
obj.Status = newStatus
if a.condition != "" {
if errors.IsConflict(err) {
a.condition.SetError(obj, "", nil)
a.condition.SetError(&newStatus, "", nil)
} else {
a.condition.SetError(obj, "", err)
a.condition.SetError(&newStatus, "", err)
}
}
if !equality.Semantic.DeepEqual(origStatus, obj.Status) {
if !equality.Semantic.DeepEqual(origStatus, &newStatus) {
var newErr error
obj.Status = newStatus
obj, newErr = a.client.UpdateStatus(obj)
if err == nil {
err = newErr
@ -381,29 +382,28 @@ type {{.lowerName}}GeneratingHandler struct {
name string
}
func (a *{{.lowerName}}GeneratingHandler) Remove(key string, obj *{{.version}}.{{.type}}) (*{{.version}}.{{.type}}, error) {
if obj != nil {
return obj, nil
}
obj = &{{.version}}.{{.type}}{}
obj.Namespace, obj.Name = kv.RSplit(key, "/")
obj.SetGroupVersionKind(a.gvk)
return nil, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects()
}
func (a *{{.lowerName}}GeneratingHandler) Handle(obj *{{.version}}.{{.type}}, status {{.version}}.{{.statusType}}) ({{.version}}.{{.statusType}}, error) {
objs, newStatus, err := a.{{.type}}GeneratingHandler(obj, status)
if err != nil {
return newStatus, err
}
apply := a.apply
if !a.opts.DynamicLookup {
apply = apply.WithStrictCaching()
}
if !a.opts.AllowCrossNamespace && !a.opts.AllowClusterScoped {
apply = apply.WithSetOwnerReference(true, false).
WithDefaultNamespace(obj.GetNamespace()).
WithListerNamespace(obj.GetNamespace())
}
if !a.opts.AllowClusterScoped {
apply = apply.WithRestrictClusterScoped()
}
return newStatus, apply.
return newStatus, generic.ConfigureApplyForObject(a.apply, obj, &a.opts).
WithOwner(obj).
WithSetID(a.name).
ApplyObjects(objs...)

View File

@ -3,6 +3,7 @@ package crd
import (
"context"
"reflect"
"strconv"
"strings"
"sync"
"time"
@ -16,7 +17,7 @@ import (
apiext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
@ -52,8 +53,107 @@ func (c CRD) WithSchemaFromStruct(obj interface{}) CRD {
return c
}
func (c CRD) WithCustomColumn(columns []v1beta1.CustomResourceColumnDefinition) CRD {
c.Columns = columns
func (c CRD) WithColumn(name, path string) CRD {
c.Columns = append(c.Columns, v1beta1.CustomResourceColumnDefinition{
Name: name,
Type: "string",
Priority: 0,
JSONPath: path,
})
return c
}
func getType(obj interface{}) reflect.Type {
if t, ok := obj.(reflect.Type); ok {
return t
}
t := reflect.TypeOf(obj)
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
return t
}
func (c CRD) WithColumnsFromStruct(obj interface{}) CRD {
c.Columns = append(c.Columns, readCustomColumns(getType(obj), ".")...)
return c
}
func fieldName(f reflect.StructField) string {
jsonTag := f.Tag.Get("json")
if jsonTag == "-" {
return ""
}
name := strings.Split(jsonTag, ",")[0]
if name == "" {
return f.Name
}
return name
}
func tagToColumn(f reflect.StructField) (v1beta1.CustomResourceColumnDefinition, bool) {
c := v1beta1.CustomResourceColumnDefinition{
Name: f.Name,
Type: "string",
}
columnDef, ok := f.Tag.Lookup("column")
if !ok {
return c, false
}
for k, v := range kv.SplitMap(columnDef, ",") {
switch k {
case "name":
c.Name = v
case "type":
c.Type = v
case "format":
c.Format = v
case "description":
c.Description = v
case "priority":
p, _ := strconv.Atoi(v)
c.Priority = int32(p)
case "jsonpath":
c.JSONPath = v
}
}
return c, true
}
func readCustomColumns(t reflect.Type, path string) (result []v1beta1.CustomResourceColumnDefinition) {
for i := 0; i < t.NumField(); i++ {
f := t.Field(i)
fieldName := fieldName(f)
if fieldName == "" {
continue
}
t := f.Type
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
if t.Kind() == reflect.Struct {
if f.Anonymous {
result = append(result, readCustomColumns(t, path)...)
} else {
result = append(result, readCustomColumns(t, path+"."+fieldName)...)
}
} else {
if col, ok := tagToColumn(f); ok {
result = append(result, col)
}
}
}
return result
}
func (c CRD) WithCustomColumn(columns ...v1beta1.CustomResourceColumnDefinition) CRD {
c.Columns = append(c.Columns, columns...)
return c
}
@ -79,10 +179,7 @@ func (c CRD) WithShortNames(shortNames ...string) CRD {
func (c CRD) ToCustomResourceDefinition() (apiext.CustomResourceDefinition, error) {
if c.SchemaObject != nil && c.GVK.Kind == "" {
t := reflect.TypeOf(c.SchemaObject)
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
t := getType(c.SchemaObject)
c.GVK.Kind = t.Name()
}
@ -250,6 +347,13 @@ func (f *Factory) CreateCRDs(ctx context.Context, crds ...CRD) (map[schema.Group
return nil, nil
}
if ok, err := f.ensureAccess(ctx); err != nil {
return nil, err
} else if !ok {
logrus.Infof("No access to list CRDs, assuming CRDs are pre-created.")
return nil, err
}
crdStatus := map[schema.GroupVersionKind]*apiext.CustomResourceDefinition{}
ready, err := f.getReadyCRDs(ctx)
@ -341,7 +445,7 @@ func (f *Factory) createCRD(ctx context.Context, crdDef CRD, ready map[string]*a
}
logrus.Infof("Creating CRD %s", crd.Name)
if newCrd, err := f.CRDClient.ApiextensionsV1beta1().CustomResourceDefinitions().Create(ctx, &crd, metav1.CreateOptions{}); errors.IsAlreadyExists(err) {
if newCrd, err := f.CRDClient.ApiextensionsV1beta1().CustomResourceDefinitions().Create(ctx, &crd, metav1.CreateOptions{}); apierrors.IsAlreadyExists(err) {
return f.CRDClient.ApiextensionsV1beta1().CustomResourceDefinitions().Get(ctx, crd.Name, metav1.GetOptions{})
} else if err != nil {
return nil, err
@ -350,6 +454,14 @@ func (f *Factory) createCRD(ctx context.Context, crdDef CRD, ready map[string]*a
}
}
func (f *Factory) ensureAccess(ctx context.Context) (bool, error) {
_, err := f.CRDClient.ApiextensionsV1beta1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{})
if apierrors.IsForbidden(err) {
return false, nil
}
return true, err
}
func (f *Factory) getReadyCRDs(ctx context.Context) (map[string]*apiext.CustomResourceDefinition, error) {
list, err := f.CRDClient.ApiextensionsV1beta1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{})
if err != nil {

24
vendor/github.com/rancher/wrangler/pkg/data/merge.go generated vendored Normal file
View File

@ -0,0 +1,24 @@
package data
func MergeMaps(base, overlay map[string]interface{}) map[string]interface{} {
result := map[string]interface{}{}
for k, v := range base {
result[k] = v
}
for k, v := range overlay {
if baseMap, overlayMap, bothMaps := bothMaps(result[k], v); bothMaps {
v = MergeMaps(baseMap, overlayMap)
}
result[k] = v
}
return result
}
func bothMaps(left, right interface{}) (map[string]interface{}, map[string]interface{}, bool) {
leftMap, ok := left.(map[string]interface{})
if !ok {
return nil, nil, false
}
rightMap, ok := right.(map[string]interface{})
return leftMap, rightMap, ok
}

View File

@ -1,7 +1,39 @@
package generic
import (
"github.com/rancher/wrangler/pkg/apply"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type GeneratingHandlerOptions struct {
AllowCrossNamespace bool
AllowClusterScoped bool
NoOwnerReference bool
DynamicLookup bool
}
func ConfigureApplyForObject(apply apply.Apply, obj metav1.Object, opts *GeneratingHandlerOptions) apply.Apply {
if opts == nil {
opts = &GeneratingHandlerOptions{}
}
if opts.DynamicLookup {
apply = apply.WithDynamicLookup()
}
if opts.NoOwnerReference {
apply = apply.WithSetOwnerReference(true, false)
}
if opts.AllowCrossNamespace && !opts.AllowClusterScoped {
apply = apply.
WithDefaultNamespace(obj.GetNamespace()).
WithListerNamespace(obj.GetNamespace())
}
if !opts.AllowClusterScoped {
apply = apply.WithRestrictClusterScoped()
}
return apply
}

View File

@ -5,6 +5,7 @@ import (
"reflect"
"strings"
errors2 "github.com/pkg/errors"
"k8s.io/apimachinery/pkg/runtime"
)
@ -25,7 +26,7 @@ func (h *Handlers) Handle(key string, obj runtime.Object) (runtime.Object, error
for _, handler := range h.handlers {
newObj, err := handler.handler(key, obj)
if err != nil {
if err != nil && errors2.Cause(err) != ErrSkip {
errs = append(errs, &handlerError{
HandlerName: handler.name,
Err: err,

View File

@ -27,7 +27,16 @@ func Get(obj runtime.Object) (schema.GroupVersionKind, error) {
return gvks[0], nil
}
func Set(obj runtime.Object) error {
func Set(objs ...runtime.Object) error {
for _, obj := range objs {
if err := setObject(obj); err != nil {
return err
}
}
return nil
}
func setObject(obj runtime.Object) error {
gvk := obj.GetObjectKind().GroupVersionKind()
if gvk.Kind != "" {
return nil

View File

@ -34,6 +34,8 @@ func (o ObjectKey) String() string {
return fmt.Sprintf("%s/%s", o.Namespace, o.Name)
}
type ObjectKeyByGVK map[schema.GroupVersionKind][]ObjectKey
type ObjectByGVK map[schema.GroupVersionKind]map[ObjectKey]runtime.Object
func (o ObjectByGVK) Add(obj runtime.Object) (schema.GroupVersionKind, error) {
@ -69,14 +71,19 @@ type ObjectSet struct {
gvkSeen map[schema.GroupVersionKind]bool
}
func NewObjectSet() *ObjectSet {
return &ObjectSet{
func NewObjectSet(objs ...runtime.Object) *ObjectSet {
os := &ObjectSet{
objects: ObjectByGVK{},
gvkSeen: map[schema.GroupVersionKind]bool{},
}
os.Add(objs...)
return os
}
func (o *ObjectSet) ObjectsByGVK() ObjectByGVK {
if o == nil {
return nil
}
return o.objects
}
@ -126,6 +133,10 @@ func (o *ObjectSet) Len() int {
return len(o.objects)
}
func (o *ObjectSet) GVKs() []schema.GroupVersionKind {
return o.GVKOrder()
}
func (o *ObjectSet) GVKOrder(known ...schema.GroupVersionKind) []schema.GroupVersionKind {
var rest []schema.GroupVersionKind

View File

@ -36,7 +36,7 @@ func applyStrategicMergePatch(original, patch []byte, lookup strategicpatch.Look
if err := json.Unmarshal(patch, &patchMap); err != nil {
return nil, err
}
patchedMap, err := strategicpatch.StrategicMergeMapPatch(originalMap, patchMap, lookup)
patchedMap, err := strategicpatch.StrategicMergeMapPatchUsingLookupPatchMeta(originalMap, patchMap, lookup)
if err != nil {
return nil, err
}

View File

@ -1,6 +1,7 @@
package openapi
import (
"encoding/json"
"fmt"
types "github.com/rancher/wrangler/pkg/schemas"
@ -8,6 +9,14 @@ import (
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
)
var (
blacklistFields = map[string]bool{
"kind": true,
"apiVersion": true,
"metadata": true,
}
)
func MustGenerate(obj interface{}) *v1beta1.JSONSchemaProps {
if obj == nil {
return nil
@ -42,28 +51,27 @@ func toOpenAPI(name string, schemas *types.Schemas) (*v1beta1.JSONSchemaProps, e
delete(newSchema.ResourceFields, "kind")
delete(newSchema.ResourceFields, "apiVersion")
delete(newSchema.ResourceFields, "metadata")
return parseSchema(newSchema, schemas)
return schemaToProps(newSchema, schemas, map[string]bool{})
}
func parseSchema(schema *types.Schema, schemas *types.Schemas) (*v1beta1.JSONSchemaProps, error) {
jsp := &v1beta1.JSONSchemaProps{
Description: schema.Description,
Type: "object",
Properties: map[string]v1beta1.JSONSchemaProps{},
func populateField(fieldJSP *v1beta1.JSONSchemaProps, f *types.Field) error {
fieldJSP.Description = f.Description
// don't reset this to not nullable
if f.Nullable {
fieldJSP.Nullable = f.Nullable
}
fieldJSP.MinLength = f.MinLength
fieldJSP.MaxLength = f.MaxLength
for name, f := range schema.ResourceFields {
fieldJSP := v1beta1.JSONSchemaProps{
Description: f.Description,
Nullable: f.Nullable,
MinLength: f.MinLength,
MaxLength: f.MaxLength,
if f.Type == "string" && len(f.Options) > 0 {
for _, opt := range append(f.Options, "") {
bytes, err := json.Marshal(&opt)
if err != nil {
return err
}
if len(f.Options) > 0 {
for _, opt := range f.Options {
fieldJSP.Enum = append(fieldJSP.Enum, v1beta1.JSON{
Raw: []byte(opt),
Raw: bytes,
})
}
}
@ -86,134 +94,126 @@ func parseSchema(schema *types.Schema, schemas *types.Schemas) (*v1beta1.JSONSch
fieldJSP.Maximum = &fl
}
// default is not support by k8s
//
//if f.Default != nil {
// bytes, err := json.Marshal(f.Default)
// if err != nil {
// return nil, err
// }
// fieldJSP.Default = &v1beta1.JSON{
// Raw: bytes,
// }
//}
if f.Required {
fieldJSP.Required = append(fieldJSP.Required, name)
return nil
}
if definition.IsMapType(f.Type) {
fieldJSP.Type = "object"
subType := definition.SubType(f.Type)
subType, schema, err := typeAndSchema(subType, schemas)
func typeToProps(typeName string, schemas *types.Schemas, inflight map[string]bool) (*v1beta1.JSONSchemaProps, error) {
t, subType, schema, err := typeAndSchema(typeName, schemas)
if err != nil {
return nil, err
}
if schema == nil {
fieldJSP.AdditionalProperties = &v1beta1.JSONSchemaPropsOrBool{
Schema: &v1beta1.JSONSchemaProps{
Type: subType,
},
if schema != nil {
return schemaToProps(schema, schemas, inflight)
}
} else {
subObject, err := parseSchema(schema, schemas)
jsp := &v1beta1.JSONSchemaProps{}
switch t {
case "map":
additionalProps, err := typeToProps(subType, schemas, inflight)
if err != nil {
return nil, err
}
fieldJSP.AdditionalProperties = &v1beta1.JSONSchemaPropsOrBool{
Schema: subObject,
jsp.Type = "object"
jsp.Nullable = true
jsp.AdditionalProperties = &v1beta1.JSONSchemaPropsOrBool{
Schema: additionalProps,
}
}
} else if definition.IsArrayType(f.Type) {
fieldJSP.Type = "array"
subType := definition.SubType(f.Type)
subType, schema, err := typeAndSchema(subType, schemas)
case "array":
items, err := typeToProps(subType, schemas, inflight)
if err != nil {
return nil, err
}
if schema == nil {
fieldJSP.Items = &v1beta1.JSONSchemaPropsOrArray{
Schema: &v1beta1.JSONSchemaProps{
Type: subType,
},
jsp.Type = "array"
jsp.Nullable = true
jsp.Items = &v1beta1.JSONSchemaPropsOrArray{
Schema: items,
}
} else {
subObject, err := parseSchema(schema, schemas)
if err != nil {
return nil, err
}
fieldJSP.Items = &v1beta1.JSONSchemaPropsOrArray{
Schema: subObject,
}
}
} else {
typeName, schema, err := typeAndSchema(f.Type, schemas)
if err != nil {
return nil, err
}
if schema == nil {
fieldJSP.Type = typeName
} else {
fieldJSP.Type = "object"
subObject, err := parseSchema(schema, schemas)
if err != nil {
return nil, err
}
fieldJSP.Properties = subObject.Properties
}
}
jsp.Properties[name] = fieldJSP
default:
jsp.Type = t
}
return jsp, nil
}
func typeAndSchema(typeName string, schemas *types.Schemas) (string, *types.Schema, error) {
switch typeName {
// TODO: in v1 set the x- header for this
case "intOrString":
return "string", nil, nil
case "int":
return "integer", nil, nil
case "float":
return "number", nil, nil
case "string":
return "string", nil, nil
case "date":
return "string", nil, nil
case "enum":
return "string", nil, nil
case "password":
return "string", nil, nil
case "hostname":
return "string", nil, nil
case "boolean":
return "boolean", nil, nil
case "json":
return "object", nil, nil
func schemaToProps(schema *types.Schema, schemas *types.Schemas, inflight map[string]bool) (*v1beta1.JSONSchemaProps, error) {
jsp := &v1beta1.JSONSchemaProps{
Description: schema.Description,
Type: "object",
}
if inflight[schema.ID] {
return jsp, nil
}
inflight[schema.ID] = true
defer delete(inflight, schema.ID)
jsp.Properties = map[string]v1beta1.JSONSchemaProps{}
for name, f := range schema.ResourceFields {
fieldJSP, err := typeToProps(f.Type, schemas, inflight)
if err != nil {
return nil, err
}
if err := populateField(fieldJSP, &f); err != nil {
return nil, err
}
if f.Required {
jsp.Required = append(jsp.Required, name)
}
jsp.Properties[name] = *fieldJSP
}
return jsp, nil
}
func typeAndSchema(typeName string, schemas *types.Schemas) (string, string, *types.Schema, error) {
if definition.IsReferenceType(typeName) {
return "string", nil, nil
return "string", "", nil, nil
}
if definition.IsArrayType(typeName) {
return "array", nil, nil
return "array", definition.SubType(typeName), nil, nil
}
if definition.IsMapType(typeName) {
return "map", definition.SubType(typeName), nil, nil
}
switch typeName {
// TODO: in v1 set the x- header for this
case "intOrString":
return "string", "", nil, nil
case "int":
return "integer", "", nil, nil
case "float":
return "number", "", nil, nil
case "string":
return "string", "", nil, nil
case "date":
return "string", "", nil, nil
case "enum":
return "string", "", nil, nil
case "base64":
return "string", "", nil, nil
case "password":
return "string", "", nil, nil
case "hostname":
return "string", "", nil, nil
case "boolean":
return "boolean", "", nil, nil
case "json":
return "object", "", nil, nil
}
schema := schemas.Schema(typeName)
if schema == nil {
return "", nil, fmt.Errorf("failed to find schema %s", typeName)
return "", "", nil, fmt.Errorf("failed to find schema %s", typeName)
}
if schema.InternalSchema != nil {
return "", schema.InternalSchema, nil
return "", "", schema.InternalSchema, nil
}
return "", schema, nil
return "", "", schema, nil
}

View File

@ -64,23 +64,25 @@ func (s *Schemas) MustImportAndCustomize(obj interface{}, f func(*Schema), exter
MustCustomizeType(obj, f)
}
func getType(obj interface{}) reflect.Type {
if t, ok := obj.(reflect.Type); ok {
return t
}
t := reflect.TypeOf(obj)
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
return t
}
func (s *Schemas) Import(obj interface{}, externalOverrides ...interface{}) (*Schema, error) {
var types []reflect.Type
for _, override := range externalOverrides {
types = append(types, reflect.TypeOf(override))
}
var (
v = reflect.ValueOf(obj)
t reflect.Type
)
if v.Kind() == reflect.Ptr {
t = v.Elem().Type()
} else {
t = v.Type()
types = append(types, getType(override))
}
t := getType(obj)
return s.importType(t, types...)
}
@ -320,6 +322,9 @@ func (s *Schemas) readFields(schema *Schema, t reflect.Type) error {
}
if hasType && hasMeta {
delete(schema.ResourceFields, "kind")
delete(schema.ResourceFields, "apiVersion")
delete(schema.ResourceFields, "metadata")
schema.CollectionMethods = []string{"GET", "POST"}
schema.ResourceMethods = []string{"GET", "PUT", "DELETE"}
}
@ -357,7 +362,11 @@ func (s *Schemas) processFieldsMappers(t reflect.Type, fieldName string, schema
}
func applyTag(structField *reflect.StructField, field *Field) error {
for _, part := range strings.Split(structField.Tag.Get("norman"), ",") {
t, ok := structField.Tag.Lookup("wrangler")
if !ok {
t = structField.Tag.Get("norman")
}
for _, part := range strings.Split(t, ",") {
if part == "" {
continue
}

10
vendor/modules.txt vendored
View File

@ -135,7 +135,7 @@ github.com/blang/semver
github.com/bronze1man/goStrongswanVici
# github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23
github.com/buger/jsonparser
# github.com/canonical/go-dqlite v1.3.0
# github.com/canonical/go-dqlite v1.5.1
github.com/canonical/go-dqlite
github.com/canonical/go-dqlite/client
github.com/canonical/go-dqlite/driver
@ -716,7 +716,7 @@ github.com/rancher/dynamiclistener/factory
github.com/rancher/dynamiclistener/storage/file
github.com/rancher/dynamiclistener/storage/kubernetes
github.com/rancher/dynamiclistener/storage/memory
# github.com/rancher/helm-controller v0.4.2-0.20200326195131-eb51d4fa9d8d
# github.com/rancher/helm-controller v0.5.0
github.com/rancher/helm-controller/pkg/apis/helm.cattle.io
github.com/rancher/helm-controller/pkg/apis/helm.cattle.io/v1
github.com/rancher/helm-controller/pkg/generated/clientset/versioned
@ -730,7 +730,7 @@ github.com/rancher/helm-controller/pkg/generated/informers/externalversions/helm
github.com/rancher/helm-controller/pkg/generated/informers/externalversions/internalinterfaces
github.com/rancher/helm-controller/pkg/generated/listers/helm.cattle.io/v1
github.com/rancher/helm-controller/pkg/helm
# github.com/rancher/kine v0.3.5
# github.com/rancher/kine v0.4.0
github.com/rancher/kine/pkg/broadcaster
github.com/rancher/kine/pkg/client
github.com/rancher/kine/pkg/drivers/dqlite
@ -745,7 +745,7 @@ github.com/rancher/kine/pkg/server
github.com/rancher/kine/pkg/tls
# github.com/rancher/remotedialer v0.2.0
github.com/rancher/remotedialer
# github.com/rancher/wrangler v0.5.4-0.20200326191509-4054411d9736
# github.com/rancher/wrangler v0.6.1
github.com/rancher/wrangler/pkg/apply
github.com/rancher/wrangler/pkg/apply/injectors
github.com/rancher/wrangler/pkg/cleanup
@ -773,7 +773,7 @@ github.com/rancher/wrangler/pkg/schemes
github.com/rancher/wrangler/pkg/signals
github.com/rancher/wrangler/pkg/slice
github.com/rancher/wrangler/pkg/start
# github.com/rancher/wrangler-api v0.5.1-0.20200326194427-c13310506d04
# github.com/rancher/wrangler-api v0.6.0
github.com/rancher/wrangler-api/pkg/generated/controllers/apps
github.com/rancher/wrangler-api/pkg/generated/controllers/apps/v1
github.com/rancher/wrangler-api/pkg/generated/controllers/batch