diff options
| author | Xe Iaso <me@xeiaso.net> | 2023-06-20 07:11:40 -0400 |
|---|---|---|
| committer | Xe Iaso <me@xeiaso.net> | 2023-06-20 07:13:10 -0400 |
| commit | c13e1feed9b73f045f7baba0efc87aba0de3b2c6 (patch) | |
| tree | 9d153deaefb3aaff415fa3276f797c1e6e148ac2 /cmd | |
| parent | 7dca3ad008041edf9049b5554c23a3e7d6163fab (diff) | |
| download | x-c13e1feed9b73f045f7baba0efc87aba0de3b2c6.tar.xz x-c13e1feed9b73f045f7baba0efc87aba0de3b2c6.zip | |
cmd/marabot: use transactions to be compatible with litestream
Signed-off-by: Xe Iaso <me@xeiaso.net>
Diffstat (limited to 'cmd')
| -rw-r--r-- | cmd/marabot/.gitignore | 1 | ||||
| -rw-r--r-- | cmd/marabot/discord.go | 70 | ||||
| -rw-r--r-- | cmd/marabot/litestream.yaml | 8 | ||||
| -rw-r--r-- | cmd/marabot/main.go | 41 | ||||
| -rw-r--r-- | cmd/marabot/revolt.go | 18 | ||||
| -rw-r--r-- | cmd/marabot/schema.sql | 5 |
6 files changed, 118 insertions, 25 deletions
diff --git a/cmd/marabot/.gitignore b/cmd/marabot/.gitignore index 81e8b4d..9cd792a 100644 --- a/cmd/marabot/.gitignore +++ b/cmd/marabot/.gitignore @@ -1,3 +1,4 @@ *.db *.db-shm *.db-wal +.marabot.db-litestream diff --git a/cmd/marabot/discord.go b/cmd/marabot/discord.go index b34eb1e..3d8e4a1 100644 --- a/cmd/marabot/discord.go +++ b/cmd/marabot/discord.go @@ -17,13 +17,20 @@ import ( func (mr *MaraRevolt) importDiscordData(ctx context.Context, db *sql.DB, dg *discordgo.Session) error { ctx = opname.With(ctx, "import-discord-data") + + tx, err := mr.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + channels, err := dg.GuildChannels(*furryholeDiscord) if err != nil { return err } for _, ch := range channels { - if _, err := db.Exec("INSERT INTO discord_channels (id, guild_id, name, topic, nsfw) VALUES (?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET name = EXCLUDED.name, topic = EXCLUDED.topic, nsfw = EXCLUDED.nsfw", ch.ID, ch.GuildID, ch.Name, ch.Topic, ch.NSFW); err != nil { + if _, err := tx.ExecContext(ctx, "INSERT INTO discord_channels (id, guild_id, name, topic, nsfw) VALUES (?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET name = EXCLUDED.name, topic = EXCLUDED.topic, nsfw = EXCLUDED.nsfw", ch.ID, ch.GuildID, ch.Name, ch.Topic, ch.NSFW); err != nil { ln.Error(ctx, err, ln.F{"channel_name": ch.Name, "channel_id": ch.ID}) continue } @@ -35,7 +42,7 @@ func (mr *MaraRevolt) importDiscordData(ctx context.Context, db *sql.DB, dg *dis } for _, role := range roles { - if _, err := db.ExecContext(ctx, "INSERT INTO discord_roles (guild_id, id, name, color, hoist, position) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET name = EXCLUDED.name, color = EXCLUDED.color, position = EXCLUDED.position", furryholeDiscord, role.ID, role.Name, fmt.Sprintf("#%06x", role.Color), role.Hoist, role.Position); err != nil { + if _, err := tx.ExecContext(ctx, "INSERT INTO discord_roles (guild_id, id, name, color, hoist, position) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET name = EXCLUDED.name, color = EXCLUDED.color, position = EXCLUDED.position", furryholeDiscord, role.ID, role.Name, fmt.Sprintf("#%06x", role.Color), role.Hoist, role.Position); err != nil { ln.Error(ctx, err, ln.Action("inserting role")) continue } @@ -48,14 +55,14 @@ func (mr *MaraRevolt) importDiscordData(ctx context.Context, db *sql.DB, dg *dis } for _, emoji := range emoji { eURL := fmt.Sprintf("https://cdn.discordapp.com/emojis/%s?size=240&quality=lossless", emoji.ID) - if _, err := db.ExecContext(ctx, "INSERT INTO discord_emoji (id, guild_id, name, url) VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET name = EXCLUDED.name, url = EXCLUDED.url", emoji.ID, furryholeDiscord, emoji.Name, eURL); err != nil { + if _, err := tx.ExecContext(ctx, "INSERT INTO discord_emoji (id, guild_id, name, url) VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET name = EXCLUDED.name, url = EXCLUDED.url", emoji.ID, furryholeDiscord, emoji.Name, eURL); err != nil { ln.Error(ctx, err, ln.Action("inserting emoji")) continue } mr.attachmentPreprocess.Add([3]string{eURL, "emoji", ""}, len(eURL)) } - rows, err := mr.db.QueryContext(ctx, "SELECT url, message_id FROM discord_attachments WHERE url NOT IN ( SELECT url FROM s3_uploads )") + rows, err := tx.QueryContext(ctx, "SELECT url, message_id FROM discord_attachments WHERE url NOT IN ( SELECT url FROM s3_uploads )") if err == nil { defer rows.Close() for rows.Next() { @@ -68,16 +75,28 @@ func (mr *MaraRevolt) importDiscordData(ctx context.Context, db *sql.DB, dg *dis } } + if err := tx.Commit(); err != nil { + return err + } + return nil } func (mr *MaraRevolt) DiscordMessageDelete(s *discordgo.Session, m *discordgo.MessageDelete) { ctx := opname.With(context.Background(), "marabot.discord-message-delete") - if _, err := mr.db.ExecContext(ctx, "DELETE FROM discord_messages WHERE id = ?", m.ID); err != nil { + + tx, err := mr.db.Begin() + if err != nil { + ln.Error(ctx, err) + return + } + defer tx.Rollback() + + if _, err := tx.ExecContext(ctx, "DELETE FROM discord_messages WHERE id = ?", m.ID); err != nil { ln.Error(ctx, err, ln.Action("nuking deleted messages")) } - rows, err := mr.db.QueryContext(ctx, "SELECT id FROM s3_uploads WHERE message_id = ?", m.ID) + rows, err := tx.QueryContext(ctx, "SELECT id FROM s3_uploads WHERE message_id = ?", m.ID) if err != nil { ln.Error(ctx, err) return @@ -91,7 +110,7 @@ func (mr *MaraRevolt) DiscordMessageDelete(s *discordgo.Session, m *discordgo.Me return } - if _, err := mr.db.ExecContext(ctx, "DELETE FROM s3_uploads WHERE id = ?", id); err != nil { + if _, err := tx.ExecContext(ctx, "DELETE FROM s3_uploads WHERE id = ?", id); err != nil { ln.Error(ctx, err) } @@ -101,6 +120,11 @@ func (mr *MaraRevolt) DiscordMessageDelete(s *discordgo.Session, m *discordgo.Me ln.Error(ctx, err) } } + + if err := tx.Commit(); err != nil { + ln.Error(ctx, err) + return + } } func (mr *MaraRevolt) DiscordMessageEdit(s *discordgo.Session, m *discordgo.MessageUpdate) { @@ -110,13 +134,23 @@ func (mr *MaraRevolt) DiscordMessageEdit(s *discordgo.Session, m *discordgo.Mess } func (mr *MaraRevolt) DiscordMessageCreate(s *discordgo.Session, m *discordgo.MessageCreate) { - if err := mr.discordMessageCreate(s, m); err != nil { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ctx = opname.With(ctx, "marabot.discordMessageCreate") + + if err := mr.discordMessageCreate(ctx, s, m); err != nil { ln.Error(context.Background(), err) } } -func (mr *MaraRevolt) discordMessageCreate(s *discordgo.Session, m *discordgo.MessageCreate) error { - if _, err := mr.db.Exec(`INSERT INTO discord_users (id, username, avatar_url, accent_color) +func (mr *MaraRevolt) discordMessageCreate(ctx context.Context, s *discordgo.Session, m *discordgo.MessageCreate) error { + tx, err := mr.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + if _, err := tx.ExecContext(ctx, `INSERT INTO discord_users (id, username, avatar_url, accent_color) VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET username = EXCLUDED.username, avatar_url = EXCLUDED.avatar_url, accent_color = EXCLUDED.accent_color`, m.Author.ID, m.Author.Username, m.Author.AvatarURL(""), m.Author.AccentColor); err != nil { @@ -125,18 +159,18 @@ DO UPDATE SET username = EXCLUDED.username, avatar_url = EXCLUDED.avatar_url, ac mr.attachmentPreprocess.Add([3]string{m.Author.AvatarURL(""), "avatars", ""}, len(m.Author.Avatar)) - if _, err := mr.db.Exec(`INSERT INTO discord_messages (id, guild_id, channel_id, author_id, content, created_at, edited_at, webhook_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, m.ID, m.GuildID, m.ChannelID, m.Author.ID, m.Content, m.Timestamp.Format(time.RFC3339), m.EditedTimestamp, m.WebhookID); err != nil { + if _, err := tx.ExecContext(ctx, `INSERT INTO discord_messages (id, guild_id, channel_id, author_id, content, created_at, edited_at, webhook_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, m.ID, m.GuildID, m.ChannelID, m.Author.ID, m.Content, m.Timestamp.Format(time.RFC3339), m.EditedTimestamp, m.WebhookID); err != nil { return err } if m.WebhookID != "" { - if _, err := mr.db.Exec("INSERT INTO discord_webhook_message_info (id, name, avatar_url) VALUES (?, ?, ?)", m.ID, m.Author.Username, m.Author.AvatarURL("")); err != nil { + if _, err := tx.ExecContext(ctx, "INSERT INTO discord_webhook_message_info (id, name, avatar_url) VALUES (?, ?, ?)", m.ID, m.Author.Username, m.Author.AvatarURL("")); err != nil { return err } } for _, att := range m.Attachments { - if _, err := mr.db.Exec(`INSERT INTO discord_attachments (id, message_id, url, proxy_url, filename, content_type, width, height, size) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, att.ID, m.ID, att.URL, att.ProxyURL, att.Filename, att.ContentType, att.Width, att.Height, att.Size); err != nil { + if _, err := tx.ExecContext(ctx, `INSERT INTO discord_attachments (id, message_id, url, proxy_url, filename, content_type, width, height, size) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, att.ID, m.ID, att.URL, att.ProxyURL, att.Filename, att.ContentType, att.Width, att.Height, att.Size); err != nil { return err } @@ -147,7 +181,7 @@ DO UPDATE SET username = EXCLUDED.username, avatar_url = EXCLUDED.avatar_url, ac if emb.Image == nil { continue } - if _, err := mr.db.Exec(`INSERT INTO discord_attachments (id, message_id, url, proxy_url, filename, content_type, width, height, size) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, uuid.NewString(), m.ID, emb.Image.URL, emb.Image.ProxyURL, filepath.Base(emb.Image.URL), "", emb.Image.Width, emb.Image.Height, 0); err != nil { + if _, err := tx.ExecContext(ctx, `INSERT INTO discord_attachments (id, message_id, url, proxy_url, filename, content_type, width, height, size) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, uuid.NewString(), m.ID, emb.Image.URL, emb.Image.ProxyURL, filepath.Base(emb.Image.URL), "", emb.Image.Width, emb.Image.Height, 0); err != nil { return err } @@ -159,17 +193,21 @@ DO UPDATE SET username = EXCLUDED.username, avatar_url = EXCLUDED.avatar_url, ac return err } - if _, err := mr.db.Exec("INSERT INTO discord_channels (id, guild_id, name, topic, nsfw) VALUES (?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET name = EXCLUDED.name, topic = EXCLUDED.topic, nsfw = EXCLUDED.nsfw", ch.ID, ch.GuildID, ch.Name, ch.Topic, ch.NSFW); err != nil { + if _, err := tx.ExecContext(ctx, "INSERT INTO discord_channels (id, guild_id, name, topic, nsfw) VALUES (?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET name = EXCLUDED.name, topic = EXCLUDED.topic, nsfw = EXCLUDED.nsfw", ch.ID, ch.GuildID, ch.Name, ch.Topic, ch.NSFW); err != nil { return err } for _, emoji := range m.GetCustomEmojis() { eURL := fmt.Sprintf("https://cdn.discordapp.com/emojis/%s?size=240&quality=lossless", emoji.ID) - if _, err := mr.db.Exec("INSERT INTO discord_emoji (id, guild_id, name, url) VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET name = EXCLUDED.name, url = EXCLUDED.url", emoji.ID, furryholeDiscord, emoji.Name, eURL); err != nil { + if _, err := tx.ExecContext(ctx, "INSERT INTO discord_emoji (id, guild_id, name, url) VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET name = EXCLUDED.name, url = EXCLUDED.url", emoji.ID, furryholeDiscord, emoji.Name, eURL); err != nil { return err } mr.attachmentPreprocess.Add([3]string{eURL, "emoji", ""}, len(eURL)) } + if err := tx.Commit(); err != nil { + return err + } + return nil } diff --git a/cmd/marabot/litestream.yaml b/cmd/marabot/litestream.yaml new file mode 100644 index 0000000..16a26b0 --- /dev/null +++ b/cmd/marabot/litestream.yaml @@ -0,0 +1,8 @@ +dbs: + - path: /home/cadey/code/Xe/x/cmd/marabot/marabot.db + replicas: + - path: /mnt/arsene/backup/marabot + - type: s3 + bucket: xeserv-marabot-backups + path: db + region: ca-central-1 diff --git a/cmd/marabot/main.go b/cmd/marabot/main.go index 612a4bc..8c16d74 100644 --- a/cmd/marabot/main.go +++ b/cmd/marabot/main.go @@ -16,12 +16,13 @@ import ( "syscall" "time" + _ "modernc.org/sqlite" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/bwmarrin/discordgo" - _ "modernc.org/sqlite" "tailscale.com/hostinfo" "within.website/ln" "within.website/ln/opname" @@ -168,13 +169,20 @@ func (mr *MaraRevolt) PreprocessLinks(data [][3]string) { } func (mr *MaraRevolt) preprocessLinks(ctx context.Context, data [][3]string) { + tx, err := mr.db.Begin() + if err != nil { + ln.Error(ctx, err) + return + } + defer tx.Rollback() + for _, linkkind := range data { kind := linkkind[1] link := linkkind[0] msgID := linkkind[2] var count int - if err := mr.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM s3_uploads WHERE url = ?", link).Scan(&count); err != nil { + if err := tx.QueryRowContext(ctx, "SELECT COUNT(*) FROM s3_uploads WHERE url = ?", link).Scan(&count); err != nil { ln.Error(ctx, err) continue } @@ -185,6 +193,18 @@ func (mr *MaraRevolt) preprocessLinks(ctx context.Context, data [][3]string) { att, err := hashURL(link, kind) if err != nil { ln.Error(ctx, err, ln.F{"link": link, "kind": kind}) + + if werr, ok := err.(*web.Error); ok { + if werr.GotStatus == http.StatusNotFound { + tx.ExecContext(ctx, "DELETE FROM discord_users WHERE avatar_url = ?", link) + tx.ExecContext(ctx, "DELETE FROM discord_attachments WHERE url = ?", link) + tx.ExecContext(ctx, "DELETE FROM discord_emoji WHERE url = ?", link) + tx.ExecContext(ctx, "DELETE FROM revolt_attachments WHERE url = ?", link) + tx.ExecContext(ctx, "DELETE FROM revolt_users WHERE avatar_url = ?", link) + tx.ExecContext(ctx, "DELETE FROM revolt_emoji WHERE url = ?", link) + } + } + continue } @@ -192,6 +212,9 @@ func (mr *MaraRevolt) preprocessLinks(ctx context.Context, data [][3]string) { mr.attachmentUpload.Add(att, len(att.Data)) } + if err := tx.Commit(); err != nil { + ln.Error(ctx, err) + } } func hashURL(itemURL, kind string) (*Attachment, error) { @@ -234,13 +257,20 @@ func (mr *MaraRevolt) S3Upload(att []*Attachment) { } func (mr *MaraRevolt) s3Upload(ctx context.Context, att []*Attachment) { + tx, err := mr.db.Begin() + if err != nil { + ln.Error(ctx, err) + return + } + defer tx.Rollback() + for _, att := range att { key := filepath.Join(att.Kind, att.ID) f := ln.F{"kind": att.Kind, "id": att.ID, "url": att.URL, "content_type": att.ContentType} var count int - if err := mr.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM s3_uploads WHERE id = ?", att.ID).Scan(&count); err != nil { + if err := tx.QueryRowContext(ctx, "SELECT COUNT(*) FROM s3_uploads WHERE id = ?", att.ID).Scan(&count); err != nil { ln.Error(ctx, err, f) continue } @@ -265,9 +295,12 @@ func (mr *MaraRevolt) s3Upload(ctx context.Context, att []*Attachment) { continue } - if _, err := mr.db.ExecContext(ctx, "INSERT INTO s3_uploads(id, url, kind, content_type, created_at, message_id) VALUES (?, ?, ?, ?, ?, ?)", att.ID, att.URL, att.Kind, att.ContentType, att.CreatedAt, att.MessageID); err != nil { + if _, err := tx.ExecContext(ctx, "INSERT INTO s3_uploads(id, url, kind, content_type, created_at, message_id) VALUES (?, ?, ?, ?, ?, ?)", att.ID, att.URL, att.Kind, att.ContentType, att.CreatedAt, att.MessageID); err != nil { ln.Error(ctx, err, ln.Action("saving upload information to DB"), f) } } + if err := tx.Commit(); err != nil { + ln.Error(ctx, err) + } } diff --git a/cmd/marabot/revolt.go b/cmd/marabot/revolt.go index 33d1d06..5abea81 100644 --- a/cmd/marabot/revolt.go +++ b/cmd/marabot/revolt.go @@ -28,6 +28,12 @@ type MaraRevolt struct { } func (mr *MaraRevolt) MessageCreate(ctx context.Context, msg *revolt.Message) error { + tx, err := mr.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + if msg.Content == "!ping" { sendMsg := &revolt.SendMessage{ Masquerade: &revolt.Masquerade{ @@ -46,12 +52,12 @@ func (mr *MaraRevolt) MessageCreate(ctx context.Context, msg *revolt.Message) er return err } - if _, err := mr.db.ExecContext(ctx, "INSERT INTO revolt_messages(id, channel_id, author_id, content, created_at) VALUES (?, ?, ?, ?, ?)", msg.ID, msg.ChannelId, msg.AuthorId, msg.Content, msg.CreatedAt.Format(time.RFC3339)); err != nil { + if _, err := tx.ExecContext(ctx, "INSERT INTO revolt_messages(id, channel_id, author_id, content, created_at) VALUES (?, ?, ?, ?, ?)", msg.ID, msg.ChannelId, msg.AuthorId, msg.Content, msg.CreatedAt.Format(time.RFC3339)); err != nil { ln.Error(ctx, err, ln.Action("saving revolt message to database")) } if msg.Masquerade != nil { - if _, err := mr.db.ExecContext(ctx, "INSERT INTO revolt_message_masquerade(id, username, avatar_url) VALUES(?, ?, ?)", msg.ID, msg.Masquerade.Name, msg.Masquerade.AvatarURL); err != nil { + if _, err := tx.ExecContext(ctx, "INSERT INTO revolt_message_masquerade(id, username, avatar_url) VALUES(?, ?, ?)", msg.ID, msg.Masquerade.Name, msg.Masquerade.AvatarURL); err != nil { ln.Error(ctx, err, ln.Action("saving masquerade info to database")) } @@ -69,14 +75,14 @@ func (mr *MaraRevolt) MessageCreate(ctx context.Context, msg *revolt.Message) er mr.attachmentPreprocess.Add([3]string{avatarURL, "avatars", ""}, len(avatarURL)) - if _, err := mr.db.ExecContext(ctx, "INSERT INTO revolt_users(id, username, avatar_url, created_at) VALUES(?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET username = EXCLUDED.username, avatar_url = EXCLUDED.avatar_url, created_at = EXCLUDED.created_at", author.Id, author.Username, avatarURL, author.CreatedAt.Format(time.RFC3339)); err != nil { + if _, err := tx.ExecContext(ctx, "INSERT INTO revolt_users(id, username, avatar_url, created_at) VALUES(?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET username = EXCLUDED.username, avatar_url = EXCLUDED.avatar_url, created_at = EXCLUDED.created_at", author.Id, author.Username, avatarURL, author.CreatedAt.Format(time.RFC3339)); err != nil { ln.Error(ctx, err, ln.Action("writing revolt user record")) } for _, att := range msg.Attachments { url := mr.cli.ResolveAttachment(att) - if _, err := mr.db.ExecContext(ctx, "INSERT INTO revolt_attachments(id, tag, message_id, url, filename, content_type, width, height, size) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", att.ID, att.Tag, msg.ID, url, att.FileName, att.ContentType, att.Metadata.Width, att.Metadata.Height, att.Size); err != nil { + if _, err := tx.ExecContext(ctx, "INSERT INTO revolt_attachments(id, tag, message_id, url, filename, content_type, width, height, size) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", att.ID, att.Tag, msg.ID, url, att.FileName, att.ContentType, att.Metadata.Width, att.Metadata.Height, att.Size); err != nil { ln.Error(ctx, err, ln.Action("writing revolt attachment information")) } @@ -105,5 +111,9 @@ func (mr *MaraRevolt) MessageCreate(ctx context.Context, msg *revolt.Message) er } } + if err := tx.Commit(); err != nil { + return err + } + return nil } diff --git a/cmd/marabot/schema.sql b/cmd/marabot/schema.sql index f38537f..9ddc789 100644 --- a/cmd/marabot/schema.sql +++ b/cmd/marabot/schema.sql @@ -1,4 +1,7 @@ -PRAGMA journal_mode=WAL; +PRAGMA journal_mode = WAL; +PRAGMA synchronous = NORMAL; +PRAGMA wal_autocheckpoint = 0; +PRAGMA busy_timeout = 5000; CREATE TABLE IF NOT EXISTS discord_roles ( id TEXT PRIMARY KEY, |
