Skip to content

Instantly share code, notes, and snippets.

@anacrolix
Last active November 15, 2023 07:20
Show Gist options
  • Save anacrolix/d0adb869016959fdc875800c3a9bbdb1 to your computer and use it in GitHub Desktop.
Save anacrolix/d0adb869016959fdc875800c3a9bbdb1 to your computer and use it in GitHub Desktop.
package s3storage
import (
"bytes"
"crypto/sha1"
"encoding/hex"
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
"sync"
"github.com/anacrolix/torrent"
dataPkg "github.com/anacrolix/torrent/data"
"github.com/anacrolix/torrent/metainfo"
"github.com/mitchellh/goamz/s3"
)
const (
filePerm = 0640
dirPerm = 0750
completedPieceS3Path = "completed-pieces/"
incompletePieceDirPath = "data/incomplete"
)
func New(bucket *s3.Bucket) (ret *pieceStorage, err error) {
ret = &pieceStorage{
bucket: bucket,
}
ret.mu.Lock()
go func() {
defer ret.mu.Unlock()
if err := ret.updateComplete(); err != nil {
log.Printf("error updating completed pieces: %s", err)
}
}()
return
}
func (me *pieceStorage) updateComplete() error {
marker := ""
me.pieces = make(map[string]bool, len(me.pieces))
for {
lr, err := me.bucket.List(completedPieceS3Path, "/", marker, 1000)
if err != nil {
return err
}
for _, key := range lr.Contents {
hash := strings.TrimPrefix(key.Key, completedPieceS3Path)
if len(hash) != 40 {
continue
}
me.pieces[hash] = true
}
if !lr.IsTruncated {
break
}
marker = lr.NextMarker
}
log.Printf("updated complete pieces: %d of them", len(me.pieces))
return nil
}
type pieceStorage struct {
mu sync.Mutex
bucket *s3.Bucket
pieces map[string]bool
}
func (me *pieceStorage) piecePath(hash []byte) string {
return "completed-pieces/" + hex.EncodeToString(hash)
}
func (me *pieceStorage) havePiece(hash []byte) bool {
_, err := me.bucket.GetKey(me.piecePath(hash))
return err == nil
}
func (me *pieceStorage) OpenTorrent(info *metainfo.Info) dataPkg.Data {
return &torrentStorage{info, me}
}
type torrentStorage struct {
info *metainfo.Info
pieces *pieceStorage
}
var _ torrent.SectionOpener = &torrentStorage{}
type section struct {
curPiece io.ReadCloser
sectionRemaining int64
torrentStorage *torrentStorage
curPieceIndex int
pieceRemaining int
}
func (me *section) Close() error {
if me.curPiece != nil {
return me.curPiece.Close()
}
return nil
}
func (me *section) Read(b []byte) (n int, err error) {
if me.sectionRemaining == 0 {
err = io.EOF
return
}
if me.pieceRemaining == 0 {
me.curPieceIndex++
if me.curPiece != nil {
me.curPiece.Close()
}
me.curPiece, err = me.torrentStorage.pieces.pieceReader(me.torrentStorage.info.Piece(me.curPieceIndex), 0)
if err != nil {
return
}
me.pieceRemaining = int(me.torrentStorage.info.Piece(me.curPieceIndex).Length())
}
if len(b) > me.pieceRemaining {
b = b[:me.pieceRemaining]
}
if int64(len(b)) > me.sectionRemaining {
b = b[:me.sectionRemaining]
}
n, err = me.curPiece.Read(b)
me.sectionRemaining -= int64(n)
me.pieceRemaining -= n
if err == io.EOF {
if me.pieceRemaining != 0 {
err = io.ErrUnexpectedEOF
} else if me.sectionRemaining != 0 {
err = nil
}
}
return
}
func (me *torrentStorage) OpenSection(off, n int64) (rc io.ReadCloser, err error) {
pieceIndex := int(off / me.info.PieceLength)
pieceOff := off % me.info.PieceLength
pieceInfo := me.info.Piece(pieceIndex)
pieceReader, err := me.pieces.pieceReader(pieceInfo, pieceOff)
if err != nil {
return
}
rc = &section{pieceReader, n, me, pieceIndex, int(pieceInfo.Length() - pieceOff)}
return
}
func (me *torrentStorage) ReadAt(p []byte, off int64) (n int, err error) {
piece := me.info.Piece(int(off / me.info.PieceLength))
off %= me.info.PieceLength
maxLen := piece.Length() - off
if int64(len(p)) > maxLen {
p = p[:maxLen]
}
r, err := me.pieces.pieceReader(piece, off)
if err != nil {
return
}
defer r.Close()
n, err = r.Read(p)
// Individual piece io.Reader's can eagerly return io.EOF, but it isn't
// the end of the torrent yet. The next Read will be to another piece.
if n != 0 && err == io.EOF {
err = nil
}
return
}
func (me *pieceStorage) incompletePiecePath(piece metainfo.Piece) string {
return filepath.Join(incompletePieceDirPath, hex.EncodeToString(piece.Hash()))
}
func (me *pieceStorage) pieceWriter(piece metainfo.Piece, off int64) (ret io.WriteCloser, err error) {
if me.pieces[hex.EncodeToString(piece.Hash())] {
err = errors.New("can't write to completed piece")
return
}
os.MkdirAll(incompletePieceDirPath, dirPerm)
var f *os.File
f, err = os.OpenFile(me.incompletePiecePath(piece), os.O_WRONLY|os.O_CREATE, filePerm)
if err != nil {
return
}
_, err = f.Seek(off, os.SEEK_SET)
if err != nil {
f.Close()
return
}
ret = f
return
}
func (me *torrentStorage) WriteAt(p []byte, off int64) (n int, err error) {
piece := me.info.Piece(int(off / me.info.PieceLength))
off %= me.info.PieceLength
for len(p) != 0 {
var w io.WriteCloser
w, err = me.pieces.pieceWriter(piece, off)
if err != nil {
return
}
p1 := p
maxN := piece.Length() - off
if int64(len(p1)) > maxN {
p1 = p1[:maxN]
}
var n1 int
n1, err = w.Write(p1)
w.Close()
n += n1
if err != nil {
return
}
p = p[n1:]
off = 0
}
return
}
func (me *pieceStorage) incompletePieceReader(piece metainfo.Piece, off int64) (ret io.ReadCloser, err error) {
var f *os.File
f, err = os.Open(me.incompletePiecePath(piece))
if os.IsNotExist(err) {
err = io.EOF
return
}
if err != nil {
return
}
_, err = f.Seek(off, os.SEEK_SET)
if err != nil {
f.Close()
return
}
ret = f
return
}
func (me *pieceStorage) pieceReader(piece metainfo.Piece, off int64) (ret io.ReadCloser, err error) {
if me.pieceComplete(piece) {
path := me.piecePath(piece.Hash())
range_ := fmt.Sprintf("bytes=%d-%d", off, piece.Length())
log.Printf("opening s3://webtorrent/%s (%s)", path, range_)
ret, err = me.bucket.GetRangeReader(path, range_)
// ret, err = me.bucket.GetReader(me.piecePath(piece.Hash()))
// if err != nil {
// return
// }
// _, err = io.CopyN(ioutil.Discard, ret, off)
// if err != nil {
// ret.Close()
// return
// }
return
}
return me.incompletePieceReader(piece, off)
}
func (me *torrentStorage) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) {
i := int(off / me.info.PieceLength)
off %= me.info.PieceLength
for n != 0 {
var pr io.ReadCloser
pr, err = me.pieces.pieceReader(me.info.Piece(i), off)
if err != nil {
if err == io.EOF {
err = nil
}
return
}
var n1 int64
n1, err = io.CopyN(w, pr, n)
written += n1
n -= n1
if err != nil {
return
}
off = 0
}
return
}
func (me *pieceStorage) pieceComplete(piece metainfo.Piece) bool {
return me.pieces[hex.EncodeToString(piece.Hash())]
}
func (me *pieceStorage) PieceComplete(piece metainfo.Piece) bool {
me.mu.Lock()
defer me.mu.Unlock()
return me.pieceComplete(piece)
}
func (me *pieceStorage) pieceCompleted(piece metainfo.Piece) (err error) {
if me.pieceComplete(piece) {
return nil
}
incomplete, err := me.incompletePieceReader(piece, 0)
if err != nil {
return
}
defer incomplete.Close()
hash := sha1.New()
r := io.TeeReader(io.LimitReader(incomplete, piece.Length()), hash)
completedS3Path := completedPieceS3Path + hex.EncodeToString(piece.Hash())
err = me.bucket.PutReaderHeader(completedS3Path, r, piece.Length(), map[string][]string{
"x-amz-storage-class": []string{"REDUCED_REDUNDANCY"},
}, s3.Private)
if err != nil {
return
}
if !bytes.Equal(hash.Sum(nil), piece.Hash()) {
err = errors.New("piece actually incomplete")
if err := me.bucket.Del(completedS3Path); err != nil {
log.Print(err)
}
return
}
me.pieces[hex.EncodeToString(piece.Hash())] = true
os.Remove(me.incompletePiecePath(piece))
return
}
func (me *torrentStorage) PieceCompleted(piece int) (err error) {
return me.pieces.pieceCompleted(me.info.Piece(piece))
}
func (me *torrentStorage) PieceComplete(piece int) bool {
return me.pieces.PieceComplete(me.info.Piece(piece))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment