aboutsummaryrefslogtreecommitdiff
path: root/tun2/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'tun2/server.go')
-rw-r--r--tun2/server.go480
1 files changed, 0 insertions, 480 deletions
diff --git a/tun2/server.go b/tun2/server.go
deleted file mode 100644
index cb2e4e4..0000000
--- a/tun2/server.go
+++ /dev/null
@@ -1,480 +0,0 @@
-package tun2
-
-import (
- "bytes"
- "context"
- "encoding/json"
- "errors"
- "expvar"
- "fmt"
- "io"
- "io/ioutil"
- "math/rand"
- "net"
- "net/http"
- "os"
- "sync"
- "time"
-
- failure "github.com/dgryski/go-failure"
- "github.com/pborman/uuid"
- cmap "github.com/streamrail/concurrent-map"
- "github.com/xtaci/smux"
- "within.website/ln"
- "within.website/ln/opname"
-)
-
-// Error values
-var (
- ErrNoSuchBackend = errors.New("tun2: there is no such backend")
- ErrAuthMismatch = errors.New("tun2: authenication doesn't match database records")
- ErrCantRemoveWhatDoesntExist = errors.New("tun2: this connection does not exist, cannot remove it")
-)
-
-// gen502Page creates the page that is shown when a backend is not connected to a given route.
-func gen502Page(req *http.Request) *http.Response {
- template := `<html><head><title>no backends connected</title></head><body><h1>no backends connected</h1><p>Please ensure a backend is running for ${HOST}. This is request ID ${REQ_ID}.</p></body></html>`
-
- resbody := []byte(os.Expand(template, func(in string) string {
- switch in {
- case "HOST":
- return req.Host
- case "REQ_ID":
- return req.Header.Get("X-Request-Id")
- }
-
- return "<unknown>"
- }))
- reshdr := req.Header
- reshdr.Set("Content-Type", "text/html; charset=utf-8")
-
- resp := &http.Response{
- Status: fmt.Sprintf("%d Bad Gateway", http.StatusBadGateway),
- StatusCode: http.StatusBadGateway,
- Body: ioutil.NopCloser(bytes.NewBuffer(resbody)),
-
- Proto: req.Proto,
- ProtoMajor: req.ProtoMajor,
- ProtoMinor: req.ProtoMinor,
- Header: reshdr,
- ContentLength: int64(len(resbody)),
- Close: true,
- Request: req,
- }
-
- return resp
-}
-
-// ServerConfig ...
-type ServerConfig struct {
- SmuxConf *smux.Config
- Storage Storage
-}
-
-// Storage is the minimal subset of features that tun2's Server needs out of a
-// persistence layer.
-type Storage interface {
- HasToken(ctx context.Context, token string) (user string, scopes []string, err error)
- HasRoute(ctx context.Context, domain string) (user string, err error)
-}
-
-// Server routes frontend HTTP traffic to backend TCP traffic.
-type Server struct {
- cfg *ServerConfig
- ctx context.Context
- cancel context.CancelFunc
-
- connlock sync.Mutex
- conns map[net.Conn]*Connection
-
- domains cmap.ConcurrentMap
-}
-
-// NewServer creates a new Server instance with a given config, acquiring all
-// relevant resources.
-func NewServer(cfg *ServerConfig) (*Server, error) {
- if cfg == nil {
- return nil, errors.New("tun2: config must be specified")
- }
-
- if cfg.SmuxConf == nil {
- cfg.SmuxConf = smux.DefaultConfig()
-
- cfg.SmuxConf.KeepAliveInterval = time.Second
- cfg.SmuxConf.KeepAliveTimeout = 15 * time.Second
- }
-
- ctx, cancel := context.WithCancel(context.Background())
- ctx = opname.With(ctx, "tun2.Server")
-
- server := &Server{
- cfg: cfg,
-
- conns: map[net.Conn]*Connection{},
- domains: cmap.New(),
- ctx: ctx,
- cancel: cancel,
- }
-
- go server.phiDetectionLoop(ctx)
-
- return server, nil
-}
-
-// Close stops the background tasks for this Server.
-func (s *Server) Close() {
- s.cancel()
-}
-
-// Wait blocks until the server context is cancelled.
-func (s *Server) Wait() {
- for {
- select {
- case <-s.ctx.Done():
- return
- }
- }
-}
-
-// Listen passes this Server a given net.Listener to accept backend connections.
-func (s *Server) Listen(l net.Listener) {
- ctx := opname.With(s.ctx, "Listen")
-
- f := ln.F{
- "listener_addr": l.Addr(),
- "listener_network": l.Addr().Network(),
- }
-
- for {
- select {
- case <-ctx.Done():
- return
- default:
- }
-
- conn, err := l.Accept()
- if err != nil {
- ln.Error(ctx, err, f, ln.Action("accept connection"))
- continue
- }
-
- ln.Log(ctx, f, ln.Action("new backend client connected"), ln.F{
- "conn_addr": conn.RemoteAddr(),
- "conn_network": conn.RemoteAddr().Network(),
- })
-
- go s.HandleConn(ctx, conn)
- }
-}
-
-// phiDetectionLoop is an infinite loop that will run the [phi accrual failure detector]
-// for each of the backends connected to the Server. This is fairly experimental and
-// may be removed.
-//
-// [phi accrual failure detector]: https://dspace.jaist.ac.jp/dspace/handle/10119/4784
-func (s *Server) phiDetectionLoop(ctx context.Context) {
- ctx = opname.With(ctx, "phiDetectionLoop")
- t := time.NewTicker(5 * time.Second)
- defer t.Stop()
-
- for {
- select {
- case <-ctx.Done():
- return
- case <-t.C:
- now := time.Now()
-
- s.connlock.Lock()
- for _, c := range s.conns {
- failureChance := c.detector.Phi(now)
- const thresh = 0.9 // the threshold for phi failure detection causing logs
-
- if failureChance > thresh {
- ln.Log(ctx, c, ln.Info("phi failure detection"), ln.F{
- "value": failureChance,
- "threshold": thresh,
- })
- }
- }
- s.connlock.Unlock()
- }
- }
-}
-
-// backendAuthv1 runs a simple backend authentication check. It expects the
-// client to write a json-encoded instance of Auth. This is then checked
-// for token validity and domain matching.
-//
-// This returns the user that was authenticated and the domain they identified
-// with.
-func (s *Server) backendAuthv1(ctx context.Context, st io.Reader) (string, *Auth, error) {
- ctx = opname.With(ctx, "backendAuthv1")
- f := ln.F{
- "backend_auth_version": 1,
- }
-
- f["stage"] = "json decoding"
-
- d := json.NewDecoder(st)
- var auth Auth
- err := d.Decode(&auth)
- if err != nil {
- ln.Error(ctx, err, f)
- return "", nil, err
- }
-
- f["auth_domain"] = auth.Domain
- f["stage"] = "checking domain"
-
- routeUser, err := s.cfg.Storage.HasRoute(ctx, auth.Domain)
- if err != nil {
- ln.Error(ctx, err, f)
- return "", nil, err
- }
-
- f["route_user"] = routeUser
- f["stage"] = "checking token"
-
- tokenUser, scopes, err := s.cfg.Storage.HasToken(ctx, auth.Token)
- if err != nil {
- ln.Error(ctx, err, f)
- return "", nil, err
- }
-
- f["token_user"] = tokenUser
- f["stage"] = "checking token scopes"
-
- ok := false
- for _, sc := range scopes {
- if sc == "connect" {
- ok = true
- break
- }
- }
-
- if !ok {
- ln.Error(ctx, ErrAuthMismatch, f)
- return "", nil, ErrAuthMismatch
- }
-
- f["stage"] = "user verification"
-
- if routeUser != tokenUser {
- ln.Error(ctx, ErrAuthMismatch, f)
- return "", nil, ErrAuthMismatch
- }
-
- return routeUser, &auth, nil
-}
-
-// HandleConn starts up the needed mechanisms to relay HTTP traffic to/from
-// the currently connected backend.
-func (s *Server) HandleConn(ctx context.Context, c net.Conn) {
- ctx = opname.With(ctx, "HandleConn")
- var cancel context.CancelFunc
- ctx, cancel = context.WithCancel(ctx)
- defer cancel()
-
- f := ln.F{
- "local": c.LocalAddr().String(),
- "remote": c.RemoteAddr().String(),
- }
-
- session, err := smux.Server(c, s.cfg.SmuxConf)
- if err != nil {
- ln.Error(ctx, err, f, ln.Action("establish server side of smux"))
-
- return
- }
- defer session.Close()
-
- controlStream, err := session.OpenStream()
- if err != nil {
- ln.Error(ctx, err, f, ln.Action("opening control stream"))
-
- return
- }
- defer controlStream.Close()
-
- user, auth, err := s.backendAuthv1(ctx, controlStream)
- if err != nil {
- return
- }
-
- connection := &Connection{
- id: uuid.New(),
- conn: c,
- session: session,
- user: user,
- domain: auth.Domain,
- cf: cancel,
- detector: failure.New(15, 1),
- Auth: auth,
- }
- connection.counter = expvar.NewInt("http.backend." + connection.id + ".hits")
-
- defer func() {
- if r := recover(); r != nil {
- ln.Log(ctx, connection, ln.F{"action": "connection handler panic", "err": r})
- }
- }()
-
- ln.Log(ctx, connection, ln.Action("backend successfully connected"))
- s.addConn(ctx, connection)
-
- connection.Lock()
- connection.usable = true // XXX set this to true once health checks pass?
- connection.Unlock()
-
- ticker := time.NewTicker(5 * time.Second)
- defer ticker.Stop()
-
- for {
- select {
- case <-ticker.C:
- err := connection.Ping()
- if err != nil {
- connection.cancel()
- }
- case <-s.ctx.Done():
- ln.Log(ctx, connection, ln.Action("server context finished"))
- s.removeConn(ctx, connection)
- connection.Close()
-
- return
- case <-ctx.Done():
- ln.Log(ctx, connection, ln.Action("client context finished"))
- s.removeConn(ctx, connection)
- connection.Close()
-
- return
- }
- }
-}
-
-// addConn adds a connection to the pool of backend connections.
-func (s *Server) addConn(ctx context.Context, connection *Connection) {
- s.connlock.Lock()
- s.conns[connection.conn] = connection
- s.connlock.Unlock()
-
- var conns []*Connection
-
- connection.Lock()
- val, ok := s.domains.Get(connection.domain)
- connection.Unlock()
- if ok {
- conns, ok = val.([]*Connection)
- if !ok {
- conns = nil
-
- connection.Lock()
- s.domains.Remove(connection.domain)
- connection.Unlock()
- }
- }
-
- conns = append(conns, connection)
-
- connection.Lock()
- s.domains.Set(connection.domain, conns)
- connection.Unlock()
-}
-
-// removeConn removes a connection from pool of backend connections.
-func (s *Server) removeConn(ctx context.Context, connection *Connection) {
- ctx = opname.With(ctx, "removeConn")
- s.connlock.Lock()
- delete(s.conns, connection.conn)
- s.connlock.Unlock()
-
- auth := connection.Auth
-
- var conns []*Connection
-
- val, ok := s.domains.Get(auth.Domain)
- if ok {
- conns, ok = val.([]*Connection)
- if !ok {
- ln.Error(ctx, ErrCantRemoveWhatDoesntExist, connection, ln.Info("looking up for disconnect removal"))
-
- return
- }
- }
-
- for i, cntn := range conns {
- if cntn.id == connection.id {
- conns[i] = conns[len(conns)-1]
- conns = conns[:len(conns)-1]
- }
- }
-
- if len(conns) != 0 {
- s.domains.Set(auth.Domain, conns)
- } else {
- s.domains.Remove(auth.Domain)
- }
-}
-
-// RoundTrip sends a HTTP request to a backend and then returns its response.
-func (s *Server) RoundTrip(req *http.Request) (*http.Response, error) {
- var conns []*Connection
- ctx := req.Context()
- ctx = opname.With(ctx, "tun2.Server.RoundTrip")
-
- f := ln.F{
- "req_remote": req.RemoteAddr,
- "req_host": req.Host,
- "req_uri": req.RequestURI,
- "req_method": req.Method,
- "req_content_length": req.ContentLength,
- }
-
- val, ok := s.domains.Get(req.Host)
- if ok {
- conns, ok = val.([]*Connection)
- if !ok {
- ln.Error(ctx, ErrNoSuchBackend, f, ln.Action("no backend available"))
-
- return gen502Page(req), nil
- }
- }
-
- var goodConns []*Connection
- for _, conn := range conns {
- conn.Lock()
- if conn.usable {
- goodConns = append(goodConns, conn)
- }
- conn.Unlock()
- }
-
- if len(goodConns) == 0 {
- ln.Error(ctx, ErrNoSuchBackend, f, ln.Action("no good backends available"))
-
- return gen502Page(req), nil
- }
-
- c := goodConns[rand.Intn(len(goodConns))]
-
- resp, err := c.RoundTrip(req)
- if err != nil {
- ln.Error(ctx, err, c, f, ln.Action("connection roundtrip"))
-
- defer c.cancel()
- return nil, err
- }
-
- ln.Log(ctx, c, ln.Action("http traffic"), f, ln.F{
- "resp_status_code": resp.StatusCode,
- "resp_content_length": resp.ContentLength,
- })
-
- return resp, nil
-}
-
-// Auth is the authentication info the client passes to the server.
-type Auth struct {
- Token string `json:"token"`
- Domain string `json:"domain"`
-}