mirror of https://github.com/portainer/portainer
215 lines
7.0 KiB
Go
215 lines
7.0 KiB
Go
package cron
|
|
|
|
import (
|
|
"encoding/json"
|
|
"io/ioutil"
|
|
"log"
|
|
"strings"
|
|
|
|
"github.com/portainer/portainer/api"
|
|
)
|
|
|
|
// EndpointSyncJobRunner is used to run a EndpointSyncJob
|
|
type EndpointSyncJobRunner struct {
|
|
schedule *portainer.Schedule
|
|
context *EndpointSyncJobContext
|
|
}
|
|
|
|
// EndpointSyncJobContext represents the context of execution of a EndpointSyncJob
|
|
type EndpointSyncJobContext struct {
|
|
endpointService portainer.EndpointService
|
|
endpointFilePath string
|
|
}
|
|
|
|
// NewEndpointSyncJobContext returns a new context that can be used to execute a EndpointSyncJob
|
|
func NewEndpointSyncJobContext(endpointService portainer.EndpointService, endpointFilePath string) *EndpointSyncJobContext {
|
|
return &EndpointSyncJobContext{
|
|
endpointService: endpointService,
|
|
endpointFilePath: endpointFilePath,
|
|
}
|
|
}
|
|
|
|
// NewEndpointSyncJobRunner returns a new runner that can be scheduled
|
|
func NewEndpointSyncJobRunner(schedule *portainer.Schedule, context *EndpointSyncJobContext) *EndpointSyncJobRunner {
|
|
return &EndpointSyncJobRunner{
|
|
schedule: schedule,
|
|
context: context,
|
|
}
|
|
}
|
|
|
|
type synchronization struct {
|
|
endpointsToCreate []*portainer.Endpoint
|
|
endpointsToUpdate []*portainer.Endpoint
|
|
endpointsToDelete []*portainer.Endpoint
|
|
}
|
|
|
|
type fileEndpoint struct {
|
|
Name string `json:"Name"`
|
|
URL string `json:"URL"`
|
|
TLS bool `json:"TLS,omitempty"`
|
|
TLSSkipVerify bool `json:"TLSSkipVerify,omitempty"`
|
|
TLSCACert string `json:"TLSCACert,omitempty"`
|
|
TLSCert string `json:"TLSCert,omitempty"`
|
|
TLSKey string `json:"TLSKey,omitempty"`
|
|
}
|
|
|
|
// GetSchedule returns the schedule associated to the runner
|
|
func (runner *EndpointSyncJobRunner) GetSchedule() *portainer.Schedule {
|
|
return runner.schedule
|
|
}
|
|
|
|
// Run triggers the execution of the endpoint synchronization process.
|
|
func (runner *EndpointSyncJobRunner) Run() {
|
|
data, err := ioutil.ReadFile(runner.context.endpointFilePath)
|
|
if endpointSyncError(err) {
|
|
return
|
|
}
|
|
|
|
var fileEndpoints []fileEndpoint
|
|
err = json.Unmarshal(data, &fileEndpoints)
|
|
if endpointSyncError(err) {
|
|
return
|
|
}
|
|
|
|
if len(fileEndpoints) == 0 {
|
|
log.Println("background job error (endpoint synchronization). External endpoint source is empty")
|
|
return
|
|
}
|
|
|
|
storedEndpoints, err := runner.context.endpointService.Endpoints()
|
|
if endpointSyncError(err) {
|
|
return
|
|
}
|
|
|
|
convertedFileEndpoints := convertFileEndpoints(fileEndpoints)
|
|
|
|
sync := prepareSyncData(storedEndpoints, convertedFileEndpoints)
|
|
if sync.requireSync() {
|
|
err = runner.context.endpointService.Synchronize(sync.endpointsToCreate, sync.endpointsToUpdate, sync.endpointsToDelete)
|
|
if endpointSyncError(err) {
|
|
return
|
|
}
|
|
log.Printf("Endpoint synchronization ended. [created: %v] [updated: %v] [deleted: %v]", len(sync.endpointsToCreate), len(sync.endpointsToUpdate), len(sync.endpointsToDelete))
|
|
}
|
|
}
|
|
|
|
func endpointSyncError(err error) bool {
|
|
if err != nil {
|
|
log.Printf("background job error (endpoint synchronization). Unable to synchronize endpoints (err=%s)\n", err)
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func isValidEndpoint(endpoint *portainer.Endpoint) bool {
|
|
if endpoint.Name != "" && endpoint.URL != "" {
|
|
if !strings.HasPrefix(endpoint.URL, "unix://") && !strings.HasPrefix(endpoint.URL, "tcp://") {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func convertFileEndpoints(fileEndpoints []fileEndpoint) []portainer.Endpoint {
|
|
convertedEndpoints := make([]portainer.Endpoint, 0)
|
|
|
|
for _, e := range fileEndpoints {
|
|
endpoint := portainer.Endpoint{
|
|
Name: e.Name,
|
|
URL: e.URL,
|
|
TLSConfig: portainer.TLSConfiguration{},
|
|
}
|
|
if e.TLS {
|
|
endpoint.TLSConfig.TLS = true
|
|
endpoint.TLSConfig.TLSSkipVerify = e.TLSSkipVerify
|
|
endpoint.TLSConfig.TLSCACertPath = e.TLSCACert
|
|
endpoint.TLSConfig.TLSCertPath = e.TLSCert
|
|
endpoint.TLSConfig.TLSKeyPath = e.TLSKey
|
|
}
|
|
convertedEndpoints = append(convertedEndpoints, endpoint)
|
|
}
|
|
|
|
return convertedEndpoints
|
|
}
|
|
|
|
func endpointExists(endpoint *portainer.Endpoint, endpoints []portainer.Endpoint) int {
|
|
for idx, v := range endpoints {
|
|
if endpoint.Name == v.Name && isValidEndpoint(&v) {
|
|
return idx
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
func mergeEndpointIfRequired(original, updated *portainer.Endpoint) *portainer.Endpoint {
|
|
var endpoint *portainer.Endpoint
|
|
if original.URL != updated.URL || original.TLSConfig.TLS != updated.TLSConfig.TLS ||
|
|
(updated.TLSConfig.TLS && original.TLSConfig.TLSSkipVerify != updated.TLSConfig.TLSSkipVerify) ||
|
|
(updated.TLSConfig.TLS && original.TLSConfig.TLSCACertPath != updated.TLSConfig.TLSCACertPath) ||
|
|
(updated.TLSConfig.TLS && original.TLSConfig.TLSCertPath != updated.TLSConfig.TLSCertPath) ||
|
|
(updated.TLSConfig.TLS && original.TLSConfig.TLSKeyPath != updated.TLSConfig.TLSKeyPath) {
|
|
endpoint = original
|
|
endpoint.URL = updated.URL
|
|
if updated.TLSConfig.TLS {
|
|
endpoint.TLSConfig.TLS = true
|
|
endpoint.TLSConfig.TLSSkipVerify = updated.TLSConfig.TLSSkipVerify
|
|
endpoint.TLSConfig.TLSCACertPath = updated.TLSConfig.TLSCACertPath
|
|
endpoint.TLSConfig.TLSCertPath = updated.TLSConfig.TLSCertPath
|
|
endpoint.TLSConfig.TLSKeyPath = updated.TLSConfig.TLSKeyPath
|
|
} else {
|
|
endpoint.TLSConfig.TLS = false
|
|
endpoint.TLSConfig.TLSSkipVerify = false
|
|
endpoint.TLSConfig.TLSCACertPath = ""
|
|
endpoint.TLSConfig.TLSCertPath = ""
|
|
endpoint.TLSConfig.TLSKeyPath = ""
|
|
}
|
|
}
|
|
return endpoint
|
|
}
|
|
|
|
func (sync synchronization) requireSync() bool {
|
|
if len(sync.endpointsToCreate) != 0 || len(sync.endpointsToUpdate) != 0 || len(sync.endpointsToDelete) != 0 {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func prepareSyncData(storedEndpoints, fileEndpoints []portainer.Endpoint) *synchronization {
|
|
endpointsToCreate := make([]*portainer.Endpoint, 0)
|
|
endpointsToUpdate := make([]*portainer.Endpoint, 0)
|
|
endpointsToDelete := make([]*portainer.Endpoint, 0)
|
|
|
|
for idx := range storedEndpoints {
|
|
fidx := endpointExists(&storedEndpoints[idx], fileEndpoints)
|
|
if fidx != -1 {
|
|
endpoint := mergeEndpointIfRequired(&storedEndpoints[idx], &fileEndpoints[fidx])
|
|
if endpoint != nil {
|
|
log.Printf("New definition for a stored endpoint found in file, updating database. [name: %v] [url: %v]\n", endpoint.Name, endpoint.URL)
|
|
endpointsToUpdate = append(endpointsToUpdate, endpoint)
|
|
}
|
|
} else {
|
|
log.Printf("Stored endpoint not found in file (definition might be invalid), removing from database. [name: %v] [url: %v]", storedEndpoints[idx].Name, storedEndpoints[idx].URL)
|
|
endpointsToDelete = append(endpointsToDelete, &storedEndpoints[idx])
|
|
}
|
|
}
|
|
|
|
for idx, endpoint := range fileEndpoints {
|
|
if !isValidEndpoint(&endpoint) {
|
|
log.Printf("Invalid file endpoint definition, skipping. [name: %v] [url: %v]", endpoint.Name, endpoint.URL)
|
|
continue
|
|
}
|
|
sidx := endpointExists(&fileEndpoints[idx], storedEndpoints)
|
|
if sidx == -1 {
|
|
log.Printf("File endpoint not found in database, adding to database. [name: %v] [url: %v]", fileEndpoints[idx].Name, fileEndpoints[idx].URL)
|
|
endpointsToCreate = append(endpointsToCreate, &fileEndpoints[idx])
|
|
}
|
|
}
|
|
|
|
return &synchronization{
|
|
endpointsToCreate: endpointsToCreate,
|
|
endpointsToUpdate: endpointsToUpdate,
|
|
endpointsToDelete: endpointsToDelete,
|
|
}
|
|
}
|