Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.query.DefaultQueryMetrics;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.http.ClientSqlQuery;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.server.metrics.LatchableEmitter;
import org.apache.druid.server.metrics.StorageMonitor;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.http.GetQueryReportResponse;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
Expand All @@ -53,6 +57,8 @@
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

/**
Expand All @@ -73,7 +79,8 @@
@Override
public EmbeddedDruidCluster createCluster()
{
historical.addProperty("druid.segmentCache.virtualStorage", "true")
historical.setServerMemory(500_000_000)
.addProperty("druid.segmentCache.virtualStorage", "true")
.addProperty("druid.segmentCache.virtualStorageLoadThreads", String.valueOf(Runtime.getRuntime().availableProcessors()))
.addBeforeStartHook(
(cluster, self) -> self.addProperty(
Expand All @@ -87,6 +94,11 @@
)
.addProperty("druid.server.maxSize", String.valueOf(HumanReadableBytes.parse("100MiB")));

broker.setServerMemory(200_000_000)
.addProperty("druid.msq.dart.controller.maxRetainedReportCount", "10")
.addProperty("druid.query.default.context.maxConcurrentStages", "1")
.addProperty("druid.sql.planner.enableSysQueriesTable", "true");

coordinator.addProperty("druid.manager.segments.useIncrementalCache", "always");

overlord.addProperty("druid.manager.segments.useIncrementalCache", "always")
Expand All @@ -102,6 +114,7 @@
.useLatchableEmitter()
.useDefaultTimeoutForLatchableEmitter(20)
.addResource(storageResource)
.addCommonProperty("druid.msq.dart.enabled", "true")
.addCommonProperty("druid.storage.zip", "false")
.addCommonProperty("druid.indexer.task.buildV10", "true")
.addCommonProperty("druid.monitoring.emissionPeriod", "PT1s")
Expand Down Expand Up @@ -136,7 +149,7 @@
{
Throwable t = Assertions.assertThrows(
RuntimeException.class,
() -> cluster.runSql("select * from \"%s\"", dataSource)
() -> cluster.runSql("select count(*) from \"%s\"", dataSource)
);
Assertions.assertTrue(t.getMessage().contains("Unable to load segment"));
Assertions.assertTrue(t.getMessage().contains("] on demand, ensure enough disk space has been allocated to load all segments involved in the query"));
Expand Down Expand Up @@ -233,6 +246,58 @@
);
}

@Test
void testQueryTooMuchDataButWithDart()
{
// dart uses vsf in a totally rad way so it can query all of the segments at once due to how it chunks up and
// fetches segments to do the work
final String sqlQueryId = UUID.randomUUID().toString();
final String resultString = cluster.callApi().onAnyBroker(
b -> b.submitSqlQuery(
new ClientSqlQuery(
StringUtils.format("select count(*) from \"%s\"", dataSource),
"CSV",
false,
false,
false,
Map.of(
QueryContexts.CTX_SQL_QUERY_ID, sqlQueryId,
QueryContexts.ENGINE, "msq-dart"
),
null
)
)
).trim();

final Long result = Long.parseLong(resultString);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note test

Potential uncaught 'java.lang.NumberFormatException'.
Assertions.assertEquals(39244L, result);

// Now fetch the report using the SQL query ID
final GetQueryReportResponse reportResponse = msqApis.getDartQueryReport(sqlQueryId, broker);

// Verify the report response
Assertions.assertNotNull(reportResponse, "Report response should not be null");
ChannelCounters.Snapshot segmentChannelCounters =
(ChannelCounters.Snapshot) reportResponse.getReportMap()
.findReport("multiStageQuery")
.map(r ->
((MSQTaskReportPayload) r.getPayload()).getCounters()
.snapshotForStage(0)
.get(0)
.getMap()
.get("input0")
).orElse(null);

Assertions.assertNotNull(segmentChannelCounters);
Assertions.assertArrayEquals(new long[]{24L}, segmentChannelCounters.getFiles());
Assertions.assertTrue(segmentChannelCounters.getLoadFiles()[0] > 0 && segmentChannelCounters.getLoadFiles()[0] <= segmentChannelCounters.getFiles()[0]);
// size of all segments at time of writing, possibly we have to load all of them, but possibly less depending on
// test order
Assertions.assertTrue(segmentChannelCounters.getLoadBytes()[0] > 0 && segmentChannelCounters.getLoadBytes()[0] <= 3776682L);
Assertions.assertTrue(segmentChannelCounters.getLoadTime()[0] > 0);
Assertions.assertTrue(segmentChannelCounters.getLoadWait()[0] > 0);
}


private void assertQueryMetrics(int expectedEventCount, @Nullable Long expectedLoadCount)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import it.unimi.dsi.fastutil.longs.LongList;
import org.apache.druid.frame.Frame;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.segment.loading.AcquireSegmentResult;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

/**
* Counters for inputs and outputs. Created by {@link CounterTracker#channel}.
Expand All @@ -54,6 +56,18 @@ public class ChannelCounters implements QueryCounter
@GuardedBy("this")
private final LongList totalFiles = new LongArrayList();

@GuardedBy("this")
private final LongList loadFiles = new LongArrayList();

@GuardedBy("this")
private final LongList loadBytes = new LongArrayList();

@GuardedBy("this")
private final LongList loadTime = new LongArrayList();

@GuardedBy("this")
private final LongList loadWait = new LongArrayList();

public void incrementRowCount()
{
incrementRowCount(NO_PARTITION);
Expand All @@ -79,6 +93,19 @@ public void addFile(final long nRows, final long nBytes)
add(NO_PARTITION, nRows, nBytes, 0, 1);
}

public void addLoad(AcquireSegmentResult loadResult)
{
if (loadResult.getLoadSizeBytes() > 0) {
addLoad(
NO_PARTITION,
loadResult.getLoadSizeBytes(),
TimeUnit.NANOSECONDS.toMillis(loadResult.getLoadTimeNanos()),
TimeUnit.NANOSECONDS.toMillis(loadResult.getWaitTimeNanos()),
1
);
}
}

/**
* Increment counts by one frame, and if this {@link RowsAndColumns} is a {@link Frame}, also increment
* bytes by {@link Frame#numBytes()}.
Expand Down Expand Up @@ -116,6 +143,23 @@ private void add(
}
}

private void addLoad(
final int partitionNumber,
final long nBytes,
final long nTime,
final long nWait,
final long nFiles
)
{
synchronized (this) {
ensureCapacityForPartitionLoad(partitionNumber);
loadBytes.set(partitionNumber, loadBytes.getLong(partitionNumber) + nBytes);
loadTime.set(partitionNumber, loadTime.getLong(partitionNumber) + nTime);
loadWait.set(partitionNumber, loadWait.getLong(partitionNumber) + nWait);
loadFiles.set(partitionNumber, loadFiles.getLong(partitionNumber) + nFiles);
}
}

@GuardedBy("this")
private void ensureCapacityForPartition(final int partitionNumber)
{
Expand All @@ -140,6 +184,26 @@ private void ensureCapacityForPartition(final int partitionNumber)
}
}

@GuardedBy("this")
private void ensureCapacityForPartitionLoad(final int partitionNumber)
{
while (partitionNumber >= loadFiles.size()) {
loadFiles.add(0);
}

while (partitionNumber >= loadBytes.size()) {
loadBytes.add(0);
}

while (partitionNumber >= loadTime.size()) {
loadTime.add(0);
}

while (partitionNumber >= loadWait.size()) {
loadWait.add(0);
}
}

@Override
@Nullable
public Snapshot snapshot()
Expand All @@ -149,23 +213,46 @@ public Snapshot snapshot()
final long[] framesArray;
final long[] filesArray;
final long[] totalFilesArray;
final long[] loadBytesArray;
final long[] loadTimeArray;
final long[] loadWaitArray;
final long[] loadFilesArray;

synchronized (this) {
rowsArray = listToArray(rows);
bytesArray = listToArray(bytes);
framesArray = listToArray(frames);
filesArray = listToArray(files);
totalFilesArray = listToArray(totalFiles);
loadBytesArray = listToArray(loadBytes);
loadTimeArray = listToArray(loadTime);
loadWaitArray = listToArray(loadWait);
loadFilesArray = listToArray(loadFiles);
}

if (rowsArray == null
&& bytesArray == null
&& framesArray == null
&& filesArray == null
&& totalFilesArray == null) {
&& totalFilesArray == null
&& loadBytesArray == null
&& loadTimeArray == null
&& loadWaitArray == null
&& loadFilesArray == null
) {
return null;
} else {
return new Snapshot(rowsArray, bytesArray, framesArray, filesArray, totalFilesArray);
return new Snapshot(
rowsArray,
bytesArray,
framesArray,
filesArray,
totalFilesArray,
loadBytesArray,
loadTimeArray,
loadWaitArray,
loadFilesArray
);
}
}

Expand Down Expand Up @@ -196,21 +283,33 @@ public static class Snapshot implements QueryCounterSnapshot
private final long[] frames;
private final long[] files;
private final long[] totalFiles;
private final long[] loadBytes;
private final long[] loadTime;
private final long[] loadWait;
private final long[] loadFiles;

@JsonCreator
public Snapshot(
@Nullable @JsonProperty("rows") final long[] rows,
@Nullable @JsonProperty("bytes") final long[] bytes,
@Nullable @JsonProperty("frames") final long[] frames,
@Nullable @JsonProperty("files") final long[] files,
@Nullable @JsonProperty("totalFiles") final long[] totalFiles
@Nullable @JsonProperty("totalFiles") final long[] totalFiles,
@Nullable @JsonProperty("loadBytes") final long[] loadBytes,
@Nullable @JsonProperty("loadTime") final long[] loadTime,
@Nullable @JsonProperty("loadWait") final long[] loadWait,
@Nullable @JsonProperty("loadFiles") final long[] loadFiles
)
{
this.rows = rows;
this.bytes = bytes;
this.frames = frames;
this.files = files;
this.totalFiles = totalFiles;
this.loadBytes = loadBytes;
this.loadTime = loadTime;
this.loadWait = loadWait;
this.loadFiles = loadFiles;
}

@JsonProperty
Expand Down Expand Up @@ -248,6 +347,34 @@ public long[] getTotalFiles()
return totalFiles;
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public long[] getLoadBytes()
{
return loadBytes;
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public long[] getLoadTime()
{
return loadTime;
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public long[] getLoadWait()
{
return loadTime;
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public long[] getLoadFiles()
{
return loadFiles;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -262,7 +389,10 @@ public boolean equals(Object o)
&& Arrays.equals(bytes, snapshot.bytes)
&& Arrays.equals(frames, snapshot.frames)
&& Arrays.equals(files, snapshot.files)
&& Arrays.equals(totalFiles, snapshot.totalFiles);
&& Arrays.equals(totalFiles, snapshot.totalFiles)
&& Arrays.equals(loadBytes, snapshot.loadBytes)
&& Arrays.equals(loadTime, snapshot.loadTime)
&& Arrays.equals(loadFiles, snapshot.loadFiles);
}

@Override
Expand All @@ -273,6 +403,9 @@ public int hashCode()
result = 31 * result + Arrays.hashCode(frames);
result = 31 * result + Arrays.hashCode(files);
result = 31 * result + Arrays.hashCode(totalFiles);
result = 31 * result + Arrays.hashCode(loadBytes);
result = 31 * result + Arrays.hashCode(loadTime);
result = 31 * result + Arrays.hashCode(loadFiles);
return result;
}

Expand All @@ -285,6 +418,9 @@ public String toString()
", frames=" + Arrays.toString(frames) +
", files=" + Arrays.toString(files) +
", totalFiles=" + Arrays.toString(totalFiles) +
", loadBytes=" + Arrays.toString(loadBytes) +
", loadTime=" + Arrays.toString(loadTime) +
", loadFiles=" + Arrays.toString(loadFiles) +
'}';
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.exec.DataServerQueryHandler;
import org.apache.druid.msq.exec.std.StandardPartitionReader;
import org.apache.druid.msq.input.LoadableSegment;
Expand Down Expand Up @@ -305,6 +306,10 @@ private ListenableFuture<ReadableInput> loadNextSegment()
// Transfer segment from "loadingSegments" to "loadedSegments" and return a reference to it.
if (loadingSegments.remove(acquireSegmentAction)) {
try {
final ChannelCounters inputCounters = nextLoadableSegment.inputCounters();
if (inputCounters != null) {
inputCounters.addLoad(segment);
}
final SegmentReferenceHolder referenceHolder = new SegmentReferenceHolder(
new SegmentReference(
nextLoadableSegment.descriptor(),
Expand Down
Loading
Loading