aboutsummaryrefslogtreecommitdiff
path: root/cmd/future-sight/main.go
diff options
context:
space:
mode:
authorXe Iaso <me@xeiaso.net>2024-06-08 13:39:52 -0700
committerXe Iaso <me@xeiaso.net>2024-06-08 13:41:05 -0700
commitbb8a5a886c993fc943277e56b4c9065c1f3a82b2 (patch)
tree14de5be85b83b17553092f1f1c78b2b0d0b4cba3 /cmd/future-sight/main.go
parent2f9125d0cbe232dbb0dcd64eab33af356d5d615c (diff)
downloadx-bb8a5a886c993fc943277e56b4c9065c1f3a82b2.tar.xz
x-bb8a5a886c993fc943277e56b4c9065c1f3a82b2.zip
cmd: add command future-sight for preview deployments
Signed-off-by: Xe Iaso <me@xeiaso.net>
Diffstat (limited to 'cmd/future-sight/main.go')
-rw-r--r--cmd/future-sight/main.go325
1 files changed, 325 insertions, 0 deletions
diff --git a/cmd/future-sight/main.go b/cmd/future-sight/main.go
new file mode 100644
index 0000000..4948324
--- /dev/null
+++ b/cmd/future-sight/main.go
@@ -0,0 +1,325 @@
+package main
+
+import (
+ "context"
+ "crypto/sha256"
+ "encoding/base64"
+ "flag"
+ "fmt"
+ "io"
+ "log"
+ "log/slog"
+ "net/http"
+ "os"
+ "path/filepath"
+ "runtime"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/credentials"
+ "github.com/aws/aws-sdk-go-v2/service/s3"
+ "github.com/nats-io/nats.go"
+ "github.com/redis/go-redis/v9"
+ "golang.org/x/sync/errgroup"
+ "google.golang.org/protobuf/proto"
+ "within.website/x/internal"
+ "within.website/x/internal/xesite"
+ pb "within.website/x/proto/future-sight"
+ "within.website/x/web/useragent"
+)
+
+var (
+ apiBind = flag.String("api-bind", ":8080", "address to bind API to")
+ bind = flag.String("bind", ":8081", "address to bind zipserver to")
+
+ awsAccessKeyID = flag.String("aws-access-key-id", "", "AWS access key ID")
+ awsSecretKey = flag.String("aws-secret-access-key", "", "AWS secret access key")
+ awsEndpointS3 = flag.String("aws-endpoint-url-s3", "http://localhost:9000", "AWS S3 endpoint")
+ awsRegion = flag.String("aws-region", "auto", "AWS region")
+ bucketName = flag.String("bucket-name", "xesite-preview-versions", "bucket to fetch previews from")
+ dataDir = flag.String("data-dir", "./var", "directory to store data in (not permanent)")
+ natsURL = flag.String("nats-url", "nats://localhost:4222", "nats url")
+ usePathStyle = flag.Bool("use-path-style", false, "use path style for S3")
+ valkeyHost = flag.String("valkey-host", "localhost:6379", "host:port for valkey")
+ valkeyPassword = flag.String("valkey-password", "", "password for valkey")
+)
+
+func main() {
+ internal.HandleStartup()
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ creds := credentials.NewStaticCredentialsProvider(*awsAccessKeyID, *awsSecretKey, "")
+
+ s3c := s3.New(s3.Options{
+ AppID: useragent.GenUserAgent("future-sight-push", "https://xeiaso.net"),
+ BaseEndpoint: awsEndpointS3,
+ ClientLogMode: aws.LogRetries | aws.LogRequest | aws.LogResponse,
+ Credentials: creds,
+ EndpointResolver: s3.EndpointResolverFromURL(*awsEndpointS3),
+ //Logger: logging.NewStandardLogger(os.Stderr),
+ UsePathStyle: *usePathStyle,
+ Region: *awsRegion,
+ })
+
+ slog.Debug("details",
+ "awsAccessKeyID", *awsAccessKeyID,
+ "awsSecretKey", *awsSecretKey,
+ "awsEndpointS3", *awsEndpointS3,
+ "awsRegion", *awsRegion,
+ "bucketName", *bucketName,
+ "natsURL", *natsURL,
+ "usePathStyle", *usePathStyle,
+ "valkeyHost", *valkeyHost,
+ "valkeyPassword", *valkeyPassword,
+ )
+
+ vk := redis.NewClient(&redis.Options{
+ Addr: *valkeyHost,
+ Password: *valkeyPassword,
+ DB: 0,
+ })
+ defer vk.Close()
+
+ if _, err := vk.Ping(context.Background()).Result(); err != nil {
+ log.Fatal(err)
+ }
+
+ nc, err := nats.Connect(*natsURL)
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer nc.Close()
+
+ zs, err := xesite.NewZipServer(filepath.Join(*dataDir, "current.zip"), *dataDir)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ s := &Server{
+ s3c: s3c,
+ vk: vk,
+ nc: nc,
+ zs: zs,
+ dir: *dataDir,
+ }
+
+ currentVersion, err := vk.Get(ctx, "future-sight:current").Result()
+ if err != nil && err != redis.Nil {
+ log.Fatal(err)
+ }
+
+ if currentVersion != "" {
+ nv := pb.NewVersion{
+ Slug: currentVersion,
+ }
+
+ if err := s.fetchVersion(ctx, &nv); err != nil {
+ slog.Error("can't fetch current version", "err", err)
+ }
+ }
+
+ if _, err := nc.Subscribe("future-sight-push", s.HandleFutureSightPushMsg); err != nil {
+ log.Fatal(err)
+ }
+
+ apiMux := http.NewServeMux()
+ apiMux.HandleFunc("/upload", s.UploadVersion)
+ apiMux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
+ fmt.Fprintln(w, "OK")
+ })
+
+ g, _ := errgroup.WithContext(context.Background())
+
+ g.Go(func() error {
+ slog.Info("listening", "for", "api", "addr", *apiBind)
+ return http.ListenAndServe(*apiBind, apiMux)
+ })
+
+ g.Go(func() error {
+ slog.Info("listening", "for", "zipserver", "addr", *bind)
+ return http.ListenAndServe(*bind, zs)
+ })
+
+ if err := g.Wait(); err != nil {
+ slog.Error("error doing work", "err", err)
+ os.Exit(1)
+ }
+}
+
+type Server struct {
+ s3c *s3.Client
+ vk *redis.Client
+ nc *nats.Conn
+ zs *xesite.ZipServer
+ dir string
+}
+
+func (s *Server) UploadVersion(w http.ResponseWriter, r *http.Request) {
+ ctx, cancel := context.WithCancel(r.Context())
+ defer cancel()
+
+ slog.Info("uploading version")
+
+ if err := r.ParseMultipartForm(10 << 24); err != nil {
+ slog.Error("failed to parse form", "err", err)
+ http.Error(w, "failed to parse form", http.StatusBadRequest)
+ return
+ }
+
+ f, header, err := r.FormFile("file")
+ if err != nil {
+ slog.Error("failed to get file", "err", err)
+ http.Error(w, "failed to get file", http.StatusBadRequest)
+ return
+ }
+ defer f.Close()
+
+ slog.Info("got file", "filename", header.Filename)
+
+ fout, err := os.CreateTemp(s.dir, "future-sight-upload-*")
+ if err != nil {
+ slog.Error("failed to create temp file", "err", err)
+ http.Error(w, "failed to create temp file", http.StatusInternalServerError)
+ return
+ }
+ defer fout.Close()
+ defer os.Remove(fout.Name())
+
+ if _, err := io.Copy(fout, f); err != nil {
+ slog.Error("failed to copy file", "err", err)
+ http.Error(w, "failed to copy file", http.StatusInternalServerError)
+ return
+ }
+
+ hash, err := hashFileSha256(fout)
+ if err != nil {
+ slog.Error("failed to hash file", "err", err)
+ http.Error(w, "failed to hash file", http.StatusInternalServerError)
+ return
+ }
+
+ st, err := fout.Stat()
+ if err != nil {
+ slog.Error("failed to stat file", "err", err)
+ http.Error(w, "failed to stat file", http.StatusInternalServerError)
+ return
+ }
+
+ slog.Info("hashed file", "hash", hash)
+
+ if _, err := s.s3c.PutObject(ctx, &s3.PutObjectInput{
+ Bucket: bucketName,
+ Key: aws.String(hash),
+ Body: fout,
+ ContentType: aws.String("application/zip"),
+ ContentLength: aws.Int64(st.Size()),
+ Metadata: map[string]string{
+ "host_os": runtime.GOOS,
+ },
+ }); err != nil {
+ slog.Error("failed to push file", "bucketName", *bucketName, "hash", hash, "err", err)
+ http.Error(w, "failed to push file", http.StatusInternalServerError)
+ return
+ }
+
+ nv := &pb.NewVersion{
+ Slug: hash,
+ }
+
+ if err := s.PushVersion(ctx, nv); err != nil {
+ slog.Error("failed to push version", "slug", nv.Slug, "err", err)
+ http.Error(w, "failed to push version", http.StatusInternalServerError)
+ return
+ }
+
+ w.WriteHeader(http.StatusOK)
+}
+
+func (s *Server) PushVersion(ctx context.Context, nv *pb.NewVersion) error {
+ slog.Info("got new version", "version", nv)
+
+ msg, err := proto.Marshal(nv)
+ if err != nil {
+ slog.Error("failed to marshal message", "slug", nv.Slug, "err", err)
+ return err
+ }
+
+ if err := s.nc.Publish("future-sight-push", msg); err != nil {
+ slog.Error("failed to publish message", "slug", nv.Slug, "err", err)
+ return err
+ }
+
+ if _, err := s.vk.Set(ctx, "future-sight:current", nv.Slug, 0).Result(); err != nil {
+ slog.Error("failed to set current version", "slug", nv.Slug, "err", err)
+ return err
+ }
+
+ return nil
+}
+
+func (s *Server) HandleFutureSightPushMsg(msg *nats.Msg) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ nv := new(pb.NewVersion)
+ if err := proto.Unmarshal(msg.Data, nv); err != nil {
+ slog.Error("failed to unmarshal message", "err", err)
+ return
+ }
+
+ if err := s.fetchVersion(ctx, nv); err != nil {
+ slog.Error("failed to handle message", "slug", nv.Slug, "err", err)
+ return
+ }
+
+ slog.Info("handled message", "slug", nv.Slug)
+}
+
+func (s *Server) fetchVersion(ctx context.Context, nv *pb.NewVersion) error {
+ os.Remove(filepath.Join(s.dir, "current.zip"))
+
+ fout, err := os.Create(filepath.Join(s.dir, "current.zip"))
+ if err != nil {
+ return err
+ }
+ defer fout.Close()
+
+ obj, err := s.s3c.GetObject(ctx, &s3.GetObjectInput{
+ Bucket: bucketName,
+ Key: aws.String(nv.Slug),
+ })
+ if err != nil {
+ os.Remove(filepath.Join(s.dir, "current.zip"))
+ slog.Error("failed to get object", "slug", nv.Slug, "err", err)
+ return err
+ }
+ defer obj.Body.Close()
+
+ if _, err := io.Copy(fout, obj.Body); err != nil {
+ os.Remove(filepath.Join(s.dir, "current.zip"))
+ slog.Error("failed to copy object", "slug", nv.Slug, "err", err)
+ return err
+ }
+
+ if err := s.zs.Update(filepath.Join(s.dir, "current.zip")); err != nil {
+ slog.Error("failed to update zipserver", "slug", nv.Slug, "err", err)
+ return err
+ }
+
+ return nil
+}
+
+// hashFileSha256 hashes a file with Sha256 and returns the hash as a base64 encoded string.
+func hashFileSha256(fin *os.File) (string, error) {
+ h := sha256.New()
+ if _, err := io.Copy(h, fin); err != nil {
+ return "", err
+ }
+
+ // rewind the file
+ if _, err := fin.Seek(0, io.SeekStart); err != nil {
+ return "", err
+ }
+
+ return base64.StdEncoding.EncodeToString(h.Sum(nil)), nil
+}