aboutsummaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorXe Iaso <me@xeiaso.net>2024-10-18 11:38:51 -0400
committerXe Iaso <me@xeiaso.net>2024-10-18 11:39:31 -0400
commitb946025fdc97e77d03c1f54f54cbdf1eb33bae4b (patch)
treecd32b9812832513a67a2d9b15cb53aa8114b003d /cmd
parent695237e6b216b7d36a0377a6accba8610eda6321 (diff)
downloadx-b946025fdc97e77d03c1f54f54cbdf1eb33bae4b.tar.xz
x-b946025fdc97e77d03c1f54f54cbdf1eb33bae4b.zip
cmd/mi: basic twitch events client
Signed-off-by: Xe Iaso <me@xeiaso.net>
Diffstat (limited to 'cmd')
-rw-r--r--cmd/mi/main.go13
-rw-r--r--cmd/mi/services/twitchevents/twitchevents.go305
2 files changed, 318 insertions, 0 deletions
diff --git a/cmd/mi/main.go b/cmd/mi/main.go
index 88800ea..e5fa101 100644
--- a/cmd/mi/main.go
+++ b/cmd/mi/main.go
@@ -19,6 +19,7 @@ import (
"within.website/x/cmd/mi/services/importer"
"within.website/x/cmd/mi/services/posse"
"within.website/x/cmd/mi/services/switchtracker"
+ "within.website/x/cmd/mi/services/twitchevents"
"within.website/x/internal"
pb "within.website/x/proto/mi"
"within.website/x/proto/mimi/announce"
@@ -84,6 +85,17 @@ func main() {
os.Exit(1)
}
+ te, err := twitchevents.New(ctx, dao, twitchevents.Config{
+ BlueskyAuthkey: *blueskyAuthkey,
+ BlueskyHandle: *blueskyHandle,
+ BlueskyPDS: *blueskyPDS,
+ MimiAnnounceURL: *mimiAnnounceURL,
+ })
+ if err != nil {
+ slog.Error("failed to create twitch events", "err", err)
+ os.Exit(1)
+ }
+
st := switchtracker.New(dao)
es := events.New(dao, *flyghtTrackerURL)
@@ -97,6 +109,7 @@ func main() {
mux.Handle(pb.SwitchTrackerPathPrefix, pb.NewSwitchTrackerServer(st))
mux.Handle(pb.EventsPathPrefix, pb.NewEventsServer(es))
mux.Handle("/front", homefrontshim.New(dao))
+ mux.Handle("/twitch", te)
i := importer.New(dao)
i.Mount(http.DefaultServeMux)
diff --git a/cmd/mi/services/twitchevents/twitchevents.go b/cmd/mi/services/twitchevents/twitchevents.go
new file mode 100644
index 0000000..1b87504
--- /dev/null
+++ b/cmd/mi/services/twitchevents/twitchevents.go
@@ -0,0 +1,305 @@
+package twitchevents
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "flag"
+ "fmt"
+ "io"
+ "log/slog"
+ "net/http"
+ "net/url"
+ "strconv"
+ "strings"
+ "time"
+
+ bsky "github.com/danrusei/gobot-bsky"
+ "github.com/nicklaw5/helix/v2"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+ "within.website/x/cmd/mi/models"
+ "within.website/x/proto/mimi/announce"
+)
+
+var (
+ twitchEventsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
+ Name: "mi_twitch_events_count",
+ Help: "Number of twitch events ever processed",
+ }, []string{"messageType"})
+
+ twitchEventsErrors = promauto.NewGaugeVec(prometheus.GaugeOpts{
+ Name: "mi_twitch_events_errors",
+ Help: "Number of twitch events that caused errors",
+ }, []string{"messageType"})
+
+ twitchClientID = flag.String("twitch-client-id", "", "twitch.tv client ID")
+ twitchClientSecret = flag.String("twitch-client-secret", "", "twitch.tv client secret")
+ twitchUserID = flag.Int("twitch-user-id", 105794391, "twitch.tv user ID")
+ twitchWebhookSecret = flag.String("twitch-webhook-secret", "", "twitch.tv webhook secret")
+ twitchWebhookURL = flag.String("twitch-events-webhook-url", "https://shiroko-wsl.shark-harmonic.ts.net/twitch", "URL for Twitch events to be pushed to")
+)
+
+type Config struct {
+ BlueskyAuthkey string
+ BlueskyHandle string
+ BlueskyPDS string
+ MimiAnnounceURL string
+}
+
+func (c Config) BlueskyAgent(ctx context.Context) (*bsky.BskyAgent, error) {
+ bluesky := bsky.NewAgent(ctx, c.BlueskyPDS, c.BlueskyHandle, c.BlueskyAuthkey)
+ if err := bluesky.Connect(ctx); err != nil {
+ slog.Error("failed to connect to bluesky", "err", err)
+ return nil, err
+ }
+
+ if err := bluesky.Connect(ctx); err != nil {
+ slog.Error("failed to connect to bluesky", "err", err)
+ return nil, err
+ }
+
+ return &bluesky, nil
+}
+
+type Server struct {
+ dao *models.DAO
+ mimi announce.Announce
+ cfg Config
+ twitch *helix.Client
+}
+
+func New(ctx context.Context, dao *models.DAO, cfg Config) (*Server, error) {
+ twitch, err := helix.NewClient(&helix.Options{
+ ClientID: *twitchClientID,
+ ClientSecret: *twitchClientSecret,
+ })
+
+ if err != nil {
+ slog.Error("can't create twitch client", "err", err)
+ return nil, err
+ }
+
+ resp, err := twitch.RequestAppAccessToken([]string{"user:read:email", "user:read:broadcast"})
+ if err != nil {
+ slog.Error("can't request app access token", "err", err)
+ return nil, err
+ }
+
+ twitch.SetAppAccessToken(resp.Data.AccessToken)
+
+ s := &Server{
+ dao: dao,
+ mimi: announce.NewAnnounceProtobufClient(cfg.MimiAnnounceURL, &http.Client{}),
+ cfg: cfg,
+ twitch: twitch,
+ }
+
+ if err := s.maybeCreateWebhookSubscription(); err != nil {
+ slog.Error("cant' create subscription", "err", err)
+ }
+
+ return s, nil
+}
+
+const sixteenMegs = 16777216
+
+func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ var body = http.MaxBytesReader(w, r.Body, sixteenMegs)
+ defer body.Close()
+
+ data, err := io.ReadAll(body)
+ if err != nil {
+ slog.Error("can't read from body", "err", err)
+ http.Error(w, "can't read", http.StatusBadRequest)
+ return
+ }
+
+ if !helix.VerifyEventSubNotification(*twitchWebhookSecret, r.Header, string(data)) {
+ slog.Error("can't verify event", "err", "invalid secret")
+ http.Error(w, "no auth", http.StatusUnauthorized)
+ return
+ }
+
+ messageType := r.Header.Get("Twitch-Eventsub-Message-Type")
+ twitchEventsCount.WithLabelValues(messageType).Inc()
+
+ lg := slog.With("message_type", messageType)
+ lg.Debug("got message")
+
+ body = io.NopCloser(bytes.NewBuffer(data))
+
+ switch messageType {
+ case "webhook_callback_verification":
+ err = s.handleWebhookVerification(w, body)
+ case "revocation":
+ go func() {
+ time.Sleep(5 * time.Minute)
+ if err := s.maybeCreateWebhookSubscription(); err != nil {
+ slog.Error("can't create new webhook subscription after the current one was revoked", "err", err)
+ }
+ }()
+ case "notification":
+ err = s.handleNotification(r.Context(), lg, w, data)
+ default:
+ lg.Error("unknown event", "type", messageType, "body", json.RawMessage(data))
+ http.Error(w, "unknown event", http.StatusOK)
+ }
+
+ if err != nil {
+ twitchEventsCount.WithLabelValues(messageType).Inc()
+ lg.Error("can't handle message", "err", err)
+ http.Error(w, "can't deal with this input", http.StatusInternalServerError)
+ return
+ }
+}
+
+func (s *Server) maybeCreateWebhookSubscription() error {
+ subs, err := s.twitch.GetEventSubSubscriptions(&helix.EventSubSubscriptionsParams{
+ Status: "enabled",
+ })
+ if err != nil {
+ return fmt.Errorf("can't get eventsub subscriptions: %w", err)
+ }
+
+ found := false
+ for _, sub := range subs.Data.EventSubSubscriptions {
+ if sub.Transport.Callback == *twitchWebhookURL {
+ slog.Info("no need to resubscribe, webhook URL was found")
+ found = true
+ break
+ }
+ }
+
+ if found {
+ return nil
+ }
+
+ if _, err := s.twitch.CreateEventSubSubscription(&helix.EventSubSubscription{
+ Type: "stream.online",
+ Version: "1",
+ Condition: helix.EventSubCondition{
+ BroadcasterUserID: strconv.Itoa(*twitchUserID),
+ },
+ Transport: helix.EventSubTransport{
+ Method: "webhook",
+ Callback: *twitchWebhookURL,
+ Secret: *twitchWebhookSecret,
+ },
+ }); err != nil {
+ return fmt.Errorf("can't create subscription: %w", err)
+ }
+ slog.Info("created webhook subscription")
+
+ return nil
+}
+
+func (s *Server) handleWebhookVerification(w http.ResponseWriter, body io.Reader) error {
+ var data struct {
+ Challenge string `json:"challenge"`
+ }
+
+ if err := json.NewDecoder(body).Decode(&data); err != nil {
+ return fmt.Errorf("can't decode challenge: %w", err)
+ }
+
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprint(w, data.Challenge)
+
+ return nil
+}
+
+func (s *Server) handleNotification(ctx context.Context, lg *slog.Logger, w http.ResponseWriter, bodyData []byte) error {
+ var data Event
+
+ if err := json.NewDecoder(bytes.NewBuffer(bodyData)).Decode(&data); err != nil {
+ return fmt.Errorf("can't decode notification: %w", err)
+ }
+
+ var err error
+ switch data.Subscription.Type {
+ case "stream.online":
+ var ev helix.EventSubStreamOnlineEvent
+ if err := json.Unmarshal(data.Event, &ev); err != nil {
+ return fmt.Errorf("can't unmarshal event: %w", err)
+ }
+ err = s.handleStreamUp(ctx, lg, w, &ev)
+ default:
+ lg.Error("unknown event", "event", data.Subscription.Type, "data", data.Event)
+ }
+
+ if err != nil {
+ lg.Error("can't handle message", "err", err)
+ http.Error(w, "can't deal with this event", http.StatusInternalServerError)
+ return nil
+ }
+
+ return nil
+}
+
+func (s *Server) handleStreamUp(ctx context.Context, lg *slog.Logger, w http.ResponseWriter, ev *helix.EventSubStreamOnlineEvent) error {
+ lg.Info("broadcaster went online!", "username", ev.BroadcasterUserLogin)
+
+ bs, err := s.cfg.BlueskyAgent(ctx)
+ if err != nil {
+ return fmt.Errorf("can't create bluesky agent: %w", err)
+ }
+
+ u, err := url.Parse("https://twitch.tv/" + ev.BroadcasterUserLogin)
+ if err != nil {
+ return fmt.Errorf("[unexpected] can't create twitch URL: %w", err)
+ }
+
+ q := u.Query()
+ q.Set("utm_campaign", "mi_irl")
+ q.Set("utm_medium", "social")
+ q.Set("utm_source", "bluesky")
+ u.RawQuery = q.Encode()
+
+ var sb strings.Builder
+ fmt.Fprintln(&sb, "Xe is live on stream!")
+ fmt.Fprintln(&sb)
+ fmt.Fprint(&sb, u.String())
+
+ post, err := bsky.NewPostBuilder(sb.String()).
+ WithExternalLink("twitch.tv - princessxen", *u, "Xe on Twitch!").
+ WithFacet(bsky.Facet_Link, u.String(), u.String()).
+ Build()
+ if err != nil {
+ return fmt.Errorf("can't build post: %w", err)
+ }
+
+ cid, uri, err := bs.PostToFeed(ctx, post)
+ if err != nil {
+ return fmt.Errorf("can't post to feed: %w", err)
+ }
+
+ lg.Info("posted to bluesky", "bluesky_cid", cid, "bluesky_uri", uri, "body", sb.String())
+
+ return nil
+}
+
+type Event struct {
+ Subscription Subscription `json:"subscription"`
+ Event json.RawMessage `json:"event"`
+}
+
+type Subscription struct {
+ ID string `json:"id"`
+ Status string `json:"status"`
+ Type string `json:"type"`
+ Version string `json:"version"`
+ Condition Condition `json:"condition"`
+ Transport Transport `json:"transport"`
+ CreatedAt time.Time `json:"created_at"`
+ Cost int `json:"cost"`
+}
+
+type Condition struct {
+ BroadcasterUserID string `json:"broadcaster_user_id"`
+}
+
+type Transport struct {
+ Method string `json:"method"`
+ Callback string `json:"callback"`
+}