diff options
| author | Xe <me@christine.website> | 2022-11-23 21:56:18 -0500 |
|---|---|---|
| committer | Xe <me@christine.website> | 2022-11-23 21:56:18 -0500 |
| commit | c62e73f9f8145655485ecf36e8187a5652521855 (patch) | |
| tree | bfd9119316785a2ea7d7fd0bdf889f62087dd9b2 /tun2 | |
| parent | 58385c715cd31ce4c89e555167b5ae2cf5306ad8 (diff) | |
| download | x-1.3.0.tar.xz x-1.3.0.zip | |
remove tun2v1.3.0
Signed-off-by: Xe <me@christine.website>
Diffstat (limited to 'tun2')
| -rw-r--r-- | tun2/backend.go | 78 | ||||
| -rw-r--r-- | tun2/backend_test.go | 212 | ||||
| -rw-r--r-- | tun2/client.go | 171 | ||||
| -rw-r--r-- | tun2/client_test.go | 21 | ||||
| -rw-r--r-- | tun2/connection.go | 162 | ||||
| -rw-r--r-- | tun2/doc.go | 11 | ||||
| -rw-r--r-- | tun2/server.go | 480 | ||||
| -rw-r--r-- | tun2/server_test.go | 171 | ||||
| -rw-r--r-- | tun2/storage_test.go | 100 |
9 files changed, 0 insertions, 1406 deletions
diff --git a/tun2/backend.go b/tun2/backend.go deleted file mode 100644 index d94a1a8..0000000 --- a/tun2/backend.go +++ /dev/null @@ -1,78 +0,0 @@ -package tun2 - -import "time" - -// Backend is the public state of an individual Connection. -type Backend struct { - ID string - Proto string - User string - Domain string - Phi float32 - Host string - Usable bool -} - -type backendMatcher func(*Connection) bool - -func (s *Server) getBackendsForMatcher(bm backendMatcher) []Backend { - s.connlock.Lock() - defer s.connlock.Unlock() - - var result []Backend - - for _, c := range s.conns { - if !bm(c) { - continue - } - - result = append(result, Backend{ - ID: c.id, - Proto: c.conn.LocalAddr().Network(), - User: c.user, - Domain: c.domain, - Phi: float32(c.detector.Phi(time.Now())), - Host: c.conn.RemoteAddr().String(), - Usable: c.usable, - }) - } - - return result -} - -// KillBackend forcibly disconnects a given backend but doesn't offer a way to -// "ban" it from reconnecting. -func (s *Server) KillBackend(id string) error { - s.connlock.Lock() - defer s.connlock.Unlock() - - for _, c := range s.conns { - if c.id == id { - c.cancel() - return nil - } - } - - return ErrNoSuchBackend -} - -// GetBackendsForDomain fetches all backends connected to this server associated -// to a single public domain name. -func (s *Server) GetBackendsForDomain(domain string) []Backend { - return s.getBackendsForMatcher(func(c *Connection) bool { - return c.domain == domain - }) -} - -// GetBackendsForUser fetches all backends connected to this server owned by a -// given user by username. -func (s *Server) GetBackendsForUser(uname string) []Backend { - return s.getBackendsForMatcher(func(c *Connection) bool { - return c.user == uname - }) -} - -// GetAllBackends fetches every backend connected to this server. -func (s *Server) GetAllBackends() []Backend { - return s.getBackendsForMatcher(func(*Connection) bool { return true }) -} diff --git a/tun2/backend_test.go b/tun2/backend_test.go deleted file mode 100644 index 3e808d1..0000000 --- a/tun2/backend_test.go +++ /dev/null @@ -1,212 +0,0 @@ -package tun2 - -import ( - "bytes" - "context" - "encoding/json" - "net" - "net/http" - "net/http/httptest" - "os" - "testing" - "time" -) - -func TestBackendAuthV1(t *testing.T) { - st := MockStorage() - - s, err := NewServer(&ServerConfig{ - Storage: st, - }) - if err != nil { - t.Fatal(err) - } - defer s.Close() - - st.AddRoute(domain, user) - st.AddToken(token, user, []string{"connect"}) - st.AddToken(noPermToken, user, nil) - st.AddToken(otherUserToken, "cadey", []string{"connect"}) - - cases := []struct { - name string - auth Auth - wantErr bool - }{ - { - name: "basic everything should work", - auth: Auth{ - Token: token, - Domain: domain, - }, - wantErr: false, - }, - { - name: "invalid domain", - auth: Auth{ - Token: token, - Domain: "aw.heck", - }, - wantErr: true, - }, - { - name: "invalid token", - auth: Auth{ - Token: "asdfwtweg", - Domain: domain, - }, - wantErr: true, - }, - { - name: "invalid token scopes", - auth: Auth{ - Token: noPermToken, - Domain: domain, - }, - wantErr: true, - }, - { - name: "user token doesn't match domain owner", - auth: Auth{ - Token: otherUserToken, - Domain: domain, - }, - wantErr: true, - }, - } - - for _, cs := range cases { - t.Run(cs.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - data, err := json.Marshal(cs.auth) - if err != nil { - t.Fatal(err) - } - - _, _, err = s.backendAuthv1(ctx, bytes.NewBuffer(data)) - - if cs.wantErr && err == nil { - t.Fatalf("auth did not err as expected") - } - - if !cs.wantErr && err != nil { - t.Fatalf("unexpected auth err: %v", err) - } - }) - } -} - -func TestBackendRouting(t *testing.T) { - t.Skip() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - st := MockStorage() - - st.AddRoute(domain, user) - st.AddToken(token, user, []string{"connect"}) - - s, err := NewServer(&ServerConfig{ - Storage: st, - }) - if err != nil { - t.Fatal(err) - } - defer s.Close() - - l, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatal(err) - } - defer l.Close() - - go s.Listen(l) - - cases := []struct { - name string - wantStatusCode int - handler http.HandlerFunc - }{ - { - name: "200 everything's okay", - wantStatusCode: http.StatusOK, - handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - http.Error(w, "HTTP 200, everything is okay :)", http.StatusOK) - }), - }, - { - name: "500 internal error", - wantStatusCode: http.StatusInternalServerError, - handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - http.Error(w, "HTTP 500, the world is on fire", http.StatusInternalServerError) - }), - }, - } - - for _, cs := range cases { - t.Run(cs.name, func(t *testing.T) { - ts := httptest.NewServer(cs.handler) - defer ts.Close() - - cc := &ClientConfig{ - ConnType: "tcp", - ServerAddr: l.Addr().String(), - Token: token, - BackendURL: ts.URL, - Domain: domain, - - forceTCPClear: true, - } - - c, err := NewClient(cc) - if err != nil { - t.Fatal(err) - } - - go c.Connect(ctx) // TODO: fix the client library so this ends up actually getting cleaned up - - time.Sleep(125 * time.Millisecond) - - req, err := http.NewRequest("GET", "http://cetacean.club/", nil) - if err != nil { - t.Fatal(err) - } - - resp, err := s.RoundTrip(req) - if err != nil { - t.Fatalf("error in doing round trip: %v", err) - } - - if cs.wantStatusCode != resp.StatusCode { - resp.Write(os.Stdout) - t.Fatalf("got status %d instead of %d", resp.StatusCode, cs.wantStatusCode) - } - }) - } -} - -func setupTestServer() (*Server, *mockStorage, net.Listener, error) { - st := MockStorage() - - st.AddRoute(domain, user) - st.AddToken(token, user, []string{"connect"}) - - s, err := NewServer(&ServerConfig{ - Storage: st, - }) - if err != nil { - return nil, nil, nil, err - } - defer s.Close() - - l, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return nil, nil, nil, err - } - - go s.Listen(l) - - return s, st, l, nil -} diff --git a/tun2/client.go b/tun2/client.go deleted file mode 100644 index d7a3982..0000000 --- a/tun2/client.go +++ /dev/null @@ -1,171 +0,0 @@ -package tun2 - -import ( - "context" - "crypto/tls" - "encoding/json" - "errors" - "io" - "net" - "net/http" - "net/http/httputil" - "net/url" - - "within.website/ln" - "within.website/ln/opname" - kcp "github.com/xtaci/kcp-go" - "github.com/xtaci/smux" -) - -// Client connects to a remote tun2 server and sets up authentication before routing -// individual HTTP requests to discrete streams that are reverse proxied to the eventual -// backend. -type Client struct { - cfg *ClientConfig -} - -// ClientConfig configures client with settings that the user provides. -type ClientConfig struct { - TLSConfig *tls.Config - ConnType string - ServerAddr string - Token string - Domain string - BackendURL string - - // internal use only - forceTCPClear bool -} - -// NewClient constructs an instance of Client with a given ClientConfig. -func NewClient(cfg *ClientConfig) (*Client, error) { - if cfg == nil { - return nil, errors.New("tun2: client config needed") - } - - c := &Client{ - cfg: cfg, - } - - return c, nil -} - -// Connect dials the remote server and negotiates a client session with its -// configured server address. This will then continuously proxy incoming HTTP -// requests to the backend HTTP server. -// -// This is a blocking function. -func (c *Client) Connect(ctx context.Context) error { - ctx = opname.With(ctx, "tun2.Client.connect") - return c.connect(ctx, c.cfg.ServerAddr) -} - -func closeLater(ctx context.Context, clo io.Closer) { - <-ctx.Done() - clo.Close() -} - -func (c *Client) connect(ctx context.Context, serverAddr string) error { - target, err := url.Parse(c.cfg.BackendURL) - if err != nil { - return err - } - - s := &http.Server{ - Handler: httputil.NewSingleHostReverseProxy(target), - } - go closeLater(ctx, s) - - f := ln.F{ - "server_addr": serverAddr, - "conn_type": c.cfg.ConnType, - } - - var conn net.Conn - - switch c.cfg.ConnType { - case "tcp": - if c.cfg.forceTCPClear { - ln.Log(ctx, f, ln.Info("connecting over plain TCP")) - conn, err = net.Dial("tcp", serverAddr) - } else { - conn, err = tls.Dial("tcp", serverAddr, c.cfg.TLSConfig) - } - - if err != nil { - return err - } - - case "kcp": - kc, err := kcp.Dial(serverAddr) - if err != nil { - return err - } - defer kc.Close() - - serverHost, _, _ := net.SplitHostPort(serverAddr) - - tc := c.cfg.TLSConfig.Clone() - tc.ServerName = serverHost - conn = tls.Client(kc, tc) - } - go closeLater(ctx, conn) - - ln.Log(ctx, f, ln.Info("connected")) - - session, err := smux.Client(conn, smux.DefaultConfig()) - if err != nil { - return err - } - go closeLater(ctx, session) - - controlStream, err := session.AcceptStream() - if err != nil { - return err - } - go closeLater(ctx, controlStream) - - authData, err := json.Marshal(&Auth{ - Token: c.cfg.Token, - Domain: c.cfg.Domain, - }) - if err != nil { - return err - } - - _, err = controlStream.Write(authData) - if err != nil { - return err - } - - err = s.Serve(&smuxListener{ - conn: conn, - session: session, - }) - if err != nil { - return err - } - - if err := ctx.Err(); err != nil { - ln.Error(ctx, err, f, ln.Info("context error")) - } - return nil -} - -// smuxListener wraps a smux session as a net.Listener. -type smuxListener struct { - conn net.Conn - session *smux.Session -} - -func (sl *smuxListener) Accept() (net.Conn, error) { - return sl.session.AcceptStream() -} - -func (sl *smuxListener) Addr() net.Addr { - return sl.conn.LocalAddr() -} - -func (sl *smuxListener) Close() error { - return sl.session.Close() -} diff --git a/tun2/client_test.go b/tun2/client_test.go deleted file mode 100644 index d3127a7..0000000 --- a/tun2/client_test.go +++ /dev/null @@ -1,21 +0,0 @@ -package tun2 - -import ( - "net" - "testing" -) - -func TestNewClientNullConfig(t *testing.T) { - _, err := NewClient(nil) - if err == nil { - t.Fatalf("expected NewClient(nil) to fail, got non-failure") - } -} - -func TestSmuxListenerIsNetListener(t *testing.T) { - var sl interface{} = &smuxListener{} - _, ok := sl.(net.Listener) - if !ok { - t.Fatalf("smuxListener does not implement net.Listener") - } -} diff --git a/tun2/connection.go b/tun2/connection.go deleted file mode 100644 index ff03e96..0000000 --- a/tun2/connection.go +++ /dev/null @@ -1,162 +0,0 @@ -package tun2 - -import ( - "bufio" - "context" - "expvar" - "net" - "net/http" - "sync" - "time" - - failure "github.com/dgryski/go-failure" - "github.com/pkg/errors" - "github.com/xtaci/smux" - "within.website/ln" - "within.website/ln/opname" -) - -// Connection is a single active client -> server connection and session -// containing many streams over TCP+TLS or KCP+TLS. Every stream beyond the -// control stream is assumed to be passed to the underlying backend server. -// -// All Connection methods assume this is locked externally. -type Connection struct { - id string - conn net.Conn - session *smux.Session - controlStream *smux.Stream - user string - domain string - cf context.CancelFunc - detector *failure.Detector - Auth *Auth - usable bool - - sync.Mutex - counter *expvar.Int -} - -func (c *Connection) cancel() { - c.cf() - c.usable = false -} - -// F logs key->value pairs as an ln.Fer -func (c *Connection) F() ln.F { - return map[string]interface{}{ - "id": c.id, - "remote": c.conn.RemoteAddr(), - "local": c.conn.LocalAddr(), - "kind": c.conn.LocalAddr().Network(), - "user": c.user, - "domain": c.domain, - } -} - -// Ping ends a "ping" to the client. If the client doesn't respond or the connection -// dies, then the connection needs to be cleaned up. -func (c *Connection) Ping() error { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - ctx = opname.With(ctx, "tun2.Connection.Ping") - ctx = ln.WithF(ctx, ln.F{"timeout": time.Second}) - - req, err := http.NewRequest("GET", "http://backend/health", nil) - if err != nil { - panic(err) - } - req = req.WithContext(ctx) - - _, err = c.RoundTrip(req) - if err != nil { - ln.Error(ctx, err, c, ln.Action("pinging the backend")) - return err - } - - c.detector.Ping(time.Now()) - - return nil -} - -// OpenStream creates a new stream (connection) to the backend server. -func (c *Connection) OpenStream(ctx context.Context) (net.Conn, error) { - ctx = opname.With(ctx, "OpenStream") - if !c.usable { - return nil, ErrNoSuchBackend - } - ctx = ln.WithF(ctx, ln.F{"timeout": time.Second}) - - err := c.conn.SetDeadline(time.Now().Add(time.Second)) - if err != nil { - ln.Error(ctx, err, c) - return nil, err - } - - stream, err := c.session.OpenStream() - if err != nil { - ln.Error(ctx, err, c) - return nil, err - } - - return stream, c.conn.SetDeadline(time.Time{}) -} - -// Close destroys resouces specific to the connection. -func (c *Connection) Close() error { - err := c.controlStream.Close() - if err != nil { - return err - } - - err = c.session.Close() - if err != nil { - return err - } - - err = c.conn.Close() - if err != nil { - return err - } - - return nil -} - -// Connection-specific errors -var ( - ErrCantOpenSessionStream = errors.New("tun2: connection can't open session stream") - ErrCantWriteRequest = errors.New("tun2: connection stream can't write request") - ErrCantReadResponse = errors.New("tun2: connection stream can't read response") -) - -// RoundTrip forwards a HTTP request to the remote backend and then returns the -// response, if any. -func (c *Connection) RoundTrip(req *http.Request) (*http.Response, error) { - ctx := req.Context() - ctx = opname.With(ctx, "tun2.Connection.RoundTrip") - stream, err := c.OpenStream(ctx) - if err != nil { - return nil, errors.Wrap(err, ErrCantOpenSessionStream.Error()) - } - - go func() { - <-req.Context().Done() - stream.Close() - }() - - err = req.Write(stream) - if err != nil { - return nil, errors.Wrap(err, ErrCantWriteRequest.Error()) - } - - buf := bufio.NewReader(stream) - - resp, err := http.ReadResponse(buf, req) - if err != nil { - return nil, errors.Wrap(err, ErrCantReadResponse.Error()) - } - - c.counter.Add(1) - - return resp, nil -} diff --git a/tun2/doc.go b/tun2/doc.go deleted file mode 100644 index e1a1fa5..0000000 --- a/tun2/doc.go +++ /dev/null @@ -1,11 +0,0 @@ -/* -Package tun2 tunnels HTTP requests over existing, long-lived connections using -smux[1] and optionally kcp[2] to enable more reliable transport. - -Currently this only works on a per-domain basis, but it is designed to be -flexible enough to support path-based routing as an addition in the future. - -[1]: https://github.com/xtaci/smux -[2]: https://github.com/xtaci/kcp-go -*/ -package tun2 diff --git a/tun2/server.go b/tun2/server.go deleted file mode 100644 index cb2e4e4..0000000 --- a/tun2/server.go +++ /dev/null @@ -1,480 +0,0 @@ -package tun2 - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "expvar" - "fmt" - "io" - "io/ioutil" - "math/rand" - "net" - "net/http" - "os" - "sync" - "time" - - failure "github.com/dgryski/go-failure" - "github.com/pborman/uuid" - cmap "github.com/streamrail/concurrent-map" - "github.com/xtaci/smux" - "within.website/ln" - "within.website/ln/opname" -) - -// Error values -var ( - ErrNoSuchBackend = errors.New("tun2: there is no such backend") - ErrAuthMismatch = errors.New("tun2: authenication doesn't match database records") - ErrCantRemoveWhatDoesntExist = errors.New("tun2: this connection does not exist, cannot remove it") -) - -// gen502Page creates the page that is shown when a backend is not connected to a given route. -func gen502Page(req *http.Request) *http.Response { - template := `<html><head><title>no backends connected</title></head><body><h1>no backends connected</h1><p>Please ensure a backend is running for ${HOST}. This is request ID ${REQ_ID}.</p></body></html>` - - resbody := []byte(os.Expand(template, func(in string) string { - switch in { - case "HOST": - return req.Host - case "REQ_ID": - return req.Header.Get("X-Request-Id") - } - - return "<unknown>" - })) - reshdr := req.Header - reshdr.Set("Content-Type", "text/html; charset=utf-8") - - resp := &http.Response{ - Status: fmt.Sprintf("%d Bad Gateway", http.StatusBadGateway), - StatusCode: http.StatusBadGateway, - Body: ioutil.NopCloser(bytes.NewBuffer(resbody)), - - Proto: req.Proto, - ProtoMajor: req.ProtoMajor, - ProtoMinor: req.ProtoMinor, - Header: reshdr, - ContentLength: int64(len(resbody)), - Close: true, - Request: req, - } - - return resp -} - -// ServerConfig ... -type ServerConfig struct { - SmuxConf *smux.Config - Storage Storage -} - -// Storage is the minimal subset of features that tun2's Server needs out of a -// persistence layer. -type Storage interface { - HasToken(ctx context.Context, token string) (user string, scopes []string, err error) - HasRoute(ctx context.Context, domain string) (user string, err error) -} - -// Server routes frontend HTTP traffic to backend TCP traffic. -type Server struct { - cfg *ServerConfig - ctx context.Context - cancel context.CancelFunc - - connlock sync.Mutex - conns map[net.Conn]*Connection - - domains cmap.ConcurrentMap -} - -// NewServer creates a new Server instance with a given config, acquiring all -// relevant resources. -func NewServer(cfg *ServerConfig) (*Server, error) { - if cfg == nil { - return nil, errors.New("tun2: config must be specified") - } - - if cfg.SmuxConf == nil { - cfg.SmuxConf = smux.DefaultConfig() - - cfg.SmuxConf.KeepAliveInterval = time.Second - cfg.SmuxConf.KeepAliveTimeout = 15 * time.Second - } - - ctx, cancel := context.WithCancel(context.Background()) - ctx = opname.With(ctx, "tun2.Server") - - server := &Server{ - cfg: cfg, - - conns: map[net.Conn]*Connection{}, - domains: cmap.New(), - ctx: ctx, - cancel: cancel, - } - - go server.phiDetectionLoop(ctx) - - return server, nil -} - -// Close stops the background tasks for this Server. -func (s *Server) Close() { - s.cancel() -} - -// Wait blocks until the server context is cancelled. -func (s *Server) Wait() { - for { - select { - case <-s.ctx.Done(): - return - } - } -} - -// Listen passes this Server a given net.Listener to accept backend connections. -func (s *Server) Listen(l net.Listener) { - ctx := opname.With(s.ctx, "Listen") - - f := ln.F{ - "listener_addr": l.Addr(), - "listener_network": l.Addr().Network(), - } - - for { - select { - case <-ctx.Done(): - return - default: - } - - conn, err := l.Accept() - if err != nil { - ln.Error(ctx, err, f, ln.Action("accept connection")) - continue - } - - ln.Log(ctx, f, ln.Action("new backend client connected"), ln.F{ - "conn_addr": conn.RemoteAddr(), - "conn_network": conn.RemoteAddr().Network(), - }) - - go s.HandleConn(ctx, conn) - } -} - -// phiDetectionLoop is an infinite loop that will run the [phi accrual failure detector] -// for each of the backends connected to the Server. This is fairly experimental and -// may be removed. -// -// [phi accrual failure detector]: https://dspace.jaist.ac.jp/dspace/handle/10119/4784 -func (s *Server) phiDetectionLoop(ctx context.Context) { - ctx = opname.With(ctx, "phiDetectionLoop") - t := time.NewTicker(5 * time.Second) - defer t.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-t.C: - now := time.Now() - - s.connlock.Lock() - for _, c := range s.conns { - failureChance := c.detector.Phi(now) - const thresh = 0.9 // the threshold for phi failure detection causing logs - - if failureChance > thresh { - ln.Log(ctx, c, ln.Info("phi failure detection"), ln.F{ - "value": failureChance, - "threshold": thresh, - }) - } - } - s.connlock.Unlock() - } - } -} - -// backendAuthv1 runs a simple backend authentication check. It expects the -// client to write a json-encoded instance of Auth. This is then checked -// for token validity and domain matching. -// -// This returns the user that was authenticated and the domain they identified -// with. -func (s *Server) backendAuthv1(ctx context.Context, st io.Reader) (string, *Auth, error) { - ctx = opname.With(ctx, "backendAuthv1") - f := ln.F{ - "backend_auth_version": 1, - } - - f["stage"] = "json decoding" - - d := json.NewDecoder(st) - var auth Auth - err := d.Decode(&auth) - if err != nil { - ln.Error(ctx, err, f) - return "", nil, err - } - - f["auth_domain"] = auth.Domain - f["stage"] = "checking domain" - - routeUser, err := s.cfg.Storage.HasRoute(ctx, auth.Domain) - if err != nil { - ln.Error(ctx, err, f) - return "", nil, err - } - - f["route_user"] = routeUser - f["stage"] = "checking token" - - tokenUser, scopes, err := s.cfg.Storage.HasToken(ctx, auth.Token) - if err != nil { - ln.Error(ctx, err, f) - return "", nil, err - } - - f["token_user"] = tokenUser - f["stage"] = "checking token scopes" - - ok := false - for _, sc := range scopes { - if sc == "connect" { - ok = true - break - } - } - - if !ok { - ln.Error(ctx, ErrAuthMismatch, f) - return "", nil, ErrAuthMismatch - } - - f["stage"] = "user verification" - - if routeUser != tokenUser { - ln.Error(ctx, ErrAuthMismatch, f) - return "", nil, ErrAuthMismatch - } - - return routeUser, &auth, nil -} - -// HandleConn starts up the needed mechanisms to relay HTTP traffic to/from -// the currently connected backend. -func (s *Server) HandleConn(ctx context.Context, c net.Conn) { - ctx = opname.With(ctx, "HandleConn") - var cancel context.CancelFunc - ctx, cancel = context.WithCancel(ctx) - defer cancel() - - f := ln.F{ - "local": c.LocalAddr().String(), - "remote": c.RemoteAddr().String(), - } - - session, err := smux.Server(c, s.cfg.SmuxConf) - if err != nil { - ln.Error(ctx, err, f, ln.Action("establish server side of smux")) - - return - } - defer session.Close() - - controlStream, err := session.OpenStream() - if err != nil { - ln.Error(ctx, err, f, ln.Action("opening control stream")) - - return - } - defer controlStream.Close() - - user, auth, err := s.backendAuthv1(ctx, controlStream) - if err != nil { - return - } - - connection := &Connection{ - id: uuid.New(), - conn: c, - session: session, - user: user, - domain: auth.Domain, - cf: cancel, - detector: failure.New(15, 1), - Auth: auth, - } - connection.counter = expvar.NewInt("http.backend." + connection.id + ".hits") - - defer func() { - if r := recover(); r != nil { - ln.Log(ctx, connection, ln.F{"action": "connection handler panic", "err": r}) - } - }() - - ln.Log(ctx, connection, ln.Action("backend successfully connected")) - s.addConn(ctx, connection) - - connection.Lock() - connection.usable = true // XXX set this to true once health checks pass? - connection.Unlock() - - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - err := connection.Ping() - if err != nil { - connection.cancel() - } - case <-s.ctx.Done(): - ln.Log(ctx, connection, ln.Action("server context finished")) - s.removeConn(ctx, connection) - connection.Close() - - return - case <-ctx.Done(): - ln.Log(ctx, connection, ln.Action("client context finished")) - s.removeConn(ctx, connection) - connection.Close() - - return - } - } -} - -// addConn adds a connection to the pool of backend connections. -func (s *Server) addConn(ctx context.Context, connection *Connection) { - s.connlock.Lock() - s.conns[connection.conn] = connection - s.connlock.Unlock() - - var conns []*Connection - - connection.Lock() - val, ok := s.domains.Get(connection.domain) - connection.Unlock() - if ok { - conns, ok = val.([]*Connection) - if !ok { - conns = nil - - connection.Lock() - s.domains.Remove(connection.domain) - connection.Unlock() - } - } - - conns = append(conns, connection) - - connection.Lock() - s.domains.Set(connection.domain, conns) - connection.Unlock() -} - -// removeConn removes a connection from pool of backend connections. -func (s *Server) removeConn(ctx context.Context, connection *Connection) { - ctx = opname.With(ctx, "removeConn") - s.connlock.Lock() - delete(s.conns, connection.conn) - s.connlock.Unlock() - - auth := connection.Auth - - var conns []*Connection - - val, ok := s.domains.Get(auth.Domain) - if ok { - conns, ok = val.([]*Connection) - if !ok { - ln.Error(ctx, ErrCantRemoveWhatDoesntExist, connection, ln.Info("looking up for disconnect removal")) - - return - } - } - - for i, cntn := range conns { - if cntn.id == connection.id { - conns[i] = conns[len(conns)-1] - conns = conns[:len(conns)-1] - } - } - - if len(conns) != 0 { - s.domains.Set(auth.Domain, conns) - } else { - s.domains.Remove(auth.Domain) - } -} - -// RoundTrip sends a HTTP request to a backend and then returns its response. -func (s *Server) RoundTrip(req *http.Request) (*http.Response, error) { - var conns []*Connection - ctx := req.Context() - ctx = opname.With(ctx, "tun2.Server.RoundTrip") - - f := ln.F{ - "req_remote": req.RemoteAddr, - "req_host": req.Host, - "req_uri": req.RequestURI, - "req_method": req.Method, - "req_content_length": req.ContentLength, - } - - val, ok := s.domains.Get(req.Host) - if ok { - conns, ok = val.([]*Connection) - if !ok { - ln.Error(ctx, ErrNoSuchBackend, f, ln.Action("no backend available")) - - return gen502Page(req), nil - } - } - - var goodConns []*Connection - for _, conn := range conns { - conn.Lock() - if conn.usable { - goodConns = append(goodConns, conn) - } - conn.Unlock() - } - - if len(goodConns) == 0 { - ln.Error(ctx, ErrNoSuchBackend, f, ln.Action("no good backends available")) - - return gen502Page(req), nil - } - - c := goodConns[rand.Intn(len(goodConns))] - - resp, err := c.RoundTrip(req) - if err != nil { - ln.Error(ctx, err, c, f, ln.Action("connection roundtrip")) - - defer c.cancel() - return nil, err - } - - ln.Log(ctx, c, ln.Action("http traffic"), f, ln.F{ - "resp_status_code": resp.StatusCode, - "resp_content_length": resp.ContentLength, - }) - - return resp, nil -} - -// Auth is the authentication info the client passes to the server. -type Auth struct { - Token string `json:"token"` - Domain string `json:"domain"` -} diff --git a/tun2/server_test.go b/tun2/server_test.go deleted file mode 100644 index abae615..0000000 --- a/tun2/server_test.go +++ /dev/null @@ -1,171 +0,0 @@ -package tun2 - -import ( - "context" - "fmt" - "io/ioutil" - "net/http" - "net/http/httptest" - "strings" - "testing" - "time" - - "github.com/pborman/uuid" - "within.website/ln/opname" -) - -// testing constants -const ( - user = "shachi" - token = "orcaz r kewl" - noPermToken = "aw heck" - otherUserToken = "even more heck" - domain = "cetacean.club" -) - -func TestNewServerNullConfig(t *testing.T) { - _, err := NewServer(nil) - if err == nil { - t.Fatalf("expected NewServer(nil) to fail, got non-failure") - } -} - -func TestGen502Page(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - req, err := http.NewRequest("GET", "http://cetacean.club", nil) - if err != nil { - t.Fatal(err) - } - - substring := uuid.New() - - req = req.WithContext(ctx) - req.Header.Add("X-Request-Id", substring) - req.Host = "cetacean.club" - - resp := gen502Page(req) - if resp == nil { - t.Fatalf("expected response to be non-nil") - } - - if resp.Body != nil { - defer resp.Body.Close() - data, err := ioutil.ReadAll(resp.Body) - if err != nil { - t.Fatal(err) - } - - if !strings.Contains(string(data), substring) { - fmt.Println(string(data)) - t.Fatalf("502 page did not contain needed substring %q", substring) - } - } -} - -func TestConnectionsCloseOnServerContextClose(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - s, _, l, err := setupTestServer() - if err != nil { - t.Fatal(err) - } - defer s.Close() - defer l.Close() - - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) - defer ts.Close() |
