aboutsummaryrefslogtreecommitdiff
path: root/cmd/orodyagzou
diff options
context:
space:
mode:
authorXe Iaso <me@xeiaso.net>2024-11-08 15:51:27 -0500
committerXe Iaso <me@xeiaso.net>2024-11-08 15:51:27 -0500
commit3756e97b6aeee2e9f8c9e4c1fd6f65ebba6f54aa (patch)
tree7d54f3e533b271deaa4e2c1685a787ac0c710590 /cmd/orodyagzou
parenta731950f5c80c6b19c8b92f85600d38f6dc9ae63 (diff)
downloadx-3756e97b6aeee2e9f8c9e4c1fd6f65ebba6f54aa.tar.xz
x-3756e97b6aeee2e9f8c9e4c1fd6f65ebba6f54aa.zip
cmd/orodyagzou: initial implementation
Signed-off-by: Xe Iaso <me@xeiaso.net>
Diffstat (limited to 'cmd/orodyagzou')
-rw-r--r--cmd/orodyagzou/.gitignore2
-rw-r--r--cmd/orodyagzou/main.go277
2 files changed, 279 insertions, 0 deletions
diff --git a/cmd/orodyagzou/.gitignore b/cmd/orodyagzou/.gitignore
new file mode 100644
index 0000000..c588cf3
--- /dev/null
+++ b/cmd/orodyagzou/.gitignore
@@ -0,0 +1,2 @@
+*.env
+gpu_names_cache.json \ No newline at end of file
diff --git a/cmd/orodyagzou/main.go b/cmd/orodyagzou/main.go
new file mode 100644
index 0000000..5267103
--- /dev/null
+++ b/cmd/orodyagzou/main.go
@@ -0,0 +1,277 @@
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "flag"
+ "fmt"
+ "log"
+ "log/slog"
+ "net/http"
+ "net/http/httputil"
+ "net/url"
+ "os"
+ "sync"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/joho/godotenv"
+ "within.website/x/internal"
+ "within.website/x/web/vastai/vastaicli"
+)
+
+var (
+ bind = flag.String("bind", ":3238", "HTTP port to bind to")
+ diskSizeGB = flag.Int("vastai-disk-size-gb", 32, "amount of disk we need from vast.ai")
+ dockerImage = flag.String("docker-image", "reg.xeiaso.net/runner/sdxl-tigris:latest", "docker image to start")
+ onstartCmd = flag.String("onstart-cmd", "python -m cog.server.http", "onstart command to run in vast.ai")
+ vastaiPort = flag.Int("vastai-port", 5000, "port that the guest will use in vast.ai")
+ vastaiFilters = flag.String("vastai-filters", "verified=False cuda_max_good>=12.1 gpu_ram>=12 num_gpus=1 inet_down>=850", "vast.ai search filters")
+)
+
+func main() {
+ internal.HandleStartup()
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ if flag.NArg() != 1 {
+ fmt.Println("usage: orodyagzou [flags] <whatever.env>")
+ os.Exit(2)
+ }
+
+ fname := flag.Arg(0)
+ slog.Debug("using env file", "fname", fname)
+
+ env, err := godotenv.Read(fname)
+ if err != nil {
+ slog.Error("can't read env file", "fname", fname, "err", err)
+ os.Exit(1)
+ }
+
+ var cfg vastaicli.InstanceConfig
+
+ cfg.DiskSizeGB = *diskSizeGB
+ cfg.Environment = env
+ cfg.DockerImage = *dockerImage
+ cfg.OnStartCMD = *onstartCmd
+ cfg.Ports = append(cfg.Ports, *vastaiPort)
+
+ images := &ScaleToZeroProxy{
+ cfg: cfg,
+ }
+
+ go images.slayLoop(ctx)
+
+ mux := http.NewServeMux()
+ mux.Handle("/v1/images", images)
+
+ fmt.Printf("http://localhost%s\n", *bind)
+ log.Fatal(http.ListenAndServe(*bind, mux))
+}
+
+type ScaleToZeroProxy struct {
+ cfg vastaicli.InstanceConfig
+
+ // locked fields
+ lock sync.RWMutex
+ endpointURL string
+ instanceID int
+ ready bool
+ lastUsed time.Time
+}
+
+func (s *ScaleToZeroProxy) slayLoop(ctx context.Context) {
+ t := time.NewTicker(time.Minute)
+ defer t.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ slog.Error("context canceled", "err", ctx.Err())
+ return
+ case <-t.C:
+ s.lock.RLock()
+ ready := s.ready
+ lastUsed := s.lastUsed
+ s.lock.RUnlock()
+
+ if !ready {
+ continue
+ }
+
+ if lastUsed.Add(5 * time.Minute).Before(time.Now()) {
+ if err := s.slay(ctx); err != nil {
+ slog.Error("can't slay instance", "err", err)
+ }
+ }
+ }
+ }
+}
+
+func (s *ScaleToZeroProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ s.lock.RLock()
+ ready := s.ready
+ s.lock.RUnlock()
+
+ if !ready {
+ if err := s.mint(r.Context()); err != nil {
+ slog.Error("can't mint", "err", err)
+ http.Error(w, "can't mint", http.StatusInternalServerError)
+ return
+ }
+ }
+
+ s.lock.RLock()
+ endpointURL := s.endpointURL
+ s.lock.RUnlock()
+
+ u, err := url.Parse(endpointURL)
+ if err != nil {
+ slog.Error("can't url parse", "err", err, "url", s.endpointURL)
+ http.Error(w, "can't url parse", http.StatusInternalServerError)
+ return
+ }
+
+ next := httputil.NewSingleHostReverseProxy(u)
+ od := next.Director
+ next.Director = func(r *http.Request) {
+ od(r)
+ r.URL.Path = "/predictions/" + uuid.NewString()
+ }
+ next.ServeHTTP(w, r)
+
+ s.lock.Lock()
+ s.lastUsed = time.Now()
+ s.lock.Unlock()
+}
+
+func (s *ScaleToZeroProxy) mint(ctx context.Context) error {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ candidates, err := vastaicli.Search(ctx, *vastaiFilters, "dph+")
+ if err != nil {
+ return err
+ }
+
+ candidate := candidates[0]
+ slog.Info("found instance", "costDPH", candidate.DphTotal, "gpuName", candidate.GpuName)
+
+ instanceData, err := vastaicli.Mint(ctx, candidate.AskContractID, s.cfg)
+ if err != nil {
+ return err
+ }
+
+ slog.Info("created instance, waiting for things to settle", "id", instanceData.NewContract)
+
+ instance, err := s.delayUntilRunning(ctx, instanceData.NewContract)
+ if err != nil {
+ return err
+ }
+
+ addr, ok := instance.AddrFor(s.cfg.Ports[0])
+ if !ok {
+ return fmt.Errorf("somehow can't get port %d for instance %d, is god dead?", s.cfg.Ports[0], instance.ID)
+ }
+
+ s.endpointURL = "http://" + addr + "/"
+ s.ready = true
+ s.instanceID = instance.ID
+ s.lastUsed = time.Now().Add(5 * time.Minute)
+
+ if err := s.delayUntilReady(ctx, s.endpointURL); err != nil {
+ return fmt.Errorf("can't do healthcheck: %w", err)
+ }
+
+ slog.Info("ready", "endpointURL", s.endpointURL, "instanceID", s.instanceID)
+
+ return nil
+}
+
+func (s *ScaleToZeroProxy) slay(ctx context.Context) error {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ if err := vastaicli.Slay(ctx, s.instanceID); err != nil {
+ return err
+ }
+
+ s.endpointURL = ""
+ s.ready = false
+ s.lastUsed = time.Now()
+ s.instanceID = 0
+
+ slog.Info("instance slayed", "docker_image", s.cfg.DockerImage)
+
+ return nil
+}
+
+func (s *ScaleToZeroProxy) delayUntilReady(ctx context.Context, endpointURL string) error {
+ type cogHealthCheck struct {
+ Status string `json:"status"`
+ }
+
+ u, err := url.Parse(endpointURL)
+ if err != nil {
+ return fmt.Errorf("[unexpected] can't parse endpoint url %q: %w", endpointURL, err)
+ }
+
+ u.Path = "/health-check"
+
+ t := time.NewTicker(time.Second)
+ defer t.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-t.C:
+ resp, err := http.Get(u.String())
+ if err != nil {
+ return fmt.Errorf("can't fetch health check: %w", err)
+ }
+
+ var status cogHealthCheck
+ if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
+ return fmt.Errorf("can't parse health check response: %w", err)
+ }
+
+ if status.Status == "READY" {
+ slog.Info("health check passed")
+ return nil
+ }
+ }
+ }
+}
+
+func (s *ScaleToZeroProxy) delayUntilRunning(ctx context.Context, instanceID int) (*vastaicli.Instance, error) {
+ t := time.NewTicker(10 * time.Second)
+ defer t.Stop()
+
+ var instance *vastaicli.Instance
+ var err error
+
+ for {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case <-t.C:
+ instance, err = vastaicli.GetInstance(ctx, instanceID)
+ if err != nil {
+ return nil, err
+ }
+
+ slog.Debug("instance is cooking", "curr", instance.ActualStatus, "next", instance.NextState, "status", instance.StatusMsg)
+
+ if instance.ActualStatus == "running" {
+ _, ok := instance.AddrFor(s.cfg.Ports[0])
+ if !ok {
+ slog.Info("no addr", "ports", s.cfg.Ports)
+ continue
+ }
+
+ return instance, nil
+ }
+ }
+ }
+}