Skip to content

Commit b5da59c

Browse files
committed
feat(pdp): add SP-to-SP piece pull endpoint
1 parent 1bad255 commit b5da59c

File tree

8 files changed

+445
-445
lines changed

8 files changed

+445
-445
lines changed

cmd/curio/tasks/tasks.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,13 +305,13 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
305305
pdpNextProvingPeriodTask := pdp.NewNextProvingPeriodTask(db, must.One(dependencies.EthClient.Val()), dependencies.Chain, chainSched, es)
306306
pdpInitProvingPeriodTask := pdp.NewInitProvingPeriodTask(db, must.One(dependencies.EthClient.Val()), dependencies.Chain, chainSched, es)
307307
pdpNotifTask := pdp.NewPDPNotifyTask(ctx, db)
308-
pdpFetchPieceTask := pdp.NewPDPFetchPieceTask(ctx, db, lstor, 4)
308+
pdpPullPieceTask := pdp.NewPDPPullPieceTask(ctx, db, lstor, 4)
309309
pdpIndexingTask := indexing.NewPDPIndexingTask(db, iStore, dependencies.CachedPieceReader, cfg, idxMax)
310310
pdpIpniTask := indexing.NewPDPIPNITask(db, sc, dependencies.CachedPieceReader, cfg, idxMax)
311311
pdpTerminate := pdp.NewTerminateServiceTask(db, must.One(dependencies.EthClient.Val()), senderEth)
312312
pdpDelete := pdp.NewDeleteDataSetTask(db, must.One(dependencies.EthClient.Val()), senderEth)
313313
payTask := pay.NewSettleTask(db, must.One(dependencies.EthClient.Val()), senderEth)
314-
activeTasks = append(activeTasks, pdpProveTask, pdpNotifTask, pdpFetchPieceTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask, pdpIndexingTask, pdpIpniTask, pdpTerminate, pdpDelete, payTask)
314+
activeTasks = append(activeTasks, pdpProveTask, pdpNotifTask, pdpPullPieceTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask, pdpIndexingTask, pdpIpniTask, pdpTerminate, pdpDelete, payTask)
315315
}
316316

317317
indexingTask := indexing.NewIndexingTask(db, sc, iStore, pp, cfg, idxMax)
Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
-- PDP piece fetch tables for SP-to-SP transfer
1+
-- PDP piece pull tables for SP-to-SP transfer
22
--
3-
-- Provides idempotency and piece tracking for fetch requests.
3+
-- Provides idempotency and piece tracking for pull requests.
44
-- Status is derived dynamically from parked_pieces, not stored here.
5-
CREATE TABLE pdp_piece_fetches (
5+
CREATE TABLE pdp_piece_pulls (
66
id BIGSERIAL PRIMARY KEY,
77
service TEXT NOT NULL REFERENCES pdp_services(service_label) ON DELETE CASCADE,
88
extra_data_hash BYTEA NOT NULL, -- sha256(extraData) for idempotency
@@ -13,18 +13,18 @@ CREATE TABLE pdp_piece_fetches (
1313
UNIQUE(service, extra_data_hash, data_set_id, record_keeper)
1414
);
1515

16-
-- Tracks individual pieces within a fetch request
17-
CREATE TABLE pdp_piece_fetch_items (
18-
fetch_id BIGINT NOT NULL REFERENCES pdp_piece_fetches(id) ON DELETE CASCADE,
16+
-- Tracks individual pieces within a pull request
17+
CREATE TABLE pdp_piece_pull_items (
18+
fetch_id BIGINT NOT NULL REFERENCES pdp_piece_pulls(id) ON DELETE CASCADE,
1919
piece_cid TEXT NOT NULL, -- PieceCIDv1 (for joins with parked_pieces)
2020
piece_raw_size BIGINT NOT NULL, -- raw size to reconstruct PieceCIDv2 for API
2121
source_url TEXT NOT NULL, -- external SP URL to fetch from
22-
task_id BIGINT REFERENCES harmony_task(id) ON DELETE SET NULL, -- fetch task
22+
task_id BIGINT REFERENCES harmony_task(id) ON DELETE SET NULL, -- pull task
2323
failed BOOLEAN NOT NULL DEFAULT FALSE, -- true if piece permanently failed
2424
fail_reason TEXT, -- error message when failed
2525

2626
PRIMARY KEY (fetch_id, piece_cid)
2727
);
2828

2929
-- Index for cleanup queries
30-
CREATE INDEX idx_pdp_piece_fetches_created_at ON pdp_piece_fetches(created_at);
30+
CREATE INDEX idx_pdp_piece_pulls_created_at ON pdp_piece_pulls(created_at);

pdp/handlers.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ type PDPService struct {
5353
ethClient *ethclient.Client
5454
filClient PDPServiceNodeApi
5555

56-
fetchHandler *FetchHandler
56+
pullHandler *PullHandler
5757
}
5858

5959
type PDPServiceNodeApi interface {
@@ -63,8 +63,8 @@ type PDPServiceNodeApi interface {
6363
// NewPDPService creates a new instance of PDPService with the provided stores
6464
func NewPDPService(ctx context.Context, db *harmonydb.DB, stor paths.StashStore, ec *ethclient.Client, fc PDPServiceNodeApi, sn *message.SenderETH) *PDPService {
6565
auth := &NullAuth{}
66-
fetchStore := NewDBFetchStore(db)
67-
fetchValidator := NewEthCallValidator(ec, db)
66+
pullStore := NewDBPullStore(db)
67+
pullValidator := NewEthCallValidator(ec, db)
6868

6969
p := &PDPService{
7070
Auth: auth,
@@ -75,7 +75,7 @@ func NewPDPService(ctx context.Context, db *harmonydb.DB, stor paths.StashStore,
7575
ethClient: ec,
7676
filClient: fc,
7777

78-
fetchHandler: NewFetchHandler(auth, fetchStore, fetchValidator),
78+
pullHandler: NewPullHandler(auth, pullStore, pullValidator),
7979
}
8080

8181
go p.cleanup(ctx)
@@ -151,8 +151,8 @@ func Routes(r *chi.Mux, p *PDPService) {
151151
// POST /pdp/piece/uploads/{uploadUUID}
152152
r.Post(path.Join(PDPRoutePath, "/piece/uploads/{uploadUUID}"), p.handleFinalizeStreamingUpload)
153153

154-
// POST /pdp/piece/fetch - Fetch pieces from other SPs
155-
r.Post(path.Join(PDPRoutePath, "/piece/fetch"), p.fetchHandler.HandleFetch)
154+
// POST /pdp/piece/pull - Pull pieces from other SPs
155+
r.Post(path.Join(PDPRoutePath, "/piece/pull"), p.pullHandler.HandlePull)
156156
}
157157

158158
// Handler functions

0 commit comments

Comments
 (0)