1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
|
package dcc
import (
"context"
"encoding/binary"
"io"
"log/slog"
"net"
"time"
)
// Progress contains the progression of the
// download handled by the DCC client socket
type Progress struct {
Speed float64
Percentage float64
CurrentFileSize float64
FileSize float64
}
func (p Progress) LogValue() slog.Value {
return slog.GroupValue(
slog.Float64("speed", p.Speed),
slog.Float64("percentage", p.Percentage),
slog.Float64("curr", p.CurrentFileSize),
slog.Float64("total", p.FileSize),
)
}
// DCC creates a new socket client instance where
// it'll download the DCC transaction into the
// specified io.Writer destination
type DCC struct {
// important properties
address string
size int
// output channels used for the Run and the receiver methods()
// to avoid parameter passing
progressc chan Progress
done chan error
// internal DCC socket connection
conn net.Conn
// assigned context passed from the Run() method
ctx context.Context
// destination writer
writer io.Writer
// dial function
dialFunc func(ctx context.Context, network, address string) (net.Conn, error)
}
// NewDCC creates a new DCC instance.
// the host, port are needed for the socket client connection
// the size is required so the download progress is calculated
// the writer is required to store the transaction fragments into
// the specified io.Writer
func NewDCC(
address string,
size int,
writer io.Writer,
dialFunc func(ctx context.Context, network, address string) (net.Conn, error),
) *DCC {
return &DCC{
address: address,
size: size,
progressc: make(chan Progress, 1),
done: make(chan error, 1),
writer: writer,
dialFunc: dialFunc,
}
}
func (d *DCC) progress(written float64, speed *float64) time.Time {
d.progressc <- Progress{
Speed: written - *speed,
Percentage: (written / float64(d.size)) * 100,
CurrentFileSize: written,
FileSize: float64(d.size),
}
*speed = float64(written)
return time.Now()
}
func (d *DCC) receive() {
defer func() { // close channels
close(d.done)
// close the connection afterwards..
d.conn.Close()
}()
var (
written int
speed float64
buf = make([]byte, 30720)
reader = io.LimitReader(d.conn, int64(d.size))
ticker = time.NewTicker(time.Second)
)
defer ticker.Stop()
D:
for {
select {
case <-d.ctx.Done():
d.done <- nil // send empty to notify the watchers that we're done
return // terminated..
case <-ticker.C:
d.progress(float64(written), &speed)
// notify the other side about the state of the connection
writtenNetworkOrder := uint32(written)
if err := binary.Write(d.conn, binary.BigEndian, writtenNetworkOrder); err != nil {
if err == io.EOF {
err = nil
}
d.progress(float64(written), &speed)
d.done <- err
return
}
default:
n, err := reader.Read(buf)
if err != nil {
if err == io.EOF { // empty out the error
err = nil
}
d.progress(float64(written), &speed)
d.done <- err
return
}
if n > 0 {
_, err = d.writer.Write(buf[0:n])
if err != nil {
d.done <- err
return
} else if written >= d.size { // finished
break D
}
written += n
}
}
}
}
// Run established the connection with the DCC TCP socket
// and returns two channels, where one is used for the download progress
// and the other is used to return exceptions during our transaction.
// A context is required, where you have the ability to cancel and timeout
// a download.
// One should check the second value for the progress/error channels when
// receiving data as if the channels are closed, it means that the transaction
// is finished or got interrupted.
func (d *DCC) Run(ctx context.Context) (
progressc <-chan Progress,
done <-chan error,
) {
// assign the output to the struct properties
progressc = d.progressc
done = d.done
// assign the passed context
d.ctx = ctx
conn, err := d.dialFunc(d.ctx, "tcp", d.address)
if err != nil {
d.done <- err
return
}
// setup the connection for the receiver
d.conn = conn
go d.receive()
return
}
|