diff options
| author | Xe Iaso <me@xeiaso.net> | 2023-10-27 15:50:19 -0400 |
|---|---|---|
| committer | Xe Iaso <me@xeiaso.net> | 2023-10-27 15:50:40 -0400 |
| commit | 4f77dc44186ae5826c3cf4f0e83ba8eae21af1be (patch) | |
| tree | a3fda31972144dca7c8d4e8770ec0d4ab70291ad /cmd | |
| parent | 97d1efae1e5acb3d39a6803eb3138812fe2ad91b (diff) | |
| download | x-4f77dc44186ae5826c3cf4f0e83ba8eae21af1be.tar.xz x-4f77dc44186ae5826c3cf4f0e83ba8eae21af1be.zip | |
cmd/xedn: start preparing for xesite v4's advent
Signed-off-by: Xe Iaso <me@xeiaso.net>
Diffstat (limited to 'cmd')
| -rw-r--r-- | cmd/xedn/cache.go | 311 | ||||
| -rw-r--r-- | cmd/xedn/internal/xesite/xesite.go | 70 | ||||
| -rw-r--r-- | cmd/xedn/main.go | 335 |
3 files changed, 402 insertions, 314 deletions
diff --git a/cmd/xedn/cache.go b/cmd/xedn/cache.go new file mode 100644 index 0000000..10a7d6e --- /dev/null +++ b/cmd/xedn/cache.go @@ -0,0 +1,311 @@ +package main + +import ( + "bytes" + "crypto/md5" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "path/filepath" + "time" + + "go.etcd.io/bbolt" + "golang.org/x/sync/singleflight" + "within.website/x/web" +) + +type Cache struct { + ActualHost string + Client *http.Client + DB *bbolt.DB + cacheGroup *singleflight.Group +} + +func Hash(data string) string { + output := md5.Sum([]byte(data)) + return fmt.Sprintf("%x", output) +} + +func (dc *Cache) ListFiles(w http.ResponseWriter, r *http.Request) { + var result []string + + err := dc.DB.View(func(tx *bbolt.Tx) error { + return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { + result = append(result, string(name)) + return nil + }) + }) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + json.NewEncoder(w).Encode(result) +} + +func (dc *Cache) Purge(w http.ResponseWriter, r *http.Request) { + var files []string + + defer r.Body.Close() + if err := json.NewDecoder(r.Body).Decode(&files); err != nil { + slog.Error("can't read files to be purged", "err", err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + slog.Info("purging files", "files", files) + + if err := dc.DB.Update(func(tx *bbolt.Tx) error { + for _, fname := range files { + bkt := tx.Bucket([]byte(fname)) + if bkt == nil { + continue + } + + if err := tx.DeleteBucket([]byte(fname)); err != nil { + return err + } + } + + return nil + }); err != nil { + slog.Error("can't purge files", "err", err, "files", files) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } +} + +func (dc *Cache) Save(dir string, resp *http.Response) error { + return dc.DB.Update(func(tx *bbolt.Tx) error { + bkt, err := tx.CreateBucketIfNotExists([]byte(dir)) + if err != nil { + return err + } + + etag := fmt.Sprintf("%q", resp.Header.Get("x-bz-content-sha1")) + resp.Header.Set("ETag", etag) + etagLock.Lock() + etags[dir] = etag + etagLock.Unlock() + + data, err := json.Marshal(resp.Header) + if err != nil { + return err + } + + if err := bkt.Put([]byte("header"), data); err != nil { + return err + } + + data, err = io.ReadAll(resp.Body) + if err != nil { + return err + } + + if err := bkt.Put([]byte("body"), data); err != nil { + return err + } + + diesAt := time.Now().AddDate(0, 0, 7).Format(http.TimeFormat) + + if err := bkt.Put([]byte("diesAt"), []byte(diesAt)); err != nil { + return err + } + + // cache control headers + resp.Header.Set("Cache-Control", "max-age:604800") // one week + resp.Header.Set("Expires", diesAt) + + return nil + }) +} + +var ErrNotCached = errors.New("data is not cached") + +func (dc *Cache) Load(dir string, w io.Writer) error { + return dc.DB.Update(func(tx *bbolt.Tx) error { + bkt := tx.Bucket([]byte(dir)) + if bkt == nil { + return ErrNotCached + } + + diesAtBytes := bkt.Get([]byte("diesAt")) + if diesAtBytes == nil { + return ErrNotCached + } + + t, err := time.Parse(http.TimeFormat, string(diesAtBytes)) + if err != nil { + return err + } + + now := time.Now() + + if t.Before(now) { + tx.DeleteBucket([]byte(dir)) + fileDeaths.Get(dir).Add(1) + return ErrNotCached + } + + if err := bkt.Put([]byte("diesAt"), []byte(now.AddDate(0, 0, 7).Format(http.TimeFormat))); err != nil { + return err + } + + h := http.Header{} + + data := bkt.Get([]byte("header")) + if data == nil { + return ErrNotCached + } + if err := json.Unmarshal(data, &h); err != nil { + return err + } + + if h.Get("Content-Type") == "" && filepath.Ext(dir) == ".svg" { + h.Set("Content-Type", "image/svg+xml") + } + + data = bkt.Get([]byte("body")) + if data == nil { + return ErrNotCached + } + + if rw, ok := w.(http.ResponseWriter); ok { + for k, vs := range h { + for _, v := range vs { + rw.Header().Add(k, v) + } + } + } + + w.Write(data) + cacheHits.Add(1) + fileHits.Add(dir, 1) + + return nil + }) +} + +func (dc *Cache) LoadBytesOrFetch(path string) ([]byte, error) { + buf := bytes.NewBuffer(nil) + err := dc.Load(path, buf) + if err != nil { + if err == ErrNotCached { + _, err, _ := dc.cacheGroup.Do(path, func() (interface{}, error) { + resp, err := dc.Client.Get(fmt.Sprintf("https://%s%s", dc.ActualHost, path)) + if err != nil { + cacheErrors.Add(1) + return nil, err + } + + if resp.StatusCode != http.StatusOK { + cacheErrors.Add(1) + return nil, web.NewError(http.StatusOK, resp) + } + + err = dc.Save(path, resp) + if err != nil { + cacheErrors.Add(1) + return nil, err + } + + return nil, nil + }) + if err != nil { + return nil, err + } + + return dc.LoadBytesOrFetch(path) + } + return nil, err + } + return buf.Bytes(), nil +} + +func (dc *Cache) GetFile(w http.ResponseWriter, r *http.Request) error { + dir := filepath.Join(r.URL.Path) + + err := dc.Load(dir, w) + if err != nil { + if err == ErrNotCached { + _, err, _ := dc.cacheGroup.Do(r.URL.Path, func() (interface{}, error) { + r.URL.Host = dc.ActualHost + r.URL.Scheme = "https" + resp, err := dc.Client.Get(r.URL.String()) + if err != nil { + cacheErrors.Add(1) + return nil, err + } + + if resp.StatusCode != http.StatusOK { + cacheErrors.Add(1) + return nil, web.NewError(http.StatusOK, resp) + } + + err = dc.Save(dir, resp) + if err != nil { + cacheErrors.Add(1) + return nil, err + } + cacheLoads.Add(1) + return nil, nil + }) + if err != nil { + return err + } + } else { + cacheErrors.Add(1) + return err + } + } + + return dc.Load(dir, w) +} + +func (dc *Cache) CronPurgeDead() { + lg := slog.Default().With("job", "purgeDead") + + for range time.Tick(30 * time.Minute) { + lg.Info("starting") + + if err := dc.DB.Update(func(tx *bbolt.Tx) error { + if err := tx.ForEach(func(name []byte, b *bbolt.Bucket) error { + if string(name) == "sticker_cache" { + return nil + } + + lg := lg.With("path", string(name)) + diesAtBytes := b.Get([]byte("diesAt")) + if diesAtBytes == nil { + lg.Error("no diesAt key") + return nil + } + + diesAt, err := time.Parse(http.TimeFormat, string(diesAtBytes)) + if err != nil { + return fmt.Errorf("when parsing diesAt for %s (%q): %w", string(name), string(diesAtBytes), err) + } + + if diesAt.Before(time.Now()) { + if err := tx.DeleteBucket(name); err != nil { + return fmt.Errorf("when trying to delete bucket %s: %w", string(name), err) + } + + fileDeaths.Add(string(name), 1) + lg.Info("deleted", "diesAt", diesAt) + } + + return nil + }); err != nil { + return err + } + + return nil + }); err != nil { + lg.Info("can't update database: %v", "err", err) + } + } +} diff --git a/cmd/xedn/internal/xesite/xesite.go b/cmd/xedn/internal/xesite/xesite.go new file mode 100644 index 0000000..4c25e4a --- /dev/null +++ b/cmd/xedn/internal/xesite/xesite.go @@ -0,0 +1,70 @@ +package xesite + +import ( + "archive/zip" + "compress/gzip" + "io" + "log/slog" + "net/http" + "sync" +) + +const ( + compressionGZIP = 0x69 +) + +func init() { + zip.RegisterCompressor(compressionGZIP, func(w io.Writer) (io.WriteCloser, error) { + return gzip.NewWriterLevel(w, gzip.BestCompression) + }) + zip.RegisterDecompressor(compressionGZIP, func(r io.Reader) io.ReadCloser { + rdr, err := gzip.NewReader(r) + if err != nil { + slog.Error("can't read from gzip stream", "err", err) + panic(err) + } + return rdr + }) +} + +type ZipServer struct { + lock sync.RWMutex + zip *zip.ReadCloser +} + +func NewZipServer(zipPath string) (*ZipServer, error) { + file, err := zip.OpenReader(zipPath) + if err != nil { + return nil, err + } + + result := &ZipServer{ + zip: file, + } + + return result, nil +} + +func (zs *ZipServer) Update(fname string) error { + zs.lock.Lock() + defer zs.lock.Unlock() + + old := zs.zip + + file, err := zip.OpenReader(fname) + if err != nil { + return err + } + + zs.zip = file + + old.Close() + return nil +} + +func (zs *ZipServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + zs.lock.RLock() + defer zs.lock.RUnlock() + + http.FileServer(http.FS(zs.zip)).ServeHTTP(w, r) +} diff --git a/cmd/xedn/main.go b/cmd/xedn/main.go index 48da62f..fa773a1 100644 --- a/cmd/xedn/main.go +++ b/cmd/xedn/main.go @@ -2,14 +2,9 @@ package main import ( - "bytes" - "crypto/md5" _ "embed" - "encoding/json" - "errors" "expvar" "flag" - "fmt" "image/png" "io" "log" @@ -19,8 +14,8 @@ import ( "path/filepath" "strconv" "sync" - "time" + "github.com/gorilla/mux" "github.com/rs/cors" "github.com/sebest/xff" "go.etcd.io/bbolt" @@ -30,7 +25,6 @@ import ( "tailscale.com/tsnet" "tailscale.com/tsweb" "within.website/x/internal" - "within.website/x/web" "within.website/x/web/stablediffusion" ) @@ -54,299 +48,6 @@ func envOr(name, def string) string { return def } -type Cache struct { - ActualHost string - Client *http.Client - DB *bbolt.DB - cacheGroup *singleflight.Group -} - -func Hash(data string) string { - output := md5.Sum([]byte(data)) - return fmt.Sprintf("%x", output) -} - -func (dc *Cache) ListFiles(w http.ResponseWriter, r *http.Request) { - var result []string - - err := dc.DB.View(func(tx *bbolt.Tx) error { - return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { - result = append(result, string(name)) - return nil - }) - }) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - json.NewEncoder(w).Encode(result) -} - -func (dc *Cache) Purge(w http.ResponseWriter, r *http.Request) { - var files []string - - defer r.Body.Close() - if err := json.NewDecoder(r.Body).Decode(&files); err != nil { - slog.Error("can't read files to be purged", "err", err) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - slog.Info("purging files", "files", files) - - if err := dc.DB.Update(func(tx *bbolt.Tx) error { - for _, fname := range files { - bkt := tx.Bucket([]byte(fname)) - if bkt == nil { - continue - } - - if err := tx.DeleteBucket([]byte(fname)); err != nil { - return err - } - } - - return nil - }); err != nil { - slog.Error("can't purge files", "err", err, "files", files) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } -} - -func (dc *Cache) Save(dir string, resp *http.Response) error { - return dc.DB.Update(func(tx *bbolt.Tx) error { - bkt, err := tx.CreateBucketIfNotExists([]byte(dir)) - if err != nil { - return err - } - - etag := fmt.Sprintf("%q", resp.Header.Get("x-bz-content-sha1")) - resp.Header.Set("ETag", etag) - etagLock.Lock() - etags[dir] = etag - etagLock.Unlock() - - data, err := json.Marshal(resp.Header) - if err != nil { - return err - } - - if err := bkt.Put([]byte("header"), data); err != nil { - return err - } - - data, err = io.ReadAll(resp.Body) - if err != nil { - return err - } - - if err := bkt.Put([]byte("body"), data); err != nil { - return err - } - - diesAt := time.Now().AddDate(0, 0, 7).Format(http.TimeFormat) - - if err := bkt.Put([]byte("diesAt"), []byte(diesAt)); err != nil { - return err - } - - // cache control headers - resp.Header.Set("Cache-Control", "max-age:604800") // one week - resp.Header.Set("Expires", diesAt) - - return nil - }) -} - -var ErrNotCached = errors.New("data is not cached") - -func (dc *Cache) Load(dir string, w io.Writer) error { - return dc.DB.Update(func(tx *bbolt.Tx) error { - bkt := tx.Bucket([]byte(dir)) - if bkt == nil { - return ErrNotCached - } - - diesAtBytes := bkt.Get([]byte("diesAt")) - if diesAtBytes == nil { - return ErrNotCached - } - - t, err := time.Parse(http.TimeFormat, string(diesAtBytes)) - if err != nil { - return err - } - - now := time.Now() - - if t.Before(now) { - tx.DeleteBucket([]byte(dir)) - fileDeaths.Get(dir).Add(1) - return ErrNotCached - } - - if err := bkt.Put([]byte("diesAt"), []byte(now.AddDate(0, 0, 7).Format(http.TimeFormat))); err != nil { - return err - } - - h := http.Header{} - - data := bkt.Get([]byte("header")) - if data == nil { - return ErrNotCached - } - if err := json.Unmarshal(data, &h); err != nil { - return err - } - - if h.Get("Content-Type") == "" && filepath.Ext(dir) == ".svg" { - h.Set("Content-Type", "image/svg+xml") - } - - data = bkt.Get([]byte("body")) - if data == nil { - return ErrNotCached - } - - if rw, ok := w.(http.ResponseWriter); ok { - for k, vs := range h { - for _, v := range vs { - rw.Header().Add(k, v) - } - } - } - - w.Write(data) - cacheHits.Add(1) - fileHits.Add(dir, 1) - - return nil - }) -} - -func (dc *Cache) LoadBytesOrFetch(path string) ([]byte, error) { - buf := bytes.NewBuffer(nil) - err := dc.Load(path, buf) - if err != nil { - if err == ErrNotCached { - _, err, _ := dc.cacheGroup.Do(path, func() (interface{}, error) { - resp, err := dc.Client.Get(fmt.Sprintf("https://%s%s", dc.ActualHost, path)) - if err != nil { - cacheErrors.Add(1) - return nil, err - } - - if resp.StatusCode != http.StatusOK { - cacheErrors.Add(1) - return nil, web.NewError(http.StatusOK, resp) - } - - err = dc.Save(path, resp) - if err != nil { - cacheErrors.Add(1) - return nil, err - } - - return nil, nil - }) - if err != nil { - return nil, err - } - - return dc.LoadBytesOrFetch(path) - } - return nil, err - } - return buf.Bytes(), nil -} - -func (dc *Cache) GetFile(w http.ResponseWriter, r *http.Request) error { - dir := filepath.Join(r.URL.Path) - - err := dc.Load(dir, w) - if err != nil { - if err == ErrNotCached { - _, err, _ := dc.cacheGroup.Do(r.URL.Path, func() (interface{}, error) { - r.URL.Host = dc.ActualHost - r.URL.Scheme = "https" - resp, err := dc.Client.Get(r.URL.String()) - if err != nil { - cacheErrors.Add(1) - return nil, err - } - - if resp.StatusCode != http.StatusOK { - cacheErrors.Add(1) - return nil, web.NewError(http.StatusOK, resp) - } - - err = dc.Save(dir, resp) - if err != nil { - cacheErrors.Add(1) - return nil, err - } - cacheLoads.Add(1) - return nil, nil - }) - if err != nil { - return err - } - } else { - cacheErrors.Add(1) - return err - } - } - - return dc.Load(dir, w) -} - -func (dc *Cache) CronPurgeDead() { - lg := slog.Default().With("job", "purgeDead") - - for range time.Tick(30 * time.Minute) { - lg.Info("starting") - - if err := dc.DB.Update(func(tx *bbolt.Tx) error { - if err := tx.ForEach(func(name []byte, b *bbolt.Bucket) error { - if string(name) == "sticker_cache" { - return nil - } - - lg := lg.With("path", string(name)) - diesAtBytes := b.Get([]byte("diesAt")) - if diesAtBytes == nil { - lg.Error("no diesAt key") - return nil - } - - diesAt, err := time.Parse(http.TimeFormat, string(diesAtBytes)) - if err != nil { - return fmt.Errorf("when parsing diesAt for %s (%q): %w", string(name), string(diesAtBytes), err) - } - - if diesAt.Before(time.Now()) { - if err := tx.DeleteBucket(name); err != nil { - return fmt.Errorf("when trying to delete bucket %s: %w", string(name), err) - } - - fileDeaths.Add(string(name), 1) - lg.Info("deleted", "diesAt", diesAt) - } - - return nil - }); err != nil { - return err - } - - return nil - }); err != nil { - lg.Info("can't update database: %v", "err", err) - } - } -} - var ( cacheHits = expvar.NewInt("counter_xedn_cache_hits") cacheErrors = expvar.NewInt("counter_xedn_cache_errors") @@ -365,6 +66,13 @@ var ( func init() { etags = map[string]string{} + + expvar.Publish("gauge_xedn_referers", &referers) + expvar.Publish("gauge_xedn_file_hits", &fileHits) + expvar.Publish("gauge_xedn_file_deaths", &fileDeaths) + expvar.Publish("gauge_xedn_file_mime_type", &fileMimeTypes) + expvar.Publish("gauge_xedn_ois_file_conversions", &OISFileConversions) + expvar.Publish("gauge_xedn_ois_file_hits", &OISFileHits) } func main() { @@ -438,9 +146,9 @@ func main() { go http.ListenAndServe(*metricsAddr, http.HandlerFunc(tsweb.VarzHandler)) - mux := http.NewServeMux() + cdn := http.NewServeMux() - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + cdn.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { if r.URL.Path != "/" { http.NotFound(w, r) return @@ -451,9 +159,9 @@ func main() { w.Write(indexHTML) }) - mux.Handle("/sticker/", ois) - mux.Handle("/avatar/", sd) - mux.Handle("/static/", http.FileServer(http.Dir(*staticDir))) + cdn.Handle("/sticker/", ois) + cdn.Handle("/avatar/", sd) + cdn.Handle("/static/", http.FileServer(http.Dir(*staticDir))) hdlr := func(w http.ResponseWriter, r *http.Request) { etagLock.RLock() @@ -475,16 +183,15 @@ func main() { } } - expvar.Publish("gauge_xedn_referers", &referers) - expvar.Publish("gauge_xedn_file_hits", &fileHits) - expvar.Publish("gauge_xedn_file_deaths", &fileDeaths) - expvar.Publish("gauge_xedn_file_mime_type", &fileMimeTypes) - expvar.Publish("gauge_xedn_ois_file_conversions", &OISFileConversions) - expvar.Publish("gauge_xedn_ois_file_hits", &OISFileHits) + cdn.HandleFunc("/file/christine-static/", hdlr) + cdn.HandleFunc("/file/xeserv-akko/", hdlr) + + topLevel := mux.NewRouter() - mux.HandleFunc("/file/christine-static/", hdlr) - mux.HandleFunc("/file/xeserv-akko/", hdlr) + topLevel.Host("cdn.xeiaso.net").Handler(cdn) + topLevel.Host("xedn.fly.dev").Handler(cdn) + topLevel.Host("pneuma.shark-harmonic.ts.net").Handler(cdn) slog.Info("starting up", "addr", *addr) - http.ListenAndServe(*addr, cors.Default().Handler(xffMW.Handler(mux))) + http.ListenAndServe(*addr, cors.Default().Handler(xffMW.Handler(cdn))) } |
