diff options
| author | jet2tlf <jet2tlf@gmail.com> | 2024-06-03 18:31:42 +0000 |
|---|---|---|
| committer | jet2tlf <jet2tlf@gmail.com> | 2024-06-03 18:31:42 +0000 |
| commit | 210fb1e02453413d1ce070b70c850807286a1a7a (patch) | |
| tree | dbc49ba086460dfbf62ef1d2d602cd8da46e6df2 /cmd/mybittorrent/client.go | |
| parent | 853be358804a6e30e857035ffda81a06df3f6b74 (diff) | |
| download | bittorrent-go-master.tar.gz bittorrent-go-master.zip | |
Diffstat (limited to 'cmd/mybittorrent/client.go')
| -rw-r--r-- | cmd/mybittorrent/client.go | 148 |
1 files changed, 141 insertions, 7 deletions
diff --git a/cmd/mybittorrent/client.go b/cmd/mybittorrent/client.go index 8b3aac8..a450a81 100644 --- a/cmd/mybittorrent/client.go +++ b/cmd/mybittorrent/client.go @@ -3,6 +3,7 @@ package main import ( "bytes" "encoding/binary" + "errors" "fmt" "io" "log/slog" @@ -11,6 +12,7 @@ import ( "net/url" "os" "strconv" + "sync" bencode "github.com/jackpal/bencode-go" ) @@ -22,10 +24,12 @@ type Client struct { } type ClientTorrent struct { - Meta Meta - Uploaded int - Downloaded int - Left int + Meta Meta + Uploaded int + Downloaded int + Left int + PeerResponse PeerResponse + Peers []*Peer } type PeerResponse struct { @@ -33,6 +37,50 @@ type PeerResponse struct { Peers []string } +type FileResult struct { + Data []byte + Piece Piece +} + +type FileWriter struct { + ch chan FileResult + piece Piece +} + +func (c *Client) ConnectPeers(filename string) (*ClientTorrent, error) { + ct, ok := c.Torrents[filename] + if !ok { + if err := c.AddTorrentFile(filename); err != nil { + return nil, err + } + } + + if len(ct.PeerResponse.Peers) == 0 { + if _, err := c.GetPeers(filename); err != nil { + return nil, fmt.Errorf("failed to get peers: %v+", err) + } + } + + for _, peerAddr := range ct.PeerResponse.Peers { + peer, err := c.Handshake(filename, peerAddr) + if err != nil { + continue + } + + ct.Peers = append(ct.Peers, peer) + } + + return ct, nil +} + +func (c *Client) Close() (err error) { + for _, ct := range c.Torrents { + err = errors.Join(err, ct.Close()) + } + + return +} + func NewClient(peerId string, port int) *Client { return &Client{ PeerId: peerId, @@ -70,7 +118,7 @@ func (c *Client) AddTorrentFile(filename string) error { return nil } -func (ct ClientTorrent) getUrl(c Client) (string, error) { +func (ct *ClientTorrent) getUrl(c Client) (string, error) { u, err := url.Parse(ct.Meta.Announce) if err != nil { return "", err @@ -130,10 +178,12 @@ func (c *Client) GetPeers(filename string) (PeerResponse, error) { return PeerResponse{}, err } - return PeerResponse{ + ct.PeerResponse = PeerResponse{ Interval: resp.Interval, Peers: DecodePeers([]byte(resp.Peers)), - }, err + } + + return ct.PeerResponse, nil } func (c *Client) Handshake(filename, peerAddr string) (*Peer, error) { @@ -187,3 +237,87 @@ func (c *Client) Handshake(filename, peerAddr string) (*Peer, error) { return peer, nil } + +func (f *FileResult) WriteTo(w io.Writer) (int64, error) { + n, err := w.Write(f.Data) + return int64(n), err +} + +func (f *FileWriter) Write(b []byte) (int, error) { + + f.ch <- FileResult{ + + Data: b, + + Piece: f.piece, + } + + return len(b), nil + +} + +func (ct *ClientTorrent) Download(out string) error { + pieces := ct.Meta.Pieces() + slog.Debug("Starting download", "pieceCnt", len(pieces), "peers", ct.PeerResponse) + pieceCh := make(chan Piece, len(pieces)) + fileCh := make(chan FileResult) + wg := sync.WaitGroup{} + wg.Add(len(pieces)) + + for _, piece := range pieces { + pieceCh <- piece + } + + for _, peer := range ct.Peers { + go peer.Download(pieceCh, fileCh) + } + + go func() { + wg.Wait() + close(fileCh) + close(pieceCh) + }() + + return ct.WriteFile(pieceCh, fileCh, out, &wg) +} + +func (ct *ClientTorrent) Close() (err error) { + for _, peer := range ct.Peers { + err = errors.Join(err, peer.Close()) + } + + return +} + +func (ct *ClientTorrent) WriteFile(pieceCh chan Piece, fileCh chan FileResult, out string, wg *sync.WaitGroup) error { + f, err := os.OpenFile(out, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + + defer f.Close() + + for fr := range fileCh { + if _, err = f.Seek(int64(fr.Piece.Index*ct.Meta.Info.PieceLength), io.SeekStart); err != nil { + slog.Error("failed to seek", "err", err) + pieceCh <- fr.Piece + continue + } + + if _, err = fr.WriteTo(f); err != nil { + slog.Error("failed to write piece to file", "err", err) + pieceCh <- fr.Piece + continue + } + + if err = f.Sync(); err != nil { + slog.Error("failed to sync file to disk", "err", err) + continue + } + + slog.Debug("writing to file", "piece index", fr.Piece.Index) + wg.Done() + } + + return nil +} |