aboutsummaryrefslogtreecommitdiff
path: root/cmd/mybittorrent/client.go
diff options
context:
space:
mode:
Diffstat (limited to 'cmd/mybittorrent/client.go')
-rw-r--r--cmd/mybittorrent/client.go148
1 files changed, 141 insertions, 7 deletions
diff --git a/cmd/mybittorrent/client.go b/cmd/mybittorrent/client.go
index 8b3aac8..a450a81 100644
--- a/cmd/mybittorrent/client.go
+++ b/cmd/mybittorrent/client.go
@@ -3,6 +3,7 @@ package main
import (
"bytes"
"encoding/binary"
+ "errors"
"fmt"
"io"
"log/slog"
@@ -11,6 +12,7 @@ import (
"net/url"
"os"
"strconv"
+ "sync"
bencode "github.com/jackpal/bencode-go"
)
@@ -22,10 +24,12 @@ type Client struct {
}
type ClientTorrent struct {
- Meta Meta
- Uploaded int
- Downloaded int
- Left int
+ Meta Meta
+ Uploaded int
+ Downloaded int
+ Left int
+ PeerResponse PeerResponse
+ Peers []*Peer
}
type PeerResponse struct {
@@ -33,6 +37,50 @@ type PeerResponse struct {
Peers []string
}
+type FileResult struct {
+ Data []byte
+ Piece Piece
+}
+
+type FileWriter struct {
+ ch chan FileResult
+ piece Piece
+}
+
+func (c *Client) ConnectPeers(filename string) (*ClientTorrent, error) {
+ ct, ok := c.Torrents[filename]
+ if !ok {
+ if err := c.AddTorrentFile(filename); err != nil {
+ return nil, err
+ }
+ }
+
+ if len(ct.PeerResponse.Peers) == 0 {
+ if _, err := c.GetPeers(filename); err != nil {
+ return nil, fmt.Errorf("failed to get peers: %v+", err)
+ }
+ }
+
+ for _, peerAddr := range ct.PeerResponse.Peers {
+ peer, err := c.Handshake(filename, peerAddr)
+ if err != nil {
+ continue
+ }
+
+ ct.Peers = append(ct.Peers, peer)
+ }
+
+ return ct, nil
+}
+
+func (c *Client) Close() (err error) {
+ for _, ct := range c.Torrents {
+ err = errors.Join(err, ct.Close())
+ }
+
+ return
+}
+
func NewClient(peerId string, port int) *Client {
return &Client{
PeerId: peerId,
@@ -70,7 +118,7 @@ func (c *Client) AddTorrentFile(filename string) error {
return nil
}
-func (ct ClientTorrent) getUrl(c Client) (string, error) {
+func (ct *ClientTorrent) getUrl(c Client) (string, error) {
u, err := url.Parse(ct.Meta.Announce)
if err != nil {
return "", err
@@ -130,10 +178,12 @@ func (c *Client) GetPeers(filename string) (PeerResponse, error) {
return PeerResponse{}, err
}
- return PeerResponse{
+ ct.PeerResponse = PeerResponse{
Interval: resp.Interval,
Peers: DecodePeers([]byte(resp.Peers)),
- }, err
+ }
+
+ return ct.PeerResponse, nil
}
func (c *Client) Handshake(filename, peerAddr string) (*Peer, error) {
@@ -187,3 +237,87 @@ func (c *Client) Handshake(filename, peerAddr string) (*Peer, error) {
return peer, nil
}
+
+func (f *FileResult) WriteTo(w io.Writer) (int64, error) {
+ n, err := w.Write(f.Data)
+ return int64(n), err
+}
+
+func (f *FileWriter) Write(b []byte) (int, error) {
+
+ f.ch <- FileResult{
+
+ Data: b,
+
+ Piece: f.piece,
+ }
+
+ return len(b), nil
+
+}
+
+func (ct *ClientTorrent) Download(out string) error {
+ pieces := ct.Meta.Pieces()
+ slog.Debug("Starting download", "pieceCnt", len(pieces), "peers", ct.PeerResponse)
+ pieceCh := make(chan Piece, len(pieces))
+ fileCh := make(chan FileResult)
+ wg := sync.WaitGroup{}
+ wg.Add(len(pieces))
+
+ for _, piece := range pieces {
+ pieceCh <- piece
+ }
+
+ for _, peer := range ct.Peers {
+ go peer.Download(pieceCh, fileCh)
+ }
+
+ go func() {
+ wg.Wait()
+ close(fileCh)
+ close(pieceCh)
+ }()
+
+ return ct.WriteFile(pieceCh, fileCh, out, &wg)
+}
+
+func (ct *ClientTorrent) Close() (err error) {
+ for _, peer := range ct.Peers {
+ err = errors.Join(err, peer.Close())
+ }
+
+ return
+}
+
+func (ct *ClientTorrent) WriteFile(pieceCh chan Piece, fileCh chan FileResult, out string, wg *sync.WaitGroup) error {
+ f, err := os.OpenFile(out, os.O_CREATE|os.O_WRONLY, 0644)
+ if err != nil {
+ return err
+ }
+
+ defer f.Close()
+
+ for fr := range fileCh {
+ if _, err = f.Seek(int64(fr.Piece.Index*ct.Meta.Info.PieceLength), io.SeekStart); err != nil {
+ slog.Error("failed to seek", "err", err)
+ pieceCh <- fr.Piece
+ continue
+ }
+
+ if _, err = fr.WriteTo(f); err != nil {
+ slog.Error("failed to write piece to file", "err", err)
+ pieceCh <- fr.Piece
+ continue
+ }
+
+ if err = f.Sync(); err != nil {
+ slog.Error("failed to sync file to disk", "err", err)
+ continue
+ }
+
+ slog.Debug("writing to file", "piece index", fr.Piece.Index)
+ wg.Done()
+ }
+
+ return nil
+}