diff --git a/internal-api/src/jmh/java/datadog/trace/util/ConcurrentHashtableD2Benchmark.java b/internal-api/src/jmh/java/datadog/trace/util/ConcurrentHashtableD2Benchmark.java new file mode 100644 index 00000000000..7219cdfff69 --- /dev/null +++ b/internal-api/src/jmh/java/datadog/trace/util/ConcurrentHashtableD2Benchmark.java @@ -0,0 +1,179 @@ +package datadog.trace.util; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; + +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Compares {@link ConcurrentHashtable.D2} against {@link ConcurrentHashMap} and {@link + * ConcurrentSkipListMap} for shared, concurrent composite-key lookups. + * + *

The table is shared across all threads ({@link Scope#Benchmark}) and pre-populated before the + * measurement iteration — modelling the steady-state read-mostly pattern that the tracer uses (a + * per-class or per-method instrumentation cache consulted on every invocation). + * + *

+ * + *

ConcurrentSkipListMap is included as a second baseline: it is entirely lock-free for reads + * (CAS-based) but pays for tree traversal and Comparable overhead on every operation. + */ +@Fork(2) +@Warmup(iterations = 2) +@Measurement(iterations = 3) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(MICROSECONDS) +@Threads(8) +public class ConcurrentHashtableD2Benchmark { + + static final int N_KEYS = 64; + static final int CAPACITY = 128; + + static final String[] SOURCE_K1 = new String[N_KEYS]; + static final Integer[] SOURCE_K2 = new Integer[N_KEYS]; + + static { + for (int i = 0; i < N_KEYS; ++i) { + SOURCE_K1[i] = "key-" + i; + SOURCE_K2[i] = i * 31 + 17; + } + } + + static final class D2Entry extends Hashtable.D2.Entry { + final long value; + + D2Entry(String k1, Integer k2) { + super(k1, k2); + this.value = 1L; + } + } + + /** Composite key for ConcurrentHashMap and ConcurrentSkipListMap baselines. */ + static final class Key2 implements Comparable { + final String k1; + final Integer k2; + final int hash; + + Key2(String k1, Integer k2) { + this.k1 = k1; + this.k2 = k2; + this.hash = Objects.hash(k1, k2); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Key2)) { + return false; + } + Key2 other = (Key2) o; + return Objects.equals(k1, other.k1) && Objects.equals(k2, other.k2); + } + + @Override + public int hashCode() { + return hash; + } + + @Override + public int compareTo(Key2 other) { + int c = k1.compareTo(other.k1); + return c != 0 ? c : k2.compareTo(other.k2); + } + } + + /** + * Shared state ({@link Scope#Benchmark}): one table instance across all threads, modelling a + * shared instrumentation cache. + */ + @State(Scope.Benchmark) + public static class SharedState { + ConcurrentHashtable.D2 table; + ConcurrentHashMap concurrentHashMap; + ConcurrentSkipListMap skipListMap; + + @Setup(Level.Iteration) + public void setUp() { + table = new ConcurrentHashtable.D2<>(CAPACITY); + concurrentHashMap = new ConcurrentHashMap<>(CAPACITY); + skipListMap = new ConcurrentSkipListMap<>(); + for (int i = 0; i < N_KEYS; ++i) { + table.getOrCreate(SOURCE_K1[i], SOURCE_K2[i], D2Entry::new); + Key2 key = new Key2(SOURCE_K1[i], SOURCE_K2[i]); + concurrentHashMap.put(key, (long) i); + skipListMap.put(key, (long) i); + } + } + } + + /** Per-thread cursor so each thread cycles through keys independently. */ + @State(Scope.Thread) + public static class ThreadState { + int cursor; + + int next() { + int i = cursor; + cursor = (i + 1) & (N_KEYS - 1); + return i; + } + } + + @Benchmark + public D2Entry get_concurrentHashtable(SharedState s, ThreadState t) { + int i = t.next(); + return s.table.get(SOURCE_K1[i], SOURCE_K2[i]); + } + + @Benchmark + public Long get_concurrentHashMap(SharedState s, ThreadState t) { + int i = t.next(); + return s.concurrentHashMap.get(new Key2(SOURCE_K1[i], SOURCE_K2[i])); + } + + @Benchmark + public Long get_concurrentSkipListMap(SharedState s, ThreadState t) { + int i = t.next(); + return s.skipListMap.get(new Key2(SOURCE_K1[i], SOURCE_K2[i])); + } + + @Benchmark + public D2Entry getOrCreate_concurrentHashtable(SharedState s, ThreadState t) { + int i = t.next(); + return s.table.getOrCreate(SOURCE_K1[i], SOURCE_K2[i], D2Entry::new); + } + + /** + * get-first pattern for CHM to avoid capturing-lambda allocation on hits — the idiomatic + * equivalent of D2.getOrCreate on a mostly-populated table. + */ + @Benchmark + public Long getOrCreate_concurrentHashMap(SharedState s, ThreadState t) { + int i = t.next(); + Key2 key = new Key2(SOURCE_K1[i], SOURCE_K2[i]); + Long existing = s.concurrentHashMap.get(key); + if (existing != null) { + return existing; + } + return s.concurrentHashMap.computeIfAbsent(key, k -> 0L); + } +} diff --git a/internal-api/src/main/java/datadog/trace/util/ConcurrentHashtable.java b/internal-api/src/main/java/datadog/trace/util/ConcurrentHashtable.java new file mode 100644 index 00000000000..b7b13a27d07 --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/util/ConcurrentHashtable.java @@ -0,0 +1,219 @@ +package datadog.trace.util; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Concurrent counterpart to {@link Hashtable}. Provides lock-free reads and locked writes for + * {@link D1} (single-key) and {@link D2} (composite-key) tables. + * + *

