Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions packages/d2ts/src/operators/join.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,12 @@ export class JoinOperator<K, V1, V2> extends BinaryOperator<
if (this.outputFrontier.lessThan(inputFrontier)) {
this.outputFrontier = inputFrontier
this.output.sendFrontier(this.outputFrontier)
this.#indexA.compact(this.outputFrontier)
this.#indexB.compact(this.outputFrontier)
// Compact in the background
// but do not await to avoid blocking the event loop
Promise.all([
this.#indexA.compactAsync(this.outputFrontier),
this.#indexB.compactAsync(this.outputFrontier)
])
}
}
}
Expand Down
147 changes: 106 additions & 41 deletions packages/d2ts/src/version-index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export class Index<K, V> implements IndexType<K, V> {
#inner: IndexMap<K, V>
#compactionFrontier: Antichain | null
#modifiedKeys: Set<K>
#compactionPromise: Promise<void>

constructor() {
this.#inner = new DefaultMap<K, VersionMap<[V, number]>>(
Expand All @@ -42,6 +43,7 @@ export class Index<K, V> implements IndexType<K, V> {
// }
this.#compactionFrontier = null
this.#modifiedKeys = new Set()
this.#compactionPromise = Promise.resolve()
}

toString(indent = false): string {
Expand Down Expand Up @@ -144,44 +146,26 @@ export class Index<K, V> implements IndexType<K, V> {
)

// We want to iterate over the smaller of the two indexes to reduce the
// number of operations we need to do.
if (this.#inner.size <= other.#inner.size) {
for (const [key, versions] of this.#inner) {
if (!other.has(key)) continue
const otherVersions = other.get(key)
for (const [rawVersion1, data1] of versions) {
const version1 =
this.#compactionFrontier &&
this.#compactionFrontier.lessEqualVersion(rawVersion1)
? rawVersion1.advanceBy(this.#compactionFrontier)
: rawVersion1
for (const [version2, data2] of otherVersions) {
for (const [val1, mul1] of data1) {
for (const [val2, mul2] of data2) {
const resultVersion = version1.join(version2)
collections.update(resultVersion, (existing) => {
existing.push([key, [val1, val2], mul1 * mul2])
return existing
})
}
}
}
}
}
} else {
for (const [key, otherVersions] of other.entries()) {
if (!this.has(key)) continue
const versions = this.get(key)
// number of operations we need to do.
const thisIsTheSmallerIndex = this.#inner.size <= other.#inner.size
const [smallestIndex, otherIndex] =
thisIsTheSmallerIndex ? [this, other] : [other, this]

for (const [key, versions] of smallestIndex.#inner) {
if (!otherIndex.has(key)) continue
const otherVersions = otherIndex.get(key)
for (const [version1, data1] of versions) {
for (const [version2, data2] of otherVersions) {
for (const [version1, data1] of versions) {
for (const [val1, mul1] of data1) {
for (const [val2, mul2] of data2) {
for (const [val1, mul1] of data1) {
const resultVersion = version1.join(version2)
collections.update(resultVersion, (existing) => {
existing.push([key, [val1, val2], mul1 * mul2])
return existing
})
}
const resultVersion = version1.join(version2)
collections.update(resultVersion, (existing) => {
const values: [V, V2] = thisIsTheSmallerIndex
? [val1 as V, val2 as V2]
: [val2 as V, val1 as V2]
existing.push([key, values, mul1 * mul2])
return existing
})
}
}
}
Expand All @@ -197,7 +181,10 @@ export class Index<K, V> implements IndexType<K, V> {
return result as [Version, MultiSet<[K, [V, V2]]>][]
}

compact(compactionFrontier: Antichain, keys: K[] = []): void {
compact(
compactionFrontier: Antichain,
keys: K[] = Array.from(this.#modifiedKeys),
): void {
if (
this.#compactionFrontier &&
!this.#compactionFrontier.lessEqual(compactionFrontier)
Expand Down Expand Up @@ -226,10 +213,7 @@ export class Index<K, V> implements IndexType<K, V> {
)
}

const keysToProcess =
keys.length > 0 ? keys : Array.from(this.#modifiedKeys)

for (const key of keysToProcess) {
for (const key of keys) {
const versions = this.#inner.get(key)

const toCompact = Array.from(versions.keys()).filter(
Expand Down Expand Up @@ -264,6 +248,87 @@ export class Index<K, V> implements IndexType<K, V> {
this.#compactionFrontier = compactionFrontier
}

/**
* Asynchronous version of compact that processes one key at a time and yields to the event loop.
* This prevents blocking the event loop during compaction of large datasets.
* Multiple calls to compactAsync are chained to ensure sequential execution.
* WARNING: Never interleave calls to compactAsync with calls to compact.
* Those will not be chained and may interleave, resulting in wrong compaction.
*/
async compactAsync(
compactionFrontier: Antichain,
keys: K[] = Array.from(this.#modifiedKeys),
): Promise<void> {
// Chain this compaction operation to the previous one
this.#compactionPromise = this.#compactionPromise.then(async () => {
if (
this.#compactionFrontier &&
!this.#compactionFrontier.lessEqual(compactionFrontier)
) {
throw new Error('Invalid compaction frontier')
}

this.#validate(compactionFrontier)

const consolidateValues = (values: [V, number][]): [V, number][] => {
const consolidated = new Map<string | number, [V, number]>()

for (const [value, multiplicity] of values) {
const key = hash(value)
const existing = consolidated.get(key)
if (existing) {
consolidated.set(key, [value, existing[1] + multiplicity])
} else {
consolidated.set(key, [value, multiplicity])
}
}

return Array.from(consolidated.values()).filter(
([_, multiplicity]) => multiplicity !== 0,
)
}

for (const key of keys) {
const versions = this.#inner.get(key)

const toCompact = Array.from(versions.keys()).filter(
(version) => !compactionFrontier.lessEqualVersion(version),
)

const toConsolidate = new Set<Version>()

for (const version of toCompact) {
const values = versions.get(version)
versions.delete(version)

const newVersion = version.advanceBy(compactionFrontier)
versions.update(newVersion, (existing) => {
chunkedArrayPush(existing, values)
return existing
})
toConsolidate.add(newVersion)
}

for (const version of toConsolidate) {
const newValues = consolidateValues(versions.get(version))
if (newValues.length > 0) {
versions.set(version, newValues)
} else {
this.#inner.delete(key)
}
}
this.#modifiedKeys.delete(key)

// Yield to the event loop after processing each key
await new Promise(resolve => setTimeout(resolve, 0))
}

this.#compactionFrontier = compactionFrontier
})

return this.#compactionPromise
}

keys(): K[] {
return Array.from(this.#inner.keys())
}
Expand Down
Loading