aboutsummaryrefslogtreecommitdiff
path: root/web/mastodon/websocket.go
diff options
context:
space:
mode:
authorXe <me@christine.website>2022-11-21 20:57:30 -0500
committerXe <me@christine.website>2022-11-21 20:57:30 -0500
commitbec049cf7752382b2894c9526612686bbc15ed47 (patch)
tree684161f44ef70bb4a88924a8d0d40ec24d91d9d3 /web/mastodon/websocket.go
parenta1aa37164038beb230dea8ad250be9539b7647f8 (diff)
downloadx-bec049cf7752382b2894c9526612686bbc15ed47.tar.xz
x-bec049cf7752382b2894c9526612686bbc15ed47.zip
basic mastodon client implemented, will do more later
Signed-off-by: Xe <me@christine.website>
Diffstat (limited to 'web/mastodon/websocket.go')
-rw-r--r--web/mastodon/websocket.go104
1 files changed, 104 insertions, 0 deletions
diff --git a/web/mastodon/websocket.go b/web/mastodon/websocket.go
new file mode 100644
index 0000000..bf6b846
--- /dev/null
+++ b/web/mastodon/websocket.go
@@ -0,0 +1,104 @@
+package mastodon
+
+import (
+ "context"
+ "encoding/json"
+ "net/url"
+ "time"
+
+ "nhooyr.io/websocket"
+ "within.website/ln"
+ "within.website/ln/opname"
+)
+
+// WSSubscribeRequest is a websocket instruction to subscribe to a streaming feed.
+type WSSubscribeRequest struct {
+ Type string `json:"type"` // should be "subscribe" or "unsubscribe"
+ Stream string `json:"stream"`
+ Hashtag string `json:"hashtag,omitempty"`
+}
+
+// WSMessage is a websocket message. Whenever you get something from the streaming service, it will fit into this box.
+type WSMessage struct {
+ Stream []string `json:"stream"`
+ Event string `json:"event"`
+ Payload string `json:"payload"` // json string
+}
+
+// StreamMessages is a low-level message streaming facility.
+func (c *Client) StreamMessages(ctx context.Context, subreq ...WSSubscribeRequest) (chan WSMessage, error) {
+ result := make(chan WSMessage, 10)
+ ctx = opname.With(ctx, "websocket-streaming")
+
+ u, err := c.server.Parse("/api/vi/streaming/")
+ if err != nil {
+ return nil, err
+ }
+
+ q := u.Query()
+ q.Set("access_token", c.token)
+ u.RawQuery = q.Encode()
+
+ go func(ctx context.Context) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+
+ if err := doWebsocket(ctx, u, result, subreq); err != nil {
+ ln.Error(ctx, err, ln.Info("websocket error, retrying"))
+ }
+ time.Sleep(time.Minute)
+ }
+ }(ctx)
+
+ return result, nil
+}
+
+func doWebsocket(ctx context.Context, u *url.URL, result chan WSMessage, subreq []WSSubscribeRequest) error {
+ conn, _, err := websocket.Dial(ctx, u.String(), &websocket.DialOptions{})
+ if err != nil {
+ return err
+ }
+ defer conn.Close(websocket.StatusNormalClosure, "doWebsocket function returned")
+
+ for _, sub := range subreq {
+ data, err := json.Marshal(sub)
+ if err != nil {
+ return err
+ }
+ err = conn.Write(ctx, websocket.MessageText, data)
+ if err != nil {
+ return err
+ }
+ }
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+
+ default:
+ }
+
+ msgType, data, err := conn.Read(ctx)
+ if err != nil {
+ return err
+ }
+
+ if msgType != websocket.MessageText {
+ ln.Log(ctx, ln.Info("got non-text message from mastodon"))
+ continue
+ }
+
+ var msg WSMessage
+ err = json.Unmarshal(data, &msg)
+ if err != nil {
+ return err
+ }
+
+ result <- msg
+ }
+}