diff options
| author | Xe Iaso <me@xeiaso.net> | 2024-11-08 15:51:27 -0500 |
|---|---|---|
| committer | Xe Iaso <me@xeiaso.net> | 2024-11-08 15:51:27 -0500 |
| commit | 3756e97b6aeee2e9f8c9e4c1fd6f65ebba6f54aa (patch) | |
| tree | 7d54f3e533b271deaa4e2c1685a787ac0c710590 /cmd/orodyagzou | |
| parent | a731950f5c80c6b19c8b92f85600d38f6dc9ae63 (diff) | |
| download | x-3756e97b6aeee2e9f8c9e4c1fd6f65ebba6f54aa.tar.xz x-3756e97b6aeee2e9f8c9e4c1fd6f65ebba6f54aa.zip | |
cmd/orodyagzou: initial implementation
Signed-off-by: Xe Iaso <me@xeiaso.net>
Diffstat (limited to 'cmd/orodyagzou')
| -rw-r--r-- | cmd/orodyagzou/.gitignore | 2 | ||||
| -rw-r--r-- | cmd/orodyagzou/main.go | 277 |
2 files changed, 279 insertions, 0 deletions
diff --git a/cmd/orodyagzou/.gitignore b/cmd/orodyagzou/.gitignore new file mode 100644 index 0000000..c588cf3 --- /dev/null +++ b/cmd/orodyagzou/.gitignore @@ -0,0 +1,2 @@ +*.env +gpu_names_cache.json
\ No newline at end of file diff --git a/cmd/orodyagzou/main.go b/cmd/orodyagzou/main.go new file mode 100644 index 0000000..5267103 --- /dev/null +++ b/cmd/orodyagzou/main.go @@ -0,0 +1,277 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "log" + "log/slog" + "net/http" + "net/http/httputil" + "net/url" + "os" + "sync" + "time" + + "github.com/google/uuid" + "github.com/joho/godotenv" + "within.website/x/internal" + "within.website/x/web/vastai/vastaicli" +) + +var ( + bind = flag.String("bind", ":3238", "HTTP port to bind to") + diskSizeGB = flag.Int("vastai-disk-size-gb", 32, "amount of disk we need from vast.ai") + dockerImage = flag.String("docker-image", "reg.xeiaso.net/runner/sdxl-tigris:latest", "docker image to start") + onstartCmd = flag.String("onstart-cmd", "python -m cog.server.http", "onstart command to run in vast.ai") + vastaiPort = flag.Int("vastai-port", 5000, "port that the guest will use in vast.ai") + vastaiFilters = flag.String("vastai-filters", "verified=False cuda_max_good>=12.1 gpu_ram>=12 num_gpus=1 inet_down>=850", "vast.ai search filters") +) + +func main() { + internal.HandleStartup() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if flag.NArg() != 1 { + fmt.Println("usage: orodyagzou [flags] <whatever.env>") + os.Exit(2) + } + + fname := flag.Arg(0) + slog.Debug("using env file", "fname", fname) + + env, err := godotenv.Read(fname) + if err != nil { + slog.Error("can't read env file", "fname", fname, "err", err) + os.Exit(1) + } + + var cfg vastaicli.InstanceConfig + + cfg.DiskSizeGB = *diskSizeGB + cfg.Environment = env + cfg.DockerImage = *dockerImage + cfg.OnStartCMD = *onstartCmd + cfg.Ports = append(cfg.Ports, *vastaiPort) + + images := &ScaleToZeroProxy{ + cfg: cfg, + } + + go images.slayLoop(ctx) + + mux := http.NewServeMux() + mux.Handle("/v1/images", images) + + fmt.Printf("http://localhost%s\n", *bind) + log.Fatal(http.ListenAndServe(*bind, mux)) +} + +type ScaleToZeroProxy struct { + cfg vastaicli.InstanceConfig + + // locked fields + lock sync.RWMutex + endpointURL string + instanceID int + ready bool + lastUsed time.Time +} + +func (s *ScaleToZeroProxy) slayLoop(ctx context.Context) { + t := time.NewTicker(time.Minute) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + slog.Error("context canceled", "err", ctx.Err()) + return + case <-t.C: + s.lock.RLock() + ready := s.ready + lastUsed := s.lastUsed + s.lock.RUnlock() + + if !ready { + continue + } + + if lastUsed.Add(5 * time.Minute).Before(time.Now()) { + if err := s.slay(ctx); err != nil { + slog.Error("can't slay instance", "err", err) + } + } + } + } +} + +func (s *ScaleToZeroProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.lock.RLock() + ready := s.ready + s.lock.RUnlock() + + if !ready { + if err := s.mint(r.Context()); err != nil { + slog.Error("can't mint", "err", err) + http.Error(w, "can't mint", http.StatusInternalServerError) + return + } + } + + s.lock.RLock() + endpointURL := s.endpointURL + s.lock.RUnlock() + + u, err := url.Parse(endpointURL) + if err != nil { + slog.Error("can't url parse", "err", err, "url", s.endpointURL) + http.Error(w, "can't url parse", http.StatusInternalServerError) + return + } + + next := httputil.NewSingleHostReverseProxy(u) + od := next.Director + next.Director = func(r *http.Request) { + od(r) + r.URL.Path = "/predictions/" + uuid.NewString() + } + next.ServeHTTP(w, r) + + s.lock.Lock() + s.lastUsed = time.Now() + s.lock.Unlock() +} + +func (s *ScaleToZeroProxy) mint(ctx context.Context) error { + s.lock.Lock() + defer s.lock.Unlock() + + candidates, err := vastaicli.Search(ctx, *vastaiFilters, "dph+") + if err != nil { + return err + } + + candidate := candidates[0] + slog.Info("found instance", "costDPH", candidate.DphTotal, "gpuName", candidate.GpuName) + + instanceData, err := vastaicli.Mint(ctx, candidate.AskContractID, s.cfg) + if err != nil { + return err + } + + slog.Info("created instance, waiting for things to settle", "id", instanceData.NewContract) + + instance, err := s.delayUntilRunning(ctx, instanceData.NewContract) + if err != nil { + return err + } + + addr, ok := instance.AddrFor(s.cfg.Ports[0]) + if !ok { + return fmt.Errorf("somehow can't get port %d for instance %d, is god dead?", s.cfg.Ports[0], instance.ID) + } + + s.endpointURL = "http://" + addr + "/" + s.ready = true + s.instanceID = instance.ID + s.lastUsed = time.Now().Add(5 * time.Minute) + + if err := s.delayUntilReady(ctx, s.endpointURL); err != nil { + return fmt.Errorf("can't do healthcheck: %w", err) + } + + slog.Info("ready", "endpointURL", s.endpointURL, "instanceID", s.instanceID) + + return nil +} + +func (s *ScaleToZeroProxy) slay(ctx context.Context) error { + s.lock.Lock() + defer s.lock.Unlock() + + if err := vastaicli.Slay(ctx, s.instanceID); err != nil { + return err + } + + s.endpointURL = "" + s.ready = false + s.lastUsed = time.Now() + s.instanceID = 0 + + slog.Info("instance slayed", "docker_image", s.cfg.DockerImage) + + return nil +} + +func (s *ScaleToZeroProxy) delayUntilReady(ctx context.Context, endpointURL string) error { + type cogHealthCheck struct { + Status string `json:"status"` + } + + u, err := url.Parse(endpointURL) + if err != nil { + return fmt.Errorf("[unexpected] can't parse endpoint url %q: %w", endpointURL, err) + } + + u.Path = "/health-check" + + t := time.NewTicker(time.Second) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + resp, err := http.Get(u.String()) + if err != nil { + return fmt.Errorf("can't fetch health check: %w", err) + } + + var status cogHealthCheck + if err := json.NewDecoder(resp.Body).Decode(&status); err != nil { + return fmt.Errorf("can't parse health check response: %w", err) + } + + if status.Status == "READY" { + slog.Info("health check passed") + return nil + } + } + } +} + +func (s *ScaleToZeroProxy) delayUntilRunning(ctx context.Context, instanceID int) (*vastaicli.Instance, error) { + t := time.NewTicker(10 * time.Second) + defer t.Stop() + + var instance *vastaicli.Instance + var err error + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-t.C: + instance, err = vastaicli.GetInstance(ctx, instanceID) + if err != nil { + return nil, err + } + + slog.Debug("instance is cooking", "curr", instance.ActualStatus, "next", instance.NextState, "status", instance.StatusMsg) + + if instance.ActualStatus == "running" { + _, ok := instance.AddrFor(s.cfg.Ports[0]) + if !ok { + slog.Info("no addr", "ports", s.cfg.Ports) + continue + } + + return instance, nil + } + } + } +} |
