Skip to content

Commit 965a0b1

Browse files
authored
feat(pdp): add SP-to-SP piece pull endpoint (#864)
Implement POST /pdp/piece/pull for fetching pieces from other SPs. Background task handles download, CommP verification, and retry with failure tracking. Consolidates PieceCID parsing into shared utilities. Closes: #828
1 parent 5c9e34d commit 965a0b1

File tree

12 files changed

+3066
-87
lines changed

12 files changed

+3066
-87
lines changed

cmd/curio/tasks/tasks.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,12 +305,13 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
305305
pdpNextProvingPeriodTask := pdp.NewNextProvingPeriodTask(db, must.One(dependencies.EthClient.Val()), dependencies.Chain, chainSched, es)
306306
pdpInitProvingPeriodTask := pdp.NewInitProvingPeriodTask(db, must.One(dependencies.EthClient.Val()), dependencies.Chain, chainSched, es)
307307
pdpNotifTask := pdp.NewPDPNotifyTask(ctx, db)
308+
pdpPullPieceTask := pdp.NewPDPPullPieceTask(ctx, db, lstor, 4)
308309
pdpIndexingTask := indexing.NewPDPIndexingTask(db, iStore, dependencies.CachedPieceReader, cfg, idxMax)
309310
pdpIpniTask := indexing.NewPDPIPNITask(db, sc, dependencies.CachedPieceReader, cfg, idxMax)
310311
pdpTerminate := pdp.NewTerminateServiceTask(db, must.One(dependencies.EthClient.Val()), senderEth)
311312
pdpDelete := pdp.NewDeleteDataSetTask(db, must.One(dependencies.EthClient.Val()), senderEth)
312313
payTask := pay.NewSettleTask(db, must.One(dependencies.EthClient.Val()), senderEth)
313-
activeTasks = append(activeTasks, pdpProveTask, pdpNotifTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask, pdpIndexingTask, pdpIpniTask, pdpTerminate, pdpDelete, payTask)
314+
activeTasks = append(activeTasks, pdpProveTask, pdpNotifTask, pdpPullPieceTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask, pdpIndexingTask, pdpIpniTask, pdpTerminate, pdpDelete, payTask)
314315
}
315316

316317
indexingTask := indexing.NewIndexingTask(db, sc, iStore, pp, cfg, idxMax)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
-- PDP piece pull tables for SP-to-SP transfer
2+
--
3+
-- Provides idempotency and piece tracking for pull requests.
4+
-- Status is derived dynamically from parked_pieces, not stored here.
5+
CREATE TABLE pdp_piece_pulls (
6+
id BIGSERIAL PRIMARY KEY,
7+
service TEXT NOT NULL REFERENCES pdp_services(service_label) ON DELETE CASCADE,
8+
extra_data_hash BYTEA NOT NULL, -- sha256(extraData) for idempotency
9+
data_set_id BIGINT NOT NULL DEFAULT 0, -- 0 = create new dataset
10+
record_keeper TEXT NOT NULL DEFAULT '', -- required when data_set_id is 0
11+
created_at TIMESTAMPTZ DEFAULT NOW(),
12+
13+
UNIQUE(service, extra_data_hash, data_set_id, record_keeper)
14+
);
15+
16+
-- Tracks individual pieces within a pull request
17+
CREATE TABLE pdp_piece_pull_items (
18+
fetch_id BIGINT NOT NULL REFERENCES pdp_piece_pulls(id) ON DELETE CASCADE,
19+
piece_cid TEXT NOT NULL, -- PieceCIDv1 (for joins with parked_pieces)
20+
piece_raw_size BIGINT NOT NULL, -- raw size to reconstruct PieceCIDv2 for API
21+
source_url TEXT NOT NULL, -- external SP URL to fetch from
22+
task_id BIGINT REFERENCES harmony_task(id) ON DELETE SET NULL, -- pull task
23+
failed BOOLEAN NOT NULL DEFAULT FALSE, -- true if piece permanently failed
24+
fail_reason TEXT, -- error message when failed
25+
26+
PRIMARY KEY (fetch_id, piece_cid)
27+
);
28+
29+
-- Index for cleanup queries
30+
CREATE INDEX idx_pdp_piece_pulls_created_at ON pdp_piece_pulls(created_at);

