Skip to content

Commit 5c99c5b

Browse files
jcleezerJason Leezer
andauthored
Add methods to clear our EvictingCircularBuffer (#1133)
* Add methods to clear our EvictingBuffer * Fix test * Bump patch version * Changelog and version * Fix test * Make test less flaky * Fix another flaky test --------- Co-authored-by: Jason Leezer <[email protected]>
1 parent 2bb26e7 commit 5c99c5b

File tree

10 files changed

+151
-26
lines changed

10 files changed

+151
-26
lines changed

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ and what APIs have changed, if applicable.
1414

1515
## [Unreleased]
1616

17+
## [29.82.0] - 2026-01-13
18+
Add clear method to EvictingCircularBuffer that doesnt attempt to drain
19+
1720
## [29.81.2] - 2026-01-09
1821
- Clear DarkCluster Buffer on Refresh
1922

@@ -5936,7 +5939,8 @@ patch operations can re-use these classes for generating patch messages.
59365939

59375940
## [0.14.1]
59385941

5939-
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.81.2...master
5942+
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.82.0...master
5943+
[29.82.0]: https://github.com/linkedin/rest.li/compare/v29.81.2...v29.82.0
59405944
[29.81.2]: https://github.com/linkedin/rest.li/compare/v29.81.1...v29.81.2
59415945
[29.81.1]: https://github.com/linkedin/rest.li/compare/v29.81.0...v29.81.1
59425946
[29.81.0]: https://github.com/linkedin/rest.li/compare/v29.80.3...v29.81.0

d2/src/test/java/com/linkedin/d2/balancer/servers/ZookeeperConnectionManagerTest.java

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -833,22 +833,54 @@ private static class IgnoreCancelledCallback implements Callback<None>
833833
@Override
834834
public void onError(Throwable e)
835835
{
836-
if (e instanceof CancellationException || e.getCause() instanceof CancellationException || (e.getCause().getCause() != null && e.getCause()
837-
.getCause() instanceof CancellationException))
838-
{
836+
if (isIgnorable(e)) {
839837
_callback.onSuccess(None.none());
838+
return;
840839
}
841-
else
842-
{
843-
_callback.onError(e);
844-
}
840+
_callback.onError(e);
845841
}
846842

847843
@Override
848844
public void onSuccess(None result)
849845
{
850846
_callback.onSuccess(result);
851847
}
848+
849+
private boolean isIgnorable(Throwable t)
850+
{
851+
if (t == null)
852+
{
853+
return false;
854+
}
855+
// Directly ignorable types
856+
if (t instanceof CancellationException)
857+
{
858+
return true;
859+
}
860+
if (t instanceof org.apache.zookeeper.KeeperException.NoNodeException)
861+
{
862+
return true;
863+
}
864+
// MultiException: all causes must be ignorable
865+
if (t instanceof com.linkedin.common.callback.MultiException)
866+
{
867+
for (Throwable cause : ((com.linkedin.common.callback.MultiException) t).getCauses())
868+
{
869+
if (!isIgnorable(cause))
870+
{
871+
return false;
872+
}
873+
}
874+
return true;
875+
}
876+
// Otherwise, walk the cause chain defensively
877+
Throwable cause = t.getCause();
878+
if (cause != null && cause != t)
879+
{
880+
return isIgnorable(cause);
881+
}
882+
return false;
883+
}
852884
}
853885

854886
private static void dataValidation(String uri, String cluster, double weight)

d2/src/test/java/com/linkedin/d2/discovery/event/PublisherTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,9 @@ public void publishInitialize(String prop, String value)
191191
{
192192
_currentValues.put(prop, value);
193193
_initCondition.signalAll();
194+
// Also signal add waiters: some publishers may emit initialize with a value
195+
// (e.g., if a change occurs during initial fetch). Allow awaitAdd to wake and re-check.
196+
_addCondition.signalAll();
194197
}
195198
finally
196199
{

darkcluster/src/main/java/com/linkedin/darkcluster/impl/ConstantQpsDarkClusterStrategy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public boolean handleRequest(RestRequest originalRequest, RestRequest darkReques
8484
@Override
8585
public void shutdown()
8686
{
87-
_rateLimiter.cancelAll(new RuntimeException("Shutting down ConstantQpsDarkClusterStrategy"));
87+
_rateLimiter.clear();
8888
}
8989

9090
/**

darkcluster/src/test/java/com/linkedin/darkcluster/TestConstantQpsDarkClusterStrategy.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -204,11 +204,6 @@ public void testShutdown()
204204
// Call shutdown
205205
strategy.shutdown();
206206

207-
// Verify that cancelAll was called with a RuntimeException containing the expected message.
208-
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
209-
verify(mockRateLimiter).cancelAll(throwableCaptor.capture());
210-
Throwable throwable = throwableCaptor.getValue();
211-
Assert.assertTrue(throwable instanceof RuntimeException);
212-
Assert.assertEquals(throwable.getMessage(), "Shutting down ConstantQpsDarkClusterStrategy");
207+
verify(mockRateLimiter).clear();
213208
}
214209
}

darkcluster/src/test/java/com/linkedin/darkcluster/TestDarkClusterStrategyFactory.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -323,16 +323,16 @@ public void testRefreshShutsDownExistingStrategiesPerPartition()
323323
// strategies were shut down.
324324
//
325325
// Note: RelativeTrafficMultiplierDarkClusterStrategy has no observable shutdown behavior. ConstantQpsDarkClusterStrategy
326-
// does (it calls ConstantQpsRateLimiter.cancelAll), so we use CONSTANT_QPS here with a tracking rate limiter.
326+
// does (it calls ConstantQpsRateLimiter.clear()), so we use CONSTANT_QPS here with a tracking rate limiter.
327327
MockClusterInfoProvider clusterInfoProvider = new MockClusterInfoProvider();
328328
Facilities facilities = new MockFacilities(clusterInfoProvider);
329329
DarkClusterDispatcher darkClusterDispatcher = new DefaultDarkClusterDispatcher(new MockClient(false));
330330
ClockedExecutor executor = new ClockedExecutor();
331331

332-
AtomicInteger cancelAllInvocations = new AtomicInteger(0);
332+
AtomicInteger clearInvocations = new AtomicInteger(0);
333333
Supplier<ConstantQpsRateLimiter> rateLimiterSupplier = () ->
334334
new TrackingConstantQpsRateLimiter(executor, executor, executor,
335-
TestConstantQpsDarkClusterStrategy.getBuffer(executor), cancelAllInvocations);
335+
TestConstantQpsDarkClusterStrategy.getBuffer(executor), clearInvocations);
336336

337337
DarkClusterStrategyFactory strategyFactory = new DarkClusterStrategyFactoryImpl(facilities,
338338
SOURCE_CLUSTER_NAME,
@@ -361,31 +361,32 @@ public void testRefreshShutsDownExistingStrategiesPerPartition()
361361

362362
// Refresh again: should rebuild both partitions and shut down the old strategies.
363363
clusterInfoProvider.notifyListenersClusterAdded(SOURCE_CLUSTER_NAME);
364-
Assert.assertEquals(cancelAllInvocations.get(), 2,
365-
"Expected existing strategies to be shut down once per partition on refresh");
364+
Assert.assertEquals(clearInvocations.get(), 2,
365+
"Expected existing strategies to be cleared once per partition on refresh");
366366

367367
Assert.assertTrue(strategyFactory.get(DARK_CLUSTER_NAME, 0) instanceof ConstantQpsDarkClusterStrategy);
368368
Assert.assertTrue(strategyFactory.get(DARK_CLUSTER_NAME, 1) instanceof ConstantQpsDarkClusterStrategy);
369369
}
370370

371371
private static final class TrackingConstantQpsRateLimiter extends ConstantQpsRateLimiter
372372
{
373-
private final AtomicInteger _cancelAllInvocations;
373+
private final AtomicInteger _clearInvocations;
374374

375375
TrackingConstantQpsRateLimiter(ScheduledExecutorService scheduler,
376376
java.util.concurrent.Executor executor,
377377
com.linkedin.util.clock.Clock clock,
378378
com.linkedin.r2.transport.http.client.EvictingCircularBuffer callbackBuffer,
379-
AtomicInteger cancelAllInvocations)
379+
AtomicInteger clearInvocations)
380380
{
381381
super(scheduler, executor, clock, callbackBuffer);
382-
_cancelAllInvocations = cancelAllInvocations;
382+
_clearInvocations = clearInvocations;
383383
}
384384

385385
@Override
386-
public void cancelAll(Throwable throwable)
386+
public void clear()
387387
{
388-
_cancelAllInvocations.incrementAndGet();
388+
_clearInvocations.incrementAndGet();
389+
super.clear();
389390
}
390391
}
391392

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version=29.81.2
1+
version=29.82.0
22
group=com.linkedin.pegasus
33
org.gradle.configureondemand=true
44
org.gradle.parallel=true

r2-core/src/main/java/com/linkedin/r2/transport/http/client/ConstantQpsRateLimiter.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,14 @@ public void setBufferTtl(int ttl, ChronoUnit ttlUnit)
6565
_evictingCircularBuffer.setTtl(ttl, ttlUnit);
6666
}
6767

68+
/**
69+
* Clears all callbacks currently stored in the underlying {@link EvictingCircularBuffer}.
70+
*/
71+
public void clear()
72+
{
73+
_evictingCircularBuffer.clear();
74+
}
75+
6876

6977
private static class UnboundedRateLimiterExecutionTracker implements RateLimiterExecutionTracker
7078
{

r2-core/src/main/java/com/linkedin/r2/transport/http/client/EvictingCircularBuffer.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,32 @@ int getCapacity()
156156
return _callbacks.size();
157157
}
158158

159+
/**
160+
* Clears all callbacks and TTLs from the buffer immediately.
161+
* Acquires write locks for all elements to avoid partially cleared reads/writes.
162+
* Resets reader and writer positions to 0.
163+
*/
164+
public void clear()
165+
{
166+
ArrayList<ReentrantReadWriteLock> tempLocks = new ArrayList<>();
167+
_elementLocks.forEach(x ->
168+
{
169+
x.writeLock().lock();
170+
tempLocks.add(x);
171+
});
172+
try
173+
{
174+
Collections.fill(_callbacks, null);
175+
Collections.fill(_ttlBuffer, null);
176+
_readerPosition.set(0);
177+
_writerPosition.set(0);
178+
}
179+
finally
180+
{
181+
tempLocks.forEach(x -> x.writeLock().unlock());
182+
}
183+
}
184+
159185
/**
160186
* Resizes the circular buffer, deleting the contents in the process.
161187
* This method should not be called frequently, ideally only as part of a startup lifecycle, as it does heavy locking

r2-core/src/test/java/com/linkedin/r2/transport/http/client/ratelimiter/TestConstantQpsRateLimiter.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.linkedin.common.callback.Callback;
2020
import com.linkedin.common.util.None;
2121
import com.linkedin.r2.transport.http.client.ConstantQpsRateLimiter;
22+
import com.linkedin.r2.transport.http.client.EvictingCircularBuffer;
2223
import com.linkedin.r2.transport.http.client.TestEvictingCircularBuffer;
2324
import com.linkedin.test.util.ClockedExecutor;
2425
import com.linkedin.test.util.retry.ThreeRetries;
@@ -28,9 +29,11 @@
2829
import java.util.HashSet;
2930
import java.util.List;
3031
import java.util.Set;
32+
import java.util.NoSuchElementException;
3133
import java.util.concurrent.atomic.AtomicInteger;
3234
import org.junit.Assert;
3335
import org.testng.annotations.Test;
36+
import static org.junit.Assert.fail;
3437

3538

3639
public class TestConstantQpsRateLimiter
@@ -211,6 +214,59 @@ public void testLowRateHighlyParallelConsistentRandomness()
211214
assert(zeroFreqFailCount <= acceptableFailCount);
212215
}
213216

217+
@Test(timeOut = TEST_TIMEOUT)
218+
/**
219+
* This simulates the shutdown behavior of ConstantQpsDarkClusterStrategy,
220+
* to verify that the buffer is cleared immediately upon shutdown.
221+
*/
222+
public void shutdownClearsBufferImmediately()
223+
{
224+
ClockedExecutor executor = new ClockedExecutor();
225+
// Use a separate clock for the circular buffer so TTL doesn’t auto-expire unless we advance it.
226+
ClockedExecutor circularBufferExecutor = new ClockedExecutor();
227+
EvictingCircularBuffer buffer = TestEvictingCircularBuffer.getBuffer(circularBufferExecutor);
228+
ConstantQpsRateLimiter rateLimiter =
229+
new ConstantQpsRateLimiter(executor, executor, executor, buffer);
230+
231+
// Ensure TTL is long so callbacks would normally remain unless explicitly cleared.
232+
rateLimiter.setBufferTtl(Integer.MAX_VALUE, ChronoUnit.DAYS);
233+
rateLimiter.setBufferCapacity(10);
234+
235+
// Submit several no-op callbacks
236+
for (int i = 0; i < 10; i++)
237+
{
238+
rateLimiter.submit(new Callback<None>()
239+
{
240+
@Override
241+
public void onError(Throwable e) { }
242+
243+
@Override
244+
public void onSuccess(None result) { }
245+
});
246+
}
247+
248+
// Sanity: buffer should have something retrievable before cancelAll
249+
Assert.assertNotNull(buffer.get());
250+
251+
// Invoke shutdown-like behavior: clear buffer immediately
252+
rateLimiter.clear();
253+
254+
// Run the executor briefly to allow the event loop to process
255+
executor.runFor(ONE_SECOND);
256+
257+
// Verify the buffer is cleared immediately (i.e., not waiting for TTL to expire)
258+
boolean threw = false;
259+
try
260+
{
261+
buffer.get();
262+
}
263+
catch (NoSuchElementException ex)
264+
{
265+
threw = true;
266+
}
267+
Assert.assertTrue("Expected the buffer to be empty immediately after clear()", threw);
268+
}
269+
214270
private static class TattlingCallback<T> implements Callback<T>
215271
{
216272
private AtomicInteger _interactCount = new AtomicInteger();

0 commit comments

Comments
 (0)