Skip to content

fix: reduce MDT compaction heap pressure in BitCaskDiskMap and ExternalSpillableMap#19085

Open
nsivabalan wants to merge 1 commit into
apache:masterfrom
nsivabalan:ENG-43078-mdt-compaction-heap-fixes-oss
Open

fix: reduce MDT compaction heap pressure in BitCaskDiskMap and ExternalSpillableMap#19085
nsivabalan wants to merge 1 commit into
apache:masterfrom
nsivabalan:ENG-43078-mdt-compaction-heap-fixes-oss

Conversation

@nsivabalan

Copy link
Copy Markdown
Contributor

Change Logs

Fixes #19084.

MDT record_index partition compaction OOMs executors configured with 9 GB heap, four concurrent tasks, and spark.memory.fraction=0.3. Heap analysis identified three structural consumers that aren't bounded by the configured compaction memory cap (hoodie.memory.compaction.fraction):

Consumer What Total (4 tasks)
B BitCaskDiskMap.valueMetadataMap — 18.3M ValueMetadata entries with Integer/Long boxing overhead dominating (57.2M java.lang.Long instances = ~1.28 GB pure boxing) ~3.0–3.2 GB
F LazyFileIterator materialised a full sorted ArrayList (4.58M entries × 4 tasks) at iterator-open time because ConcurrentHashMap doesn't preserve insertion order ~513 MB
E ExternalSpillableMap.keySet() allocated a transient HashSet copying all 6.29M keys solely to seed the PriorityQueue in HoodieSortedMergeHandle's constructor ~256 MB peak

4-task demand: ~6.25 GB on a 9 GB executor — OOMs under GC pressure. Lowering hoodie.memory.compaction.fraction makes B larger and the OOM worse, so the fix must be code, not config.

Changes

  1. Unbox ValueMetadata primitives (BitCaskDiskMap.java) — Integer/Longint/long for sizeOfValue, offsetOfValue, timestamp. Saves ~880 MB across 4 tasks. The Lombok @AllArgsConstructor + @Getter annotations are preserved; getter return types switch from boxed to primitive (autoboxing handles any external caller).

  2. LinkedHashMap + ReentrantReadWriteLock for valueMetadataMap (BitCaskDiskMap.java, LazyFileIterable.java) — filePosition is strictly monotonic under the write lock (append-only file + monotonic SizeAwareDataOutputStream.getSize()), so insertion order = disk offset order. LazyFileIterator no longer sorts; it consumes a pre-built snapshot list taken under the read lock. put() does remove-before-put under the write lock so re-inserted keys land at the LinkedHashMap tail. All read paths (get, containsKey, size, iterator, valueStream, entrySet, keySet, iterator(Predicate)) take the read lock and snapshot before releasing. close() and clear() take the write lock so a concurrent reader under SpillableMapBasedFileSystemView's shared readLock can't observe a half-cleared map. Saves ~513 MB across 4 tasks.

  3. ExternalSpillableMap.keyStream() + HoodieSortedMergeHandle uses it (ExternalSpillableMap.java, HoodieSortedMergeHandle.java) — keyStream() returns Stream.concat(inMemoryKeys, diskKeys) without allocating a HashSet copy. HoodieSortedMergeHandle now collects the stream into an ArrayList and passes it to new PriorityQueue<>(list) to get the O(N) heapify (vs. O(N log N) per-add). Saves ~256 MB peak spike per task.

Total: ~1.65 GB per task. 4-task demand drops from ~6.25 GB to ~4.57 GB.

Impact

Eliminates executor OOM during MDT record_index compaction on commonly-sized executors. No on-disk format change. No public-API surface change (ValueMetadata.getOffsetOfValue() return type changes from Long to long, but ValueMetadata is an internal class and autoboxing handles any external callers).

Risk Level

Low. The locking model is conservative (RW-lock with read-shared concurrency preserved), the insertion-order = offset-order invariant is proved from append-only file + monotonic filePosition + write-lock, and the regression test suite locks down both the invariant and the concurrent-read contract.

Documentation Update

None required.

Tests

  • TestBitCaskDiskMap#testReInsertionPreservesOffsetOrder (parameterized × compression on/off): puts a key twice, asserts re-inserted key lands at iteration tail. Locks down the LinkedHashMap-insertion-order == disk-offset-order invariant against future refactors of put().
  • TestBitCaskDiskMap#testConcurrentReadersAndWriter: writer + reader threads; asserts no ConcurrentModificationException and that the RW-lock contract holds.
  • Full TestBitCaskDiskMap + TestExternalSpillableMap suite: 45 run / 0 failed / 1 skipped on Java 11 with -Dscala-2.12 -Dspark3.5.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

…and ExternalSpillableMap

Fixes apache#19084.

## Problem

MDT `record_index` compaction tasks were OOMing on executors with 9 GB
heap, four concurrent tasks, and `spark.memory.fraction=0.3`. Heap
analysis identified three structural consumers that aren't bounded by
the configured compaction memory cap:

  B  ~3.0–3.2 GB    BitCaskDiskMap.valueMetadataMap — 18.3M ValueMetadata
                    entries with Integer/Long boxing overhead dominating
                    (57.2M java.lang.Long instances = ~1.28 GB pure boxing)
  F  ~513 MB        LazyFileIterator sorted ArrayList (4.58M entries × 4
                    tasks) materialised at iterator-open time because
                    ConcurrentHashMap doesn't preserve insertion order
  E  ~256 MB peak   ExternalSpillableMap.keySet() HashSet spike (6.29M
                    keys copied to seed a PriorityQueue)

