aboutsummaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorXe Iaso <me@xeiaso.net>2023-08-13 02:09:17 -0400
committerXe Iaso <me@xeiaso.net>2023-08-13 02:09:17 -0400
commit378ae0feaa2bb9d6b4406e1a2281fe0c8400fdd8 (patch)
tree6c43514a2fd77d1857c883d6933ef5424b881c77 /cmd
parent1a27d6b01baa36c3f0a1b2eb5c46f3f5780906e7 (diff)
downloadx-378ae0feaa2bb9d6b4406e1a2281fe0c8400fdd8.tar.xz
x-378ae0feaa2bb9d6b4406e1a2281fe0c8400fdd8.zip
cmd/sanguisuga: anime monitoring
Signed-off-by: Xe Iaso <me@xeiaso.net>
Diffstat (limited to 'cmd')
-rw-r--r--cmd/sanguisuga/config.default.ts2
-rw-r--r--cmd/sanguisuga/config.go2
-rw-r--r--cmd/sanguisuga/dcc.go288
-rw-r--r--cmd/sanguisuga/dcc/dcc.go176
-rw-r--r--cmd/sanguisuga/main.go28
5 files changed, 494 insertions, 2 deletions
diff --git a/cmd/sanguisuga/config.default.ts b/cmd/sanguisuga/config.default.ts
index f05e224..107f6b7 100644
--- a/cmd/sanguisuga/config.default.ts
+++ b/cmd/sanguisuga/config.default.ts
@@ -29,10 +29,12 @@ export type Tailscale = {
export type Config = {
irc: IRC;
+ xdcc: IRC;
transmission: Transmission;
shows: Show[];
rssKey: string;
tailscale: Tailscale;
+ baseDiskPath: string;
};
export default {
diff --git a/cmd/sanguisuga/config.go b/cmd/sanguisuga/config.go
index 3c2effd..8d91f6d 100644
--- a/cmd/sanguisuga/config.go
+++ b/cmd/sanguisuga/config.go
@@ -30,8 +30,10 @@ type Tailscale struct {
type Config struct {
IRC IRC `json:"irc"`
+ XDCC IRC `json:"xdcc"`
Transmission Transmission `json:"transmission"`
Shows []Show `json:"shows"`
RSSKey string `json:"rssKey"`
Tailscale Tailscale `json:"tailscale"`
+ BaseDiskPath string `json:"baseDiskPath"`
}
diff --git a/cmd/sanguisuga/dcc.go b/cmd/sanguisuga/dcc.go
new file mode 100644
index 0000000..7a014c8
--- /dev/null
+++ b/cmd/sanguisuga/dcc.go
@@ -0,0 +1,288 @@
+package main
+
+import (
+ "context"
+ "encoding/binary"
+ "encoding/json"
+ "errors"
+ "expvar"
+ "fmt"
+ "io"
+ "log"
+ "net"
+ "net/http"
+ "os"
+ "path/filepath"
+ "regexp"
+ "strconv"
+ "strings"
+ "time"
+
+ irc "github.com/thoj/go-ircevent"
+ "golang.org/x/exp/slog"
+ "tailscale.com/metrics"
+ "within.website/x/cmd/sanguisuga/dcc"
+)
+
+var (
+ subspleaseAnnounceRegex = regexp.MustCompile(`^.*\* (\[SubsPlease\] (.*) - ([0-9]+) \(([0-9]+p)\).*\.mkv) \* /MSG ([A-Za-z-_|]+) XDCC SEND ([0-9]+)$`)
+ dccCommand = regexp.MustCompile(`^DCC SEND "(.*)" ([0-9]+) ([0-9]+) ([0-9]+)$`)
+
+ bytesDownloaded = &metrics.LabelMap{Label: "filename"}
+)
+
+func init() {
+ expvar.Publish("gauge_sanguisuga_bytes_downloaded", bytesDownloaded)
+}
+
+type SubspleaseAnnouncement struct {
+ Filename, ShowName string
+ Episode, Quality string
+ BotName, PackID string
+}
+
+func (sa SubspleaseAnnouncement) Key() string {
+ return fmt.Sprintf("%s %s %s", sa.BotName, sa.ShowName, sa.Episode)
+}
+
+func (sa SubspleaseAnnouncement) LogValue() slog.Value {
+ return slog.GroupValue(
+ slog.String("showname", sa.ShowName),
+ slog.String("episode", sa.Episode),
+ slog.String("quality", sa.Quality),
+ slog.String("botName", sa.BotName),
+ )
+}
+
+func ParseSubspleaseAnnouncement(input string) (*SubspleaseAnnouncement, error) {
+ match := subspleaseAnnounceRegex.FindStringSubmatch(input)
+
+ if match == nil {
+ return nil, errors.New("invalid annoucement format")
+ }
+
+ return &SubspleaseAnnouncement{
+ Filename: match[1],
+ ShowName: match[2],
+ Episode: match[3],
+ Quality: match[4],
+ BotName: match[5],
+ PackID: match[6],
+ }, nil
+}
+
+func int2ip(nn uint32) string {
+ ip := make(net.IP, 4)
+ binary.BigEndian.PutUint32(ip, nn)
+ return ip.String()
+}
+
+func (s *Sanguisuga) XDCC() {
+ ircCli := irc.IRC(s.Config.XDCC.Nick, s.Config.XDCC.User)
+ ircCli.Password = s.Config.XDCC.Password
+ ircCli.RealName = s.Config.XDCC.Real
+ ircCli.AddCallback("001", func(ev *irc.Event) {
+ ircCli.Join(s.Config.XDCC.Channel)
+ })
+ ircCli.AddCallback("PRIVMSG", s.ScrapeSubsplease)
+ ircCli.AddCallback("CTCP", s.SubspleaseDCC)
+
+ ircCli.Log = slog.NewLogLogger(slog.Default().Handler().WithAttrs([]slog.Attr{slog.String("from", "ircevent"), slog.String("for", "anime")}), slog.LevelInfo)
+ ircCli.Timeout = 5 * time.Second
+
+ if err := ircCli.Connect(s.Config.XDCC.Server); err != nil {
+ log.Fatalf("can't connect to XDCC server %s: %v", s.Config.XDCC.Server, err)
+ }
+
+ ircCli.Loop()
+}
+
+func (s *Sanguisuga) TrackAnime(w http.ResponseWriter, r *http.Request) {
+ defer r.Body.Close()
+ data, err := io.ReadAll(http.MaxBytesReader(w, r.Body, 256))
+ if err != nil {
+ slog.Error("can't read request body", "err", err)
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ name := strings.TrimSpace(string(data))
+
+ s.dbLock.Lock()
+ defer s.dbLock.Unlock()
+
+ s.db.Data.AnimeToTrack = append(s.db.Data.AnimeToTrack, name)
+ if err := s.db.Save(); err != nil {
+ slog.Error("can't save database", "err", err)
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ json.NewEncoder(w).Encode(s.db.Data.AnimeToTrack)
+}
+
+func (s *Sanguisuga) ListAnime(w http.ResponseWriter, r *http.Request) {
+ s.dbLock.Lock()
+ defer s.dbLock.Unlock()
+
+ json.NewEncoder(w).Encode(s.db.Data.AnimeToTrack)
+}
+
+func (s *Sanguisuga) ScrapeSubsplease(ev *irc.Event) {
+ if ev.Code != "PRIVMSG" {
+ return
+ }
+
+ if ev.Arguments[0] != s.Config.XDCC.Channel {
+ return
+ }
+
+ switch ev.Nick {
+ case "CR-ARUTHA|NEW", "CR-HOLLAND|NEW", "Belath":
+ default:
+ return
+ }
+
+ ann, err := ParseSubspleaseAnnouncement(ev.MessageWithoutFormat())
+ if err != nil {
+ slog.Debug("can't parse announcement", "input", ev.MessageWithoutFormat(), "err", err)
+ return
+ }
+
+ if ann.Quality != "1080p" {
+ return
+ }
+
+ lg := slog.Default().With("announcement", ann)
+ lg.Info("found announcement")
+
+ s.dbLock.Lock()
+ defer s.dbLock.Unlock()
+
+ s.aifLock.Lock()
+ defer s.aifLock.Unlock()
+
+ if _, ok := s.db.Data.AnimeSnatches[ann.Filename]; ok {
+ return
+ }
+
+ found := false
+ for _, name := range s.db.Data.AnimeToTrack {
+ if ann.ShowName == name {
+ found = true
+ }
+ }
+
+ if !found {
+ return
+ }
+
+ // if already being fetched, don't fetch again
+ if _, ok := s.animeInFlight[ann.Filename]; ok {
+ return
+ }
+
+ ev.Connection.Privmsgf(ann.BotName, "XDCC SEND %s", ann.PackID)
+
+ s.animeInFlight[ann.Filename] = ann
+
+ s.db.Data.AnimeSnatches[ann.Filename] = *ann
+ if err := s.db.Save(); err != nil {
+ lg.Error("can't save database", "err", err)
+ return
+ }
+}
+
+func (s *Sanguisuga) SubspleaseDCC(ev *irc.Event) {
+ matches := dccCommand.FindStringSubmatch(ev.MessageWithoutFormat())
+ if matches == nil {
+ return
+ }
+
+ s.aifLock.Lock()
+ defer s.aifLock.Unlock()
+
+ if len(matches) != 5 {
+ slog.Error("wrong message from DCC bot", "botName", ev.Nick, "message", ev.Message())
+ return
+ }
+
+ fname := matches[1]
+ ipString := matches[2]
+ port := matches[3]
+ sizeString := matches[4]
+
+ if strings.HasSuffix(fname, "\"") {
+ fname = fname[:len(fname)-2]
+ }
+
+ ann, _ := s.animeInFlight[fname]
+
+ if ann == nil {
+ slog.Debug("ann == nil?", "fname", fname, "ann", s.animeInFlight[fname])
+ return
+ }
+
+ ipUint, err := strconv.ParseUint(ipString, 10, 32)
+ if err != nil {
+ slog.Error("can't parse IP address", "addr", ipString, "err", err)
+ return
+ }
+
+ ip := int2ip(uint32(ipUint))
+ addr := net.JoinHostPort(ip, port)
+
+ size, err := strconv.Atoi(sizeString)
+ if err != nil {
+ slog.Error("can't parse size", "size", sizeString, "err", err)
+ return
+ }
+
+ lg := slog.Default().With("fname", fname, "botName", ev.Nick, "addr", addr)
+ lg.Info("fetching episode")
+
+ outDir := filepath.Join(s.Config.BaseDiskPath, ann.ShowName)
+ outFname := filepath.Join(outDir, fname)
+
+ os.MkdirAll(outDir, 0777)
+
+ fout, err := os.Create(outFname)
+ if err != nil {
+ lg.Error("can't create output file", "outFname", outFname, "err", err)
+ return
+ }
+ defer fout.Close()
+
+ d := dcc.NewDCC(addr, size, fout)
+
+ ctx, cancel := context.WithTimeout(ev.Ctx, 120*time.Minute)
+ defer cancel()
+
+ progc, errc := d.Run(ctx)
+
+ defer lg.Info("done")
+
+ for {
+ select {
+ case p := <-progc:
+ curr := bytesDownloaded.GetFloat(fname)
+ curr.Set(p.CurrentFileSize)
+
+ if p.CurrentFileSize == p.FileSize {
+ delete(s.animeInFlight, fname)
+ return
+ }
+
+ if p.Percentage >= 100 {
+ delete(s.animeInFlight, fname)
+ return
+ }
+
+ lg.Info("download progress", "progress", p)
+ case err := <-errc:
+ lg.Error("error in DCC thread, giving up", "err", err)
+ delete(s.animeInFlight, fname)
+ return
+ }
+ }
+}
diff --git a/cmd/sanguisuga/dcc/dcc.go b/cmd/sanguisuga/dcc/dcc.go
new file mode 100644
index 0000000..c26b141
--- /dev/null
+++ b/cmd/sanguisuga/dcc/dcc.go
@@ -0,0 +1,176 @@
+package dcc
+
+import (
+ "context"
+ "io"
+ "net"
+ "time"
+
+ "golang.org/x/exp/slog"
+)
+
+// Progress contains the progression of the
+// download handled by the DCC client socket
+type Progress struct {
+ Speed float64
+ Percentage float64
+ CurrentFileSize float64
+ FileSize float64
+}
+
+func (p Progress) LogValue() slog.Value {
+ return slog.GroupValue(
+ slog.Float64("speed", p.Speed),
+ slog.Float64("percentage", p.Percentage),
+ slog.Float64("curr", p.CurrentFileSize),
+ slog.Float64("total", p.FileSize),
+ )
+}
+
+// DCC creates a new socket client instance where
+// it'll download the DCC transaction into the
+// specified io.Writer destination
+type DCC struct {
+ // important properties
+ address string
+ size int
+
+ // output channels used for the Run and the receiver methods()
+ // to avoid parameter passing
+ progressc chan Progress
+ done chan error
+
+ // internal DCC socket connection
+ conn net.Conn
+
+ // assigned context passed from the Run() method
+ ctx context.Context
+
+ // destination writer
+ writer io.Writer
+}
+
+// NewDCC creates a new DCC instance.
+// the host, port are needed for the socket client connection
+// the size is required so the download progress is calculated
+// the writer is required to store the transaction fragments into
+// the specified io.Writer
+func NewDCC(
+ address string,
+ size int,
+ writer io.Writer,
+) *DCC {
+ return &DCC{
+ address: address,
+ size: size,
+ progressc: make(chan Progress, 1),
+ done: make(chan error, 1),
+ writer: writer,
+ }
+}
+
+func (d *DCC) progress(written float64, speed *float64) time.Time {
+ d.progressc <- Progress{
+ Speed: written - *speed,
+ Percentage: (written / float64(d.size)) * 100,
+ CurrentFileSize: written,
+ FileSize: float64(d.size),
+ }
+
+ *speed = float64(written)
+
+ return time.Now()
+}
+
+func (d *DCC) receive() {
+ defer func() { // close channels
+ close(d.done)
+
+ // close the connection afterwards..
+ d.conn.Close()
+ }()
+
+ var (
+ written int
+ speed float64
+ buf = make([]byte, 30720)
+ reader = io.LimitReader(d.conn, int64(d.size))
+ ticker = time.NewTicker(time.Second)
+ )
+
+ defer ticker.Stop()
+
+D:
+ for {
+ select {
+ case <-d.ctx.Done():
+ d.done <- nil // send empty to notify the watchers that we're done
+ return // terminated..
+ case <-ticker.C:
+ d.progress(float64(written), &speed)
+ default:
+ n, err := reader.Read(buf)
+
+ if err != nil {
+ if err == io.EOF { // empty out the error
+ err = nil
+ }
+
+ d.progress(float64(written), &speed)
+ d.done <- err
+
+ return
+ }
+
+ if n > 0 {
+ _, err = d.writer.Write(buf[0:n])
+
+ if err != nil {
+ d.done <- err
+ return
+ } else if written >= d.size { // finished
+ break D
+ }
+
+ written += n
+ }
+ }
+ }
+}
+
+// Run established the connection with the DCC TCP socket
+// and returns two channels, where one is used for the download progress
+// and the other is used to return exceptions during our transaction.
+// A context is required, where you have the ability to cancel and timeout
+// a download.
+// One should check the second value for the progress/error channels when
+// receiving data as if the channels are closed, it means that the transaction
+// is finished or got interrupted.
+func (d *DCC) Run(ctx context.Context) (
+ progressc <-chan Progress,
+ done <-chan error,
+) {
+ // assign the output to the struct properties
+ progressc = d.progressc
+ done = d.done
+
+ // assign the passed context
+ d.ctx = ctx
+
+ dialer := &net.Dialer{Resolver: net.DefaultResolver}
+ conn, err := dialer.DialContext(
+ d.ctx, "tcp", d.address,
+ )
+
+ if err != nil {
+ d.done <- err
+ return
+ }
+
+ // setup the connection for the receiver
+ d.conn = conn
+
+ go d.receive()
+
+ return
+}
diff --git a/cmd/sanguisuga/main.go b/cmd/sanguisuga/main.go
index e6a57ee..4300bcd 100644
--- a/cmd/sanguisuga/main.go
+++ b/cmd/sanguisuga/main.go
@@ -3,6 +3,7 @@ package main
import (
"bytes"
"encoding/base64"
+ "errors"
"expvar"
"flag"
"fmt"
@@ -12,6 +13,7 @@ import (
"path/filepath"
"regexp"
"strings"
+ "sync"
"time"
irc "github.com/thoj/go-ircevent"
@@ -50,7 +52,7 @@ func ParseTorrentAnnouncement(input string) (*TorrentAnnouncement, error) {
match := annRegex.FindStringSubmatch(input)
if match == nil {
- return nil, fmt.Errorf("invalid torrent announcement format")
+ return nil, errors.New("invalid torrent announcement format")
}
torrent := &TorrentAnnouncement{
@@ -81,6 +83,11 @@ func main() {
Seen: map[string]TorrentAnnouncement{},
}
}
+
+ if db.Data.AnimeSnatches == nil {
+ db.Data.AnimeSnatches = map[string]SubspleaseAnnouncement{}
+ }
+
if err := db.Save(); err != nil {
log.Fatalf("can't ping database: %v", err)
}
@@ -94,7 +101,8 @@ func main() {
Dir: dataDir,
AuthKey: c.Tailscale.Authkey,
Hostname: c.Tailscale.Hostname,
- Logf: slog.NewLogLogger(slog.Default().Handler().WithAttrs([]slog.Attr{slog.String("from", "tsnet")}), slog.LevelInfo).Printf,
+ Logf: func(string, ...any) {},
+ //Logf: slog.NewLogLogger(slog.Default().Handler().WithAttrs([]slog.Attr{slog.String("from", "tsnet")}), slog.LevelInfo).Printf,
}
if err := srv.Start(); err != nil {
@@ -126,8 +134,15 @@ func main() {
Config: c,
cl: cl,
db: db,
+
+ animeInFlight: map[string]*SubspleaseAnnouncement{},
}
+ http.HandleFunc("/api/anime/list", s.ListAnime)
+ http.HandleFunc("/api/anime/track", s.TrackAnime)
+
+ s.XDCC()
+
ircCli := irc.IRC(c.IRC.Nick, c.IRC.User)
ircCli.Password = c.IRC.Password
ircCli.RealName = c.IRC.Real
@@ -149,11 +164,18 @@ type Sanguisuga struct {
Config Config
cl *transmission.Client
db *jsondb.DB[State]
+ dbLock sync.Mutex
+
+ animeInFlight map[string]*SubspleaseAnnouncement
+ aifLock sync.Mutex
}
type State struct {
// Name + " " + SeasonEpisode -> TorrentAnnouncement
Seen map[string]TorrentAnnouncement
+
+ AnimeSnatches map[string]SubspleaseAnnouncement
+ AnimeToTrack []string
}
func (s *Sanguisuga) HandleIRCMessage(ev *irc.Event) {
@@ -189,6 +211,8 @@ func (s *Sanguisuga) HandleIRCMessage(ev *irc.Event) {
stateKey := fmt.Sprintf("%s %s", ti.Title, id)
for _, show := range s.Config.Shows {
+ s.dbLock.Lock()
+ defer s.dbLock.Unlock()
if s.db.Data == nil {
s.db.Data = &State{
Seen: map[string]TorrentAnnouncement{},