aboutsummaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorXe Iaso <me@xeiaso.net>2023-07-07 14:10:21 -0400
committerXe Iaso <me@xeiaso.net>2023-07-07 14:10:21 -0400
commit0ce7fd3fb7d45b7c4c2de52099f78dbd7a00d1cc (patch)
tree915a3e7fb9faceeb7486716899f37ad01beebaf9 /cmd
parentf51b99d8b3b88a2aba3132d1ec1445e02dc48963 (diff)
downloadx-0ce7fd3fb7d45b7c4c2de52099f78dbd7a00d1cc.tar.xz
x-0ce7fd3fb7d45b7c4c2de52099f78dbd7a00d1cc.zip
cmd/marabot: fix problems I hope
Signed-off-by: Xe Iaso <me@xeiaso.net>
Diffstat (limited to 'cmd')
-rw-r--r--cmd/marabot/discord.go86
-rw-r--r--cmd/marabot/main.go33
-rw-r--r--cmd/marabot/revolt.go2
3 files changed, 57 insertions, 64 deletions
diff --git a/cmd/marabot/discord.go b/cmd/marabot/discord.go
index 001ef96..74db134 100644
--- a/cmd/marabot/discord.go
+++ b/cmd/marabot/discord.go
@@ -2,7 +2,6 @@ package main
import (
"context"
- "database/sql"
"fmt"
"path/filepath"
"time"
@@ -12,11 +11,12 @@ import (
"github.com/bwmarrin/discordgo"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
+ "github.com/jackc/pgx/v5/pgxpool"
"within.website/ln"
"within.website/ln/opname"
)
-func (mr *MaraRevolt) importDiscordData(ctx context.Context, db *sql.DB, dg *discordgo.Session) error {
+func (mr *MaraRevolt) importDiscordData(ctx context.Context, db *pgxpool.Pool, dg *discordgo.Session) error {
ctx = opname.With(ctx, "import-discord-data")
tx, err := mr.pg.Begin(ctx)
@@ -89,9 +89,6 @@ func (mr *MaraRevolt) importDiscordData(ctx context.Context, db *sql.DB, dg *dis
}
func (mr *MaraRevolt) DiscordMessageDelete(s *discordgo.Session, m *discordgo.MessageDelete) {
- mr.lock.Lock()
- defer mr.lock.Unlock()
-
ctx := opname.With(context.Background(), "marabot.discord-message-delete")
tx, err := mr.pg.Begin(ctx)
@@ -138,18 +135,12 @@ func (mr *MaraRevolt) DiscordMessageDelete(s *discordgo.Session, m *discordgo.Me
}
func (mr *MaraRevolt) DiscordMessageEdit(s *discordgo.Session, m *discordgo.MessageUpdate) {
- mr.lock.Lock()
- defer mr.lock.Unlock()
-
- if _, err := mr.pg.Exec(context.Background(), "UPDATE discord_messages SET content = ?, edited_at = ? WHERE id = ?", m.Content, time.Now().Format(time.RFC3339), m.ID); err != nil {
+ if _, err := mr.pg.Exec(context.Background(), "UPDATE discord_messages SET content = $1, edited_at = $2 WHERE id = $3", m.Content, time.Now().Format(time.RFC3339), m.ID); err != nil {
ln.Error(context.Background(), err)
}
}
func (mr *MaraRevolt) DiscordMessageCreate(s *discordgo.Session, m *discordgo.MessageCreate) {
- mr.lock.Lock()
- defer mr.lock.Unlock()
-
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = opname.With(ctx, "marabot.discordMessageCreate")
@@ -219,7 +210,7 @@ func (mr *MaraRevolt) DiscordReactionAdd(s *discordgo.Session, mra *discordgo.Me
func (mr *MaraRevolt) doesDiscordMessageExist(ctx context.Context, tx pgx.Tx, messageID string) (bool, error) {
var count int
- if err := tx.QueryRow(ctx, "SELECT COUNT(*) FROM discord_messages WHERE id = ?", messageID).Scan(&count); err != nil {
+ if err := tx.QueryRow(ctx, "SELECT COUNT(*) FROM discord_messages WHERE id = $1", messageID).Scan(&count); err != nil {
return false, err
}
@@ -236,19 +227,19 @@ func (mr *MaraRevolt) backfillDiscordChannel(s *discordgo.Session, channelID, me
ln.Log(ctx, ln.Action("archiving channel from message"), ln.F{"channel_id": channelID, "message_id": messageID})
- tx, err := mr.pg.Begin(ctx)
- if err != nil {
- ln.Error(ctx, err)
- return
- }
- defer tx.Rollback(ctx)
-
- t := time.NewTicker(30 * time.Second)
+ t := time.NewTicker(5 * time.Second)
defer t.Stop()
done := false
+outer:
for range t.C {
+ tx, err := mr.pg.Begin(ctx)
+ if err != nil {
+ ln.Error(ctx, err)
+ return
+ }
+
ln.Log(ctx, ln.Action("fetching batch of messages"), ln.F{"curr": curr})
msgs, err := s.ChannelMessages(channelID, 100, "", curr, "")
if err != nil {
@@ -256,38 +247,55 @@ func (mr *MaraRevolt) backfillDiscordChannel(s *discordgo.Session, channelID, me
s.ChannelMessageSend(channelID, fmt.Sprintf("error getting messages past %s: %v", curr, err))
break
}
+ ln.Log(ctx, ln.Action("fetching batch of messages"), ln.F{"num": len(msgs)})
+ if len(msgs) == 0 {
+ break
+ }
for _, msg := range msgs {
- found, err := mr.doesDiscordMessageExist(ctx, tx, msg.ID)
- if err != nil {
- ln.Error(ctx, err, ln.F{"message_id": msg.ID})
- continue
- }
-
- if found {
- ln.Log(ctx, ln.Action("stopping archival"))
- done = true
- }
-
+ // found, err := mr.doesDiscordMessageExist(ctx, tx, msg.ID)
+ // if err != nil {
+ // ln.Error(ctx, err, ln.F{"message_id": msg.ID})
+ // continue
+ // }
+
+ // if found {
+ // if !done {
+ // ln.Log(ctx, ln.Action("stopping archival"))
+ // }
+ // done = true
+ // }
+
+ before := time.Now()
if err := mr.discordMessageCreate(ctx, tx, s, msg); err != nil {
ln.Error(ctx, err, ln.F{"message_id": msg.ID})
- continue
+ tx.Rollback(ctx)
+ continue outer
}
+ dur := time.Since(before)
+ ln.Log(ctx, ln.F{"message_id": msg.ID, "created_at": msg.Timestamp, "archival_time": dur})
+ // found, err := mr.doesDiscordMessageExist(ctx, tx, msg.ID)
+ // if err != nil {
+ // ln.Error(ctx, err, ln.F{"message_id": msg.ID})
+ // continue
+ // }
curr = msg.ID
}
+ if err := tx.Commit(ctx); err != nil {
+ ln.Error(ctx, err, ln.F{
+ "channel_id": channelID,
+ "message_id": messageID,
+ })
+ }
+
if done {
break
}
- }
- if err := tx.Commit(ctx); err != nil {
- ln.Error(ctx, err, ln.F{
- "channel_id": channelID,
- "message_id": messageID,
- })
}
+
}
func (mr *MaraRevolt) discordMessageCreate(ctx context.Context, tx pgx.Tx, s *discordgo.Session, m *discordgo.Message) error {
diff --git a/cmd/marabot/main.go b/cmd/marabot/main.go
index 95c4c6b..0092955 100644
--- a/cmd/marabot/main.go
+++ b/cmd/marabot/main.go
@@ -4,8 +4,6 @@ import (
"bytes"
"context"
"crypto/sha512"
- "database/sql"
- "database/sql/driver"
_ "embed"
"flag"
"fmt"
@@ -24,7 +22,6 @@ import (
"github.com/bwmarrin/discordgo"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
- "github.com/tailscale/sqlite"
"tailscale.com/hostinfo"
"within.website/ln"
"within.website/ln/opname"
@@ -35,8 +32,6 @@ import (
)
var (
- dbFile = flag.String("db-file", "marabot.db", "Path to the database file")
-
pgURL = flag.String("database-url", "", "URL for the database (postgres)")
discordToken = flag.String("discord-token", "", "Discord bot token")
@@ -59,19 +54,6 @@ var (
dbSchema string
)
-func openDB(fname string) (*sql.DB, error) {
- db := sql.OpenDB(sqlite.Connector("file:"+fname, func(ctx context.Context, conn driver.ConnPrepareContext) error {
- return sqlite.ExecScript(conn.(sqlite.SQLConn), dbSchema)
- }, nil))
-
- err := db.Ping()
- if err != nil {
- return nil, err
- }
-
- return db, nil
-}
-
func main() {
internal.HandleStartup()
@@ -82,11 +64,13 @@ func main() {
ln.Log(ctx, ln.Action("starting up"))
- db, err := openDB(*dbFile)
+ pgcfg, err := pgxpool.ParseConfig(*pgURL)
if err != nil {
- ln.FatalErr(ctx, err, ln.Action("opening sqlite database"))
+ ln.FatalErr(ctx, err, ln.Action("parsing postgres config"))
}
- defer db.Close()
+
+ pgcfg.MinConns = 5
+ pgcfg.MaxConns = 20
pg, err := pgxpool.New(ctx, *pgURL)
if err != nil {
@@ -114,7 +98,6 @@ func main() {
mr := &MaraRevolt{
cli: client,
- db: db,
pg: pg,
ircmsgs: ircmsgs,
uploader: uploader,
@@ -148,7 +131,7 @@ func main() {
}
defer dg.Close()
- if err := mr.importDiscordData(ctx, db, dg); err != nil {
+ if err := mr.importDiscordData(ctx, pg, dg); err != nil {
ln.Error(ctx, err)
}
@@ -263,6 +246,10 @@ func (mr *MaraRevolt) archiveAttachment(ctx context.Context, tx pgx.Tx, link, ki
}
}
+ if att == nil {
+ return nil
+ }
+
att.MessageID = aws.String(messageID)
key := filepath.Join(att.Kind, att.ID)
diff --git a/cmd/marabot/revolt.go b/cmd/marabot/revolt.go
index a28f693..4fdfa88 100644
--- a/cmd/marabot/revolt.go
+++ b/cmd/marabot/revolt.go
@@ -2,7 +2,6 @@ package main
import (
"context"
- "database/sql"
"fmt"
"strings"
"sync"
@@ -19,7 +18,6 @@ import (
type MaraRevolt struct {
cli *revolt.Client
- db *sql.DB
pg *pgxpool.Pool
ircmsgs chan string
attachmentPreprocess *bundler.Bundler[[3]string]