Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion build/deps/oci.MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,12 @@ oci.pull(
"linux/arm64/v8",
],
)
use_repo(oci, "node_25_slim", "node_25_slim_linux_amd64", "node_25_slim_linux_arm64_v8")
oci.pull(
name = "proxy_everything",
image = "docker.io/cloudflare/proxy-everything:main",
platforms = [
"linux/amd64",
"linux/arm64/v8",
],
)
use_repo(oci, "node_25_slim", "node_25_slim_linux_amd64", "node_25_slim_linux_arm64_v8", "proxy_everything", "proxy_everything_linux_amd64", "proxy_everything_linux_arm64_v8")
1 change: 1 addition & 0 deletions images/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ load("@rules_multirun//:defs.bzl", "command", "multirun")

IMAGES = {
"container-client-test": "//images/container-client-test:load",
"proxy-everything": "//images/container-client-test:load-proxy-everything",
}

[
Expand Down
7 changes: 7 additions & 0 deletions images/container-client-test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,10 @@ oci_load(
repo_tags = ["cloudflare/workerd/container-client-test:latest"],
visibility = ["//visibility:public"],
)

oci_load(
name = "load-proxy-everything",
image = "@proxy_everything",
repo_tags = ["cloudflare/proxy-everything:main"],
visibility = ["//visibility:public"],
)
58 changes: 37 additions & 21 deletions images/container-client-test/app.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
const { createServer } = require('http');

const webSocketEnabled = process.env.WS_ENABLED === 'true';
const wsProxyTarget = process.env.WS_PROXY_TARGET || null;

// Create HTTP server
const server = createServer(function (req, res) {
if (req.url === '/ws') {
// WebSocket upgrade will be handled by the WebSocket server
return;
}

if (req.url === '/intercept') {
const targetHost = req.headers['x-host'] || '11.0.0.1';
fetch(`http://${targetHost}`)
.then((result) => result.text())
.then((body) => {
res.writeHead(200);
res.write(body);
res.end();
})
.catch((err) => {
res.writeHead(500);
res.write(`${targetHost} ${err.message}`);
res.end();
});
return;
}

Expand All @@ -14,30 +30,30 @@ const server = createServer(function (req, res) {
res.end();
});

// Check if WebSocket functionality is enabled
if (webSocketEnabled) {
const WebSocket = require('ws');
const wss = new WebSocket.Server({ server, path: '/ws' });

// Create WebSocket server
const wss = new WebSocket.Server({
server: server,
path: '/ws',
});

wss.on('connection', function connection(ws) {
console.log('WebSocket connection established');

ws.on('message', function message(data) {
console.log('Received:', data.toString());
// Echo the message back with prefix
ws.send('Echo: ' + data.toString());
});
wss.on('connection', function (clientWs) {
if (wsProxyTarget) {
const targetWs = new WebSocket(`ws://${wsProxyTarget}/ws`);
const ready = new Promise(function (resolve) {
targetWs.on('open', resolve);
});

ws.on('close', function close() {
console.log('WebSocket connection closed');
});
targetWs.on('message', (data) => clientWs.send(data));
clientWs.on('message', async function (data) {
await ready;
targetWs.send(data);
});

ws.on('error', console.error);
clientWs.on('close', targetWs.close);
targetWs.on('close', clientWs.close);
} else {
clientWs.on('message', function (data) {
clientWs.send('Echo: ' + data.toString());
});
}
});
}

Expand Down
33 changes: 33 additions & 0 deletions src/workerd/api/container.c++
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,39 @@ jsg::Promise<void> Container::setInactivityTimeout(jsg::Lock& js, int64_t durati
return IoContext::current().awaitIo(js, req.sendIgnoringResult());
}

jsg::Promise<void> Container::interceptOutboundHttp(
jsg::Lock& js, kj::String addr, jsg::Ref<Fetcher> binding) {
auto& ioctx = IoContext::current();
auto channel = binding->getSubrequestChannel(ioctx);

// Get a channel token for RPC usage, the container runtime can use this
// token later to redeem a Fetcher.
auto token = channel->getToken(IoChannelFactory::ChannelTokenUsage::RPC);

auto req = rpcClient->setEgressHttpRequest();
req.setHostPort(addr);
req.setChannelToken(token);
return ioctx.awaitIo(js, req.sendIgnoringResult());
}

jsg::Promise<void> Container::interceptAllOutboundHttp(jsg::Lock& js, jsg::Ref<Fetcher> binding) {
auto& ioctx = IoContext::current();
auto channel = binding->getSubrequestChannel(ioctx);
auto token = channel->getToken(IoChannelFactory::ChannelTokenUsage::RPC);

// Register for all IPv4 and IPv6 addresses (on port 80)
auto reqV4 = rpcClient->setEgressHttpRequest();
reqV4.setHostPort("0.0.0.0/0"_kj);
reqV4.setChannelToken(token);

auto reqV6 = rpcClient->setEgressHttpRequest();
reqV6.setHostPort("::/0"_kj);
reqV6.setChannelToken(token);

return ioctx.awaitIo(
js, kj::joinPromises(kj::arr(reqV4.sendIgnoringResult(), reqV6.sendIgnoringResult())));
}

jsg::Promise<void> Container::monitor(jsg::Lock& js) {
JSG_REQUIRE(running, Error, "monitor() cannot be called on a container that is not running.");

Expand Down
8 changes: 8 additions & 0 deletions src/workerd/api/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class Container: public jsg::Object {
void signal(jsg::Lock& js, int signo);
jsg::Ref<Fetcher> getTcpPort(jsg::Lock& js, int port);
jsg::Promise<void> setInactivityTimeout(jsg::Lock& js, int64_t durationMs);
jsg::Promise<void> interceptOutboundHttp(
jsg::Lock& js, kj::String addr, jsg::Ref<Fetcher> binding);
jsg::Promise<void> interceptAllOutboundHttp(jsg::Lock& js, jsg::Ref<Fetcher> binding);

// TODO(containers): listenTcp()

Expand All @@ -73,6 +76,11 @@ class Container: public jsg::Object {
JSG_METHOD(signal);
JSG_METHOD(getTcpPort);
JSG_METHOD(setInactivityTimeout);

if (flags.getWorkerdExperimental()) {
JSG_METHOD(interceptOutboundHttp);
JSG_METHOD(interceptAllOutboundHttp);
}
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
Expand Down
12 changes: 12 additions & 0 deletions src/workerd/io/container.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,16 @@ interface Container @0x9aaceefc06523bca {
# Note that if there is an open connection to the container, the runtime must not shutdown the container.
# If there is no activity timeout duration configured and no container connection, it's up to the runtime
# to decide when to signal the container to exit.

setEgressTcp @8 (addr :Text, channelToken :Data);
# TODO: This method is unimplemented.
#
# Configures egress TCP routing for the container. When the container attempts to connect to the
# specified address, the connection should be routed back to the Workers runtime using the channel token.

setEgressHttp @9 (hostPort :Text, channelToken :Data);
# Configures egress HTTP routing for the container. When the container attempts to connect to the
# specified host:port, the connection should be routed back to the Workers runtime using the channel token.
# The format of hostPort can be '<ip|cidr>[':'<port>]'. If port is omitted, it's assumed to only cover port 80.
# This method does not support HTTPs yet.
}
16 changes: 14 additions & 2 deletions src/workerd/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -204,19 +204,29 @@ wd_cc_library(
}),
)

wd_cc_library(
name = "channel-token",
srcs = ["channel-token.c++"],
hdrs = ["channel-token.h"],
deps = [
":channel-token_capnp",
"//src/workerd/io",
"//src/workerd/util:entropy",
],
)

wd_cc_library(
name = "server",
srcs = [
"channel-token.c++",
"server.c++",
],
hdrs = [
"channel-token.h",
"server.h",
],
deps = [
":actor-id-impl",
":alarm-scheduler",
":channel-token",
":channel-token_capnp",
":container-client",
":facet-tree-index",
Expand Down Expand Up @@ -268,7 +278,9 @@ wd_cc_library(
hdrs = ["container-client.h"],
visibility = ["//visibility:public"],
deps = [
":channel-token",
":docker-api_capnp",
"//src/workerd/io",
"//src/workerd/io:container_capnp",
"//src/workerd/jsg",
"@capnp-cpp//src/capnp/compat:http-over-capnp",
Expand Down
Loading
Loading