pdp/handlers.go

Lines changed: 27 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,8 @@ import (
1818
"github.com/ethereum/go-ethereum/core/types"
1919
"github.com/ethereum/go-ethereum/ethclient"
2020
"github.com/go-chi/chi/v5"
21-
"github.com/ipfs/go-cid"
22-
"github.com/multiformats/go-multicodec"
2321
"github.com/yugabyte/pgx/v5"
2422

25-
commcid "github.com/filecoin-project/go-fil-commcid"
26-
2723
"github.com/filecoin-project/curio/harmony/harmonydb"
2824
"github.com/filecoin-project/curio/lib/paths"
2925
"github.com/filecoin-project/curio/pdp/contract"
@@ -56,6 +52,8 @@ type PDPService struct {
5652
sender *message.SenderETH
5753
ethClient *ethclient.Client
5854
filClient PDPServiceNodeApi
55+
56+
pullHandler *PullHandler
5957
}
6058

6159
type PDPServiceNodeApi interface {
@@ -64,14 +62,20 @@ type PDPServiceNodeApi interface {
6462

6563
// NewPDPService creates a new instance of PDPService with the provided stores
6664
func NewPDPService(ctx context.Context, db *harmonydb.DB, stor paths.StashStore, ec *ethclient.Client, fc PDPServiceNodeApi, sn *message.SenderETH) *PDPService {
65+
auth := &NullAuth{}
66+
pullStore := NewDBPullStore(db)
67+
pullValidator := NewEthCallValidator(ec, db)
68+
6769
p := &PDPService{
68-
Auth: &NullAuth{},
70+
Auth: auth,
6971
db: db,
7072
storage: stor,
7173

7274
sender: sn,
7375
ethClient: ec,
7476
filClient: fc,
77+
78+
pullHandler: NewPullHandler(auth, pullStore, pullValidator),
7579
}
7680

7781
go p.cleanup(ctx)
@@ -146,6 +150,9 @@ func Routes(r *chi.Mux, p *PDPService) {
146150

147151
// POST /pdp/piece/uploads/{uploadUUID}
148152
r.Post(path.Join(PDPRoutePath, "/piece/uploads/{uploadUUID}"), p.handleFinalizeStreamingUpload)
153+
154+
// POST /pdp/piece/pull - Pull pieces from other SPs
155+
r.Post(path.Join(PDPRoutePath, "/piece/pull"), p.pullHandler.HandlePull)
149156
}
150157

151158
// Handler functions
@@ -181,12 +188,12 @@ func (p *PDPService) handleGetPieceStatus(w http.ResponseWriter, r *http.Request
181188
}
182189

183190
// Convert to v1 format (database stores v1)
184-
pieceCidV1, err := asPieceCIDv1(pieceCidStr)
191+
info, err := ParsePieceCid(pieceCidStr)
185192
if err != nil {
186193
http.Error(w, "Invalid pieceCid format: "+err.Error(), http.StatusBadRequest)
187194
return
188195
}
189-
pieceCidV1Str := pieceCidV1.String()
196+
pieceCidV1Str := info.CidV1.String()
190197

191198
// Query status from database
192199
var result struct {
@@ -256,7 +263,7 @@ func (p *PDPService) handleGetPieceStatus(w http.ResponseWriter, r *http.Request
256263
}
257264

258265
// Convert authoritative PieceCID back from v1 to v2 for external API
259-
pieceCidV2, _, err := asPieceCIDv2(result.PieceCID, result.PieceRawSize)
266+
pieceInfo, err := PieceCidV2FromV1Str(result.PieceCID, result.PieceRawSize)
260267
if err != nil {
261268
http.Error(w, "Failed to convert PieceCID to v2: "+err.Error(), http.StatusInternalServerError)
262269
return
@@ -271,7 +278,7 @@ func (p *PDPService) handleGetPieceStatus(w http.ResponseWriter, r *http.Request
271278
Retrieved bool `json:"retrieved"`
272279
RetrievedAt *time.Time `json:"retrievedAt,omitempty"`
273280
}{
274-
PieceCID: pieceCidV2.String(),
281+
PieceCID: pieceInfo.CidV2.String(),
275282
Status: result.Status,
276283
Indexed: result.Indexed,
277284
Advertised: result.Advertised,
@@ -573,25 +580,25 @@ func (p *PDPService) handleGetDataSet(w http.ResponseWriter, r *http.Request) {
573580
pcv2Str, exists := aggregatePieceCIDs[piece.PieceID]
574581
if !exists {
575582
aggregateRawSize := pieceRawSizes[piece.PieceID]
576-
pcv2, _, err := asPieceCIDv2(piece.PieceCid, aggregateRawSize)
583+
pcInfo, err := PieceCidV2FromV1Str(piece.PieceCid, aggregateRawSize)
577584
if err != nil {
578585
http.Error(w, "Invalid PieceCID: "+err.Error(), http.StatusBadRequest)
579586
return
580587
}
581-
pcv2Str = pcv2.String()
588+
pcv2Str = pcInfo.CidV2.String()
582589
aggregatePieceCIDs[piece.PieceID] = pcv2Str
583590
}
584591

585592
// Use the raw size for the sub piece
586-
spcv2, _, err := asPieceCIDv2(piece.SubPieceCID, piece.SubPieceRawSize)
593+
spcInfo, err := PieceCidV2FromV1Str(piece.SubPieceCID, piece.SubPieceRawSize)
587594
if err != nil {
588595
http.Error(w, "Invalid SubPieceCID: "+err.Error(), http.StatusBadRequest)
589596
return
590597
}
591598
response.Pieces = append(response.Pieces, PieceEntry{
592599
PieceID: piece.PieceID,
593600
PieceCID: pcv2Str,
594-
SubPieceCID: spcv2.String(),
601+
SubPieceCID: spcInfo.CidV2.String(),
595602
SubPieceOffset: piece.SubPieceOffset,
596603
})
597604
}
@@ -1095,48 +1102,6 @@ func (p *PDPService) handleGetDataSetPiece(w http.ResponseWriter, r *http.Reques
10951102
}
10961103
}
10971104

1098-
func asPieceCIDv1(cidStr string) (cid.Cid, error) {
1099-
pieceCid, err := cid.Decode(cidStr)
1100-
if err != nil {
1101-
return cid.Undef, fmt.Errorf("failed to decode PieceCID: %w", err)
1102-
}
1103-
if pieceCid.Prefix().MhType == uint64(multicodec.Fr32Sha256Trunc254Padbintree) {
1104-
c1, _, err := commcid.PieceCidV1FromV2(pieceCid)
1105-
return c1, err
1106-
}
1107-
return pieceCid, nil
1108-
}
1109-
1110-
// asPieceCIDv2 converts a string to a PieceCIDv2. Where the input is expected to be a PieceCIDv1,
1111-
// a size argument is required. Where it's expected to be a v2, the size argument is ignored. The
1112-
// size either derived from the v2 or from the size argument in the case of a v1 is returned.
1113-
func asPieceCIDv2(cidStr string, size uint64) (cid.Cid, uint64, error) {
1114-
pieceCid, err := cid.Decode(cidStr)
1115-
if err != nil {
1116-
return cid.Undef, 0, fmt.Errorf("failed to decode PieceCid: %w", err)
1117-
}
1118-
switch pieceCid.Prefix().MhType {
1119-
case uint64(multicodec.Sha2_256Trunc254Padded):
1120-
if size == 0 {
1121-
return cid.Undef, 0, fmt.Errorf("size must be provided for PieceCIDv1")
1122-
}
1123-
c, err := commcid.PieceCidV2FromV1(pieceCid, size)
1124-
if err != nil {
1125-
return cid.Undef, 0, err
1126-
}
1127-
return c, size, nil
1128-
case uint64(multicodec.Fr32Sha256Trunc254Padbintree):
1129-
// get the size from the CID, not the argument
1130-
_, size, err := commcid.PieceCidV2ToDataCommitment(pieceCid)
1131-
if err != nil {
1132-
return cid.Undef, 0, fmt.Errorf("failed to get size from Sha2_256Trunc254Padded PieceCid: %w", err)
1133-
}
1134-
return pieceCid, size, nil
1135-
default:
1136-
return cid.Undef, 0, fmt.Errorf("unsupported piece CID type: %d", pieceCid.Prefix().MhType)
1137-
}
1138-
}
1139-
11401105
func (p *PDPService) cleanup(ctx context.Context) {
11411106
rm := func(ctx context.Context, db *harmonydb.DB) {
11421107
var RefIDs []int64
@@ -1155,6 +1120,13 @@ func (p *PDPService) cleanup(ctx context.Context) {
11551120
log.Errorw("failed to delete non-finalized uploads", "error", err)
11561121
}
11571122
}
1123+
1124+
// Clean up old piece fetch records (older than 5 days)
1125+
// CASCADE deletes pdp_piece_fetch_items automatically
1126+
_, err = db.Exec(ctx, `DELETE FROM pdp_piece_fetches WHERE created_at < NOW() - INTERVAL '5 days'`)
1127+
if err != nil {
1128+
log.Errorw("failed to delete old piece fetch records", "error", err)
1129+
}
11581130
}
11591131

11601132
ticker := time.NewTicker(time.Minute * 5)

pdp/handlers_add.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,11 @@ func (p *PDPService) transformAddPiecesRequest(ctx context.Context, serviceLabel
6969
if subPieceEntry.SubPieceCID == "" {
7070
return nil, nil, nil, errors.New("subPieceCid is required for each subPiece")
7171
}
72-
pieceCid, err := asPieceCIDv1(subPieceEntry.SubPieceCID)
72+
info, err := ParsePieceCid(subPieceEntry.SubPieceCID)
7373
if err != nil {
7474
return nil, nil, nil, fmt.Errorf("invalid SubPiece: %w", err)
7575
}
76-
pieceCidString := pieceCid.String()
76+
pieceCidString := info.CidV1.String()
7777

7878
addPieceReq.SubPieces[i].subPieceCIDv1 = pieceCidString // save it for to query subPieceInfoMap later
7979

@@ -177,14 +177,14 @@ func (p *PDPService) transformAddPiecesRequest(ctx context.Context, serviceLabel
177177
}
178178

179179
// Compare generated PieceCid with provided PieceCid
180-
providedPieceCidv1, err := asPieceCIDv1(addPieceReq.PieceCID)
180+
providedInfo, err := ParsePieceCid(addPieceReq.PieceCID)
181181
if err != nil {
182182
return false, fmt.Errorf("invalid provided PieceCid: %v", err)
183183
}
184-
pieces[i].pieceCIDv1 = providedPieceCidv1.String()
184+
pieces[i].pieceCIDv1 = providedInfo.CidV1.String()
185185

186-
if !providedPieceCidv1.Equals(generatedPieceCid) {
187-
return false, fmt.Errorf("provided PieceCid does not match generated PieceCid: %s != %s", providedPieceCidv1, generatedPieceCid)
186+
if !providedInfo.CidV1.Equals(generatedPieceCid) {
187+
return false, fmt.Errorf("provided PieceCid does not match generated PieceCid: %s != %s", providedInfo.CidV1, generatedPieceCid)
188188
}
189189
}
190190

0 commit comments

Comments
 (0)