diff options
| author | Xe <me@christine.website> | 2022-11-27 11:56:37 -0500 |
|---|---|---|
| committer | Xe <me@christine.website> | 2022-11-27 11:56:37 -0500 |
| commit | 0058d81d5e8aab017c5b30aa60eb406602f998f7 (patch) | |
| tree | 99e0dbdd7b20d359f26d9343e59dcbd1952e5169 /cmd/xedn/main.go | |
| parent | 1413160f07bf178009fd7f076503f80896a1c0b3 (diff) | |
| download | x-0058d81d5e8aab017c5b30aa60eb406602f998f7.tar.xz x-0058d81d5e8aab017c5b30aa60eb406602f998f7.zip | |
xedn: don't crash as much
Signed-off-by: Xe <me@christine.website>
Diffstat (limited to 'cmd/xedn/main.go')
| -rw-r--r-- | cmd/xedn/main.go | 263 |
1 files changed, 196 insertions, 67 deletions
diff --git a/cmd/xedn/main.go b/cmd/xedn/main.go index ca0f7ca..17cf99e 100644 --- a/cmd/xedn/main.go +++ b/cmd/xedn/main.go @@ -2,10 +2,11 @@ package main import ( - "bytes" "context" + "crypto/md5" _ "embed" - "encoding/gob" + "encoding/json" + "errors" "expvar" "flag" "fmt" @@ -13,13 +14,14 @@ import ( "log" "net/http" "os" + "path/filepath" "strconv" "sync" "time" - "github.com/golang/groupcache" "github.com/rs/cors" "github.com/sebest/xff" + "go.etcd.io/bbolt" "tailscale.com/tsnet" "tailscale.com/tsweb" "within.website/ln" @@ -30,78 +32,215 @@ import ( ) var ( - b2Backend = flag.String("b2-backend", "https://f001.backblazeb2.com", "Backblaze B2 base URL") + b2Backend = flag.String("b2-backend", "f001.backblazeb2.com", "Backblaze B2 base host") addr = flag.String("addr", ":8080", "server address") + dir = flag.String("dir", os.Getenv("XEDN_STATE"), "where XeDN should store cached data") //go:embed index.html indexHTML []byte ) -const cacheSize = 386 * 1024 * 1024 // 386 mebibytes +type Cache struct { + ActualHost string + Client *http.Client + DB *bbolt.DB +} -type CacheData struct { - Headers http.Header - Body []byte +func Hash(data string) string { + output := md5.Sum([]byte(data)) + return fmt.Sprintf("%x", output) } -var Group = groupcache.NewGroup("b2-bucket", cacheSize, groupcache.GetterFunc( - func(ctx groupcache.Context, key string, dest groupcache.Sink) error { - ln.Log(context.Background(), ln.F{"key": key}) +func (dc *Cache) ListFiles(w http.ResponseWriter, r *http.Request) { + var result []string - fileHits.Add(key, 1) + err := dc.DB.View(func(tx *bbolt.Tx) error { + return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { + result = append(result, string(name)) + return nil + }) + }) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } - resp, err := http.Get(*b2Backend + key) - if err != nil { - return fmt.Errorf("can't fetch from b2: %v", err) + json.NewEncoder(w).Encode(result) +} + +func (dc *Cache) Purge(w http.ResponseWriter, r *http.Request) { + var files []string + + defer r.Body.Close() + if err := json.NewDecoder(r.Body).Decode(&files); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if err := dc.DB.Update(func(tx *bbolt.Tx) error { + for _, fname := range files { + bkt := tx.Bucket([]byte(fname)) + if bkt == nil { + continue + } + + if err := tx.DeleteBucket([]byte(fname)); err != nil { + return err + } } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return web.NewError(http.StatusOK, resp) + return nil + }); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } +} + +func (dc *Cache) Save(dir string, resp *http.Response) error { + return dc.DB.Update(func(tx *bbolt.Tx) error { + bkt, err := tx.CreateBucketIfNotExists([]byte(dir)) + if err != nil { + return err } etag := fmt.Sprintf("%q", resp.Header.Get("x-bz-content-sha1")) resp.Header.Set("ETag", etag) etagLock.Lock() - etags[key] = etag + etags[dir] = etag etagLock.Unlock() + data, err := json.Marshal(resp.Header) + if err != nil { + return err + } + + if err := bkt.Put([]byte("header"), data); err != nil { + return err + } + + data, err = io.ReadAll(resp.Body) + if err != nil { + return err + } + + if err := bkt.Put([]byte("body"), data); err != nil { + return err + } + + diesAt := time.Now().Add(604800 * time.Second).Format(http.TimeFormat) + + if err := bkt.Put([]byte("diesAt"), []byte(diesAt)); err != nil { + return err + } + // cache control headers resp.Header.Set("Cache-Control", "max-age:604800") - resp.Header.Set("Expires", time.Now().Add(604800*time.Second).Format(http.TimeFormat)) + resp.Header.Set("Expires", diesAt) - body, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("can't read from b2: %v", err) + return nil + }) +} + +var ErrNotCached = errors.New("data is not cached") + +func (dc *Cache) Load(dir string, w http.ResponseWriter) error { + return dc.DB.View(func(tx *bbolt.Tx) error { + bkt := tx.Bucket([]byte(dir)) + if bkt == nil { + return ErrNotCached } - result := &CacheData{ - Headers: resp.Header, - Body: body, + diesAtBytes := bkt.Get([]byte("diesAt")) + if diesAtBytes == nil { + return ErrNotCached } - var buf bytes.Buffer - err = gob.NewEncoder(&buf).Encode(result) + t, err := time.Parse(http.TimeFormat, string(diesAtBytes)) if err != nil { return err } - dest.SetBytes(buf.Bytes()) + if t.Before(time.Now()) { + tx.DeleteBucket([]byte(dir)) + return ErrNotCached + } + + h := http.Header{} + + data := bkt.Get([]byte("header")) + if data == nil { + return ErrNotCached + } + if err := json.Unmarshal(data, &h); err != nil { + return err + } + + data = bkt.Get([]byte("body")) + if data == nil { + return ErrNotCached + } + + for k, vs := range h { + for _, v := range vs { + w.Header().Add(k, v) + } + } + + w.Write(data) + cacheHits.Add(1) + fileHits.Add(dir, 1) return nil - }, -)) + }) +} + +func (dc *Cache) GetFile(w http.ResponseWriter, r *http.Request) error { + dir := filepath.Join(r.URL.Path) + + err := dc.Load(dir, w) + if err != nil { + if err == ErrNotCached { + r.URL.Host = dc.ActualHost + r.URL.Scheme = "https" + resp, err := dc.Client.Get(r.URL.String()) + if err != nil { + cacheErrors.Add(1) + return err + } + + if resp.StatusCode != http.StatusOK { + cacheErrors.Add(1) + return web.NewError(http.StatusOK, resp) + } + + err = dc.Save(dir, resp) + if err != nil { + cacheErrors.Add(1) + return err + } + + cacheLoads.Add(1) + } else { + cacheErrors.Add(1) + return err + } + } + + return dc.Load(dir, w) +} + +func (dc *Cache) ServeHTTP(w http.ResponseWriter, r *http.Request) { +} var ( - cacheGets = expvar.NewInt("cache_gets") cacheHits = expvar.NewInt("cache_hits") cacheErrors = expvar.NewInt("cache_errors") cacheLoads = expvar.NewInt("cache_loads") etagMatches = expvar.NewInt("etag_matches") - fileHits = expvar.NewMap("file_hits") referers = expvar.NewMap("referers") + fileHits = expvar.NewMap("filehits") etags map[string]string etagLock sync.RWMutex @@ -111,29 +250,29 @@ func init() { etags = map[string]string{} } -func refreshMetrics() { - t := time.NewTicker(10 * time.Second) - defer t.Stop() - - for range t.C { - cacheGets.Set(Group.Stats.Gets.Get()) - cacheHits.Set(Group.Stats.CacheHits.Get()) - cacheErrors.Set(int64(Group.Stats.LocalLoadErrs)) - cacheLoads.Set(int64(Group.Stats.LocalLoads)) - } -} - func main() { internal.HandleStartup() ctx := opname.With(context.Background(), "startup") - go refreshMetrics() + os.MkdirAll(filepath.Join(*dir, "tsnet"), 0700) + + db, err := bbolt.Open(filepath.Join(*dir, "data"), 0600, &bbolt.Options{}) + if err != nil { + ln.FatalErr(ctx, err) + } + + dc := &Cache{ + ActualHost: *b2Backend, + Client: &http.Client{}, + DB: db, + } go func() { srv := &tsnet.Server{ Hostname: "xedn-" + os.Getenv("FLY_REGION"), Logf: log.New(io.Discard, "", 0).Printf, AuthKey: os.Getenv("TS_AUTHKEY"), + Dir: filepath.Join(*dir, "tsnet"), } lis, err := srv.Listen("tcp", ":80") @@ -142,6 +281,8 @@ func main() { } http.DefaultServeMux.HandleFunc("/debug/varz", tsweb.VarzHandler) + http.DefaultServeMux.HandleFunc("/xedn/files", dc.ListFiles) + http.DefaultServeMux.HandleFunc("/xedn/purge", dc.Purge) defer srv.Close() defer lis.Close() @@ -153,8 +294,11 @@ func main() { ln.FatalErr(ctx, err) } + os.MkdirAll(*dir, 0700) + mux := http.NewServeMux() mux.HandleFunc("/.within/metrics", tsweb.VarzHandler) + mux.Handle("/.within/metrics/json", expvar.Handler()) mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { if r.URL.Path != "/" { @@ -167,7 +311,7 @@ func main() { w.Write(indexHTML) }) - mux.HandleFunc("/file/christine-static/", func(w http.ResponseWriter, r *http.Request) { + hdlr := func(w http.ResponseWriter, r *http.Request) { etagLock.RLock() etag, ok := etags[r.URL.Path] etagLock.RUnlock() @@ -180,29 +324,14 @@ func main() { referers.Add(r.Header.Get("Referer"), 1) - var b []byte - err := Group.Get(nil, r.URL.Path, groupcache.AllocatingByteSliceSink(&b)) - if err != nil { - http.Error(w, err.Error(), http.StatusNotFound) - return - } - - var result CacheData - err = gob.NewDecoder(bytes.NewBuffer(b)).Decode(&result) - if err != nil { - ln.Error(r.Context(), err) - http.Error(w, "internal cache error", http.StatusInternalServerError) + if err := dc.GetFile(w, r); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) return } + } - for k, vs := range result.Headers { - for _, v := range vs { - w.Header().Add(k, v) - } - } - w.WriteHeader(http.StatusOK) - w.Write(result.Body) - }) + mux.HandleFunc("/file/christine-static/", hdlr) + mux.HandleFunc("/file/xeserv-akko/", hdlr) ln.Log(context.Background(), ln.F{"addr": *addr}) http.ListenAndServe(*addr, cors.Default().Handler(xffMW.Handler(ex.HTTPLog(mux)))) |
