mirror of https://github.com/hashicorp/consul
68 lines
2.1 KiB
Go
68 lines
2.1 KiB
Go
|
package proxycfgglue
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
|
||
|
"github.com/hashicorp/go-memdb"
|
||
|
|
||
|
"github.com/hashicorp/consul/agent/consul/watch"
|
||
|
"github.com/hashicorp/consul/agent/proxycfg"
|
||
|
"github.com/hashicorp/consul/agent/structs"
|
||
|
"github.com/hashicorp/consul/agent/structs/aclfilter"
|
||
|
)
|
||
|
|
||
|
// ServerIntentionUpstreams satisfies the proxycfg.IntentionUpstreams interface
|
||
|
// by sourcing data from a blocking query against the server's state store.
|
||
|
func ServerIntentionUpstreams(deps ServerDataSourceDeps) proxycfg.IntentionUpstreams {
|
||
|
return serverIntentionUpstreams{deps}
|
||
|
}
|
||
|
|
||
|
type serverIntentionUpstreams struct {
|
||
|
deps ServerDataSourceDeps
|
||
|
}
|
||
|
|
||
|
func (s serverIntentionUpstreams) Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
|
||
|
target := structs.NewServiceName(req.ServiceName, &req.EnterpriseMeta)
|
||
|
|
||
|
return watch.ServerLocalNotify(ctx, correlationID, s.deps.GetStore,
|
||
|
func(ws memdb.WatchSet, store Store) (uint64, *structs.IndexedServiceList, error) {
|
||
|
authz, err := s.deps.ACLResolver.ResolveTokenAndDefaultMeta(req.Token, &req.EnterpriseMeta, nil)
|
||
|
if err != nil {
|
||
|
return 0, nil, err
|
||
|
}
|
||
|
defaultDecision := authz.IntentionDefaultAllow(nil)
|
||
|
|
||
|
index, services, err := store.IntentionTopology(ws, target, false, defaultDecision, structs.IntentionTargetService)
|
||
|
if err != nil {
|
||
|
return 0, nil, err
|
||
|
}
|
||
|
|
||
|
result := &structs.IndexedServiceList{
|
||
|
Services: services,
|
||
|
QueryMeta: structs.QueryMeta{
|
||
|
Index: index,
|
||
|
Backend: structs.QueryBackendBlocking,
|
||
|
},
|
||
|
}
|
||
|
aclfilter.New(authz, s.deps.Logger).Filter(result)
|
||
|
|
||
|
return index, result, nil
|
||
|
},
|
||
|
dispatchBlockingQueryUpdate[*structs.IndexedServiceList](ch),
|
||
|
)
|
||
|
}
|
||
|
|
||
|
func dispatchBlockingQueryUpdate[ResultType any](ch chan<- proxycfg.UpdateEvent) func(context.Context, string, ResultType, error) {
|
||
|
return func(ctx context.Context, correlationID string, result ResultType, err error) {
|
||
|
event := proxycfg.UpdateEvent{
|
||
|
CorrelationID: correlationID,
|
||
|
Result: result,
|
||
|
Err: err,
|
||
|
}
|
||
|
select {
|
||
|
case ch <- event:
|
||
|
case <-ctx.Done():
|
||
|
}
|
||
|
}
|
||
|
}
|