aboutsummaryrefslogtreecommitdiff
path: root/cmd/xedn/main.go
diff options
context:
space:
mode:
authorXe <me@christine.website>2022-11-27 11:56:37 -0500
committerXe <me@christine.website>2022-11-27 11:56:37 -0500
commit0058d81d5e8aab017c5b30aa60eb406602f998f7 (patch)
tree99e0dbdd7b20d359f26d9343e59dcbd1952e5169 /cmd/xedn/main.go
parent1413160f07bf178009fd7f076503f80896a1c0b3 (diff)
downloadx-0058d81d5e8aab017c5b30aa60eb406602f998f7.tar.xz
x-0058d81d5e8aab017c5b30aa60eb406602f998f7.zip
xedn: don't crash as much
Signed-off-by: Xe <me@christine.website>
Diffstat (limited to 'cmd/xedn/main.go')
-rw-r--r--cmd/xedn/main.go263
1 files changed, 196 insertions, 67 deletions
diff --git a/cmd/xedn/main.go b/cmd/xedn/main.go
index ca0f7ca..17cf99e 100644
--- a/cmd/xedn/main.go
+++ b/cmd/xedn/main.go
@@ -2,10 +2,11 @@
package main
import (
- "bytes"
"context"
+ "crypto/md5"
_ "embed"
- "encoding/gob"
+ "encoding/json"
+ "errors"
"expvar"
"flag"
"fmt"
@@ -13,13 +14,14 @@ import (
"log"
"net/http"
"os"
+ "path/filepath"
"strconv"
"sync"
"time"
- "github.com/golang/groupcache"
"github.com/rs/cors"
"github.com/sebest/xff"
+ "go.etcd.io/bbolt"
"tailscale.com/tsnet"
"tailscale.com/tsweb"
"within.website/ln"
@@ -30,78 +32,215 @@ import (
)
var (
- b2Backend = flag.String("b2-backend", "https://f001.backblazeb2.com", "Backblaze B2 base URL")
+ b2Backend = flag.String("b2-backend", "f001.backblazeb2.com", "Backblaze B2 base host")
addr = flag.String("addr", ":8080", "server address")
+ dir = flag.String("dir", os.Getenv("XEDN_STATE"), "where XeDN should store cached data")
//go:embed index.html
indexHTML []byte
)
-const cacheSize = 386 * 1024 * 1024 // 386 mebibytes
+type Cache struct {
+ ActualHost string
+ Client *http.Client
+ DB *bbolt.DB
+}
-type CacheData struct {
- Headers http.Header
- Body []byte
+func Hash(data string) string {
+ output := md5.Sum([]byte(data))
+ return fmt.Sprintf("%x", output)
}
-var Group = groupcache.NewGroup("b2-bucket", cacheSize, groupcache.GetterFunc(
- func(ctx groupcache.Context, key string, dest groupcache.Sink) error {
- ln.Log(context.Background(), ln.F{"key": key})
+func (dc *Cache) ListFiles(w http.ResponseWriter, r *http.Request) {
+ var result []string
- fileHits.Add(key, 1)
+ err := dc.DB.View(func(tx *bbolt.Tx) error {
+ return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
+ result = append(result, string(name))
+ return nil
+ })
+ })
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
- resp, err := http.Get(*b2Backend + key)
- if err != nil {
- return fmt.Errorf("can't fetch from b2: %v", err)
+ json.NewEncoder(w).Encode(result)
+}
+
+func (dc *Cache) Purge(w http.ResponseWriter, r *http.Request) {
+ var files []string
+
+ defer r.Body.Close()
+ if err := json.NewDecoder(r.Body).Decode(&files); err != nil {
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ if err := dc.DB.Update(func(tx *bbolt.Tx) error {
+ for _, fname := range files {
+ bkt := tx.Bucket([]byte(fname))
+ if bkt == nil {
+ continue
+ }
+
+ if err := tx.DeleteBucket([]byte(fname)); err != nil {
+ return err
+ }
}
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- return web.NewError(http.StatusOK, resp)
+ return nil
+ }); err != nil {
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+}
+
+func (dc *Cache) Save(dir string, resp *http.Response) error {
+ return dc.DB.Update(func(tx *bbolt.Tx) error {
+ bkt, err := tx.CreateBucketIfNotExists([]byte(dir))
+ if err != nil {
+ return err
}
etag := fmt.Sprintf("%q", resp.Header.Get("x-bz-content-sha1"))
resp.Header.Set("ETag", etag)
etagLock.Lock()
- etags[key] = etag
+ etags[dir] = etag
etagLock.Unlock()
+ data, err := json.Marshal(resp.Header)
+ if err != nil {
+ return err
+ }
+
+ if err := bkt.Put([]byte("header"), data); err != nil {
+ return err
+ }
+
+ data, err = io.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+
+ if err := bkt.Put([]byte("body"), data); err != nil {
+ return err
+ }
+
+ diesAt := time.Now().Add(604800 * time.Second).Format(http.TimeFormat)
+
+ if err := bkt.Put([]byte("diesAt"), []byte(diesAt)); err != nil {
+ return err
+ }
+
// cache control headers
resp.Header.Set("Cache-Control", "max-age:604800")
- resp.Header.Set("Expires", time.Now().Add(604800*time.Second).Format(http.TimeFormat))
+ resp.Header.Set("Expires", diesAt)
- body, err := io.ReadAll(resp.Body)
- if err != nil {
- return fmt.Errorf("can't read from b2: %v", err)
+ return nil
+ })
+}
+
+var ErrNotCached = errors.New("data is not cached")
+
+func (dc *Cache) Load(dir string, w http.ResponseWriter) error {
+ return dc.DB.View(func(tx *bbolt.Tx) error {
+ bkt := tx.Bucket([]byte(dir))
+ if bkt == nil {
+ return ErrNotCached
}
- result := &CacheData{
- Headers: resp.Header,
- Body: body,
+ diesAtBytes := bkt.Get([]byte("diesAt"))
+ if diesAtBytes == nil {
+ return ErrNotCached
}
- var buf bytes.Buffer
- err = gob.NewEncoder(&buf).Encode(result)
+ t, err := time.Parse(http.TimeFormat, string(diesAtBytes))
if err != nil {
return err
}
- dest.SetBytes(buf.Bytes())
+ if t.Before(time.Now()) {
+ tx.DeleteBucket([]byte(dir))
+ return ErrNotCached
+ }
+
+ h := http.Header{}
+
+ data := bkt.Get([]byte("header"))
+ if data == nil {
+ return ErrNotCached
+ }
+ if err := json.Unmarshal(data, &h); err != nil {
+ return err
+ }
+
+ data = bkt.Get([]byte("body"))
+ if data == nil {
+ return ErrNotCached
+ }
+
+ for k, vs := range h {
+ for _, v := range vs {
+ w.Header().Add(k, v)
+ }
+ }
+
+ w.Write(data)
+ cacheHits.Add(1)
+ fileHits.Add(dir, 1)
return nil
- },
-))
+ })
+}
+
+func (dc *Cache) GetFile(w http.ResponseWriter, r *http.Request) error {
+ dir := filepath.Join(r.URL.Path)
+
+ err := dc.Load(dir, w)
+ if err != nil {
+ if err == ErrNotCached {
+ r.URL.Host = dc.ActualHost
+ r.URL.Scheme = "https"
+ resp, err := dc.Client.Get(r.URL.String())
+ if err != nil {
+ cacheErrors.Add(1)
+ return err
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ cacheErrors.Add(1)
+ return web.NewError(http.StatusOK, resp)
+ }
+
+ err = dc.Save(dir, resp)
+ if err != nil {
+ cacheErrors.Add(1)
+ return err
+ }
+
+ cacheLoads.Add(1)
+ } else {
+ cacheErrors.Add(1)
+ return err
+ }
+ }
+
+ return dc.Load(dir, w)
+}
+
+func (dc *Cache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+}
var (
- cacheGets = expvar.NewInt("cache_gets")
cacheHits = expvar.NewInt("cache_hits")
cacheErrors = expvar.NewInt("cache_errors")
cacheLoads = expvar.NewInt("cache_loads")
etagMatches = expvar.NewInt("etag_matches")
- fileHits = expvar.NewMap("file_hits")
referers = expvar.NewMap("referers")
+ fileHits = expvar.NewMap("filehits")
etags map[string]string
etagLock sync.RWMutex
@@ -111,29 +250,29 @@ func init() {
etags = map[string]string{}
}
-func refreshMetrics() {
- t := time.NewTicker(10 * time.Second)
- defer t.Stop()
-
- for range t.C {
- cacheGets.Set(Group.Stats.Gets.Get())
- cacheHits.Set(Group.Stats.CacheHits.Get())
- cacheErrors.Set(int64(Group.Stats.LocalLoadErrs))
- cacheLoads.Set(int64(Group.Stats.LocalLoads))
- }
-}
-
func main() {
internal.HandleStartup()
ctx := opname.With(context.Background(), "startup")
- go refreshMetrics()
+ os.MkdirAll(filepath.Join(*dir, "tsnet"), 0700)
+
+ db, err := bbolt.Open(filepath.Join(*dir, "data"), 0600, &bbolt.Options{})
+ if err != nil {
+ ln.FatalErr(ctx, err)
+ }
+
+ dc := &Cache{
+ ActualHost: *b2Backend,
+ Client: &http.Client{},
+ DB: db,
+ }
go func() {
srv := &tsnet.Server{
Hostname: "xedn-" + os.Getenv("FLY_REGION"),
Logf: log.New(io.Discard, "", 0).Printf,
AuthKey: os.Getenv("TS_AUTHKEY"),
+ Dir: filepath.Join(*dir, "tsnet"),
}
lis, err := srv.Listen("tcp", ":80")
@@ -142,6 +281,8 @@ func main() {
}
http.DefaultServeMux.HandleFunc("/debug/varz", tsweb.VarzHandler)
+ http.DefaultServeMux.HandleFunc("/xedn/files", dc.ListFiles)
+ http.DefaultServeMux.HandleFunc("/xedn/purge", dc.Purge)
defer srv.Close()
defer lis.Close()
@@ -153,8 +294,11 @@ func main() {
ln.FatalErr(ctx, err)
}
+ os.MkdirAll(*dir, 0700)
+
mux := http.NewServeMux()
mux.HandleFunc("/.within/metrics", tsweb.VarzHandler)
+ mux.Handle("/.within/metrics/json", expvar.Handler())
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
@@ -167,7 +311,7 @@ func main() {
w.Write(indexHTML)
})
- mux.HandleFunc("/file/christine-static/", func(w http.ResponseWriter, r *http.Request) {
+ hdlr := func(w http.ResponseWriter, r *http.Request) {
etagLock.RLock()
etag, ok := etags[r.URL.Path]
etagLock.RUnlock()
@@ -180,29 +324,14 @@ func main() {
referers.Add(r.Header.Get("Referer"), 1)
- var b []byte
- err := Group.Get(nil, r.URL.Path, groupcache.AllocatingByteSliceSink(&b))
- if err != nil {
- http.Error(w, err.Error(), http.StatusNotFound)
- return
- }
-
- var result CacheData
- err = gob.NewDecoder(bytes.NewBuffer(b)).Decode(&result)
- if err != nil {
- ln.Error(r.Context(), err)
- http.Error(w, "internal cache error", http.StatusInternalServerError)
+ if err := dc.GetFile(w, r); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
+ }
- for k, vs := range result.Headers {
- for _, v := range vs {
- w.Header().Add(k, v)
- }
- }
- w.WriteHeader(http.StatusOK)
- w.Write(result.Body)
- })
+ mux.HandleFunc("/file/christine-static/", hdlr)
+ mux.HandleFunc("/file/xeserv-akko/", hdlr)
ln.Log(context.Background(), ln.F{"addr": *addr})
http.ListenAndServe(*addr, cors.Default().Handler(xffMW.Handler(ex.HTTPLog(mux))))