-
Notifications
You must be signed in to change notification settings - Fork 13
NOISSUE - Use SMQ for fML #117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Use pickle for model2wasm Fix TinyGo entrypoint and m2cgen wrapping workflow Add FL task mode and metadata fields Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Include FL mode and spec in start payload Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Add FL update envelope type for results Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Deduplicate FL helpers; update host and wazero Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Handle train results as FL envelope updates Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Coordinate federated rounds; validate and aggregate updates Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Coordinate federated rounds; validate and aggregate updates Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Coordinate federated rounds; validate and aggregate updates Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Coordinate federated rounds; validate and aggregate updates Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Make wazero StopApp idempotent like host Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Include numeric args in wazero WASI argv Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Update runtime and transport logic Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Fix CI Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Fix CI Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Fix CI Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Fix CI Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Fix CI Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Fix CI Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Fix CI Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Fix CI Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
| @@ -0,0 +1,100 @@ | |||
| package cli | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this?
Since the FML is running as a demo application?
| StartTime time.Time | ||
| Updates []Update | ||
| Completed bool | ||
| mu sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The RoundState struct has a mutex, but the map rounds that contains these states uses a separate roundsMu.
| bool headers_complete = false; | ||
| size_t content_length = 0; | ||
|
|
||
| while (total_received < buffer_size - 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the HTTP headers span across multiple chunks, the code searches for "\r\n\r\n" only in the current chunk. If the header boundary falls across chunks, strstr() will fail to find it and headers_complete remains false indefinitely. Additionally, Content-Length parsing only happens in the chunk containing the headers.
| } | ||
|
|
||
| if (results_len > 0) { | ||
| int ret = base64_encode((uint8_t *)update_b64, sizeof(update_b64), &encoded_len, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When base64 encoding fails, the code sets encoded_len = 0 and update_b64[0] = '\0' (via the else branch logic), but then continues to construct the JSON payload with empty update_b64 field. The coordinator will receive an update with no actual model update data, which should be treated as an error but is published as a successful update
| return &FedAvgAggregator{} | ||
| } | ||
|
|
||
| func (f *FedAvgAggregator) Aggregate(updates []Update) (Model, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
totalSamples is declared as int, but update.NumSamples could be large. On 32-bit systems, int is typically 32 bits and can overflow if aggregating updates from many clients with large sample counts.
| cJSON *fl_obj = cJSON_GetObjectItemCaseSensitive(json, "fl"); | ||
| cJSON *env = cJSON_GetObjectItemCaseSensitive(json, "env"); | ||
|
|
||
| if (!cJSON_IsString(id) || !cJSON_IsString(name)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code checks cJSON_IsString() which validates the item exists and is a string type, but doesn't verify that valuestring is non-NULL. In cJSON, even if an item is a string type, valuestring could theoretically be NULL in edge cases (though rare).
| return; | ||
| } | ||
|
|
||
| if (t.is_fl_task && !t.is_fml_task && strlen(t.fl.job_id) == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code has two separate FL detection mechanisms:
- t.is_fl_task (set from JSON fl object)
- t.is_fml_task (set from env ROUND_ID)
Issue: The two flags are checked inconsistently throughout the code. For example:
- Line 665: if (t.is_fml_task && strlen(t.model_uri) > 0)
- Line 693: else if (t.is_fl_task) // legacy
- Line 754: if (t.is_fml_task && strlen(t.round_id) == 0) - this will reject FML tasks without ROUND_ID
- Line 759: if (t.is_fl_task && !t.is_fml_task && strlen(t.fl.job_id) == 0) - rejects old FL tasks
This creates confusion about which FL protocol is being used and can cause valid tasks to be rejected.
| ps.mu.RLock() | ||
| defer ps.mu.RUnlock() | ||
|
|
||
| roundFile := filepath.Join(ps.roundsDir, fmt.Sprintf("round_%s.json", roundID)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The roundID comes from user input (via HTTP requests or MQTT messages) and is directly used in file path construction. If roundID contains .. or absolute paths, it could write/read files outside the intended directory
|
|
||
| func (svc *service) handleRoundStart(ctx context.Context) func(topic string, msg map[string]any) error { | ||
| return func(topic string, msg map[string]any) error { | ||
| go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goroutine spawned at line 498 doesn't check ctx.Done() and has no timeout mechanism. If the manager service shuts down, these goroutines will continue running indefinitely, attempting to access potentially closed resources (database connections, MQTT clients)
|
|
||
| let coordinator_url = env.get("COORDINATOR_URL") | ||
| .cloned() | ||
| .unwrap_or_else(|| "http://coordinator-http:8080".to_string()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default service URLs are Docker Compose service names, which only work inside the demo Docker network. If services are deployed differently (Kubernetes, bare metal, different compose file), these hardcoded defaults fail.
Also let's use existing env parsing mechanism


What type of PR is this?
What does this do?
Which issue(s) does this PR fix/relate to?
Have you included tests for your changes?
Did you document any new/modified features?
Notes