agent: SCADA HTTP integration

pull/711/head
Armon Dadgar 10 years ago
parent 456645f2fb
commit 32aaee5185

@ -2,11 +2,14 @@ package agent
import ( import (
"crypto/tls" "crypto/tls"
"errors"
"fmt" "fmt"
"io" "io"
"log" "log"
"net" "net"
"strconv" "strconv"
"sync"
"time"
"github.com/hashicorp/scada-client" "github.com/hashicorp/scada-client"
) )
@ -60,13 +63,126 @@ func NewProvider(c *Config, logOutput io.Writer) (*client.Provider, net.Listener
InsecureSkipVerify: true, InsecureSkipVerify: true,
} }
// TODO: Setup the handlers // TODO: AtlasACLToken
config.Handlers["http"] = nil
// Create an HTTP listener and handler
list := newScadaListener(c.AtlasCluster)
config.Handlers["http"] = func(capability string, meta map[string]string,
conn io.ReadWriteCloser) error {
return list.PushRWC(conn)
}
// Create the provider // Create the provider
provider, err := client.NewProvider(config) provider, err := client.NewProvider(config)
if err != nil { if err != nil {
list.Close()
return nil, nil, err return nil, nil, err
} }
return provider, nil, nil return provider, list, nil
}
// scadaListener is used to return a net.Listener for
// incoming SCADA connections
type scadaListener struct {
addr *scadaAddr
pending chan net.Conn
closed bool
closedCh chan struct{}
l sync.Mutex
}
// newScadaListener returns a new listener
func newScadaListener(cluster string) *scadaListener {
l := &scadaListener{
addr: &scadaAddr{cluster},
pending: make(chan net.Conn),
closedCh: make(chan struct{}),
}
return l
}
// PushRWC is used to push a io.ReadWriteCloser as a net.Conn
func (s *scadaListener) PushRWC(conn io.ReadWriteCloser) error {
// Check if this already implements net.Conn
if nc, ok := conn.(net.Conn); ok {
return s.Push(nc)
}
// Wrap to implement the interface
wrapped := &scadaRWC{conn, s.addr}
return s.Push(wrapped)
}
// Push is used to add a connection to the queu
func (s *scadaListener) Push(conn net.Conn) error {
select {
case s.pending <- conn:
return nil
case <-s.closedCh:
return fmt.Errorf("scada listener closed")
}
}
func (s *scadaListener) Accept() (net.Conn, error) {
select {
case conn := <-s.pending:
return conn, nil
case <-s.closedCh:
return nil, fmt.Errorf("scada listener closed")
}
}
func (s *scadaListener) Close() error {
s.l.Lock()
defer s.l.Unlock()
if s.closed {
return nil
}
s.closed = true
close(s.pending)
close(s.closedCh)
return nil
}
func (s *scadaListener) Addr() net.Addr {
return s.addr
}
// scadaAddr is used to return a net.Addr for SCADA
type scadaAddr struct {
cluster string
}
func (s *scadaAddr) Network() string {
return "SCADA"
}
func (s *scadaAddr) String() string {
return fmt.Sprintf("SCADA::Atlas::%s", s.cluster)
}
type scadaRWC struct {
io.ReadWriteCloser
addr *scadaAddr
}
func (s *scadaRWC) LocalAddr() net.Addr {
return s.addr
}
func (s *scadaRWC) RemoteAddr() net.Addr {
return s.addr
}
func (s *scadaRWC) SetDeadline(t time.Time) error {
return errors.New("SCADA.Conn does not support deadlines")
}
func (s *scadaRWC) SetReadDeadline(t time.Time) error {
return errors.New("SCADA.Conn does not support deadlines")
}
func (s *scadaRWC) SetWriteDeadline(t time.Time) error {
return errors.New("SCADA.Conn does not support deadlines")
} }

Loading…
Cancel
Save