diff options
| author | Xe Iaso <me@xeiaso.net> | 2023-07-07 14:10:21 -0400 |
|---|---|---|
| committer | Xe Iaso <me@xeiaso.net> | 2023-07-07 14:10:21 -0400 |
| commit | 0ce7fd3fb7d45b7c4c2de52099f78dbd7a00d1cc (patch) | |
| tree | 915a3e7fb9faceeb7486716899f37ad01beebaf9 /cmd | |
| parent | f51b99d8b3b88a2aba3132d1ec1445e02dc48963 (diff) | |
| download | x-0ce7fd3fb7d45b7c4c2de52099f78dbd7a00d1cc.tar.xz x-0ce7fd3fb7d45b7c4c2de52099f78dbd7a00d1cc.zip | |
cmd/marabot: fix problems I hope
Signed-off-by: Xe Iaso <me@xeiaso.net>
Diffstat (limited to 'cmd')
| -rw-r--r-- | cmd/marabot/discord.go | 86 | ||||
| -rw-r--r-- | cmd/marabot/main.go | 33 | ||||
| -rw-r--r-- | cmd/marabot/revolt.go | 2 |
3 files changed, 57 insertions, 64 deletions
diff --git a/cmd/marabot/discord.go b/cmd/marabot/discord.go index 001ef96..74db134 100644 --- a/cmd/marabot/discord.go +++ b/cmd/marabot/discord.go @@ -2,7 +2,6 @@ package main import ( "context" - "database/sql" "fmt" "path/filepath" "time" @@ -12,11 +11,12 @@ import ( "github.com/bwmarrin/discordgo" "github.com/google/uuid" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" "within.website/ln" "within.website/ln/opname" ) -func (mr *MaraRevolt) importDiscordData(ctx context.Context, db *sql.DB, dg *discordgo.Session) error { +func (mr *MaraRevolt) importDiscordData(ctx context.Context, db *pgxpool.Pool, dg *discordgo.Session) error { ctx = opname.With(ctx, "import-discord-data") tx, err := mr.pg.Begin(ctx) @@ -89,9 +89,6 @@ func (mr *MaraRevolt) importDiscordData(ctx context.Context, db *sql.DB, dg *dis } func (mr *MaraRevolt) DiscordMessageDelete(s *discordgo.Session, m *discordgo.MessageDelete) { - mr.lock.Lock() - defer mr.lock.Unlock() - ctx := opname.With(context.Background(), "marabot.discord-message-delete") tx, err := mr.pg.Begin(ctx) @@ -138,18 +135,12 @@ func (mr *MaraRevolt) DiscordMessageDelete(s *discordgo.Session, m *discordgo.Me } func (mr *MaraRevolt) DiscordMessageEdit(s *discordgo.Session, m *discordgo.MessageUpdate) { - mr.lock.Lock() - defer mr.lock.Unlock() - - if _, err := mr.pg.Exec(context.Background(), "UPDATE discord_messages SET content = ?, edited_at = ? WHERE id = ?", m.Content, time.Now().Format(time.RFC3339), m.ID); err != nil { + if _, err := mr.pg.Exec(context.Background(), "UPDATE discord_messages SET content = $1, edited_at = $2 WHERE id = $3", m.Content, time.Now().Format(time.RFC3339), m.ID); err != nil { ln.Error(context.Background(), err) } } func (mr *MaraRevolt) DiscordMessageCreate(s *discordgo.Session, m *discordgo.MessageCreate) { - mr.lock.Lock() - defer mr.lock.Unlock() - ctx, cancel := context.WithCancel(context.Background()) defer cancel() ctx = opname.With(ctx, "marabot.discordMessageCreate") @@ -219,7 +210,7 @@ func (mr *MaraRevolt) DiscordReactionAdd(s *discordgo.Session, mra *discordgo.Me func (mr *MaraRevolt) doesDiscordMessageExist(ctx context.Context, tx pgx.Tx, messageID string) (bool, error) { var count int - if err := tx.QueryRow(ctx, "SELECT COUNT(*) FROM discord_messages WHERE id = ?", messageID).Scan(&count); err != nil { + if err := tx.QueryRow(ctx, "SELECT COUNT(*) FROM discord_messages WHERE id = $1", messageID).Scan(&count); err != nil { return false, err } @@ -236,19 +227,19 @@ func (mr *MaraRevolt) backfillDiscordChannel(s *discordgo.Session, channelID, me ln.Log(ctx, ln.Action("archiving channel from message"), ln.F{"channel_id": channelID, "message_id": messageID}) - tx, err := mr.pg.Begin(ctx) - if err != nil { - ln.Error(ctx, err) - return - } - defer tx.Rollback(ctx) - - t := time.NewTicker(30 * time.Second) + t := time.NewTicker(5 * time.Second) defer t.Stop() done := false +outer: for range t.C { + tx, err := mr.pg.Begin(ctx) + if err != nil { + ln.Error(ctx, err) + return + } + ln.Log(ctx, ln.Action("fetching batch of messages"), ln.F{"curr": curr}) msgs, err := s.ChannelMessages(channelID, 100, "", curr, "") if err != nil { @@ -256,38 +247,55 @@ func (mr *MaraRevolt) backfillDiscordChannel(s *discordgo.Session, channelID, me s.ChannelMessageSend(channelID, fmt.Sprintf("error getting messages past %s: %v", curr, err)) break } + ln.Log(ctx, ln.Action("fetching batch of messages"), ln.F{"num": len(msgs)}) + if len(msgs) == 0 { + break + } for _, msg := range msgs { - found, err := mr.doesDiscordMessageExist(ctx, tx, msg.ID) - if err != nil { - ln.Error(ctx, err, ln.F{"message_id": msg.ID}) - continue - } - - if found { - ln.Log(ctx, ln.Action("stopping archival")) - done = true - } - + // found, err := mr.doesDiscordMessageExist(ctx, tx, msg.ID) + // if err != nil { + // ln.Error(ctx, err, ln.F{"message_id": msg.ID}) + // continue + // } + + // if found { + // if !done { + // ln.Log(ctx, ln.Action("stopping archival")) + // } + // done = true + // } + + before := time.Now() if err := mr.discordMessageCreate(ctx, tx, s, msg); err != nil { ln.Error(ctx, err, ln.F{"message_id": msg.ID}) - continue + tx.Rollback(ctx) + continue outer } + dur := time.Since(before) + ln.Log(ctx, ln.F{"message_id": msg.ID, "created_at": msg.Timestamp, "archival_time": dur}) + // found, err := mr.doesDiscordMessageExist(ctx, tx, msg.ID) + // if err != nil { + // ln.Error(ctx, err, ln.F{"message_id": msg.ID}) + // continue + // } curr = msg.ID } + if err := tx.Commit(ctx); err != nil { + ln.Error(ctx, err, ln.F{ + "channel_id": channelID, + "message_id": messageID, + }) + } + if done { break } - } - if err := tx.Commit(ctx); err != nil { - ln.Error(ctx, err, ln.F{ - "channel_id": channelID, - "message_id": messageID, - }) } + } func (mr *MaraRevolt) discordMessageCreate(ctx context.Context, tx pgx.Tx, s *discordgo.Session, m *discordgo.Message) error { diff --git a/cmd/marabot/main.go b/cmd/marabot/main.go index 95c4c6b..0092955 100644 --- a/cmd/marabot/main.go +++ b/cmd/marabot/main.go @@ -4,8 +4,6 @@ import ( "bytes" "context" "crypto/sha512" - "database/sql" - "database/sql/driver" _ "embed" "flag" "fmt" @@ -24,7 +22,6 @@ import ( "github.com/bwmarrin/discordgo" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "github.com/tailscale/sqlite" "tailscale.com/hostinfo" "within.website/ln" "within.website/ln/opname" @@ -35,8 +32,6 @@ import ( ) var ( - dbFile = flag.String("db-file", "marabot.db", "Path to the database file") - pgURL = flag.String("database-url", "", "URL for the database (postgres)") discordToken = flag.String("discord-token", "", "Discord bot token") @@ -59,19 +54,6 @@ var ( dbSchema string ) -func openDB(fname string) (*sql.DB, error) { - db := sql.OpenDB(sqlite.Connector("file:"+fname, func(ctx context.Context, conn driver.ConnPrepareContext) error { - return sqlite.ExecScript(conn.(sqlite.SQLConn), dbSchema) - }, nil)) - - err := db.Ping() - if err != nil { - return nil, err - } - - return db, nil -} - func main() { internal.HandleStartup() @@ -82,11 +64,13 @@ func main() { ln.Log(ctx, ln.Action("starting up")) - db, err := openDB(*dbFile) + pgcfg, err := pgxpool.ParseConfig(*pgURL) if err != nil { - ln.FatalErr(ctx, err, ln.Action("opening sqlite database")) + ln.FatalErr(ctx, err, ln.Action("parsing postgres config")) } - defer db.Close() + + pgcfg.MinConns = 5 + pgcfg.MaxConns = 20 pg, err := pgxpool.New(ctx, *pgURL) if err != nil { @@ -114,7 +98,6 @@ func main() { mr := &MaraRevolt{ cli: client, - db: db, pg: pg, ircmsgs: ircmsgs, uploader: uploader, @@ -148,7 +131,7 @@ func main() { } defer dg.Close() - if err := mr.importDiscordData(ctx, db, dg); err != nil { + if err := mr.importDiscordData(ctx, pg, dg); err != nil { ln.Error(ctx, err) } @@ -263,6 +246,10 @@ func (mr *MaraRevolt) archiveAttachment(ctx context.Context, tx pgx.Tx, link, ki } } + if att == nil { + return nil + } + att.MessageID = aws.String(messageID) key := filepath.Join(att.Kind, att.ID) diff --git a/cmd/marabot/revolt.go b/cmd/marabot/revolt.go index a28f693..4fdfa88 100644 --- a/cmd/marabot/revolt.go +++ b/cmd/marabot/revolt.go @@ -2,7 +2,6 @@ package main import ( "context" - "database/sql" "fmt" "strings" "sync" @@ -19,7 +18,6 @@ import ( type MaraRevolt struct { cli *revolt.Client - db *sql.DB pg *pgxpool.Pool ircmsgs chan string attachmentPreprocess *bundler.Bundler[[3]string] |
