mirror of https://github.com/prometheus/prometheus
Julien Pivotto
4 years ago
committed by
GitHub
47 changed files with 1425 additions and 209 deletions
@ -0,0 +1,136 @@
|
||||
// Code generated by golang.org/x/tools/cmd/bundle. DO NOT EDIT.
|
||||
// $ bundle -pkg godo -prefix ./dev/dist/godo
|
||||
|
||||
package godo |
||||
|
||||
import () |
||||
|
||||
// AppDatabaseSpec struct for AppDatabaseSpec
|
||||
type AppDatabaseSpec struct { |
||||
Name string `json:"name"` |
||||
Engine AppDatabaseSpecEngine `json:"engine,omitempty"` |
||||
Version string `json:"version,omitempty"` |
||||
Size string `json:"size,omitempty"` |
||||
NumNodes int64 `json:"num_nodes,omitempty"` |
||||
} |
||||
|
||||
// AppDatabaseSpecEngine the model 'AppDatabaseSpecEngine'
|
||||
type AppDatabaseSpecEngine string |
||||
|
||||
// List of AppDatabaseSpecEngine
|
||||
const ( |
||||
APPDATABASESPECENGINE_UNSET AppDatabaseSpecEngine = "UNSET" |
||||
APPDATABASESPECENGINE_MYSQL AppDatabaseSpecEngine = "MYSQL" |
||||
APPDATABASESPECENGINE_PG AppDatabaseSpecEngine = "PG" |
||||
APPDATABASESPECENGINE_REDIS AppDatabaseSpecEngine = "REDIS" |
||||
) |
||||
|
||||
// AppDomainSpec struct for AppDomainSpec
|
||||
type AppDomainSpec struct { |
||||
Domain string `json:"domain"` |
||||
} |
||||
|
||||
// AppRouteSpec struct for AppRouteSpec
|
||||
type AppRouteSpec struct { |
||||
Path string `json:"path,omitempty"` |
||||
} |
||||
|
||||
// AppServiceSpec struct for AppServiceSpec
|
||||
type AppServiceSpec struct { |
||||
Name string `json:"name"` |
||||
RunCommand string `json:"run_command,omitempty"` |
||||
BuildCommand string `json:"build_command,omitempty"` |
||||
HTTPPort int64 `json:"http_port,omitempty"` |
||||
DockerfilePath string `json:"dockerfile_path,omitempty"` |
||||
Git GitSourceSpec `json:"git,omitempty"` |
||||
GitHub GitHubSourceSpec `json:"github,omitempty"` |
||||
Envs []AppVariableDefinition `json:"envs,omitempty"` |
||||
InstanceSizeSlug string `json:"instance_size_slug,omitempty"` |
||||
InstanceCount int64 `json:"instance_count,omitempty"` |
||||
Routes []AppRouteSpec `json:"routes,omitempty"` |
||||
SourceDir string `json:"source_dir,omitempty"` |
||||
EnvironmentSlug string `json:"environment_slug,omitempty"` |
||||
} |
||||
|
||||
// AppSpec struct for AppSpec
|
||||
type AppSpec struct { |
||||
Services []AppServiceSpec `json:"services,omitempty"` |
||||
StaticSites []AppStaticSiteSpec `json:"static_sites,omitempty"` |
||||
Databases []AppDatabaseSpec `json:"databases,omitempty"` |
||||
Workers []AppWorkerSpec `json:"workers,omitempty"` |
||||
Region string `json:"region,omitempty"` |
||||
Name string `json:"name"` |
||||
Domains []AppDomainSpec `json:"domains,omitempty"` |
||||
} |
||||
|
||||
// AppStaticSiteSpec struct for AppStaticSiteSpec
|
||||
type AppStaticSiteSpec struct { |
||||
Name string `json:"name"` |
||||
BuildCommand string `json:"build_command,omitempty"` |
||||
Git GitSourceSpec `json:"git,omitempty"` |
||||
GitHub GitHubSourceSpec `json:"github,omitempty"` |
||||
Envs []AppVariableDefinition `json:"envs,omitempty"` |
||||
Routes []AppRouteSpec `json:"routes,omitempty"` |
||||
SourceDir string `json:"source_dir,omitempty"` |
||||
EnvironmentSlug string `json:"environment_slug,omitempty"` |
||||
} |
||||
|
||||
// AppVariableDefinition struct for AppVariableDefinition
|
||||
type AppVariableDefinition struct { |
||||
Value string `json:"value,omitempty"` |
||||
Scope VariableScope `json:"scope,omitempty"` |
||||
// POSIX allows a broader env var definition, but we restrict to what is allowed by bash. http://git.savannah.gnu.org/cgit/bash.git/tree/general.h?h=bash-5.0#n124 Based on the POSIX spec and some casting to unsigned char in bash code I think this is restricted to ASCII (not unicode).
|
||||
Key string `json:"key"` |
||||
Type VariableType `json:"type,omitempty"` |
||||
EncryptedValue string `json:"encrypted_value,omitempty"` |
||||
} |
||||
|
||||
// AppWorkerSpec struct for AppWorkerSpec
|
||||
type AppWorkerSpec struct { |
||||
Name string `json:"name"` |
||||
RunCommand string `json:"run_command,omitempty"` |
||||
BuildCommand string `json:"build_command,omitempty"` |
||||
DockerfilePath string `json:"dockerfile_path,omitempty"` |
||||
Git GitSourceSpec `json:"git,omitempty"` |
||||
GitHub GitHubSourceSpec `json:"github,omitempty"` |
||||
Envs []AppVariableDefinition `json:"envs,omitempty"` |
||||
InstanceSizeSlug string `json:"instance_size_slug,omitempty"` |
||||
InstanceCount int64 `json:"instance_count,omitempty"` |
||||
SourceDir string `json:"source_dir,omitempty"` |
||||
EnvironmentSlug string `json:"environment_slug,omitempty"` |
||||
} |
||||
|
||||
// GitHubSourceSpec struct for GitHubSourceSpec
|
||||
type GitHubSourceSpec struct { |
||||
Repo string `json:"repo"` |
||||
Branch string `json:"branch"` |
||||
DeployOnPush bool `json:"deploy_on_push,omitempty"` |
||||
} |
||||
|
||||
// GitSourceSpec struct for GitSourceSpec
|
||||
type GitSourceSpec struct { |
||||
Repo string `json:"repo,omitempty"` |
||||
RequiresAuth bool `json:"requires_auth,omitempty"` |
||||
Branch string `json:"branch,omitempty"` |
||||
RepoCloneURL string `json:"repo_clone_url,omitempty"` |
||||
} |
||||
|
||||
// VariableScope the model 'VariableScope'
|
||||
type VariableScope string |
||||
|
||||
// List of VariableScope
|
||||
const ( |
||||
VARIABLESCOPE_UNSET VariableScope = "UNSET" |
||||
VARIABLESCOPE_RUN_TIME VariableScope = "RUN_TIME" |
||||
VARIABLESCOPE_BUILD_TIME VariableScope = "BUILD_TIME" |
||||
VARIABLESCOPE_RUN_AND_BUILD_TIME VariableScope = "RUN_AND_BUILD_TIME" |
||||
) |
||||
|
||||
// VariableType the model 'VariableType'
|
||||
type VariableType string |
||||
|
||||
// List of VariableType
|
||||
const ( |
||||
VARIABLETYPE_GENERAL VariableType = "GENERAL" |
||||
VARIABLETYPE_SECRET VariableType = "SECRET" |
||||
) |
@ -0,0 +1,278 @@
|
||||
package godo |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"net/http" |
||||
"time" |
||||
) |
||||
|
||||
const ( |
||||
appsBasePath = "/v2/apps" |
||||
) |
||||
|
||||
// AppLogType is the type of app logs.
|
||||
type AppLogType string |
||||
|
||||
const ( |
||||
// AppLogTypeBuild represents build logs.
|
||||
AppLogTypeBuild AppLogType = "BUILD" |
||||
// AppLogTypeDeploy represents deploy logs.
|
||||
AppLogTypeDeploy AppLogType = "DEPLOY" |
||||
// AppLogTypeRun represents run logs.
|
||||
AppLogTypeRun AppLogType = "RUN" |
||||
) |
||||
|
||||
// AppsService is an interface for interfacing with the App Platform endpoints
|
||||
// of the DigitalOcean API.
|
||||
type AppsService interface { |
||||
Create(ctx context.Context, create *AppCreateRequest) (*App, *Response, error) |
||||
Get(ctx context.Context, appID string) (*App, *Response, error) |
||||
List(ctx context.Context, opts *ListOptions) ([]*App, *Response, error) |
||||
Update(ctx context.Context, appID string, update *AppUpdateRequest) (*App, *Response, error) |
||||
Delete(ctx context.Context, appID string) (*Response, error) |
||||
|
||||
GetDeployment(ctx context.Context, appID, deploymentID string) (*Deployment, *Response, error) |
||||
ListDeployments(ctx context.Context, appID string, opts *ListOptions) ([]*Deployment, *Response, error) |
||||
CreateDeployment(ctx context.Context, appID string) (*Deployment, *Response, error) |
||||
|
||||
GetLogs(ctx context.Context, appID, deploymentID, component string, logType AppLogType, follow bool) (*AppLogs, *Response, error) |
||||
} |
||||
|
||||
// App represents an app.
|
||||
type App struct { |
||||
ID string `json:"id"` |
||||
Spec *AppSpec `json:"spec"` |
||||
DefaultIngress string `json:"default_ingress"` |
||||
CreatedAt time.Time `json:"created_at"` |
||||
UpdatedAt time.Time `json:"updated_at,omitempty"` |
||||
ActiveDeployment *Deployment `json:"active_deployment,omitempty"` |
||||
InProgressDeployment *Deployment `json:"in_progress_deployment,omitempty"` |
||||
} |
||||
|
||||
// Deployment represents a deployment for an app.
|
||||
type Deployment struct { |
||||
ID string `json:"id"` |
||||
Spec *AppSpec `json:"spec"` |
||||
Services []*DeploymentService `json:"services,omitempty"` |
||||
Workers []*DeploymentWorker `json:"workers,omitempty"` |
||||
StaticSites []*DeploymentStaticSite `json:"static_sites,omitempty"` |
||||
|
||||
Cause string `json:"cause"` |
||||
Progress *DeploymentProgress `json:"progress"` |
||||
|
||||
CreatedAt time.Time `json:"created_at"` |
||||
UpdatedAt time.Time `json:"updated_at,omitempty"` |
||||
} |
||||
|
||||
// DeploymentService represents a service component in a deployment.
|
||||
type DeploymentService struct { |
||||
Name string `json:"name,omitempty"` |
||||
SourceCommitHash string `json:"source_commit_hash"` |
||||
} |
||||
|
||||
// DeploymentWorker represents a worker component in a deployment.
|
||||
type DeploymentWorker struct { |
||||
Name string `json:"name,omitempty"` |
||||
SourceCommitHash string `json:"source_commit_hash"` |
||||
} |
||||
|
||||
// DeploymentStaticSite represents a static site component in a deployment.
|
||||
type DeploymentStaticSite struct { |
||||
Name string `json:"name,omitempty"` |
||||
SourceCommitHash string `json:"source_commit_hash"` |
||||
} |
||||
|
||||
// DeploymentProgress represents the total progress of a deployment.
|
||||
type DeploymentProgress struct { |
||||
PendingSteps int `json:"pending_steps"` |
||||
RunningSteps int `json:"running_steps"` |
||||
SuccessSteps int `json:"success_steps"` |
||||
ErrorSteps int `json:"error_steps"` |
||||
TotalSteps int `json:"total_steps"` |
||||
|
||||
Steps []*DeploymentProgressStep `json:"steps"` |
||||
} |
||||
|
||||
// DeploymentProgressStep represents the progress of a deployment step.
|
||||
type DeploymentProgressStep struct { |
||||
Name string `json:"name"` |
||||
Status string `json:"status"` |
||||
Steps []*DeploymentProgressStep `json:"steps,omitempty"` |
||||
Attempts uint32 `json:"attempts"` |
||||
StartedAt time.Time `json:"started_at,omitempty"` |
||||
EndedAt time.Time `json:"ended_at,omitempty"` |
||||
} |
||||
|
||||
// AppLogs represent app logs.
|
||||
type AppLogs struct { |
||||
LiveURL string `json:"live_url"` |
||||
HistoricURLs []string `json:"historic_urls"` |
||||
} |
||||
|
||||
// AppCreateRequest represents a request to create an app.
|
||||
type AppCreateRequest struct { |
||||
Spec *AppSpec `json:"spec"` |
||||
} |
||||
|
||||
// AppUpdateRequest represents a request to update an app.
|
||||
type AppUpdateRequest struct { |
||||
Spec *AppSpec `json:"spec"` |
||||
} |
||||
|
||||
type appRoot struct { |
||||
App *App `json:"app"` |
||||
} |
||||
|
||||
type appsRoot struct { |
||||
Apps []*App `json:"apps"` |
||||
} |
||||
|
||||
type deploymentRoot struct { |
||||
Deployment *Deployment `json:"deployment"` |
||||
} |
||||
|
||||
type deploymentsRoot struct { |
||||
Deployments []*Deployment `json:"deployments"` |
||||
} |
||||
|
||||
// AppsServiceOp handles communication with Apps methods of the DigitalOcean API.
|
||||
type AppsServiceOp struct { |
||||
client *Client |
||||
} |
||||
|
||||
// Creates an app.
|
||||
func (s *AppsServiceOp) Create(ctx context.Context, create *AppCreateRequest) (*App, *Response, error) { |
||||
path := appsBasePath |
||||
req, err := s.client.NewRequest(ctx, http.MethodPost, path, create) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
|
||||
root := new(appRoot) |
||||
resp, err := s.client.Do(ctx, req, root) |
||||
if err != nil { |
||||
return nil, resp, err |
||||
} |
||||
return root.App, resp, nil |
||||
} |
||||
|
||||
// Get an app.
|
||||
func (s *AppsServiceOp) Get(ctx context.Context, appID string) (*App, *Response, error) { |
||||
path := fmt.Sprintf("%s/%s", appsBasePath, appID) |
||||
req, err := s.client.NewRequest(ctx, http.MethodGet, path, nil) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
root := new(appRoot) |
||||
resp, err := s.client.Do(ctx, req, root) |
||||
if err != nil { |
||||
return nil, resp, err |
||||
} |
||||
return root.App, resp, nil |
||||
} |
||||
|
||||
// List apps.
|
||||
func (s *AppsServiceOp) List(ctx context.Context, opts *ListOptions) ([]*App, *Response, error) { |
||||
path := appsBasePath |
||||
req, err := s.client.NewRequest(ctx, http.MethodGet, path, nil) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
root := new(appsRoot) |
||||
resp, err := s.client.Do(ctx, req, root) |
||||
if err != nil { |
||||
return nil, resp, err |
||||
} |
||||
return root.Apps, resp, nil |
||||
} |
||||
|
||||
// Update an app.
|
||||
func (s *AppsServiceOp) Update(ctx context.Context, appID string, update *AppUpdateRequest) (*App, *Response, error) { |
||||
path := fmt.Sprintf("%s/%s", appsBasePath, appID) |
||||
req, err := s.client.NewRequest(ctx, http.MethodPut, path, update) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
|
||||
root := new(appRoot) |
||||
resp, err := s.client.Do(ctx, req, root) |
||||
if err != nil { |
||||
return nil, resp, err |
||||
} |
||||
return root.App, resp, nil |
||||
} |
||||
|
||||
// Delete an app.
|
||||
func (s *AppsServiceOp) Delete(ctx context.Context, appID string) (*Response, error) { |
||||
path := fmt.Sprintf("%s/%s", appsBasePath, appID) |
||||
req, err := s.client.NewRequest(ctx, http.MethodDelete, path, nil) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
resp, err := s.client.Do(ctx, req, nil) |
||||
if err != nil { |
||||
return resp, err |
||||
} |
||||
return resp, nil |
||||
} |
||||
|
||||
// GetDeployment gets an app deployment.
|
||||
func (s *AppsServiceOp) GetDeployment(ctx context.Context, appID, deploymentID string) (*Deployment, *Response, error) { |
||||
path := fmt.Sprintf("%s/%s/deployments/%s", appsBasePath, appID, deploymentID) |
||||
req, err := s.client.NewRequest(ctx, http.MethodGet, path, nil) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
root := new(deploymentRoot) |
||||
resp, err := s.client.Do(ctx, req, root) |
||||
if err != nil { |
||||
return nil, resp, err |
||||
} |
||||
return root.Deployment, resp, nil |
||||
} |
||||
|
||||
// ListDeployments lists an app deployments.
|
||||
func (s *AppsServiceOp) ListDeployments(ctx context.Context, appID string, opts *ListOptions) ([]*Deployment, *Response, error) { |
||||
path := fmt.Sprintf("%s/%s/deployments", appsBasePath, appID) |
||||
req, err := s.client.NewRequest(ctx, http.MethodGet, path, nil) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
root := new(deploymentsRoot) |
||||
resp, err := s.client.Do(ctx, req, root) |
||||
if err != nil { |
||||
return nil, resp, err |
||||
} |
||||
return root.Deployments, resp, nil |
||||
} |
||||
|
||||
// CreateDeployment creates an app deployment.
|
||||
func (s *AppsServiceOp) CreateDeployment(ctx context.Context, appID string) (*Deployment, *Response, error) { |
||||
path := fmt.Sprintf("%s/%s/deployments", appsBasePath, appID) |
||||
req, err := s.client.NewRequest(ctx, http.MethodPost, path, nil) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
root := new(deploymentRoot) |
||||
resp, err := s.client.Do(ctx, req, root) |
||||
if err != nil { |
||||
return nil, resp, err |
||||
} |
||||
return root.Deployment, resp, nil |
||||
} |
||||
|
||||
// GetLogs retrieves app logs.
|
||||
func (s *AppsServiceOp) GetLogs(ctx context.Context, appID, deploymentID, component string, logType AppLogType, follow bool) (*AppLogs, *Response, error) { |
||||
url := fmt.Sprintf("%s/%s/deployments/%s/components/%s/logs?type=%s&follow=%t", appsBasePath, appID, deploymentID, component, logType, follow) |
||||
req, err := s.client.NewRequest(ctx, http.MethodGet, url, nil) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
logs := new(AppLogs) |
||||
resp, err := s.client.Do(ctx, req, logs) |
||||
if err != nil { |
||||
return nil, resp, err |
||||
} |
||||
return logs, resp, nil |
||||
} |
@ -0,0 +1,189 @@
|
||||
// Copyright (c) 2020 The Jaeger Authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package utils |
||||
|
||||
import ( |
||||
"fmt" |
||||
"net" |
||||
"sync" |
||||
"sync/atomic" |
||||
"time" |
||||
|
||||
"github.com/uber/jaeger-client-go/log" |
||||
) |
||||
|
||||
// reconnectingUDPConn is an implementation of udpConn that resolves hostPort every resolveTimeout, if the resolved address is
|
||||
// different than the current conn then the new address is dialed and the conn is swapped.
|
||||
type reconnectingUDPConn struct { |
||||
hostPort string |
||||
resolveFunc resolveFunc |
||||
dialFunc dialFunc |
||||
logger log.Logger |
||||
bufferBytes int64 |
||||
|
||||
connMtx sync.RWMutex |
||||
conn *net.UDPConn |
||||
destAddr *net.UDPAddr |
||||
closeChan chan struct{} |
||||
} |
||||
|
||||
type resolveFunc func(network string, hostPort string) (*net.UDPAddr, error) |
||||
type dialFunc func(network string, laddr, raddr *net.UDPAddr) (*net.UDPConn, error) |
||||
|
||||
// newReconnectingUDPConn returns a new udpConn that resolves hostPort every resolveTimeout, if the resolved address is
|
||||
// different than the current conn then the new address is dialed and the conn is swapped.
|
||||
func newReconnectingUDPConn(hostPort string, resolveTimeout time.Duration, resolveFunc resolveFunc, dialFunc dialFunc, logger log.Logger) (*reconnectingUDPConn, error) { |
||||
conn := &reconnectingUDPConn{ |
||||
hostPort: hostPort, |
||||
resolveFunc: resolveFunc, |
||||
dialFunc: dialFunc, |
||||
logger: logger, |
||||
closeChan: make(chan struct{}), |
||||
} |
||||
|
||||
if err := conn.attemptResolveAndDial(); err != nil { |
||||
logger.Error(fmt.Sprintf("failed resolving destination address on connection startup, with err: %q. retrying in %s", err.Error(), resolveTimeout)) |
||||
} |
||||
|
||||
go conn.reconnectLoop(resolveTimeout) |
||||
|
||||
return conn, nil |
||||
} |
||||
|
||||
func (c *reconnectingUDPConn) reconnectLoop(resolveTimeout time.Duration) { |
||||
ticker := time.NewTicker(resolveTimeout) |
||||
defer ticker.Stop() |
||||
|
||||
for { |
||||
select { |
||||
case <-c.closeChan: |
||||
return |
||||
case <-ticker.C: |
||||
if err := c.attemptResolveAndDial(); err != nil { |
||||
c.logger.Error(err.Error()) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (c *reconnectingUDPConn) attemptResolveAndDial() error { |
||||
newAddr, err := c.resolveFunc("udp", c.hostPort) |
||||
if err != nil { |
||||
return fmt.Errorf("failed to resolve new addr for host %q, with err: %w", c.hostPort, err) |
||||
} |
||||
|
||||
c.connMtx.RLock() |
||||
curAddr := c.destAddr |
||||
c.connMtx.RUnlock() |
||||
|
||||
// dont attempt dial if an addr was successfully dialed previously and, resolved addr is the same as current conn
|
||||
if curAddr != nil && newAddr.String() == curAddr.String() { |
||||
return nil |
||||
} |
||||
|
||||
if err := c.attemptDialNewAddr(newAddr); err != nil { |
||||
return fmt.Errorf("failed to dial newly resolved addr '%s', with err: %w", newAddr, err) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (c *reconnectingUDPConn) attemptDialNewAddr(newAddr *net.UDPAddr) error { |
||||
connUDP, err := c.dialFunc(newAddr.Network(), nil, newAddr) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if bufferBytes := int(atomic.LoadInt64(&c.bufferBytes)); bufferBytes != 0 { |
||||
if err = connUDP.SetWriteBuffer(bufferBytes); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
c.connMtx.Lock() |
||||
c.destAddr = newAddr |
||||
// store prev to close later
|
||||
prevConn := c.conn |
||||
c.conn = connUDP |
||||
c.connMtx.Unlock() |
||||
|
||||
if prevConn != nil { |
||||
return prevConn.Close() |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// Write calls net.udpConn.Write, if it fails an attempt is made to connect to a new addr, if that succeeds the write is retried before returning
|
||||
func (c *reconnectingUDPConn) Write(b []byte) (int, error) { |
||||
var bytesWritten int |
||||
var err error |
||||
|
||||
c.connMtx.RLock() |
||||
if c.conn == nil { |
||||
// if connection is not initialized indicate this with err in order to hook into retry logic
|
||||
err = fmt.Errorf("UDP connection not yet initialized, an address has not been resolved") |
||||
} else { |
||||
bytesWritten, err = c.conn.Write(b) |
||||
} |
||||
c.connMtx.RUnlock() |
||||
|
||||
if err == nil { |
||||
return bytesWritten, nil |
||||
} |
||||
|
||||
// attempt to resolve and dial new address in case that's the problem, if resolve and dial succeeds, try write again
|
||||
if reconnErr := c.attemptResolveAndDial(); reconnErr == nil { |
||||
c.connMtx.RLock() |
||||
defer c.connMtx.RUnlock() |
||||
return c.conn.Write(b) |
||||
} |
||||
|
||||
// return original error if reconn fails
|
||||
return bytesWritten, err |
||||
} |
||||
|
||||
// Close stops the reconnectLoop, then closes the connection via net.udpConn 's implementation
|
||||
func (c *reconnectingUDPConn) Close() error { |
||||
close(c.closeChan) |
||||
|
||||
// acquire rw lock before closing conn to ensure calls to Write drain
|
||||
c.connMtx.Lock() |
||||
defer c.connMtx.Unlock() |
||||
|
||||
if c.conn != nil { |
||||
return c.conn.Close() |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// SetWriteBuffer defers to the net.udpConn SetWriteBuffer implementation wrapped with a RLock. if no conn is currently held
|
||||
// and SetWriteBuffer is called store bufferBytes to be set for new conns
|
||||
func (c *reconnectingUDPConn) SetWriteBuffer(bytes int) error { |
||||
var err error |
||||
|
||||
c.connMtx.RLock() |
||||
if c.conn != nil { |
||||
err = c.conn.SetWriteBuffer(bytes) |
||||
} |
||||
c.connMtx.RUnlock() |
||||
|
||||
if err == nil { |
||||
atomic.StoreInt64(&c.bufferBytes, int64(bytes)) |
||||
} |
||||
|
||||
return err |
||||
} |
Loading…
Reference in new issue