Skip to content

Commit bcd18fb

Browse files
jssmithclaudecdavisafc
authored
Update claim check recipe to use SimplePlugin (#37)
* Update claim check recipe to use SimplePlugin - Use SimplePlugin with data_converter callback (correct API) - Remove unused aws_profile parameter from ClaimCheckCodec - Require temporalio>=1.19.0 (for SimplePlugin support) - Require python>=3.10 (required by temporalio 1.19+) - Update README code example to match implementation Co-Authored-By: Claude Opus 4.5 <[email protected]> * Simplify ClaimCheckPlugin to pass DataConverter directly SimplePlugin accepts either a DataConverter instance or a callable. The direct approach is simpler and sufficient for this use case. Co-Authored-By: Claude Opus 4.5 <[email protected]> * Sync README code snippets with actual source files Co-Authored-By: Claude Opus 4.5 <[email protected]> * remove comment * update code snippets in README Signed-off-by: Cornelia Davis <[email protected]> --------- Signed-off-by: Cornelia Davis <[email protected]> Co-authored-by: Claude Opus 4.5 <[email protected]> Co-authored-by: Cornelia Davis <[email protected]>
1 parent d8b59db commit bcd18fb

File tree

5 files changed

+86
-379
lines changed

5 files changed

+86
-379
lines changed

foundations/claim_check_pattern_python/README.md

Lines changed: 45 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,16 @@ The `ClaimCheckCodec` implements `PayloadCodec` and adds an inline threshold to
3434

3535
```python
3636
import uuid
37+
import logging
3738
from typing import Iterable, List
3839
import aioboto3
3940
from botocore.exceptions import ClientError
4041

4142
from temporalio.api.common.v1 import Payload
4243
from temporalio.converter import PayloadCodec
4344

45+
logger = logging.getLogger(__name__)
46+
4447

4548
class ClaimCheckCodec(PayloadCodec):
4649
"""PayloadCodec that implements the Claim Check pattern using S3 storage.
@@ -50,13 +53,15 @@ class ClaimCheckCodec(PayloadCodec):
5053
of large payload data.
5154
"""
5255

53-
def __init__(self,
54-
bucket_name: str = "temporal-claim-check",
55-
endpoint_url: str = None,
56-
region_name: str = "us-east-1",
57-
max_inline_bytes: int = 20 * 1024):
56+
def __init__(
57+
self,
58+
bucket_name: str = "temporal-claim-check",
59+
endpoint_url: str = None,
60+
region_name: str = "us-east-1",
61+
max_inline_bytes: int = 20 * 1024,
62+
):
5863
"""Initialize the claim check codec with S3 connection details.
59-
64+
6065
Args:
6166
bucket_name: S3 bucket name for storing claim check data
6267
endpoint_url: S3 endpoint URL (for MinIO or other S3-compatible services)
@@ -67,11 +72,8 @@ class ClaimCheckCodec(PayloadCodec):
6772
self.endpoint_url = endpoint_url
6873
self.region_name = region_name
6974
self.max_inline_bytes = max_inline_bytes
70-
71-
# Initialize aioboto3 session
7275
self.session = aioboto3.Session()
73-
74-
# Ensure bucket exists
76+
7577
self._bucket_created = False
7678

7779
async def _ensure_bucket_exists(self):
@@ -222,95 +224,47 @@ class ClaimCheckCodec(PayloadCodec):
222224

223225
## Claim Check Plugin
224226

225-
The `ClaimCheckPlugin` integrates the codec with the Temporal client configuration and supports plugin chaining.
227+
The `ClaimCheckPlugin` integrates the codec with the Temporal client configuration.
226228

227229
*File: codec/plugin.py*
228230

229231
```python
230232
import os
231-
from temporalio.client import Plugin, ClientConfig
233+
from temporalio.plugin import SimplePlugin
232234
from temporalio.converter import DataConverter
233-
from temporalio.service import ConnectConfig, ServiceClient
234235

235236
from .claim_check import ClaimCheckCodec
236237

237238

238-
class ClaimCheckPlugin(Plugin):
239+
class ClaimCheckPlugin(SimplePlugin):
239240
"""Temporal plugin that integrates the Claim Check codec with client configuration."""
240241

241242
def __init__(self):
242243
"""Initialize the plugin with S3 connection configuration."""
243-
self.bucket_name = os.getenv("S3_BUCKET_NAME", "temporal-claim-check")
244-
self.endpoint_url = os.getenv("S3_ENDPOINT_URL")
245-
self.region_name = os.getenv("AWS_REGION", "us-east-1")
246-
self._next_plugin = None
247-
248-
def init_client_plugin(self, next_plugin: Plugin) -> None:
249-
"""Initialize this plugin in the client plugin chain."""
250-
self._next_plugin = next_plugin
251-
252-
def configure_client(self, config: ClientConfig) -> ClientConfig:
253-
"""Apply the claim check configuration to the client.
254-
255-
Args:
256-
config: Temporal client configuration
257-
258-
Returns:
259-
Updated client configuration with claim check data converter
260-
"""
261-
# Configure the data converter with claim check codec
262-
default_converter_class = config["data_converter"].payload_converter_class
263-
claim_check_codec = ClaimCheckCodec(
264-
bucket_name=self.bucket_name,
265-
endpoint_url=self.endpoint_url,
266-
region_name=self.region_name
267-
)
268-
269-
config["data_converter"] = DataConverter(
270-
payload_converter_class=default_converter_class,
271-
payload_codec=claim_check_codec
244+
super().__init__(
245+
name="claim-check",
246+
data_converter=DataConverter(
247+
payload_codec=ClaimCheckCodec(
248+
bucket_name=os.getenv("S3_BUCKET_NAME", "temporal-claim-check"),
249+
endpoint_url=os.getenv("S3_ENDPOINT_URL"),
250+
region_name=os.getenv("AWS_REGION", "us-east-1"),
251+
),
252+
),
272253
)
273-
274-
# Delegate to next plugin if it exists
275-
if self._next_plugin:
276-
return self._next_plugin.configure_client(config)
277-
return config
278-
279-
async def connect_service_client(self, config: ConnectConfig) -> ServiceClient:
280-
"""Connect to the Temporal service.
281-
282-
Args:
283-
config: Service connection configuration
284-
285-
Returns:
286-
Connected service client
287-
"""
288-
# Delegate to next plugin if it exists
289-
if self._next_plugin:
290-
return await self._next_plugin.connect_service_client(config)
291-
292-
# If no next plugin, use default connection
293-
from temporalio.service import ServiceClient
294-
return await ServiceClient.connect(config)
295254
```
296255

297256
## Example: AI / RAG Workflow using Claim Check
298257

299258
This example ingests a large text, performs lightweight lexical retrieval, and answers a question with an LLM. Large intermediates (chunks, scores) are kept out of Temporal payloads via the Claim Check codec. Only the small final answer is returned inline.
300259

301-
### Activities
260+
### Shared Models
302261

303-
*File: activities/ai_claim_check.py*
262+
*File: shared/models.py*
304263

305264
```python
306265
from dataclasses import dataclass
307266
from typing import List, Dict, Any
308267

309-
from temporalio import activity
310-
311-
from openai import AsyncOpenAI
312-
from rank_bm25 import BM25Okapi
313-
314268

315269
@dataclass
316270
class IngestRequest:
@@ -339,6 +293,18 @@ class RagRequest:
339293
class RagAnswer:
340294
answer: str
341295
sources: List[Dict[str, Any]]
296+
```
297+
298+
### Activities
299+
300+
*File: activities/ai_claim_check.py*
301+
302+
```python
303+
from typing import List
304+
305+
from temporalio import activity
306+
307+
from shared.models import IngestRequest, IngestResult, RagRequest, RagAnswer
342308

343309

344310
def _split_text(text: str, chunk_size: int, overlap: int) -> List[str]:
@@ -374,6 +340,11 @@ async def ingest_document(req: IngestRequest) -> IngestResult:
374340

375341
@activity.defn
376342
async def rag_answer(req: RagRequest, ingest_result: IngestResult) -> RagAnswer:
343+
# Import heavy dependencies inside the function, not at module level
344+
# This prevents NumPy from being loaded into the workflow sandbox
345+
from openai import AsyncOpenAI
346+
from rank_bm25 import BM25Okapi
347+
377348
client = AsyncOpenAI(max_retries=0)
378349

379350
# Lexical retrieval using BM25 over chunk texts
@@ -412,14 +383,8 @@ async def rag_answer(req: RagRequest, ingest_result: IngestResult) -> RagAnswer:
412383
from temporalio import workflow
413384
from datetime import timedelta
414385

415-
from activities.ai_claim_check import (
416-
IngestRequest,
417-
IngestResult,
418-
RagRequest,
419-
RagAnswer,
420-
ingest_document,
421-
rag_answer,
422-
)
386+
from shared.models import IngestRequest, IngestResult, RagRequest, RagAnswer
387+
from activities.ai_claim_check import ingest_document, rag_answer
423388

424389

425390
@workflow.defn

foundations/claim_check_pattern_python/codec/claim_check.py

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,31 +18,27 @@ class ClaimCheckCodec(PayloadCodec):
1818
of large payload data.
1919
"""
2020

21-
def __init__(self,
22-
bucket_name: str = "temporal-claim-check",
23-
endpoint_url: str = None,
24-
region_name: str = "us-east-1",
25-
max_inline_bytes: int = 20 * 1024,
26-
aws_profile: str = None):
21+
def __init__(
22+
self,
23+
bucket_name: str = "temporal-claim-check",
24+
endpoint_url: str = None,
25+
region_name: str = "us-east-1",
26+
max_inline_bytes: int = 20 * 1024,
27+
):
2728
"""Initialize the claim check codec with S3 connection details.
28-
29+
2930
Args:
3031
bucket_name: S3 bucket name for storing claim check data
3132
endpoint_url: S3 endpoint URL (for MinIO or other S3-compatible services)
3233
region_name: AWS region name
3334
max_inline_bytes: Payloads up to this size will be left inline
34-
aws_profile: AWS profile name to use (only if access keys not set)
3535
"""
3636
self.bucket_name = bucket_name
3737
self.endpoint_url = endpoint_url
3838
self.region_name = region_name
3939
self.max_inline_bytes = max_inline_bytes
40-
41-
# Initialize aioboto3 session (uses AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY if set,
42-
# otherwise uses profile if provided, otherwise default credential chain)
43-
self.session = aioboto3.Session(profile_name=aws_profile) if aws_profile else aioboto3.Session()
44-
45-
# Ensure bucket exists
40+
self.session = aioboto3.Session()
41+
4642
self._bucket_created = False
4743

4844
async def _ensure_bucket_exists(self):
Lines changed: 11 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,22 @@
11
import os
2-
from temporalio.client import Plugin, ClientConfig
2+
from temporalio.plugin import SimplePlugin
33
from temporalio.converter import DataConverter
4-
from temporalio.service import ConnectConfig, ServiceClient
54

65
from .claim_check import ClaimCheckCodec
76

87

9-
class ClaimCheckPlugin(Plugin):
8+
class ClaimCheckPlugin(SimplePlugin):
109
"""Temporal plugin that integrates the Claim Check codec with client configuration."""
1110

1211
def __init__(self):
1312
"""Initialize the plugin with S3 connection configuration."""
14-
self.bucket_name = os.getenv("S3_BUCKET_NAME", "temporal-claim-check")
15-
self.endpoint_url = os.getenv("S3_ENDPOINT_URL")
16-
self.region_name = os.getenv("AWS_REGION", "us-east-1")
17-
# Use AWS_PROFILE only if access keys are not set
18-
self.aws_profile = os.getenv("AWS_PROFILE") if not os.getenv("AWS_ACCESS_KEY_ID") else None
19-
self._next_plugin = None
20-
21-
def init_client_plugin(self, next_plugin: Plugin) -> None:
22-
"""Initialize this plugin in the client plugin chain."""
23-
self._next_plugin = next_plugin
24-
25-
def configure_client(self, config: ClientConfig) -> ClientConfig:
26-
"""Apply the claim check configuration to the client.
27-
28-
Args:
29-
config: Temporal client configuration
30-
31-
Returns:
32-
Updated client configuration with claim check data converter
33-
"""
34-
# Configure the data converter with claim check codec
35-
default_converter_class = config["data_converter"].payload_converter_class
36-
claim_check_codec = ClaimCheckCodec(
37-
bucket_name=self.bucket_name,
38-
endpoint_url=self.endpoint_url,
39-
region_name=self.region_name,
40-
aws_profile=self.aws_profile
13+
super().__init__(
14+
name="claim-check",
15+
data_converter=DataConverter(
16+
payload_codec=ClaimCheckCodec(
17+
bucket_name=os.getenv("S3_BUCKET_NAME", "temporal-claim-check"),
18+
endpoint_url=os.getenv("S3_ENDPOINT_URL"),
19+
region_name=os.getenv("AWS_REGION", "us-east-1"),
20+
),
21+
),
4122
)
42-
43-
config["data_converter"] = DataConverter(
44-
payload_converter_class=default_converter_class,
45-
payload_codec=claim_check_codec
46-
)
47-
48-
# Delegate to next plugin if it exists
49-
if self._next_plugin:
50-
return self._next_plugin.configure_client(config)
51-
return config
52-
53-
async def connect_service_client(self, config: ConnectConfig) -> ServiceClient:
54-
"""Connect to the Temporal service.
55-
56-
Args:
57-
config: Service connection configuration
58-
59-
Returns:
60-
Connected service client
61-
"""
62-
# Delegate to next plugin if it exists
63-
if self._next_plugin:
64-
return await self._next_plugin.connect_service_client(config)
65-
66-
# If no next plugin, use default connection
67-
from temporalio.service import ServiceClient
68-
return await ServiceClient.connect(config)

foundations/claim_check_pattern_python/pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ name = "claim-check-pattern-python"
33
version = "0.1"
44
description = "Claim Check pattern implementation for handling large payloads in Temporal"
55
authors = [{ name = "Temporal Technologies Inc", email = "[email protected]" }]
6-
requires-python = ">=3.9"
6+
requires-python = ">=3.10"
77
readme = "README.md"
88
license = "MIT"
99
dependencies = [
10-
"temporalio>=1.15.0,<2",
10+
"temporalio>=1.19.0,<2",
1111
"boto3>=1.34.0",
1212
"aioboto3>=12.0.0",
1313
"requests>=2.25.0",

0 commit comments

Comments
 (0)