diff options
| author | Xe Iaso <me@xeiaso.net> | 2023-08-13 02:09:17 -0400 |
|---|---|---|
| committer | Xe Iaso <me@xeiaso.net> | 2023-08-13 02:09:17 -0400 |
| commit | 378ae0feaa2bb9d6b4406e1a2281fe0c8400fdd8 (patch) | |
| tree | 6c43514a2fd77d1857c883d6933ef5424b881c77 /cmd | |
| parent | 1a27d6b01baa36c3f0a1b2eb5c46f3f5780906e7 (diff) | |
| download | x-378ae0feaa2bb9d6b4406e1a2281fe0c8400fdd8.tar.xz x-378ae0feaa2bb9d6b4406e1a2281fe0c8400fdd8.zip | |
cmd/sanguisuga: anime monitoring
Signed-off-by: Xe Iaso <me@xeiaso.net>
Diffstat (limited to 'cmd')
| -rw-r--r-- | cmd/sanguisuga/config.default.ts | 2 | ||||
| -rw-r--r-- | cmd/sanguisuga/config.go | 2 | ||||
| -rw-r--r-- | cmd/sanguisuga/dcc.go | 288 | ||||
| -rw-r--r-- | cmd/sanguisuga/dcc/dcc.go | 176 | ||||
| -rw-r--r-- | cmd/sanguisuga/main.go | 28 |
5 files changed, 494 insertions, 2 deletions
diff --git a/cmd/sanguisuga/config.default.ts b/cmd/sanguisuga/config.default.ts index f05e224..107f6b7 100644 --- a/cmd/sanguisuga/config.default.ts +++ b/cmd/sanguisuga/config.default.ts @@ -29,10 +29,12 @@ export type Tailscale = { export type Config = { irc: IRC; + xdcc: IRC; transmission: Transmission; shows: Show[]; rssKey: string; tailscale: Tailscale; + baseDiskPath: string; }; export default { diff --git a/cmd/sanguisuga/config.go b/cmd/sanguisuga/config.go index 3c2effd..8d91f6d 100644 --- a/cmd/sanguisuga/config.go +++ b/cmd/sanguisuga/config.go @@ -30,8 +30,10 @@ type Tailscale struct { type Config struct { IRC IRC `json:"irc"` + XDCC IRC `json:"xdcc"` Transmission Transmission `json:"transmission"` Shows []Show `json:"shows"` RSSKey string `json:"rssKey"` Tailscale Tailscale `json:"tailscale"` + BaseDiskPath string `json:"baseDiskPath"` } diff --git a/cmd/sanguisuga/dcc.go b/cmd/sanguisuga/dcc.go new file mode 100644 index 0000000..7a014c8 --- /dev/null +++ b/cmd/sanguisuga/dcc.go @@ -0,0 +1,288 @@ +package main + +import ( + "context" + "encoding/binary" + "encoding/json" + "errors" + "expvar" + "fmt" + "io" + "log" + "net" + "net/http" + "os" + "path/filepath" + "regexp" + "strconv" + "strings" + "time" + + irc "github.com/thoj/go-ircevent" + "golang.org/x/exp/slog" + "tailscale.com/metrics" + "within.website/x/cmd/sanguisuga/dcc" +) + +var ( + subspleaseAnnounceRegex = regexp.MustCompile(`^.*\* (\[SubsPlease\] (.*) - ([0-9]+) \(([0-9]+p)\).*\.mkv) \* /MSG ([A-Za-z-_|]+) XDCC SEND ([0-9]+)$`) + dccCommand = regexp.MustCompile(`^DCC SEND "(.*)" ([0-9]+) ([0-9]+) ([0-9]+)$`) + + bytesDownloaded = &metrics.LabelMap{Label: "filename"} +) + +func init() { + expvar.Publish("gauge_sanguisuga_bytes_downloaded", bytesDownloaded) +} + +type SubspleaseAnnouncement struct { + Filename, ShowName string + Episode, Quality string + BotName, PackID string +} + +func (sa SubspleaseAnnouncement) Key() string { + return fmt.Sprintf("%s %s %s", sa.BotName, sa.ShowName, sa.Episode) +} + +func (sa SubspleaseAnnouncement) LogValue() slog.Value { + return slog.GroupValue( + slog.String("showname", sa.ShowName), + slog.String("episode", sa.Episode), + slog.String("quality", sa.Quality), + slog.String("botName", sa.BotName), + ) +} + +func ParseSubspleaseAnnouncement(input string) (*SubspleaseAnnouncement, error) { + match := subspleaseAnnounceRegex.FindStringSubmatch(input) + + if match == nil { + return nil, errors.New("invalid annoucement format") + } + + return &SubspleaseAnnouncement{ + Filename: match[1], + ShowName: match[2], + Episode: match[3], + Quality: match[4], + BotName: match[5], + PackID: match[6], + }, nil +} + +func int2ip(nn uint32) string { + ip := make(net.IP, 4) + binary.BigEndian.PutUint32(ip, nn) + return ip.String() +} + +func (s *Sanguisuga) XDCC() { + ircCli := irc.IRC(s.Config.XDCC.Nick, s.Config.XDCC.User) + ircCli.Password = s.Config.XDCC.Password + ircCli.RealName = s.Config.XDCC.Real + ircCli.AddCallback("001", func(ev *irc.Event) { + ircCli.Join(s.Config.XDCC.Channel) + }) + ircCli.AddCallback("PRIVMSG", s.ScrapeSubsplease) + ircCli.AddCallback("CTCP", s.SubspleaseDCC) + + ircCli.Log = slog.NewLogLogger(slog.Default().Handler().WithAttrs([]slog.Attr{slog.String("from", "ircevent"), slog.String("for", "anime")}), slog.LevelInfo) + ircCli.Timeout = 5 * time.Second + + if err := ircCli.Connect(s.Config.XDCC.Server); err != nil { + log.Fatalf("can't connect to XDCC server %s: %v", s.Config.XDCC.Server, err) + } + + ircCli.Loop() +} + +func (s *Sanguisuga) TrackAnime(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + data, err := io.ReadAll(http.MaxBytesReader(w, r.Body, 256)) + if err != nil { + slog.Error("can't read request body", "err", err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + name := strings.TrimSpace(string(data)) + + s.dbLock.Lock() + defer s.dbLock.Unlock() + + s.db.Data.AnimeToTrack = append(s.db.Data.AnimeToTrack, name) + if err := s.db.Save(); err != nil { + slog.Error("can't save database", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + json.NewEncoder(w).Encode(s.db.Data.AnimeToTrack) +} + +func (s *Sanguisuga) ListAnime(w http.ResponseWriter, r *http.Request) { + s.dbLock.Lock() + defer s.dbLock.Unlock() + + json.NewEncoder(w).Encode(s.db.Data.AnimeToTrack) +} + +func (s *Sanguisuga) ScrapeSubsplease(ev *irc.Event) { + if ev.Code != "PRIVMSG" { + return + } + + if ev.Arguments[0] != s.Config.XDCC.Channel { + return + } + + switch ev.Nick { + case "CR-ARUTHA|NEW", "CR-HOLLAND|NEW", "Belath": + default: + return + } + + ann, err := ParseSubspleaseAnnouncement(ev.MessageWithoutFormat()) + if err != nil { + slog.Debug("can't parse announcement", "input", ev.MessageWithoutFormat(), "err", err) + return + } + + if ann.Quality != "1080p" { + return + } + + lg := slog.Default().With("announcement", ann) + lg.Info("found announcement") + + s.dbLock.Lock() + defer s.dbLock.Unlock() + + s.aifLock.Lock() + defer s.aifLock.Unlock() + + if _, ok := s.db.Data.AnimeSnatches[ann.Filename]; ok { + return + } + + found := false + for _, name := range s.db.Data.AnimeToTrack { + if ann.ShowName == name { + found = true + } + } + + if !found { + return + } + + // if already being fetched, don't fetch again + if _, ok := s.animeInFlight[ann.Filename]; ok { + return + } + + ev.Connection.Privmsgf(ann.BotName, "XDCC SEND %s", ann.PackID) + + s.animeInFlight[ann.Filename] = ann + + s.db.Data.AnimeSnatches[ann.Filename] = *ann + if err := s.db.Save(); err != nil { + lg.Error("can't save database", "err", err) + return + } +} + +func (s *Sanguisuga) SubspleaseDCC(ev *irc.Event) { + matches := dccCommand.FindStringSubmatch(ev.MessageWithoutFormat()) + if matches == nil { + return + } + + s.aifLock.Lock() + defer s.aifLock.Unlock() + + if len(matches) != 5 { + slog.Error("wrong message from DCC bot", "botName", ev.Nick, "message", ev.Message()) + return + } + + fname := matches[1] + ipString := matches[2] + port := matches[3] + sizeString := matches[4] + + if strings.HasSuffix(fname, "\"") { + fname = fname[:len(fname)-2] + } + + ann, _ := s.animeInFlight[fname] + + if ann == nil { + slog.Debug("ann == nil?", "fname", fname, "ann", s.animeInFlight[fname]) + return + } + + ipUint, err := strconv.ParseUint(ipString, 10, 32) + if err != nil { + slog.Error("can't parse IP address", "addr", ipString, "err", err) + return + } + + ip := int2ip(uint32(ipUint)) + addr := net.JoinHostPort(ip, port) + + size, err := strconv.Atoi(sizeString) + if err != nil { + slog.Error("can't parse size", "size", sizeString, "err", err) + return + } + + lg := slog.Default().With("fname", fname, "botName", ev.Nick, "addr", addr) + lg.Info("fetching episode") + + outDir := filepath.Join(s.Config.BaseDiskPath, ann.ShowName) + outFname := filepath.Join(outDir, fname) + + os.MkdirAll(outDir, 0777) + + fout, err := os.Create(outFname) + if err != nil { + lg.Error("can't create output file", "outFname", outFname, "err", err) + return + } + defer fout.Close() + + d := dcc.NewDCC(addr, size, fout) + + ctx, cancel := context.WithTimeout(ev.Ctx, 120*time.Minute) + defer cancel() + + progc, errc := d.Run(ctx) + + defer lg.Info("done") + + for { + select { + case p := <-progc: + curr := bytesDownloaded.GetFloat(fname) + curr.Set(p.CurrentFileSize) + + if p.CurrentFileSize == p.FileSize { + delete(s.animeInFlight, fname) + return + } + + if p.Percentage >= 100 { + delete(s.animeInFlight, fname) + return + } + + lg.Info("download progress", "progress", p) + case err := <-errc: + lg.Error("error in DCC thread, giving up", "err", err) + delete(s.animeInFlight, fname) + return + } + } +} diff --git a/cmd/sanguisuga/dcc/dcc.go b/cmd/sanguisuga/dcc/dcc.go new file mode 100644 index 0000000..c26b141 --- /dev/null +++ b/cmd/sanguisuga/dcc/dcc.go @@ -0,0 +1,176 @@ +package dcc + +import ( + "context" + "io" + "net" + "time" + + "golang.org/x/exp/slog" +) + +// Progress contains the progression of the +// download handled by the DCC client socket +type Progress struct { + Speed float64 + Percentage float64 + CurrentFileSize float64 + FileSize float64 +} + +func (p Progress) LogValue() slog.Value { + return slog.GroupValue( + slog.Float64("speed", p.Speed), + slog.Float64("percentage", p.Percentage), + slog.Float64("curr", p.CurrentFileSize), + slog.Float64("total", p.FileSize), + ) +} + +// DCC creates a new socket client instance where +// it'll download the DCC transaction into the +// specified io.Writer destination +type DCC struct { + // important properties + address string + size int + + // output channels used for the Run and the receiver methods() + // to avoid parameter passing + progressc chan Progress + done chan error + + // internal DCC socket connection + conn net.Conn + + // assigned context passed from the Run() method + ctx context.Context + + // destination writer + writer io.Writer +} + +// NewDCC creates a new DCC instance. +// the host, port are needed for the socket client connection +// the size is required so the download progress is calculated +// the writer is required to store the transaction fragments into +// the specified io.Writer +func NewDCC( + address string, + size int, + writer io.Writer, +) *DCC { + return &DCC{ + address: address, + size: size, + progressc: make(chan Progress, 1), + done: make(chan error, 1), + writer: writer, + } +} + +func (d *DCC) progress(written float64, speed *float64) time.Time { + d.progressc <- Progress{ + Speed: written - *speed, + Percentage: (written / float64(d.size)) * 100, + CurrentFileSize: written, + FileSize: float64(d.size), + } + + *speed = float64(written) + + return time.Now() +} + +func (d *DCC) receive() { + defer func() { // close channels + close(d.done) + + // close the connection afterwards.. + d.conn.Close() + }() + + var ( + written int + speed float64 + buf = make([]byte, 30720) + reader = io.LimitReader(d.conn, int64(d.size)) + ticker = time.NewTicker(time.Second) + ) + + defer ticker.Stop() + +D: + for { + select { + case <-d.ctx.Done(): + d.done <- nil // send empty to notify the watchers that we're done + return // terminated.. + case <-ticker.C: + d.progress(float64(written), &speed) + default: + n, err := reader.Read(buf) + + if err != nil { + if err == io.EOF { // empty out the error + err = nil + } + + d.progress(float64(written), &speed) + d.done <- err + + return + } + + if n > 0 { + _, err = d.writer.Write(buf[0:n]) + + if err != nil { + d.done <- err + return + } else if written >= d.size { // finished + break D + } + + written += n + } + } + } +} + +// Run established the connection with the DCC TCP socket +// and returns two channels, where one is used for the download progress +// and the other is used to return exceptions during our transaction. +// A context is required, where you have the ability to cancel and timeout +// a download. +// One should check the second value for the progress/error channels when +// receiving data as if the channels are closed, it means that the transaction +// is finished or got interrupted. +func (d *DCC) Run(ctx context.Context) ( + progressc <-chan Progress, + done <-chan error, +) { + // assign the output to the struct properties + progressc = d.progressc + done = d.done + + // assign the passed context + d.ctx = ctx + + dialer := &net.Dialer{Resolver: net.DefaultResolver} + conn, err := dialer.DialContext( + d.ctx, "tcp", d.address, + ) + + if err != nil { + d.done <- err + return + } + + // setup the connection for the receiver + d.conn = conn + + go d.receive() + + return +} diff --git a/cmd/sanguisuga/main.go b/cmd/sanguisuga/main.go index e6a57ee..4300bcd 100644 --- a/cmd/sanguisuga/main.go +++ b/cmd/sanguisuga/main.go @@ -3,6 +3,7 @@ package main import ( "bytes" "encoding/base64" + "errors" "expvar" "flag" "fmt" @@ -12,6 +13,7 @@ import ( "path/filepath" "regexp" "strings" + "sync" "time" irc "github.com/thoj/go-ircevent" @@ -50,7 +52,7 @@ func ParseTorrentAnnouncement(input string) (*TorrentAnnouncement, error) { match := annRegex.FindStringSubmatch(input) if match == nil { - return nil, fmt.Errorf("invalid torrent announcement format") + return nil, errors.New("invalid torrent announcement format") } torrent := &TorrentAnnouncement{ @@ -81,6 +83,11 @@ func main() { Seen: map[string]TorrentAnnouncement{}, } } + + if db.Data.AnimeSnatches == nil { + db.Data.AnimeSnatches = map[string]SubspleaseAnnouncement{} + } + if err := db.Save(); err != nil { log.Fatalf("can't ping database: %v", err) } @@ -94,7 +101,8 @@ func main() { Dir: dataDir, AuthKey: c.Tailscale.Authkey, Hostname: c.Tailscale.Hostname, - Logf: slog.NewLogLogger(slog.Default().Handler().WithAttrs([]slog.Attr{slog.String("from", "tsnet")}), slog.LevelInfo).Printf, + Logf: func(string, ...any) {}, + //Logf: slog.NewLogLogger(slog.Default().Handler().WithAttrs([]slog.Attr{slog.String("from", "tsnet")}), slog.LevelInfo).Printf, } if err := srv.Start(); err != nil { @@ -126,8 +134,15 @@ func main() { Config: c, cl: cl, db: db, + + animeInFlight: map[string]*SubspleaseAnnouncement{}, } + http.HandleFunc("/api/anime/list", s.ListAnime) + http.HandleFunc("/api/anime/track", s.TrackAnime) + + s.XDCC() + ircCli := irc.IRC(c.IRC.Nick, c.IRC.User) ircCli.Password = c.IRC.Password ircCli.RealName = c.IRC.Real @@ -149,11 +164,18 @@ type Sanguisuga struct { Config Config cl *transmission.Client db *jsondb.DB[State] + dbLock sync.Mutex + + animeInFlight map[string]*SubspleaseAnnouncement + aifLock sync.Mutex } type State struct { // Name + " " + SeasonEpisode -> TorrentAnnouncement Seen map[string]TorrentAnnouncement + + AnimeSnatches map[string]SubspleaseAnnouncement + AnimeToTrack []string } func (s *Sanguisuga) HandleIRCMessage(ev *irc.Event) { @@ -189,6 +211,8 @@ func (s *Sanguisuga) HandleIRCMessage(ev *irc.Event) { stateKey := fmt.Sprintf("%s %s", ti.Title, id) for _, show := range s.Config.Shows { + s.dbLock.Lock() + defer s.dbLock.Unlock() if s.db.Data == nil { s.db.Data = &State{ Seen: map[string]TorrentAnnouncement{}, |