Total ~1.6 GB per task across 4 concurrent tasks, on top of the bounded
in-memory record map. Lowering hoodie.memory.compaction.fraction makes B
larger and the OOM worse — fix has to be code, not config.

## Changes

1. Unbox ValueMetadata primitives (BitCaskDiskMap.java)
   Integer/Long → int/long for sizeOfValue, offsetOfValue, timestamp.
   Saves ~880 MB across 4 tasks.

2. LinkedHashMap + ReentrantReadWriteLock for valueMetadataMap
   (BitCaskDiskMap.java, LazyFileIterable.java)

   - filePosition is strictly monotonic under the write lock (append-only
     file + monotonic SizeAwareDataOutputStream.getSize()), so insertion
     order = disk offset order. LazyFileIterator no longer sorts; it
     consumes a pre-built snapshot list taken under the read lock.
   - put() does remove-before-put under the write lock so re-inserted
     keys land at the LinkedHashMap tail (preserves offset-order).
   - All read paths (get, containsKey, size, iterator, valueStream,
     entrySet, keySet, iterator(Predicate)) take the read lock and
     snapshot before releasing.
   - close() and clear() take the write lock so a concurrent reader
     under SpillableMapBasedFileSystemView's shared readLock can't
     observe a half-cleared map.

   Saves ~513 MB across 4 tasks.

3. ExternalSpillableMap.keyStream() + HoodieSortedMergeHandle uses it
   (ExternalSpillableMap.java, HoodieSortedMergeHandle.java)

   keyStream() returns Stream.concat(inMemoryKeys, diskKeys) without
   allocating a HashSet copy. HoodieSortedMergeHandle now collects the
   stream into an ArrayList and passes it to new PriorityQueue<>(list)
   to get the O(N) heapify (vs. O(N log N) per-add).

   Saves ~256 MB peak spike per task.

## Total expected savings

~1.65 GB per task across the three fixes. 4-task demand drops from
~6.25 GB to ~4.57 GB on a 9 GB executor heap with 4 concurrent tasks.

## Tests

- testReInsertionPreservesOffsetOrder (parameterized × compression
  on/off): puts a key twice, asserts re-inserted key lands at iteration
  tail. Locks down the LinkedHashMap-insertion-order == disk-offset-order
  invariant.
- testConcurrentReadersAndWriter: writer + reader threads; asserts no
  CME and that the RW-lock contract holds.
- Full TestBitCaskDiskMap + TestExternalSpillableMap suite: 45/0/1 on
  Java 11 with -Dscala-2.12 -Dspark3.5.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@nsivabalan nsivabalan changed the title [HUDI-19084] fix: reduce MDT compaction heap pressure in BitCaskDiskMap and ExternalSpillableMap fix: reduce MDT compaction heap pressure in BitCaskDiskMap and ExternalSpillableMap Jun 28, 2026
@github-actions github-actions Bot added the size:L PR with lines of changes in (300, 1000] label Jun 28, 2026
@hudi-bot

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@hudi-agent hudi-agent left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ 🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for working on this! This PR reduces MDT compaction heap pressure by unboxing ValueMetadata primitives, switching BitCaskDiskMap from ConcurrentHashMap to a LinkedHashMap guarded by a ReentrantReadWriteLock (so iteration relies on insertion-order == disk-offset-order instead of an O(N log N) sort), and adding a lazy keyStream() to avoid a transient key-set copy in HoodieSortedMergeHandle.

I traced the locking model, the insertion-order/offset-order invariant, the serialization/flush moves outside the write lock, the primitive getter change, and the keyStream dedup — these all look carefully handled and correct. One inline question on whether the keyStream() path fully realizes the keySet-copy savings once the map has spilled to BitCaskDiskMap. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A few small readability nits in the new test and one redundant stream call in the production code, but the overall structure and commenting of the changes is solid.

if (diskBasedMap == null) {
return inMemoryMap.keySet().stream();
}
return Stream.concat(inMemoryMap.keySet().stream(), diskBasedMap.keySet().stream());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 When the map has spilled, keyStream() calls diskBasedMap.keySet(), which this PR changes to eagerly return new HashSet<>(valueMetadataMap.keySet()) of all disk keys. So for a heavily-spilled map (the MDT case) the transient HashSet is still materialized here — does this fully realize the consumer-E savings, or would a truly streaming keySet on BitCaskDiskMap be needed? The javadoc's claim that keyStream "does NOT allocate a new HashSet copying all keys" seems only partially true once disk keys dominate.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

records.put(firstKey, updatedValue);

// size must remain 5 (re-insertion replaces the existing entry).
List<HoodieRecord> iteratedFromEntrySet = new ArrayList<>();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: iteratedFromEntrySet is populated by calling records.iterator(), not records.entrySet() — could you rename it to something like iteratedRecords or iteratedValues so a future reader isn't sent looking for an entrySet() call?

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

allValues.add(value);
}

java.util.concurrent.atomic.AtomicBoolean writerDone = new java.util.concurrent.atomic.AtomicBoolean(false);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: AtomicBoolean and AtomicReference are used with fully-qualified names here — could you add imports at the top of the file instead? Fully-qualified names mid-method add visual noise and make the lines quite hard to scan.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

} finally {
mapReadLock.unlock();
}
return snapshot.stream().sequential().map(valueMetaData -> (R) get(valueMetaData, file, valueSerializer, isCompressionEnabled));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: .sequential() is a no-op here — ArrayList.stream() already returns a sequential stream. Could you drop it? It may leave a future reader wondering whether parallelism was a concern and whether the stream might become parallel.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

3 participants