Like {@link Hashtable}, capacity is fixed at construction and the table does not resize. + * Unlike {@link Hashtable}, all operations are safe for concurrent access without external + * synchronization. + * + *

The primary advantage over {@link java.util.concurrent.ConcurrentHashMap} for composite-key + * use cases is that {@link D2#get(Object, Object)} and {@link D2#getOrCreate(Object, Object, + * BiFunction)} accept key parts directly — no composite key object is allocated for the lookup. + * {@code ConcurrentHashMap} requires a wrapper object whose ownership may transfer to the map on + * insert; escape analysis must conservatively assume the key escapes even on hit paths, preventing + * scalar replacement. + * + *

Memory model. Bucket slots are held in an {@link AtomicReferenceArray}, so each {@link + * #get} begins with a volatile read of the slot. Entries are inserted at the bucket head: the new + * entry's {@code next} pointer is set before the volatile slot write, so any subsequent volatile + * read of that slot carries happens-before over the full chain — chain {@code next} fields do not + * need to be volatile. + */ +public final class ConcurrentHashtable { + private ConcurrentHashtable() {} + + /** + * Single-key concurrent hash table. Lock-free on hit; locked on miss. + * + * @param the key type + * @param the user's {@link Hashtable.D1.Entry D1.Entry<K>} subclass + */ + public static final class D1> { + + private final AtomicReferenceArray buckets; + private final AtomicInteger size = new AtomicInteger(); + + public D1(int capacity) { + this.buckets = new AtomicReferenceArray<>(Hashtable.Support.sizeFor(capacity)); + } + + public int size() { + return size.get(); + } + + @SuppressWarnings("unchecked") + public TEntry get(K key) { + long keyHash = Hashtable.D1.Entry.hash(key); + for (TEntry te = (TEntry) buckets.get(Support.bucketIndex(buckets, keyHash)); + te != null; + te = te.next()) { + if (te.keyHash == keyHash && te.matches(key)) { + return te; + } + } + return null; + } + + /** + * Returns the entry for {@code key}, creating one via {@code creator} if absent. Lock-free on + * hit; acquires a table-level lock on miss. Re-checks under the lock to avoid duplicate entries + * under concurrent misses. + */ + @SuppressWarnings("unchecked") + public TEntry getOrCreate(K key, Function creator) { + long keyHash = Hashtable.D1.Entry.hash(key); + int index = Support.bucketIndex(buckets, keyHash); + for (TEntry te = (TEntry) buckets.get(index); te != null; te = te.next()) { + if (te.keyHash == keyHash && te.matches(key)) { + return te; + } + } + synchronized (this) { + for (TEntry te = (TEntry) buckets.get(index); te != null; te = te.next()) { + if (te.keyHash == keyHash && te.matches(key)) { + return te; + } + } + TEntry newEntry = creator.apply(key); + newEntry.setNext((TEntry) buckets.get(index)); + buckets.set(index, newEntry); + size.incrementAndGet(); + return newEntry; + } + } + + public void forEach(Consumer consumer) { + Support.forEach(buckets, consumer); + } + + /** + * Context-passing forEach. Avoids a capturing-lambda allocation — pass a non-capturing {@link + * BiConsumer} (typically a {@code static final}) plus whatever side-band state it needs. + */ + public void forEach(T context, BiConsumer consumer) { + Support.forEach(buckets, context, consumer); + } + } + + /** + * Two-key (composite-key) concurrent hash table. Lock-free on hit; locked on miss. + * + *

Key parts are passed directly to {@link #get} and {@link #getOrCreate}, eliminating the + * per-lookup composite key object allocation that {@code ConcurrentHashMap, V>} + * requires. + * + * @param first key type + * @param second key type + * @param the user's {@link Hashtable.D2.Entry D2.Entry<K1, K2>} subclass + */ + public static final class D2> { + + private final AtomicReferenceArray buckets; + private final AtomicInteger size = new AtomicInteger(); + + public D2(int capacity) { + this.buckets = new AtomicReferenceArray<>(Hashtable.Support.sizeFor(capacity)); + } + + public int size() { + return size.get(); + } + + @SuppressWarnings("unchecked") + public TEntry get(K1 key1, K2 key2) { + long keyHash = Hashtable.D2.Entry.hash(key1, key2); + for (TEntry te = (TEntry) buckets.get(Support.bucketIndex(buckets, keyHash)); + te != null; + te = te.next()) { + if (te.keyHash == keyHash && te.matches(key1, key2)) { + return te; + } + } + return null; + } + + /** + * Returns the entry for {@code (key1, key2)}, creating one via {@code creator} if absent. + * Lock-free on hit; acquires a table-level lock on miss. Re-checks under the lock to avoid + * duplicate entries under concurrent misses. + * + *

The {@code creator} should build an entry whose {@code keyHash} equals {@link + * Hashtable.D2.Entry#hash(Object, Object) D2.Entry.hash(key1, key2)}. + */ + @SuppressWarnings("unchecked") + public TEntry getOrCreate( + K1 key1, K2 key2, BiFunction creator) { + long keyHash = Hashtable.D2.Entry.hash(key1, key2); + int index = Support.bucketIndex(buckets, keyHash); + for (TEntry te = (TEntry) buckets.get(index); te != null; te = te.next()) { + if (te.keyHash == keyHash && te.matches(key1, key2)) { + return te; + } + } + synchronized (this) { + for (TEntry te = (TEntry) buckets.get(index); te != null; te = te.next()) { + if (te.keyHash == keyHash && te.matches(key1, key2)) { + return te; + } + } + TEntry newEntry = creator.apply(key1, key2); + newEntry.setNext((TEntry) buckets.get(index)); + buckets.set(index, newEntry); + size.incrementAndGet(); + return newEntry; + } + } + + public void forEach(Consumer consumer) { + Support.forEach(buckets, consumer); + } + + /** + * Context-passing forEach. Avoids a capturing-lambda allocation — pass a non-capturing {@link + * BiConsumer} (typically a {@code static final}) plus whatever side-band state it needs. + */ + public void forEach(T context, BiConsumer consumer) { + Support.forEach(buckets, context, consumer); + } + } + + /** Building blocks for concurrent hash-table operations, mirroring {@link Hashtable.Support}. */ + public static final class Support { + private Support() {} + + public static int bucketIndex(AtomicReferenceArray buckets, long keyHash) { + return (int) (keyHash & (buckets.length() - 1)); + } + + @SuppressWarnings("unchecked") + public static void forEach( + AtomicReferenceArray buckets, Consumer consumer) { + for (int i = 0; i < buckets.length(); i++) { + for (TEntry te = (TEntry) buckets.get(i); te != null; te = te.next()) { + consumer.accept(te); + } + } + } + + @SuppressWarnings("unchecked") + public static void forEach( + AtomicReferenceArray buckets, + T context, + BiConsumer consumer) { + for (int i = 0; i < buckets.length(); i++) { + for (TEntry te = (TEntry) buckets.get(i); te != null; te = te.next()) { + consumer.accept(context, te); + } + } + } + } +} diff --git a/internal-api/src/test/java/datadog/trace/util/ConcurrentHashtableD1Test.java b/internal-api/src/test/java/datadog/trace/util/ConcurrentHashtableD1Test.java new file mode 100644 index 00000000000..aff0c47537a --- /dev/null +++ b/internal-api/src/test/java/datadog/trace/util/ConcurrentHashtableD1Test.java @@ -0,0 +1,230 @@ +package datadog.trace.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Test; + +class ConcurrentHashtableD1Test { + + @Test + void getReturnsMappedEntry() { + ConcurrentHashtable.D1 table = new ConcurrentHashtable.D1<>(8); + StringEntry e = table.getOrCreate("hello", k -> new StringEntry(k, 42)); + assertSame(e, table.get("hello")); + assertNull(table.get("world")); + } + + @Test + void getOrCreateOnMissBuildsEntry() { + ConcurrentHashtable.D1 table = new ConcurrentHashtable.D1<>(8); + int[] createCount = {0}; + StringEntry created = + table.getOrCreate( + "a", + k -> { + createCount[0]++; + return new StringEntry(k, 1); + }); + assertNotNull(created); + assertEquals(1, table.size()); + assertEquals(1, createCount[0]); + assertSame(created, table.get("a")); + } + + @Test + void getOrCreateOnHitSkipsCreator() { + ConcurrentHashtable.D1 table = new ConcurrentHashtable.D1<>(8); + StringEntry seeded = table.getOrCreate("a", k -> new StringEntry(k, 100)); + int[] createCount = {0}; + StringEntry got = + table.getOrCreate( + "a", + k -> { + createCount[0]++; + return new StringEntry(k, 999); + }); + assertSame(seeded, got); + assertEquals(1, table.size()); + assertEquals(0, createCount[0]); + } + + @Test + void nullKeyIsSupported() { + ConcurrentHashtable.D1 table = new ConcurrentHashtable.D1<>(8); + StringEntry e = table.getOrCreate(null, k -> new StringEntry(k, 0)); + assertNotNull(e); + assertSame(e, table.get(null)); + } + + @Test + void forEachVisitsAllEntries() { + ConcurrentHashtable.D1 table = new ConcurrentHashtable.D1<>(8); + table.getOrCreate("a", k -> new StringEntry(k, 1)); + table.getOrCreate("b", k -> new StringEntry(k, 2)); + table.getOrCreate("c", k -> new StringEntry(k, 3)); + Set seen = new HashSet<>(); + table.forEach(e -> seen.add(e.key)); + assertEquals(3, seen.size()); + assertTrue(seen.contains("a")); + assertTrue(seen.contains("b")); + assertTrue(seen.contains("c")); + } + + @Test + void forEachWithContextPassesContext() { + ConcurrentHashtable.D1 table = new ConcurrentHashtable.D1<>(8); + table.getOrCreate("x", k -> new StringEntry(k, 10)); + table.getOrCreate("y", k -> new StringEntry(k, 20)); + Set seen = new HashSet<>(); + table.forEach(seen, (ctx, e) -> ctx.add(e.key)); + assertEquals(2, seen.size()); + assertTrue(seen.contains("x")); + assertTrue(seen.contains("y")); + } + + @Test + void concurrentGetOrCreateProducesExactlyOneEntry() throws InterruptedException { + ConcurrentHashtable.D1 table = new ConcurrentHashtable.D1<>(8); + int threads = 16; + CountDownLatch ready = new CountDownLatch(threads); + CountDownLatch go = new CountDownLatch(1); + AtomicInteger createCount = new AtomicInteger(); + + Thread[] workers = new Thread[threads]; + for (int i = 0; i < threads; i++) { + workers[i] = + new Thread( + () -> { + ready.countDown(); + try { + go.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return; + } + table.getOrCreate( + "shared", + k -> { + createCount.incrementAndGet(); + return new StringEntry(k, 1); + }); + }); + workers[i].start(); + } + ready.await(); + go.countDown(); + for (Thread w : workers) { + w.join(); + } + + assertEquals(1, table.size()); + assertEquals(1, createCount.get()); + } + + @Test + void chainedEntriesInSameBucketAreAllReachable() { + // 2 buckets: keyHash & 1 determines the slot. Hashes 0 and 2 both land in bucket 0. + ConcurrentHashtable.D1 table = new ConcurrentHashtable.D1<>(2); + CollidingKey a = new CollidingKey("a", 0); + CollidingKey b = new CollidingKey("b", 0); // same bucket as a + CollidingKey c = new CollidingKey("c", 2); // 2 & 1 == 0, same bucket + CollidingEntry ea = table.getOrCreate(a, CollidingEntry::new); + CollidingEntry eb = table.getOrCreate(b, CollidingEntry::new); + CollidingEntry ec = table.getOrCreate(c, CollidingEntry::new); + assertEquals(3, table.size()); + assertSame(ea, table.get(a)); + assertSame(eb, table.get(b)); + assertSame(ec, table.get(c)); + assertNull(table.get(new CollidingKey("d", 0))); // same bucket, different label → miss + } + + @Test + void concurrentDistinctKeyInsertionsAreAllRetained() throws InterruptedException { + int threads = 16; + String[] keys = new String[threads]; + for (int i = 0; i < threads; i++) { + keys[i] = "key-" + i; + } + ConcurrentHashtable.D1 table = + new ConcurrentHashtable.D1<>(threads * 2); + CountDownLatch ready = new CountDownLatch(threads); + CountDownLatch go = new CountDownLatch(1); + + Thread[] workers = new Thread[threads]; + for (int i = 0; i < threads; i++) { + final String key = keys[i]; + workers[i] = + new Thread( + () -> { + ready.countDown(); + try { + go.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return; + } + table.getOrCreate(key, k -> new StringEntry(k, 1)); + }); + workers[i].start(); + } + ready.await(); + go.countDown(); + for (Thread w : workers) { + w.join(); + } + + assertEquals(threads, table.size()); + for (String key : keys) { + assertNotNull(table.get(key)); + } + } + + // Reuses Hashtable.D1.Entry — ConcurrentHashtable.D1 accepts any D1.Entry subclass. + private static final class StringEntry extends Hashtable.D1.Entry { + final int value; + + StringEntry(String key, int value) { + super(key); + this.value = value; + } + } + + /** Key with a fixed hashCode to force deterministic bucket placement. */ + private static final class CollidingKey { + final String label; + final int fixedHash; + + CollidingKey(String label, int fixedHash) { + this.label = label; + this.fixedHash = fixedHash; + } + + @Override + public int hashCode() { + return fixedHash; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof CollidingKey)) { + return false; + } + CollidingKey that = (CollidingKey) o; + return fixedHash == that.fixedHash && label.equals(that.label); + } + } + + private static final class CollidingEntry extends Hashtable.D1.Entry { + CollidingEntry(CollidingKey key) { + super(key); + } + } +} diff --git a/internal-api/src/test/java/datadog/trace/util/ConcurrentHashtableD2Test.java b/internal-api/src/test/java/datadog/trace/util/ConcurrentHashtableD2Test.java new file mode 100644 index 00000000000..46089bf6563 --- /dev/null +++ b/internal-api/src/test/java/datadog/trace/util/ConcurrentHashtableD2Test.java @@ -0,0 +1,197 @@ +package datadog.trace.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Test; + +class ConcurrentHashtableD2Test { + + @Test + void pairKeysParticipateInIdentity() { + ConcurrentHashtable.D2 table = new ConcurrentHashtable.D2<>(8); + PairEntry ab = table.getOrCreate("a", 1, PairEntry::new); + PairEntry ac = table.getOrCreate("a", 2, PairEntry::new); + PairEntry bb = table.getOrCreate("b", 1, PairEntry::new); + assertEquals(3, table.size()); + assertSame(ab, table.get("a", 1)); + assertSame(ac, table.get("a", 2)); + assertSame(bb, table.get("b", 1)); + assertNull(table.get("a", 3)); + } + + @Test + void getOrCreateOnMissBuildsEntryViaCreator() { + ConcurrentHashtable.D2 table = new ConcurrentHashtable.D2<>(8); + int[] createCount = {0}; + PairEntry created = + table.getOrCreate( + "a", + 1, + (k1, k2) -> { + createCount[0]++; + return new PairEntry(k1, k2); + }); + assertNotNull(created); + assertEquals("a", created.key1); + assertEquals(Integer.valueOf(1), created.key2); + assertEquals(1, table.size()); + assertEquals(1, createCount[0]); + assertSame(created, table.get("a", 1)); + } + + @Test + void getOrCreateOnHitSkipsCreator() { + ConcurrentHashtable.D2 table = new ConcurrentHashtable.D2<>(8); + PairEntry seeded = table.getOrCreate("a", 1, PairEntry::new); + int[] createCount = {0}; + PairEntry got = + table.getOrCreate( + "a", + 1, + (k1, k2) -> { + createCount[0]++; + return new PairEntry(k1, k2); + }); + assertSame(seeded, got); + assertEquals(1, table.size()); + assertEquals(0, createCount[0]); + } + + @Test + void forEachVisitsBothPairs() { + ConcurrentHashtable.D2 table = new ConcurrentHashtable.D2<>(8); + table.getOrCreate("a", 1, PairEntry::new); + table.getOrCreate("b", 2, PairEntry::new); + Set seen = new HashSet<>(); + table.forEach(e -> seen.add(e.key1 + ":" + e.key2)); + assertEquals(2, seen.size()); + assertTrue(seen.contains("a:1")); + assertTrue(seen.contains("b:2")); + } + + @Test + void forEachWithContextPassesContextToConsumer() { + ConcurrentHashtable.D2 table = new ConcurrentHashtable.D2<>(8); + table.getOrCreate("a", 1, PairEntry::new); + table.getOrCreate("b", 2, PairEntry::new); + Set seen = new HashSet<>(); + table.forEach(seen, (ctx, e) -> ctx.add(e.key1 + ":" + e.key2)); + assertEquals(2, seen.size()); + assertTrue(seen.contains("a:1")); + assertTrue(seen.contains("b:2")); + } + + @Test + void concurrentGetOrCreateProducesExactlyOneEntry() throws InterruptedException { + ConcurrentHashtable.D2 table = new ConcurrentHashtable.D2<>(8); + int threads = 16; + CountDownLatch ready = new CountDownLatch(threads); + CountDownLatch go = new CountDownLatch(1); + AtomicInteger createCount = new AtomicInteger(); + + Thread[] workers = new Thread[threads]; + for (int i = 0; i < threads; i++) { + workers[i] = + new Thread( + () -> { + ready.countDown(); + try { + go.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return; + } + table.getOrCreate( + "shared", + 42, + (k1, k2) -> { + createCount.incrementAndGet(); + return new PairEntry(k1, k2); + }); + }); + workers[i].start(); + } + ready.await(); + go.countDown(); + for (Thread w : workers) { + w.join(); + } + + assertEquals(1, table.size()); + assertEquals(1, createCount.get()); + } + + @Test + void chainedEntriesInSameBucketAreAllReachable() { + // 2 buckets: 4 entries guarantees at least 2 share a bucket by pigeonhole. + ConcurrentHashtable.D2 table = new ConcurrentHashtable.D2<>(2); + PairEntry e1 = table.getOrCreate("a", 1, PairEntry::new); + PairEntry e2 = table.getOrCreate("a", 2, PairEntry::new); + PairEntry e3 = table.getOrCreate("b", 1, PairEntry::new); + PairEntry e4 = table.getOrCreate("b", 2, PairEntry::new); + assertEquals(4, table.size()); + assertSame(e1, table.get("a", 1)); + assertSame(e2, table.get("a", 2)); + assertSame(e3, table.get("b", 1)); + assertSame(e4, table.get("b", 2)); + assertNull(table.get("a", 3)); + } + + @Test + void concurrentDistinctKeyInsertionsAreAllRetained() throws InterruptedException { + int threads = 16; + String[] k1s = new String[threads]; + Integer[] k2s = new Integer[threads]; + for (int i = 0; i < threads; i++) { + k1s[i] = "key-" + i; + k2s[i] = i; + } + ConcurrentHashtable.D2 table = + new ConcurrentHashtable.D2<>(threads * 2); + CountDownLatch ready = new CountDownLatch(threads); + CountDownLatch go = new CountDownLatch(1); + + Thread[] workers = new Thread[threads]; + for (int i = 0; i < threads; i++) { + final String k1 = k1s[i]; + final Integer k2 = k2s[i]; + workers[i] = + new Thread( + () -> { + ready.countDown(); + try { + go.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return; + } + table.getOrCreate(k1, k2, PairEntry::new); + }); + workers[i].start(); + } + ready.await(); + go.countDown(); + for (Thread w : workers) { + w.join(); + } + + assertEquals(threads, table.size()); + for (int i = 0; i < threads; i++) { + assertNotNull(table.get(k1s[i], k2s[i])); + } + } + + private static final class PairEntry extends Hashtable.D2.Entry { + PairEntry(String key1, Integer key2) { + super(key1, key2); + } + } +}