-
Notifications
You must be signed in to change notification settings - Fork 1.5k
plugin/decision: upload events as soon as a chunk is ready #8110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Sebastian Spaink <sebastianspaink@gmail.com>
✅ Deploy Preview for openpolicyagent ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
Signed-off-by: Sebastian Spaink <sebastianspaink@gmail.com>
Signed-off-by: Sebastian Spaink <sebastianspaink@gmail.com>
Signed-off-by: Sebastian Spaink <sebastianspaink@gmail.com>
Signed-off-by: Sebastian Spaink <sebastianspaink@gmail.com>
Signed-off-by: Sebastian Spaink <sebastianspaink@gmail.com>
Signed-off-by: Sebastian Spaink <3441183+sspaink@users.noreply.github.com>
✅ Deploy Preview for openpolicyagent ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
Signed-off-by: Sebastian Spaink <sebastianspaink@gmail.com>
✅ Deploy Preview for openpolicyagent ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
johanfylling
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Some thoughts/questions.
| retry++ | ||
| } else { | ||
| retry = 0 | ||
| timer := time.NewTimer(delay) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're not resetting the timer when a flush has been triggered in immediate mode? I.e. if we a have fraction of the timer delay left, and a new log event has triggered an upload, that fraction will be added on top of the next timer delay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct that is the case. I struggled implementing a solution that would reset this timer consistently on immediate upload. I had tried with a new channel that would reset the timer, but then the case where the timer triggers before the channel can send the upload required a mutex that just made things even more complicated. So I decided the added time in this scenario was acceptable given the events should usually be uploaded immediately and not rely on the timer.
Signed-off-by: Sebastian Spaink <sebastianspaink@gmail.com>
✅ Deploy Preview for openpolicyagent ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
Signed-off-by: Sebastian Spaink <sebastianspaink@gmail.com>
Signed-off-by: Sebastian Spaink <sebastianspaink@gmail.com>
johanfylling
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This solution should work, I think 👍.
Is the reconfiguration concern warranted, and could something be made about it if so?
Signed-off-by: Sebastian Spaink <sebastianspaink@gmail.com>
Signed-off-by: Sebastian Spaink <sebastianspaink@gmail.com>
Signed-off-by: Sebastian Spaink <sebastianspaink@gmail.com>
johanfylling
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
v1/plugins/logs/eventBuffer.go
Outdated
| case item := <-b.buffer: | ||
| b.immediateRead(ctx, item) | ||
| case done := <-b.stop: | ||
| b.flush(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this flush necessary, considering we expect the outer plugin to immediately call Flush() on the buffer anyways?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And when reconfiguring the same buffer, Reconfigure() will move events between buffers, anyways, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving events between the buffers has made it possible to remove this, deleted 👍
v1/plugins/logs/sizeBuffer.go
Outdated
| } | ||
|
|
||
| func (b *sizeBuffer) Reconfigure(bufferSizeLimitBytes int64, uploadSizeLimitBytes int64, maxDecisionsPerSecond *float64) { | ||
| func (b *sizeBuffer) Reconfigure( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Complexity-wise, I wonder if we really need this buffer-specific reconfigure 🤔. If the plugin always tore down the old, flushed, and set up a new buffer on config change, we'd have one less edge-case to worry about.
That might come with some hit to performance, but config changes are rare enough that I'm not sure that's a big concern.
On the other hand, if we keep redesigning this, we'll never get it done 😄. Fine to leave it as-is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing that edge case is much better! getting rid of any complexity in this code is huge win. I can't imagine people are re-configuring often while OPA is running.
I removed the individual reconfigure methods and now it always creates a new instance of the buffer.
Signed-off-by: Sebastian Spaink <sebastianspaink@gmail.com>
Signed-off-by: Sebastian Spaink <sebastianspaink@gmail.com>
Signed-off-by: Sebastian Spaink <sebastianspaink@gmail.com>
Signed-off-by: Sebastian Spaink <sebastianspaink@gmail.com>
…ng the plugin Signed-off-by: Sebastian Spaink <sebastianspaink@gmail.com>
…osed and restart main loop Signed-off-by: Sebastian Spaink <sebastianspaink@gmail.com>
Why the changes in this PR are needed?
resolve: #7455
What are the changes in this PR?
This introduces a new trigger mode for the decision log plugin:
decision_logs.reporting.trigger=immediateThe
immediatetrigger mode will upload events as soon as enough events are received to hit the configured upload limit. If not enough events are received within the configured min-max delay, the events received so far are flushed and uploaded.For the event buffer type this means if enough events are received they could be uploaded sooner than the configured min-max delay allowing the buffer to empty quicker preventing any dropped events. While for the size buffer uploads could also happen sooner but regardless of the trigger mode dropped events aren't as likely, given the default unlimited size and the fact the events are stored as chunks. Although in the
immediatemode both buffer types do allow the chunks of events to be uploaded as a stream, opposed to multiple chunks uploaded in bursts.Notes to assist PR review:
A contrived example to help demonstrate the benefit, using a small buffer size limit and a long delay time:
Setup the following config (opa-conf.yaml):
Have this simple Rego file (example.rego)
./opa_darwin_arm64 run -c opa-conf.yaml --server ./example.regologeaterservice (just a service to receive the logs):go run main.goecho 'POST http://localhost:8181/v1/data/example/allow' | vegeta attack --duration=10s -rate=500 | tee results.bin | vegeta reportNow if you check
http://localhost:8181/v1/statusyou will see a shocking metriccounter_decision_logs_dropped_buffer_size_limit_exceeded 4800. This is because vegeta is sending 500 requests per second for 10 seconds and the buffer managed to send only 100 events. The other 100 are in the buffer.Now if you update the config to use the new trigger mode
trigger: immediate, checking/v1/statusagain you will see no events were dropped! You do see some other fun metrics of the encoder attempting to adjust the guessed uncompressed limit:{ "counter_enc_uncompressed_limit_scale_down": 7, "counter_enc_uncompressed_limit_scale_up": 10 }These metrics didn't show up for the
periodicmode because they are reported by the encoder, which didn't get run enough times to scale the uncompressed limit.Data
Attacking OPA configured with different buffer types and triggers for 30 seconds also illustrates what I described above. Periodic uploads in bursts and the immediate as a stream. Looks like the encoder stabilizes trying to guess the uncompressed limit sooner with the immediate mode with the event buffer as well. Using the default size limits no events are dropped.
Used an updated logeater service that spits out a graph (code here).
Event, Immediate
Average Duration between uploads: 970.387093ms
Max Duration between uploads: 983.76925ms
Event, Periodic
Average Duration between uploads: 593.645934ms
Max Duration between uploads: 16.326999458s
Size, Immediate
Average Duration between uploads: 974.381334ms
Max Duration between uploads: 3.96508625s
Size, Periodic
Dropped chunks of event, and gaps between uploads
Average Duration between uploads: 439.323843ms
Max Duration between uploads: 12.64677575s