Skip to content

Commit ef35405

Browse files
committed
Update ipfs uploading
1 parent 8e25a20 commit ef35405

File tree

14 files changed

+211
-28
lines changed

14 files changed

+211
-28
lines changed

engine/src/handlers/car/mod.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,20 @@ impl CarHandler {
106106
if let Some(ipfs_cid) = payload.cid {
107107
Deployment::update_ipfs_cid(&state.database, &payload.deployment_id, &ipfs_cid)
108108
.await
109-
.ok();
109+
.ok();
110110
}
111111

112+
// if let Some(file_path) = payload.file_path {
113+
// // todo pin file using ipfs-cluster
114+
115+
// const ipfs_cluster_url = "http://0.0.0.0:44685";
116+
117+
// // download from ipfs
118+
119+
// // POST /add?local=true&format=car
120+
// // form-data, file: deploy.car
121+
// }
122+
112123
delivery.ack(BasicAckOptions::default()).await.unwrap();
113124
}
114125
tracing::error!("Consumer stream ended");

engine/src/ipfs/mod.rs

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,11 @@
1-
2-
pub struct IPFSCluster {
1+
#[derive(Debug, Clone)]
2+
pub struct IPFSModule {
33
pub cluster_url: String,
4+
pub public_cluster_url: String,
45
}
56

6-
impl IPFSCluster {
7-
pub fn new(cluster_url: String) -> Self {
8-
Self { cluster_url }
7+
impl IPFSModule {
8+
pub fn new(cluster_url: String, public_cluster_url: String) -> Self {
9+
Self { cluster_url, public_cluster_url }
910
}
10-
11-
// pub async fn add_car(&self, car_path: &str) -> Result<String, Box<dyn std::error::Error>> {
12-
// let client = reqwest::Client::new();
13-
// let response = client.post(&format!("{}/add", self.cluster_url))
14-
// .header("Content-Type", "multipart/form-data")
15-
// .body(format!("file=@{}", car_path))
16-
// .send()
17-
// .await?;
18-
19-
// let body = response.text().await?;
20-
// let json: serde_json::Value = serde_json::from_str(&body)?;
21-
// let cid = json["Hash"].as_str().ok_or("CID not found in response")?;
22-
// Ok(cid.to_string())
23-
// }
2411
}

engine/src/routes/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ pub mod invite;
2424
pub mod site;
2525
pub mod team;
2626
pub mod user;
27+
pub mod system;
2728

2829
fn get_api() -> impl OpenApi {
29-
(site::api_routes(), UserApi, AuthApi, team::api_routes(), invite::api_routes())
30+
(site::api_routes(), UserApi, AuthApi, team::api_routes(), invite::api_routes(), system::SystemApi)
3031
}
3132

3233
#[derive(Tags)]
@@ -41,6 +42,8 @@ pub enum ApiTags {
4142
Team,
4243
/// User-related endpoints
4344
User,
45+
/// System-related endpoints
46+
System,
4447
/// Authentication-related endpoints
4548
#[oai(rename = "Authentication")]
4649
Auth,
@@ -157,6 +160,7 @@ pub async fn serve(state: State) {
157160
"User",
158161
"Team",
159162
"Invite",
163+
"System",
160164
];
161165

162166
// Reorder tags according to the specified order

engine/src/routes/system.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
use crate::state::State;
2+
use poem::web::Data;
3+
use poem_openapi::{payload::Json, ApiResponse, Object, OpenApi};
4+
5+
pub struct SystemApi;
6+
7+
#[derive(Debug, Clone, Object)]
8+
pub struct IPFSStatus {
9+
pub public_cluster_url: String,
10+
}
11+
12+
#[derive(ApiResponse)]
13+
pub enum IPFSStatusResponse {
14+
#[oai(status = 200)]
15+
Ok(Json<IPFSStatus>),
16+
#[oai(status = 100)]
17+
FeatureDisabled(Json<String>),
18+
}
19+
20+
#[OpenApi]
21+
impl SystemApi {
22+
#[oai(path = "/system/ipfs", method = "get")]
23+
async fn status(&self, state: Data<&State>) -> IPFSStatusResponse {
24+
match &state.ipfs {
25+
Some(ipfs) => IPFSStatusResponse::Ok(Json(IPFSStatus {
26+
public_cluster_url: ipfs.public_cluster_url.clone(),
27+
})),
28+
None => IPFSStatusResponse::FeatureDisabled(Json("IPFS is not enabled".to_string())),
29+
}
30+
}
31+
}

engine/src/state.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use color_eyre::eyre::Result;
44
use figment::{Figment, providers::Env};
55
use serde::Deserialize;
66

7-
use crate::{cache::Cache, database::Database, handlers::TaskRabbit, storage::Storage};
7+
use crate::{cache::Cache, database::Database, handlers::TaskRabbit, ipfs::IPFSModule, storage::Storage};
88

99
pub type State = Arc<AppState>;
1010

@@ -15,6 +15,7 @@ pub struct AppState {
1515
pub storage: Storage,
1616
pub cache: Cache,
1717
pub rabbit: Option<TaskRabbit>,
18+
pub ipfs: Option<IPFSModule>,
1819
}
1920

2021
#[derive(Deserialize, Debug)]
@@ -26,6 +27,7 @@ pub struct AppConfig {
2627
pub s3_car: Option<S3CarConfig>,
2728
pub github_app: Option<GithubAppConfig>,
2829
pub amqp: Option<AMQPConfig>,
30+
pub ipfs: Option<IPFSConfig>,
2931
}
3032

3133
#[derive(Deserialize, Debug)]
@@ -79,6 +81,12 @@ pub struct GithubAppConfig {
7981
pub client_secret: String,
8082
}
8183

84+
#[derive(Deserialize, Debug)]
85+
pub struct IPFSConfig {
86+
pub cluster_url: String,
87+
pub public_cluster_url: String,
88+
}
89+
8290
impl AppState {
8391
pub async fn new() -> Result<Self> {
8492
// let config = Config::builder()
@@ -100,6 +108,8 @@ impl AppState {
100108
.map(|key| format!("github_app.{}", key.as_str().to_lowercase()).into()))
101109
.merge(Env::prefixed("AMQP_")
102110
.map(|key| format!("amqp.{}", key.as_str().to_lowercase()).into()))
111+
.merge(Env::prefixed("IPFS_")
112+
.map(|key| format!("ipfs.{}", key.as_str().to_lowercase()).into()))
103113
.extract::<AppConfig>()
104114
.expect("Failed to load AppConfig configuration");
105115

@@ -115,12 +125,19 @@ impl AppState {
115125
None
116126
};
117127

128+
let ipfs = if let Some(ipfs) = &config.ipfs {
129+
Some(IPFSModule::new(ipfs.cluster_url.clone(), ipfs.public_cluster_url.clone()))
130+
} else {
131+
None
132+
};
133+
118134
Ok(Self {
119135
config,
120136
database,
121137
storage,
122138
cache,
123139
rabbit,
140+
ipfs,
124141
})
125142
}
126143
}

services/ipfs-cart/.env.example

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,6 @@ S3_REGION=us-east-1
88
S3_BUCKET_NAME=car
99
S3_ACCESS_KEY=minioadmin
1010
S3_SECRET_KEY=minioadmin
11+
12+
# IPFS Cluster Configuration
13+
IPFS_CLUSTER_URL=http://0.0.0.0:9094

services/ipfs-cart/src/config.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ const defaultConfig: Config = {
1919
bucket: process.env.S3_BUCKET_NAME || "",
2020
endpoint: process.env.S3_ENDPOINT_URL || undefined,
2121
},
22+
ipfsCluster: {
23+
url: process.env.IPFS_CLUSTER_URL || "http://0.0.0.0:9094",
24+
},
2225
};
2326

2427
export default defaultConfig;

services/ipfs-cart/src/rabbitmq.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import AdmZip from "adm-zip";
1010
import { readdir } from "fs/promises";
1111
import { createCarFile } from "./car";
1212
import { mkdir } from "fs/promises";
13+
import { uploadCar } from "./uploadCar";
1314

1415
// Maximum number of retries before marking a message as failed
1516
const MAX_RETRIES = 4;
@@ -222,6 +223,13 @@ async function processCarRequest(
222223
console.log("CAR file uploaded to S3:", s3Key);
223224
console.log("Root CID:", rootCID);
224225

226+
// upload to ipfs cluster
227+
console.log("Uploading CAR file to IPFS cluster...");
228+
229+
const ipfsResponse = await uploadCar(carFilePath);
230+
231+
console.log("IPFS response:", ipfsResponse);
232+
225233
outputFilePath = s3Key;
226234
outputCID = rootCID;
227235
} catch (error) {

services/ipfs-cart/src/types.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ export interface Config {
1313
bucket: string;
1414
endpoint?: string;
1515
};
16+
ipfsCluster: {
17+
url: string;
18+
};
1619
}
1720

1821
export interface CarRequest {
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import config from "./config";
2+
3+
type IpfsClusterResponse = {
4+
name: string; // ""
5+
cid: string; // "bafy..."
6+
bytes: number; // 108925
7+
allocations: string[]; // ["12D3..."]
8+
}
9+
10+
export const uploadCar = async (carFilePath: string) => {
11+
if (!config.ipfsCluster.url) {
12+
throw new Error("IPFS cluster URL is not set, cannot upload CAR file ::uploadCar.ts");
13+
}
14+
15+
const url = `${config.ipfsCluster.url}/add?local=true&format=car`;
16+
17+
const file = await Bun.file(carFilePath).arrayBuffer();
18+
const formData = new FormData();
19+
formData.append("file", new Blob([file]));
20+
21+
const response = await fetch(url, {
22+
method: "POST",
23+
body: formData,
24+
});
25+
26+
const data = await response.json() as IpfsClusterResponse;
27+
28+
console.log("IPFS response:", response);
29+
30+
return data;
31+
};

0 commit comments

Comments
 (0)