diff options
| author | Xe Iaso <me@xeiaso.net> | 2024-02-10 18:48:47 -0500 |
|---|---|---|
| committer | Xe Iaso <me@xeiaso.net> | 2024-02-10 18:48:47 -0500 |
| commit | bb1a53bdeefbf7f85e4c862383abbc29a434e63e (patch) | |
| tree | 7ea2c985158a88ed3bd236bec36a03e505891a48 /cmd | |
| parent | 585b45216eb7612b6729fb644d5fe00b7cde4919 (diff) | |
| download | x-bb1a53bdeefbf7f85e4c862383abbc29a434e63e.tar.xz x-bb1a53bdeefbf7f85e4c862383abbc29a434e63e.zip | |
cmd/xedn: fully make uplodr work
Signed-off-by: Xe Iaso <me@xeiaso.net>
Diffstat (limited to 'cmd')
| -rw-r--r-- | cmd/xedn/imgoptimize.go | 207 | ||||
| -rw-r--r-- | cmd/xedn/main.go | 3 | ||||
| -rw-r--r-- | cmd/xedn/uplodr.go | 246 | ||||
| -rw-r--r-- | cmd/xedn/uplodr/main.go | 58 |
4 files changed, 288 insertions, 226 deletions
diff --git a/cmd/xedn/imgoptimize.go b/cmd/xedn/imgoptimize.go index a0c1072..58969ed 100644 --- a/cmd/xedn/imgoptimize.go +++ b/cmd/xedn/imgoptimize.go @@ -2,31 +2,22 @@ package main import ( "bytes" - "crypto/sha256" "encoding/json" "errors" "flag" "fmt" "image" - "image/jpeg" "image/png" - "io" - "log" "log/slog" "net/http" "os" "path/filepath" - "runtime" "strconv" "strings" "time" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/chai2010/webp" "github.com/disintegration/imaging" - "github.com/google/uuid" "go.etcd.io/bbolt" "golang.org/x/sync/singleflight" "tailscale.com/metrics" @@ -255,201 +246,3 @@ func (ois *OptimizedImageServer) Purge(w http.ResponseWriter, r *http.Request) { return } } - -type ImageUploader struct { - s3 *s3.Client -} - -func (iu *ImageUploader) CreateImage(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - id := uuid.New().String() - - defer r.Body.Close() - - os.MkdirAll(filepath.Join(*dir, "uploud"), 0700) - - fout, err := os.Create(filepath.Join(*dir, "uploud", id+".png")) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - slog.Error("cannot create temp file", "err", err) - return - } - - h := sha256.New() - - if n, err := io.Copy(io.MultiWriter(h, fout), r.Body); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - slog.Error("cannot copy image to buffer", "err", err) - return - } else { - slog.Info("copied image to buffer", "bytes", n, "content-length", r.ContentLength) - } - - if err := fout.Close(); err != nil { - slog.Error("cannot close temp file", "err", err) - return - } - - fin, err := os.Open(filepath.Join(*dir, "uploud", id+".png")) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - slog.Error("cannot open temp file", "err", err) - return - } - - img, err := png.Decode(fin) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - slog.Error("cannot decode image", "err", err) - return - } - - if err := fin.Close(); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - slog.Error("cannot close temp file", "err", err) - return - } - - os.Remove(filepath.Join(*dir, "uploud", id+".png")) - - directory, err := os.MkdirTemp(*dir, "uploud") - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - slog.Error("cannot create temp directory", "err", err) - return - } - defer os.RemoveAll(directory) - - if err := doAVIF(img, filepath.Join(directory, "image.avif")); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - slog.Error("cannot encode AVIF", "err", err) - return - } - - if err := doWEBP(img, filepath.Join(directory, "image.webp")); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - slog.Error("cannot encode WEBP", "err", err) - return - } - - if err := doJPEG(img, filepath.Join(directory, "image.jpg")); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - slog.Error("cannot encode JPEG", "err", err) - return - } - - files, err := os.ReadDir(directory) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - slog.Error("cannot read directory", "err", err) - return - } - - id = fmt.Sprintf("%x", h.Sum(nil)) - - s3c := mkS3Client() - - for _, finfo := range files { - log.Printf("uploading %s", finfo.Name()) - fin, err := os.Open(filepath.Join(directory, finfo.Name())) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - slog.Error("cannot read file", "err", err) - return - } - defer fin.Close() - - _, err = s3c.PutObject(ctx, &s3.PutObjectInput{ - Body: fin, - Bucket: aws.String("christine-static"), - Key: aws.String("xedn/dynamic/" + id + "/" + finfo.Name()), - ContentType: aws.String(mimeTypes[filepath.Ext(finfo.Name())]), - }) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - slog.Error("cannot upload file", "err", err) - return - } - } - - json.NewEncoder(w).Encode(map[string]string{ - "avif": "https://cdn.xeiaso.net/file/christine-static/xedn/dynamic/" + id + "/image.avif", - "webp": "https://cdn.xeiaso.net/file/christine-static/xedn/dynamic/" + id + "/image.webp", - "jpeg": "https://cdn.xeiaso.net/file/christine-static/xedn/dynamic/" + id + "/image.jpg", - }) -} - -var mimeTypes = map[string]string{ - ".avif": "image/avif", - ".webp": "image/webp", - ".jpg": "image/jpeg", - ".png": "image/png", - ".wasm": "application/wasm", - ".css": "text/css", -} - -func mkS3Client() *s3.Client { - s3Config := aws.Config{ - Credentials: credentials.NewStaticCredentialsProvider(*b2KeyID, *b2KeySecret, ""), - BaseEndpoint: aws.String("https://s3.us-west-001.backblazeb2.com"), - Region: "us-west-001", - } - s3Client := s3.NewFromConfig(s3Config, (func(o *s3.Options) { - o.UsePathStyle = true - })) - return s3Client -} - -func doAVIF(src image.Image, dstPath string) error { - dst, err := os.Create(dstPath) - if err != nil { - log.Fatalf("Can't create destination file: %v", err) - } - defer dst.Close() - - err = avif.Encode(dst, src, &avif.Options{ - Threads: runtime.GOMAXPROCS(0), - Speed: *avifEncoderSpeed, - Quality: *avifQuality, - }) - if err != nil { - return err - } - - log.Printf("Encoded AVIF at %s", dstPath) - - return nil -} - -func doWEBP(src image.Image, dstPath string) error { - fout, err := os.Create(dstPath) - if err != nil { - return err - } - defer fout.Close() - - err = webp.Encode(fout, src, &webp.Options{Quality: float32(*webpQuality)}) - if err != nil { - return err - } - - log.Printf("Encoded WEBP at %s", dstPath) - - return nil -} - -func doJPEG(src image.Image, dstPath string) error { - fout, err := os.Create(dstPath) - if err != nil { - return err - } - defer fout.Close() - - if err := jpeg.Encode(fout, src, &jpeg.Options{Quality: *jpegQuality}); err != nil { - return err - } - - log.Printf("Encoded JPEG at %s", dstPath) - - return nil -} diff --git a/cmd/xedn/main.go b/cmd/xedn/main.go index 8eabddb..ffdcd02 100644 --- a/cmd/xedn/main.go +++ b/cmd/xedn/main.go @@ -28,6 +28,7 @@ import ( "tailscale.com/tsweb" "within.website/x/cmd/xedn/internal/xesite" "within.website/x/internal" + "within.website/x/web/fly/flymachines" "within.website/x/web/stablediffusion" ) @@ -127,7 +128,7 @@ func main() { } iu := &ImageUploader{ - s3: mkS3Client(), + fmc: flymachines.New(*flyAPIToken, &http.Client{}), } os.MkdirAll(filepath.Join(*dir, "xesite"), 0700) diff --git a/cmd/xedn/uplodr.go b/cmd/xedn/uplodr.go new file mode 100644 index 0000000..45aea18 --- /dev/null +++ b/cmd/xedn/uplodr.go @@ -0,0 +1,246 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "io" + "log/slog" + "net" + "net/http" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/google/uuid" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "within.website/x/cmd/xedn/uplodr/pb" + "within.website/x/web/fly/flymachines" +) + +var ( + flyAPIToken = flag.String("fly-api-token", "", "Fly API token to use") + flyAppName = flag.String("fly-app-name", "xedn", "Fly app name to use") + flyRegion = flag.String("fly-region", "yyz", "Fly region to use") + uplodrMachineImage = flag.String("uplodr-machine-image", "registry.fly.io/xedn", "Docker image to use for uplodr Machines") + uplodrBinary = flag.String("uplodr-binary", "/bin/uplodr", "Binary to run on uplodr Machines") + uplodrPort = flag.String("uplodr-port", "9001", "Port to run uplodr on") +) + +type ImageUploader struct { + fmc *flymachines.Client +} + +func (iu *ImageUploader) CreateImage(w http.ResponseWriter, r *http.Request) { + if r.ContentLength == 0 { + http.Error(w, "empty body", http.StatusBadRequest) + return + } + + ext := r.URL.Query().Get("ext") + if ext == "" { + ext = ".png" + if r.Header.Get("Content-Type") != "" { + mimeType := r.Header.Get("Content-Type") + ext = mimeTypes[mimeType] + } + } + + fname := r.URL.Query().Get("name") + if fname == "" { + fname = uuid.New().String() + } + + folder := r.URL.Query().Get("folder") + if folder == "" { + folder = "xedn/dynamic" + } + + ctx, cancel := context.WithTimeout(r.Context(), 20*time.Minute) + defer cancel() + + data, err := io.ReadAll(r.Body) + if err != nil { + slog.Error("cannot read body", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + chonkiness := len(data) + 4096 + + slog.Debug("creating machine") + m, err := iu.fmc.CreateMachine(ctx, *flyAppName, flymachines.CreateMachine{ + Name: "uplodr-" + uuid.New().String(), + Region: *flyRegion, + Config: flymachines.MachineConfig{ + Guest: flymachines.MachineGuest{ + CPUKind: "performance", + CPUs: 16, + MemoryMB: 8192 * 4, + }, + Image: *uplodrMachineImage, + Processes: []flymachines.MachineProcess{ + { + Cmd: []string{*uplodrBinary, "--grpc-addr=:" + *uplodrPort, "--slog-level=debug", fmt.Sprintf("--msg-size=%d", chonkiness)}, + }, + }, + StopConfig: flymachines.MachineStopConfig{ + Timeout: "30s", + Signal: "SIGKILL", + }, + Restart: flymachines.MachineRestart{ + MaxRetries: 0, + Policy: flymachines.MachineRestartPolicyNo, + }, + }, + }) + if err != nil { + slog.Error("cannot create machine", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + bo := backoff.NewExponentialBackOff() + + err := backoff.Retry(func() error { + select { + case <-ctx.Done(): + return backoff.Permanent(ctx.Err()) + default: + } + + slog.Debug("deleting machine", "machine", m.ID) + return iu.fmc.DestroyAppMachine(ctx, *flyAppName, m.ID) + }, bo) + if err != nil { + slog.Error("cannot delete machine", "err", err, "machine", m.ID) + } + }() + + running := false + for !running { + time.Sleep(time.Second) + mi, err := iu.fmc.GetAppMachine(ctx, *flyAppName, m.ID) + if err != nil { + slog.Error("can't get machine state", "id", m.ID, "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if mi.State != "started" { + slog.Debug("not started", "want", "started", "got", mi.State) + continue + } + running = true + } + + bo := backoff.NewExponentialBackOff() + addr := net.JoinHostPort(m.PrivateIP, *uplodrPort) + + if err := backoff.Retry(func() error { + conn, err := net.Dial("tcp", addr) + if err != nil { + return err + } + return conn.Close() + }, bo); err != nil { + slog.Error("cannot test dial machine", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + bo.Reset() + conn, err := backoff.RetryWithData[*grpc.ClientConn](func() (*grpc.ClientConn, error) { + slog.Debug("dialing machine", "addr", addr) + + conn, err := grpc.DialContext(ctx, addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithMaxMsgSize(chonkiness), + ) + if err != nil { + slog.Error("cannot dial machine", "err", err, "addr", addr) + } + + return conn, err + }, bo) + if err != nil { + slog.Error("cannot dial machine", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer conn.Close() + slog.Debug("conn created") + + client := pb.NewImageClient(conn) + id := uuid.New().String() + bo.Reset() + pong, err := backoff.RetryWithData[*pb.Echo](func() (*pb.Echo, error) { + return client.Ping(ctx, &pb.Echo{Nonce: id}) + }, bo) + if err != nil { + slog.Error("cannot ping machine", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if pong.GetNonce() != id { + slog.Error("invalid nonce", "got", pong.GetNonce(), "want", id) + http.Error(w, "invalid nonce", http.StatusInternalServerError) + return + } + + slog.Info("pong", "msg", pong.GetNonce()) + + bo.Reset() + imageData, err := backoff.RetryWithData[*pb.UploadResp](func() (*pb.UploadResp, error) { + slog.Debug("uploading image", "fname", fname, "ext", ext) + resp, err := client.Upload(ctx, &pb.UploadReq{ + FileName: fname + "." + ext, + Data: data, + Folder: folder, + }, + grpc.MaxCallRecvMsgSize(chonkiness), + grpc.MaxCallSendMsgSize(chonkiness), + ) + if err != nil { + slog.Error("can't upload image", "err", err) + } + + return resp, err + }, bo) + if err != nil { + slog.Error("cannot upload image", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + result := map[string]string{} + + for _, vari := range imageData.Variants { + result[extForMimeType[vari.MimeType]] = vari.Url + } + + json.NewEncoder(w).Encode(result) +} + +var mimeTypes = map[string]string{ + ".avif": "image/avif", + ".webp": "image/webp", + ".jpg": "image/jpeg", + ".png": "image/png", + ".wasm": "application/wasm", + ".css": "text/css", +} + +var extForMimeType = map[string]string{ + "image/avif": "avif", + "image/webp": "webp", + "image/jpeg": "jpg", + "image/png": "png", + "application/wasm": "wasm", + "text/css": "css", +} diff --git a/cmd/xedn/uplodr/main.go b/cmd/xedn/uplodr/main.go index cb699c0..111cf44 100644 --- a/cmd/xedn/uplodr/main.go +++ b/cmd/xedn/uplodr/main.go @@ -36,6 +36,8 @@ var ( b2KeyID = flag.String("b2-key-id", "", "Backblaze B2 application key ID") b2KeySecret = flag.String("b2-application-key", "", "Backblaze B2 application secret") + msgSize = flag.Int("msg-size", 100*1024*1024, "how big the message should be") + tigrisBucket = flag.String("bucket-name", "xedn", "Tigris bucket to dump things to") avifQuality = flag.Int("avif-quality", 8, "AVIF quality (higher is worse quality)") @@ -51,6 +53,8 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), 20*time.Minute) defer cancel() + os.MkdirAll("/tmp", 0777) + s, err := New(ctx) if err != nil { log.Fatal(err) @@ -66,7 +70,10 @@ func main() { log.Fatal(err) } - gs := grpc.NewServer() + gs := grpc.NewServer( + grpc.MaxRecvMsgSize(*msgSize), + grpc.MaxSendMsgSize(*msgSize), + ) pb.RegisterImageServer(gs, s) @@ -95,6 +102,7 @@ func New(ctx context.Context) (*Server, error) { } func (s *Server) Ping(ctx context.Context, msg *pb.Echo) (*pb.Echo, error) { + slog.Info("ping", "msg", msg.Nonce) return msg, nil } @@ -115,30 +123,35 @@ func (s *Server) Upload(ctx context.Context, ur *pb.UploadReq) (*pb.UploadResp, return nil, fmt.Errorf("failed to create temp dir: %w", err) } defer os.RemoveAll(dir) + os.MkdirAll(dir, 0777) name := baseName + "-smol.png" fnames = append(fnames, name) if err := resizeSmol(img, filepath.Join(dir, name)); err != nil { return nil, fmt.Errorf("failed to make smol png: %w", err) } + slog.Debug("converted", "name", name) name = baseName + ".webp" - fnames = append(fnames, filepath.Join(dir, name)) - if err := doWEBP(img, name); err != nil { + fnames = append(fnames, name) + if err := doWEBP(img, filepath.Join(dir, name)); err != nil { return nil, fmt.Errorf("failed to make webp: %w", err) } + slog.Debug("converted", "name", name) name = baseName + ".avif" - fnames = append(fnames, filepath.Join(dir, name)) - if err := doAVIF(img, name); err != nil { + fnames = append(fnames, name) + if err := doAVIF(img, filepath.Join(dir, name)); err != nil { return nil, fmt.Errorf("failed to make avif: %w", err) } + slog.Debug("converted", "name", name) name = baseName + ".jpg" - fnames = append(fnames, filepath.Join(dir, name)) - if err := doJPEG(img, name); err != nil { + fnames = append(fnames, name) + if err := doJPEG(img, filepath.Join(dir, name)); err != nil { return nil, fmt.Errorf("failed to make jpeg: %w", err) } + slog.Debug("converted", "name", name) var result []*pb.Variant @@ -155,29 +168,33 @@ func (s *Server) Upload(ctx context.Context, ur *pb.UploadReq) (*pb.UploadResp, } defer fin.Close() - if _, err := s.b2c.PutObject(ctx, &s3.PutObjectInput{ + key := fmt.Sprintf("%s/%s", ur.Folder, fname) + + if _, err := s.tc.PutObject(ctx, &s3.PutObjectInput{ Body: fin, - Bucket: b2Bucket, - Key: aws.String(fmt.Sprintf("%s/%s", ur.Folder, fname)), + Bucket: tigrisBucket, + Key: aws.String(key), ContentType: aws.String(mimeTypes[filepath.Ext(fname)]), }); err != nil { - slog.Error("can't upload", "to", "b2", "err", err) - errs = append(errs, fmt.Errorf("while uploading %s to b2: %w", path, err)) + slog.Error("can't upload", "to", "tigris", "err", err) + errs = append(errs, fmt.Errorf("while uploading %s to tigris: %w", path, err)) continue } + slog.Debug("uploaded", "to", "tigris", "key", key) fin.Seek(0, 0) - if _, err := s.tc.PutObject(ctx, &s3.PutObjectInput{ + if _, err := s.b2c.PutObject(ctx, &s3.PutObjectInput{ Body: fin, Bucket: b2Bucket, - Key: aws.String(fmt.Sprintf("%s/%s", ur.Folder, fname)), + Key: aws.String(key), ContentType: aws.String(mimeTypes[filepath.Ext(fname)]), }); err != nil { slog.Error("can't upload", "to", "b2", "err", err) errs = append(errs, fmt.Errorf("while uploading %s to b2: %w", path, err)) continue } + slog.Debug("uploaded", "to", "tigris", "key", key) result = append(result, &pb.Variant{ Url: fmt.Sprintf("https://cdn.xeiaso.net/file/christine-static/%s/%s", ur.GetFolder(), fname), @@ -189,6 +206,8 @@ func (s *Server) Upload(ctx context.Context, ur *pb.UploadReq) (*pb.UploadResp, return nil, errors.Join(errs...) } + slog.Info("uploaded", "input", ur.FileName, "result", result) + return &pb.UploadResp{ Variants: result, }, nil @@ -197,7 +216,7 @@ func (s *Server) Upload(ctx context.Context, ur *pb.UploadReq) (*pb.UploadResp, func doAVIF(src image.Image, dstPath string) error { dst, err := os.Create(dstPath) if err != nil { - log.Fatalf("Can't create destination file: %v", err) + return fmt.Errorf("can't create destination file %s: %v", dstPath, err) } defer dst.Close() @@ -218,7 +237,7 @@ func doAVIF(src image.Image, dstPath string) error { func doWEBP(src image.Image, dstPath string) error { fout, err := os.Create(dstPath) if err != nil { - return err + return fmt.Errorf("can't create destination file %s: %v", dstPath, err) } defer fout.Close() @@ -239,7 +258,7 @@ func fileNameWithoutExt(fileName string) string { func doJPEG(src image.Image, dstPath string) error { fout, err := os.Create(dstPath) if err != nil { - return err + return fmt.Errorf("can't create destination file %s: %v", dstPath, err) } defer fout.Close() @@ -255,7 +274,7 @@ func doJPEG(src image.Image, dstPath string) error { func resizeSmol(src image.Image, dstPath string) error { fout, err := os.Create(dstPath) if err != nil { - return err + return fmt.Errorf("can't create destination file %s: %v", dstPath, err) } defer fout.Close() @@ -288,9 +307,11 @@ func mkTigrisClient(ctx context.Context) (*s3.Client, error) { if err != nil { return nil, fmt.Errorf("failed to load Tigris config: %w", err) } + cfg.Region = "auto" return s3.NewFromConfig(cfg, func(o *s3.Options) { o.BaseEndpoint = aws.String("https://fly.storage.tigris.dev") + o.Region = "auto" }), nil } @@ -302,6 +323,7 @@ func mkB2Client() *s3.Client { } s3Client := s3.NewFromConfig(s3Config, (func(o *s3.Options) { o.UsePathStyle = true + o.Region = "us-west-001" })) return s3Client } |
