mirror of https://github.com/k3s-io/k3s
154 lines
3.4 KiB
Go
154 lines
3.4 KiB
Go
|
package server
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
|
||
|
"github.com/sirupsen/logrus"
|
||
|
"go.etcd.io/etcd/etcdserver/etcdserverpb"
|
||
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
||
|
"google.golang.org/grpc"
|
||
|
"google.golang.org/grpc/health"
|
||
|
healthpb "google.golang.org/grpc/health/grpc_health_v1"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
_ etcdserverpb.KVServer = (*KVServerBridge)(nil)
|
||
|
_ etcdserverpb.WatchServer = (*KVServerBridge)(nil)
|
||
|
)
|
||
|
|
||
|
type KVServerBridge struct {
|
||
|
limited *LimitedServer
|
||
|
}
|
||
|
|
||
|
func New(backend Backend) *KVServerBridge {
|
||
|
return &KVServerBridge{
|
||
|
limited: &LimitedServer{
|
||
|
backend: backend,
|
||
|
},
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (k *KVServerBridge) Register(server *grpc.Server) {
|
||
|
etcdserverpb.RegisterLeaseServer(server, k)
|
||
|
etcdserverpb.RegisterWatchServer(server, k)
|
||
|
etcdserverpb.RegisterKVServer(server, k)
|
||
|
|
||
|
hsrv := health.NewServer()
|
||
|
hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
|
||
|
healthpb.RegisterHealthServer(server, hsrv)
|
||
|
}
|
||
|
|
||
|
func (k *KVServerBridge) Range(ctx context.Context, r *etcdserverpb.RangeRequest) (*etcdserverpb.RangeResponse, error) {
|
||
|
if r.KeysOnly {
|
||
|
return nil, unsupported("keysOnly")
|
||
|
}
|
||
|
|
||
|
if r.MaxCreateRevision != 0 {
|
||
|
return nil, unsupported("maxCreateRevision")
|
||
|
}
|
||
|
|
||
|
if r.SortOrder != 0 {
|
||
|
return nil, unsupported("sortOrder")
|
||
|
}
|
||
|
|
||
|
if r.SortTarget != 0 {
|
||
|
return nil, unsupported("sortTarget")
|
||
|
}
|
||
|
|
||
|
if r.Serializable {
|
||
|
return nil, unsupported("serializable")
|
||
|
}
|
||
|
|
||
|
if r.KeysOnly {
|
||
|
return nil, unsupported("keysOnly")
|
||
|
}
|
||
|
|
||
|
if r.MinModRevision != 0 {
|
||
|
return nil, unsupported("minModRevision")
|
||
|
}
|
||
|
|
||
|
if r.MinCreateRevision != 0 {
|
||
|
return nil, unsupported("minCreateRevision")
|
||
|
}
|
||
|
|
||
|
if r.MaxCreateRevision != 0 {
|
||
|
return nil, unsupported("maxCreateRevision")
|
||
|
}
|
||
|
|
||
|
if r.MaxModRevision != 0 {
|
||
|
return nil, unsupported("maxModRevision")
|
||
|
}
|
||
|
|
||
|
resp, err := k.limited.Range(ctx, r)
|
||
|
if err != nil {
|
||
|
logrus.Errorf("error while range on %s %s: %v", r.Key, r.RangeEnd, err)
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
rangeResponse := &etcdserverpb.RangeResponse{
|
||
|
More: resp.More,
|
||
|
Count: resp.Count,
|
||
|
Header: resp.Header,
|
||
|
Kvs: toKVs(resp.Kvs...),
|
||
|
}
|
||
|
|
||
|
return rangeResponse, nil
|
||
|
}
|
||
|
|
||
|
func toKVs(kvs ...*KeyValue) []*mvccpb.KeyValue {
|
||
|
if len(kvs) == 0 || kvs[0] == nil {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
ret := make([]*mvccpb.KeyValue, 0, len(kvs))
|
||
|
for _, kv := range kvs {
|
||
|
newKV := toKV(kv)
|
||
|
if newKV != nil {
|
||
|
ret = append(ret, newKV)
|
||
|
}
|
||
|
}
|
||
|
return ret
|
||
|
}
|
||
|
|
||
|
func toKV(kv *KeyValue) *mvccpb.KeyValue {
|
||
|
if kv == nil {
|
||
|
return nil
|
||
|
}
|
||
|
return &mvccpb.KeyValue{
|
||
|
Key: []byte(kv.Key),
|
||
|
Value: kv.Value,
|
||
|
Lease: kv.Lease,
|
||
|
CreateRevision: kv.CreateRevision,
|
||
|
ModRevision: kv.ModRevision,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (k *KVServerBridge) Put(ctx context.Context, r *etcdserverpb.PutRequest) (*etcdserverpb.PutResponse, error) {
|
||
|
return nil, fmt.Errorf("put is not supported")
|
||
|
}
|
||
|
|
||
|
func (k *KVServerBridge) DeleteRange(ctx context.Context, r *etcdserverpb.DeleteRangeRequest) (*etcdserverpb.DeleteRangeResponse, error) {
|
||
|
return nil, fmt.Errorf("delete is not supported")
|
||
|
}
|
||
|
|
||
|
func (k *KVServerBridge) Txn(ctx context.Context, r *etcdserverpb.TxnRequest) (*etcdserverpb.TxnResponse, error) {
|
||
|
res, err := k.limited.Txn(ctx, r)
|
||
|
if err != nil {
|
||
|
logrus.Errorf("error in txn: %v", err)
|
||
|
}
|
||
|
return res, err
|
||
|
}
|
||
|
|
||
|
func (k *KVServerBridge) Compact(ctx context.Context, r *etcdserverpb.CompactionRequest) (*etcdserverpb.CompactionResponse, error) {
|
||
|
return &etcdserverpb.CompactionResponse{
|
||
|
Header: &etcdserverpb.ResponseHeader{
|
||
|
Revision: r.Revision,
|
||
|
},
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
func unsupported(field string) error {
|
||
|
return fmt.Errorf("%s is unsupported", field)
|
||
|
}
|