Make tunneler hold tunnels open and healthcheck vs. reopening every 5 minutes.

Also add a test for the Update() logic.
Reordered tunnels vs. storage initialization (prevent a nil ptr panic)
This commit is contained in:
CJ Cullen
2015-12-04 18:01:29 -08:00
parent 4a9b046515
commit 04eb90a5d4
9 changed files with 331 additions and 221 deletions

View File

@@ -20,6 +20,7 @@ import (
"bytes"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"encoding/pem"
"errors"
@@ -28,12 +29,17 @@ import (
"io/ioutil"
mathrand "math/rand"
"net"
"net/http"
"net/url"
"os"
"sync"
"time"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/crypto/ssh"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/runtime"
)
@@ -108,9 +114,8 @@ func (s *SSHTunnel) Open() error {
tunnelOpenCounter.Inc()
if err != nil {
tunnelOpenFailCounter.Inc()
return err
}
return nil
return err
}
func (s *SSHTunnel) Dial(network, address string) (net.Conn, error) {
@@ -240,95 +245,193 @@ func ParsePublicKeyFromFile(keyFile string) (*rsa.PublicKey, error) {
return rsaKey, nil
}
// Should be thread safe.
type SSHTunnelEntry struct {
type tunnel interface {
Open() error
Close() error
Dial(network, address string) (net.Conn, error)
}
type sshTunnelEntry struct {
Address string
Tunnel *SSHTunnel
Tunnel tunnel
}
type sshTunnelCreator interface {
NewSSHTunnel(user, keyFile, healthCheckURL string) (tunnel, error)
}
type realTunnelCreator struct{}
func (*realTunnelCreator) NewSSHTunnel(user, keyFile, healthCheckURL string) (tunnel, error) {
return NewSSHTunnel(user, keyFile, healthCheckURL)
}
// Not thread safe!
type SSHTunnelList struct {
entries []SSHTunnelEntry
entries []sshTunnelEntry
adding map[string]bool
tunnelCreator sshTunnelCreator
tunnelsLock sync.Mutex
user string
keyfile string
healthCheckURL *url.URL
}
func MakeSSHTunnels(user, keyfile string, addresses []string) *SSHTunnelList {
tunnels := []SSHTunnelEntry{}
for ix := range addresses {
addr := addresses[ix]
tunnel, err := NewSSHTunnel(user, keyfile, addr)
if err != nil {
glog.Errorf("Failed to create tunnel for %q: %v", addr, err)
continue
}
tunnels = append(tunnels, SSHTunnelEntry{addr, tunnel})
func NewSSHTunnelList(user, keyfile string, healthCheckURL *url.URL, stopChan chan struct{}) *SSHTunnelList {
l := &SSHTunnelList{
adding: make(map[string]bool),
tunnelCreator: &realTunnelCreator{},
user: user,
keyfile: keyfile,
healthCheckURL: healthCheckURL,
}
return &SSHTunnelList{tunnels}
healthCheckPoll := 1 * time.Minute
go Until(func() {
l.tunnelsLock.Lock()
defer l.tunnelsLock.Unlock()
// Healthcheck each tunnel every minute
numTunnels := len(l.entries)
for i, entry := range l.entries {
// Stagger healthchecks evenly across duration of healthCheckPoll.
delay := healthCheckPoll * time.Duration(i) / time.Duration(numTunnels)
l.delayedHealthCheck(entry, delay)
}
}, healthCheckPoll, stopChan)
return l
}
// Open attempts to open all tunnels in the list, and removes any tunnels that
// failed to open.
func (l *SSHTunnelList) Open() error {
var openTunnels []SSHTunnelEntry
for ix := range l.entries {
if err := l.entries[ix].Tunnel.Open(); err != nil {
glog.Errorf("Failed to open tunnel %v: %v", l.entries[ix], err)
} else {
openTunnels = append(openTunnels, l.entries[ix])
func (l *SSHTunnelList) delayedHealthCheck(e sshTunnelEntry, delay time.Duration) {
go func() {
defer runtime.HandleCrash()
time.Sleep(delay)
if err := l.healthCheck(e); err != nil {
glog.Errorf("Healthcheck failed for tunnel to %q: %v", e.Address, err)
glog.Infof("Attempting once to re-establish tunnel to %q", e.Address)
l.removeAndReAdd(e)
}
}()
}
func (l *SSHTunnelList) healthCheck(e sshTunnelEntry) error {
// GET the healthcheck path using the provided tunnel's dial function.
transport := utilnet.SetTransportDefaults(&http.Transport{
Dial: e.Tunnel.Dial,
// TODO(cjcullen): Plumb real TLS options through.
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
})
client := &http.Client{Transport: transport}
_, err := client.Get(l.healthCheckURL.String())
return err
}
func (l *SSHTunnelList) removeAndReAdd(e sshTunnelEntry) {
// Find the entry to replace.
l.tunnelsLock.Lock()
defer l.tunnelsLock.Unlock()
for i, entry := range l.entries {
if entry.Tunnel == e.Tunnel {
l.entries = append(l.entries[:i], l.entries[i+1:]...)
l.adding[e.Address] = true
go l.createAndAddTunnel(e.Address)
return
}
}
l.entries = openTunnels
}
func (l *SSHTunnelList) Dial(net, addr string) (net.Conn, error) {
start := time.Now()
id := mathrand.Int63() // So you can match begins/ends in the log.
glog.Infof("[%x: %v] Dialing...", id, addr)
defer func() {
glog.Infof("[%x: %v] Dialed in %v.", id, addr, time.Now().Sub(start))
}()
tunnel, err := l.pickRandomTunnel()
if err != nil {
return nil, err
}
return tunnel.Dial(net, addr)
}
func (l *SSHTunnelList) pickRandomTunnel() (tunnel, error) {
l.tunnelsLock.Lock()
defer l.tunnelsLock.Unlock()
if len(l.entries) == 0 {
return errors.New("Failed to open any tunnels.")
return nil, fmt.Errorf("No SSH tunnels currently open. Were the targets able to accept an ssh-key for user %q?", l.user)
}
return nil
n := mathrand.Intn(len(l.entries))
return l.entries[n].Tunnel, nil
}
// Close asynchronously closes all tunnels in the list after waiting for 1
// minute. Tunnels will still be open upon this function's return, but should
// no longer be used.
func (l *SSHTunnelList) Close() {
for ix := range l.entries {
entry := l.entries[ix]
go func() {
defer runtime.HandleCrash()
time.Sleep(1 * time.Minute)
if err := entry.Tunnel.Close(); err != nil {
glog.Errorf("Failed to close tunnel %v: %v", entry, err)
// Update reconciles the list's entries with the specified addresses. Existing
// tunnels that are not in addresses are removed from entries and closed in a
// background goroutine. New tunnels specified in addresses are opened in a
// background goroutine and then added to entries.
func (l *SSHTunnelList) Update(addrs []string) {
haveAddrsMap := make(map[string]bool)
wantAddrsMap := make(map[string]bool)
func() {
l.tunnelsLock.Lock()
defer l.tunnelsLock.Unlock()
// Build a map of what we currently have.
for i := range l.entries {
haveAddrsMap[l.entries[i].Address] = true
}
// Determine any necessary additions.
for i := range addrs {
// Add tunnel if it is not in l.entries or l.adding
if _, ok := haveAddrsMap[addrs[i]]; !ok {
if _, ok := l.adding[addrs[i]]; !ok {
l.adding[addrs[i]] = true
addr := addrs[i]
go func() {
defer runtime.HandleCrash()
// Actually adding tunnel to list will block until lock
// is released after deletions.
l.createAndAddTunnel(addr)
}()
}
}
}()
}
}
/* this will make sense if we move the lock into SSHTunnelList.
func (l *SSHTunnelList) Dial(network, addr string) (net.Conn, error) {
if len(l.entries) == 0 {
return nil, fmt.Errorf("empty tunnel list.")
}
n := mathrand.Intn(len(l.entries))
return l.entries[n].Tunnel.Dial(network, addr)
}
*/
// Returns a random tunnel, xor an error if there are none.
func (l *SSHTunnelList) PickRandomTunnel() (SSHTunnelEntry, error) {
if len(l.entries) == 0 {
return SSHTunnelEntry{}, fmt.Errorf("empty tunnel list.")
}
n := mathrand.Intn(len(l.entries))
return l.entries[n], nil
}
func (l *SSHTunnelList) Has(addr string) bool {
for ix := range l.entries {
if l.entries[ix].Address == addr {
return true
wantAddrsMap[addrs[i]] = true
}
}
return false
// Determine any necessary deletions.
var newEntries []sshTunnelEntry
for i := range l.entries {
if _, ok := wantAddrsMap[l.entries[i].Address]; !ok {
tunnelEntry := l.entries[i]
glog.Infof("Removing tunnel to deleted node at %q", tunnelEntry.Address)
go func() {
defer runtime.HandleCrash()
if err := tunnelEntry.Tunnel.Close(); err != nil {
glog.Errorf("Failed to close tunnel to %q: %v", tunnelEntry.Address, err)
}
}()
} else {
newEntries = append(newEntries, l.entries[i])
}
}
l.entries = newEntries
}()
}
func (l *SSHTunnelList) Len() int {
return len(l.entries)
func (l *SSHTunnelList) createAndAddTunnel(addr string) {
glog.Infof("Trying to add tunnel to %q", addr)
tunnel, err := l.tunnelCreator.NewSSHTunnel(l.user, l.keyfile, addr)
if err != nil {
glog.Errorf("Failed to create tunnel for %q: %v", addr, err)
return
}
if err := tunnel.Open(); err != nil {
glog.Errorf("Failed to open tunnel to %q: %v", addr, err)
l.tunnelsLock.Lock()
delete(l.adding, addr)
l.tunnelsLock.Unlock()
return
}
l.tunnelsLock.Lock()
l.entries = append(l.entries, sshTunnelEntry{addr, tunnel})
delete(l.adding, addr)
l.tunnelsLock.Unlock()
glog.Infof("Successfully added tunnel for %q", addr)
}
func EncodePrivateKey(private *rsa.PrivateKey) []byte {