Skip to content

Commit e2287ba

Browse files
committed
fixup! feat(pdp): add SP-to-SP piece fetch endpoint
1 parent 546549f commit e2287ba

File tree

8 files changed

+617
-347
lines changed

8 files changed

+617
-347
lines changed

cmd/curio/tasks/tasks.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,11 +305,12 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
305305
pdpNextProvingPeriodTask := pdp.NewNextProvingPeriodTask(db, must.One(dependencies.EthClient.Val()), dependencies.Chain, chainSched, es)
306306
pdpInitProvingPeriodTask := pdp.NewInitProvingPeriodTask(db, must.One(dependencies.EthClient.Val()), dependencies.Chain, chainSched, es)
307307
pdpNotifTask := pdp.NewPDPNotifyTask(db)
308+
pdpFetchPieceTask := pdp.NewPDPFetchPieceTask(ctx, db, lstor, 4)
308309
pdpIndexingTask := indexing.NewPDPIndexingTask(db, iStore, dependencies.CachedPieceReader, cfg, idxMax)
309310
pdpIpniTask := indexing.NewPDPIPNITask(db, sc, dependencies.CachedPieceReader, cfg, idxMax)
310311
pdpTerminate := pdp.NewTerminateServiceTask(db, must.One(dependencies.EthClient.Val()), senderEth)
311312
pdpDelete := pdp.NewDeleteDataSetTask(db, must.One(dependencies.EthClient.Val()), senderEth)
312-
activeTasks = append(activeTasks, pdpNotifTask, pdpProveTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask, pdpIndexingTask, pdpIpniTask, pdpTerminate, pdpDelete)
313+
activeTasks = append(activeTasks, pdpNotifTask, pdpFetchPieceTask, pdpProveTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask, pdpIndexingTask, pdpIpniTask, pdpTerminate, pdpDelete)
313314
}
314315

315316
indexingTask := indexing.NewIndexingTask(db, sc, iStore, pp, cfg, idxMax)

harmony/harmonydb/sql/20260109-pdpv0-fetch.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ CREATE TABLE pdp_piece_fetch_items (
1818
fetch_id BIGINT NOT NULL REFERENCES pdp_piece_fetches(id) ON DELETE CASCADE,
1919
piece_cid TEXT NOT NULL, -- PieceCIDv1 (for joins with parked_pieces)
2020
piece_raw_size BIGINT NOT NULL, -- raw size to reconstruct PieceCIDv2 for API
21+
source_url TEXT NOT NULL, -- external SP URL to fetch from
22+
task_id BIGINT REFERENCES harmony_task(id) ON DELETE SET NULL, -- fetch task
2123
failed BOOLEAN NOT NULL DEFAULT FALSE, -- true if piece permanently failed
2224
fail_reason TEXT, -- error message when failed
2325

lib/ffi/piece_funcs.go

Lines changed: 0 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,6 @@ import (
88

99
"golang.org/x/xerrors"
1010

11-
commcid "github.com/filecoin-project/go-fil-commcid"
12-
commp "github.com/filecoin-project/go-fil-commp-hashhash"
13-
"github.com/filecoin-project/go-state-types/abi"
14-
1511
"github.com/filecoin-project/curio/harmony/harmonytask"
1612
storiface "github.com/filecoin-project/curio/lib/storiface"
1713
)
@@ -87,82 +83,3 @@ func (sb *SealCalls) PieceReader(ctx context.Context, id storiface.PieceNumber)
8783
func (sb *SealCalls) RemovePiece(ctx context.Context, id storiface.PieceNumber) error {
8884
return sb.Sectors.storage.Remove(ctx, id.Ref().ID, storiface.FTPiece, true, nil)
8985
}
90-
91-
func (sb *SealCalls) WriteUploadPiece(ctx context.Context, pieceID storiface.PieceNumber, size int64, data io.Reader, storageType storiface.PathType, verifySize bool) (abi.PieceInfo, uint64, error) {
92-
// Use storageType in AcquireSector
93-
paths, pathIDs, done, err := sb.Sectors.AcquireSector(ctx, nil, pieceID.Ref(), storiface.FTNone, storiface.FTPiece, storageType)
94-
if err != nil {
95-
return abi.PieceInfo{}, 0, err
96-
}
97-
skipDeclare := storiface.FTPiece
98-
99-
defer func() {
100-
done(skipDeclare)
101-
}()
102-
103-
dest := paths.Piece
104-
tempDest := dest + storiface.TempSuffix
105-
106-
destFile, err := os.OpenFile(tempDest, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
107-
if err != nil {
108-
return abi.PieceInfo{}, 0, xerrors.Errorf("creating temp piece file '%s': %w", tempDest, err)
109-
}
110-
111-
removeTemp := true
112-
defer func() {
113-
if removeTemp {
114-
rerr := os.Remove(tempDest)
115-
if rerr != nil {
116-
log.Errorf("removing temp file: %+v", rerr)
117-
}
118-
}
119-
}()
120-
121-
copyStart := time.Now()
122-
123-
wr := new(commp.Calc)
124-
writers := io.MultiWriter(wr, destFile)
125-
126-
n, err := io.CopyBuffer(writers, io.LimitReader(data, size), make([]byte, 8<<20))
127-
if err != nil {
128-
_ = destFile.Close()
129-
return abi.PieceInfo{}, 0, xerrors.Errorf("copying piece data: %w", err)
130-
}
131-
132-
if err := destFile.Close(); err != nil {
133-
return abi.PieceInfo{}, 0, xerrors.Errorf("closing temp piece file: %w", err)
134-
}
135-
136-
if verifySize && n != size {
137-
return abi.PieceInfo{}, 0, xerrors.Errorf("short write: %d", n)
138-
}
139-
140-
digest, pieceSize, err := wr.Digest()
141-
if err != nil {
142-
return abi.PieceInfo{}, 0, xerrors.Errorf("computing piece digest: %w", err)
143-
}
144-
145-
pcid, err := commcid.DataCommitmentV1ToCID(digest)
146-
if err != nil {
147-
return abi.PieceInfo{}, 0, xerrors.Errorf("computing piece CID: %w", err)
148-
}
149-
psize := abi.PaddedPieceSize(pieceSize)
150-
151-
copyEnd := time.Now()
152-
153-
log.Infow("wrote piece", "piece", pieceID, "size", n, "duration", copyEnd.Sub(copyStart), "dest", dest, "MiB/s", float64(size)/(1<<20)/copyEnd.Sub(copyStart).Seconds())
154-
155-
if err := os.Rename(tempDest, dest); err != nil {
156-
return abi.PieceInfo{}, 0, xerrors.Errorf("rename temp piece to dest %s -> %s: %w", tempDest, dest, err)
157-
}
158-
159-
skipDeclare = storiface.FTNone
160-
161-
removeTemp = false
162-
163-
if err := sb.ensureOneCopy(ctx, pieceID.Ref().ID, pathIDs, storiface.FTPiece); err != nil {
164-
return abi.PieceInfo{}, 0, xerrors.Errorf("ensure one copy: %w", err)
165-
}
166-
167-
return abi.PieceInfo{PieceCID: pcid, Size: psize}, uint64(n), nil
168-
}

0 commit comments

Comments
 (0)