diff options
| author | Xe Iaso <me@xeiaso.net> | 2024-10-25 14:06:42 -0400 |
|---|---|---|
| committer | Xe Iaso <me@xeiaso.net> | 2024-10-25 14:06:42 -0400 |
| commit | afa4bc6c01297af78885bf0562e2dae7ff83605b (patch) | |
| tree | 97a1149d5646cf9b1c7aa2892d6e849c589219cc /cmd/amano | |
| parent | 797eec6d94e193ae684db977179ea4a422b2499f (diff) | |
| download | x-afa4bc6c01297af78885bf0562e2dae7ff83605b.tar.xz x-afa4bc6c01297af78885bf0562e2dae7ff83605b.zip | |
cmd: add amano and stealthmountain
Signed-off-by: Xe Iaso <me@xeiaso.net>
Diffstat (limited to 'cmd/amano')
| -rw-r--r-- | cmd/amano/main.go | 112 |
1 files changed, 112 insertions, 0 deletions
diff --git a/cmd/amano/main.go b/cmd/amano/main.go new file mode 100644 index 0000000..98ec545 --- /dev/null +++ b/cmd/amano/main.go @@ -0,0 +1,112 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log/slog" + "os" + "sync" + "time" + + jetstreamClient "github.com/bluesky-social/jetstream/pkg/client" + "github.com/bluesky-social/jetstream/pkg/client/schedulers/parallel" + jsModels "github.com/bluesky-social/jetstream/pkg/models" + "github.com/goccy/go-json" + "github.com/nats-io/nats.go" + "within.website/x/internal" +) + +var ( + jetStreamURL = flag.String("jetstream-url", "wss://jetstream2.us-east.bsky.network/subscribe", "Jetstream server to subscribe to") + natsURL = flag.String("nats-url", "nats://localhost:4222", "nats url") +) + +func main() { + internal.HandleStartup() + + slog.Info("starting up", + "jetstream-url", jetStreamURL, + "nats-url", natsURL, + ) + + nc, err := nats.Connect(*natsURL) + if err != nil { + slog.Error("can't connect to NATS", "err", err) + os.Exit(1) + } + defer nc.Close() + slog.Info("connected to NATS") + + jsCfg := jetstreamClient.DefaultClientConfig() + jsCfg.WebsocketURL = *jetStreamURL + + jscli, err := jetstreamClient.NewClient( + jsCfg, + slog.With("aspect", "jetstream"), + parallel.NewScheduler( + 1, + "amano", + slog.With("aspect", "fan-out"), + handleEvent(nc), + ), + ) + if err != nil { + slog.Error("can't set up jetstream client", "err", err) + os.Exit(1) + } + defer jscli.Scheduler.Shutdown() + + now := time.Now().UnixNano() + if err := jscli.ConnectAndRead(context.Background(), &now); err != nil { + slog.Error("can't connect to jetstream", "err", err) + os.Exit(1) + } + + slog.Info("connected to jetstream") + + mu := sync.Mutex{} + mu.Lock() + mu.Lock() +} + +func handleEvent(nc *nats.Conn) func(ctx context.Context, ev *jsModels.Event) error { + return func(ctx context.Context, ev *jsModels.Event) error { + switch ev.Kind { + case "account": + data, err := json.Marshal(ev.Account) + if err != nil { + return fmt.Errorf("can't marshal account event: %w", err) + } + + if err := nc.Publish("amano:account", data); err != nil { + return fmt.Errorf("can't publish account event: %w", err) + } + case "identity": + data, err := json.Marshal(ev.Identity) + if err != nil { + return fmt.Errorf("can't marshal identity event: %w", err) + } + + if err := nc.Publish("amano:identity", data); err != nil { + return fmt.Errorf("can't publish identity event: %w", err) + } + case "commit": + subject := fmt.Sprintf("amano:commit:%s", ev.Commit.Collection) + data, err := json.Marshal(ev.Commit) + if err != nil { + return fmt.Errorf("can't marshal commit event: %w", err) + } + + m := nats.NewMsg(subject) + m.Data = data + m.Header.Set("bsky-actor-did", ev.Did) + + if err := nc.PublishMsg(m); err != nil { + return fmt.Errorf("can't publish %q event: %w", subject, err) + } + } + + return nil + } +} |
