aboutsummaryrefslogtreecommitdiff
path: root/cmd/_old/sanguisuga/internal/dcc/dcc.go
blob: 82d2ff3a4a73acbdaae438cb39b318741993d7ca (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
package dcc

import (
	"context"
	"encoding/binary"
	"io"
	"log/slog"
	"net"
	"time"
)

// Progress contains the progression of the
// download handled by the DCC client socket
type Progress struct {
	Speed           float64
	Percentage      float64
	CurrentFileSize float64
	FileSize        float64
}

func (p Progress) LogValue() slog.Value {
	return slog.GroupValue(
		slog.Float64("speed", p.Speed),
		slog.Float64("percentage", p.Percentage),
		slog.Float64("curr", p.CurrentFileSize),
		slog.Float64("total", p.FileSize),
	)
}

// DCC creates a new socket client instance where
// it'll download the DCC transaction into the
// specified io.Writer destination
type DCC struct {
	// important properties
	address string
	size    int

	// output channels used for the Run and the receiver methods()
	// to avoid parameter passing
	progressc chan Progress
	done      chan error

	// internal DCC socket connection
	conn net.Conn

	// assigned context passed from the Run() method
	ctx context.Context

	// destination writer
	writer io.Writer

	// dial function
	dialFunc func(ctx context.Context, network, address string) (net.Conn, error)
}

// NewDCC creates a new DCC instance.
// the host, port are needed for the socket client connection
// the size is required so the download progress is calculated
// the writer is required to store the transaction fragments into
// the specified io.Writer
func NewDCC(
	address string,
	size int,
	writer io.Writer,
	dialFunc func(ctx context.Context, network, address string) (net.Conn, error),
) *DCC {
	return &DCC{
		address:   address,
		size:      size,
		progressc: make(chan Progress, 1),
		done:      make(chan error, 1),
		writer:    writer,
		dialFunc:  dialFunc,
	}
}

func (d *DCC) progress(written float64, speed *float64) time.Time {
	d.progressc <- Progress{
		Speed:           written - *speed,
		Percentage:      (written / float64(d.size)) * 100,
		CurrentFileSize: written,
		FileSize:        float64(d.size),
	}

	*speed = float64(written)

	return time.Now()
}

func (d *DCC) receive() {
	defer func() { // close channels
		close(d.done)

		// close the connection afterwards..
		d.conn.Close()
	}()

	var (
		written int
		speed   float64
		buf     = make([]byte, 30720)
		reader  = io.LimitReader(d.conn, int64(d.size))
		ticker  = time.NewTicker(time.Second)
	)

	defer ticker.Stop()

D:
	for {
		select {
		case <-d.ctx.Done():
			d.done <- nil // send empty to notify the watchers that we're done
			return        // terminated..
		case <-ticker.C:
			d.progress(float64(written), &speed)
			// notify the other side about the state of the connection
			writtenNetworkOrder := uint32(written)
			if err := binary.Write(d.conn, binary.BigEndian, writtenNetworkOrder); err != nil {
				if err == io.EOF {
					err = nil
				}

				d.progress(float64(written), &speed)
				d.done <- err

				return
			}
		default:
			n, err := reader.Read(buf)

			if err != nil {
				if err == io.EOF { // empty out the error
					err = nil
				}

				d.progress(float64(written), &speed)
				d.done <- err

				return
			}

			if n > 0 {
				_, err = d.writer.Write(buf[0:n])

				if err != nil {
					d.done <- err
					return
				} else if written >= d.size { // finished
					break D
				}

				written += n
			}
		}
	}
}

// Run established the connection with the DCC TCP socket
// and returns two channels, where one is used for the download progress
// and the other is used to return exceptions during our transaction.
// A context is required, where you have the ability to cancel and timeout
// a download.
// One should check the second value for the progress/error channels when
// receiving data as if the channels are closed, it means that the transaction
// is finished or got interrupted.
func (d *DCC) Run(ctx context.Context) (
	progressc <-chan Progress,
	done <-chan error,
) {
	// assign the output to the struct properties
	progressc = d.progressc
	done = d.done

	// assign the passed context
	d.ctx = ctx

	conn, err := d.dialFunc(d.ctx, "tcp", d.address)

	if err != nil {
		d.done <- err
		return
	}

	// setup the connection for the receiver
	d.conn = conn

	go d.receive()

	return
}