编程

Java ConcurrentMap 指南

327 2024-06-05 04:12:00

1. 概述

Map 天然是 Java 集合中最广泛的样式之一。

而且,重要的是,HashMap 不是线程安全的实现,而 Hashtable 确实通过同步操作提供了线程安全。

尽管 Hashtable 是线程安全的,但它的效率不是很高。另一个完全同步的 Map(Collections.synchronizedMap)也没有表现出很高的效率。如果我们想要在高并发下具有高吞吐量的线程安全性,那么这些实现是不可取的。

为了解决这个问题,Java Collections Framework 在 Java 1.5 中引入了 ConcurrentMap

以下的讨论是基于 Java 1.8。

2. ConcurrentMap

ConcurrentMap 是 Map 接口的扩展。它旨在为解决吞吐量与线程安全性之间的协调问题提供一种结构和指导。

通过重写几个接口默认方法,ConcurrentMap 为有效的实现提供了指导,以提供线程安全和内存一致的原子操作。

有几个默认实现被重写了,禁用了对 null 键/值的支持:

  • getOrDefault
  • forEach
  • replaceAll
  • computeIfAbsent
  • computeIfPresent
  • compute
  • merge

以下没有默认的接口实现 API 也被重写以支持原子性:

  • putIfAbsent
  • remove
  • replace(key, oldValue, newValue)
  • replace(key, value)

其余 action 直接继承,与 Map 基本一致。

3. ConcurrentHashMap

ConcurrentHashMap 是开箱即用的 ConcurrentMap 的实现。

为了获得更好的性能,它由一个节点数组组成,作为后台的 bucket(在 Java 8 之前是表格 segment),并在更新期间主要使用 CAS 操作。

表格 bucket 在第一次插入时被延迟初始化。每个 bucket 都可以通过锁定 bucket 中的第一个节点来独立锁定。读取操作不会被阻止,并且更新争用被最小化。

所需的 segment 数与访问表的线程数有关,因此每个 segment 的更新在大多数情况下不会超过一个。

在 Java 8 之前,所需的 “segments” 数量与访问表的线程数量有关,因此每个 segment 的更新在大多数情况下不会超过一个。

这就是为什么与 HashMap 相比,构造函数提供了额外的 concurrencyLevel 参数来控制要使用的估计线程数:

public ConcurrentHashMap(
public ConcurrentHashMap(
 int initialCapacity, float loadFactor, int concurrencyLevel)

其他两个参数:initialCapacityloadFactor 的工作原理与 HashMap 完全相同。

然而,由于Java 8,构造函数仅用于向后兼容性:参数只能影响映射的初始大小。

3.1. 线程安全

ConcurrentMap 保证了多线程环境中键/值操作的内存一致性。

在将对象作为键或值放入 ConcurrentMap 之前,线程中的操作发生在另一个线程中访问或删除该对象后续的操作之前。

为了证实这一点,我们来看看内存不一致的情况:

@Test
public void givenHashMap_whenSumParallel_thenError() throws Exception {
    Map<String, Integer> map = new HashMap<>();
    List<Integer> sumList = parallelSum100(map, 100);

    assertNotEquals(1, sumList
      .stream()
      .distinct()
      .count());
    long wrongResultCount = sumList
      .stream()
      .filter(num -> num != 100)
      .count();
    
    assertTrue(wrongResultCount > 0);
}

private List<Integer> parallelSum100(Map<String, Integer> map, 
  int executionTimes) throws InterruptedException {
    List<Integer> sumList = new ArrayList<>(1000);
    for (int i = 0; i < executionTimes; i++) {
        map.put("test", 0);
        ExecutorService executorService = 
          Executors.newFixedThreadPool(4);
        for (int j = 0; j < 10; j++) {
            executorService.execute(() -> {
                for (int k = 0; k < 10; k++)
                    map.computeIfPresent(
                      "test", 
                      (key, value) -> value + 1
                    );
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(5, TimeUnit.SECONDS);
        sumList.add(map.get("test"));
    }
    return sumList;
}

对于每个并发的 map.computeIfPresent 操作,HashMap 不能提供当前整数值的一致视图,从而导致不一致和不希望的结果。

对于 ConcurrentHashMap,我们可以得到一致且正确的结果:

@Test
public void givenConcurrentMap_whenSumParallel_thenCorrect() 
  throws Exception {
    Map<String, Integer> map = new ConcurrentHashMap<>();
    List<Integer> sumList = parallelSum100(map, 1000);

    assertEquals(1, sumList
      .stream()
      .distinct()
      .count());
    long wrongResultCount = sumList
      .stream()
      .filter(num -> num != 100)
      .count();
    
    assertEquals(0, wrongResultCount);
}

3.2. Null Key/Value

ConcurrentMap 提供的大部分 API 不允许键或者值为 null,比如:

@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenPutWithNullKey_thenThrowsNPE() {
    concurrentMap.put(null, new Object());
}

@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenPutNullValue_thenThrowsNPE() {
    concurrentMap.put("test", null);
}

不过,对于 compute*merge 操作,计算值是 null,其指示键值映射如果存在则被移除或者如果先前不存在则保持不存在。

@Test
public void givenKeyPresent_whenComputeRemappingNull_thenMappingRemoved() {
    Object oldValue = new Object();
    concurrentMap.put("test", oldValue);
    concurrentMap.compute("test", (s, o) -> null);

    assertNull(concurrentMap.get("test"));
}

3.3. 流(Stream)支持

Java 8 也在 ConcurrentHashMap 提供了 Stream 支持。

与大部分 stream 方法不同,批量(顺序和并发)操作允许安全地进行并发修改。

ConcurrentModificationException 异常不会抛出,这也适用于它的迭代器。与 流相关,还添加了几个 forEach*searchreduce* 方法,以支持更丰富的遍历和 map reduce 操作。

3.4. 性能

在底层,ConcurrentHashMap 与 HashMap 有些相似,数据访问和更新都基于哈希表(尽管更复杂)。

当然,对于数据检索和更新,ConcurrentHashMap 在大多数并发情况下应该会生成更好的性能。

让我们为 getput 性能编写一个快速的微基准测试,并将其与 HashtableCollections.synchronizedMap 进行比较,在 4 个线程中运行这两个操作 500,000 次。

@Test
public void givenMaps_whenGetPut500KTimes_thenConcurrentMapFaster() 
  throws Exception {
    Map<String, Object> hashtable = new Hashtable<>();
    Map<String, Object> synchronizedHashMap = 
      Collections.synchronizedMap(new HashMap<>());
    Map<String, Object> concurrentHashMap = new ConcurrentHashMap<>();

    long hashtableAvgRuntime = timeElapseForGetPut(hashtable);
    long syncHashMapAvgRuntime = 
      timeElapseForGetPut(synchronizedHashMap);
    long concurrentHashMapAvgRuntime = 
      timeElapseForGetPut(concurrentHashMap);

    assertTrue(hashtableAvgRuntime > concurrentHashMapAvgRuntime);
    assertTrue(syncHashMapAvgRuntime > concurrentHashMapAvgRuntime);
}

private long timeElapseForGetPut(Map<String, Object> map) 
  throws InterruptedException {
    ExecutorService executorService = 
      Executors.newFixedThreadPool(4);
    long startTime = System.nanoTime();
    for (int i = 0; i < 4; i++) {
        executorService.execute(() -> {
            for (int j = 0; j < 500_000; j++) {
                int value = ThreadLocalRandom
                  .current()
                  .nextInt(10000);
                String key = String.valueOf(value);
                map.put(key, value);
                map.get(key);
            }
        });
    }
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);
    return (System.nanoTime() - startTime) / 500_000;
}

请记住,微观基准测试只关注单个场景,并不总是真实世界性能的良好反映。

也就是说,在具有平均开发系统的 OS X 系统上,我们看到了 100 次连续运行的平均采样结果(以纳秒为单位):

Hashtable: 1142.45
SynchronizedHashMap: 1273.89
ConcurrentHashMap: 230.2

在多线程环境中,期望多个线程访问一个公共 Map,ConcurrentHashMap 显然更可取。

然而,当 Map 只能由单个线程访问时,HashMap 由于其简单性和可靠的性能而成为更好的选择。

3.5. 陷阱

检索操作通常不会阻塞 ConcurrentHashMap,并且可能与更新操作重叠。因此,为了获得更好的性能,它们只反映最近完成的更新操作的结果,如官方 Javadoc 中所述。

还有其他几个事实需要牢记:

  • 聚合状态方法(包括 sizeisEmptycontainsValue)的结果通常只有在 Map 未在其他线程中进行并发更新时才有用:
@Test
public void givenConcurrentMap_whenUpdatingAndGetSize_thenError() 
  throws InterruptedException {
    Runnable collectMapSizes = () -> {
        for (int i = 0; i < MAX_SIZE; i++) {
            mapSizes.add(concurrentMap.size());
        }
    };
    Runnable updateMapData = () -> {
        for (int i = 0; i < MAX_SIZE; i++) {
            concurrentMap.put(String.valueOf(i), i);
        }
    };
    executorService.execute(updateMapData);
    executorService.execute(collectMapSizes);
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);

    assertNotEquals(MAX_SIZE, mapSizes.get(MAX_SIZE - 1).intValue());
    assertEquals(MAX_SIZE, concurrentMap.size());
}

如果并发更新受到严格控制,聚合状态仍然是可靠的。

尽管这些聚合状态方法不能保证实时准确性,但它们可能足以用于监测或估计目的。

请注意,ConcurrentHashMapsize() 的使用应该被 mappingCount() 所取代,因为后一种方法返回一个 long 计数,尽管本质上它们是基于相同的预估。

  • hashCode 很重要:请注意,使用许多具有完全相同 hashCode() 的键肯定会降低任何哈希表的性能。

当 key 是可比较的时,为了改善影响,ConcurrentHashMap 可以使用 key 之间的比较顺序来帮助打破关系。不过,我们应该尽可能避免使用相同的 hashCode()

  • 迭代器只设计用于单个线程,因为它们提供了弱一致性,而不是快速的失败遍历,而且它们永远不会抛出 ConcurrentModificationException
  • 默认的初始表容量为 16,并根据指定的并发级别进行调整:
public ConcurrentHashMap(
  int initialCapacity, float loadFactor, int concurrencyLevel) {
 
    //...
    if (initialCapacity < concurrencyLevel) {
        initialCapacity = concurrencyLevel;
    }
    //...
}
  • 关于重映射函数的注意事项:尽管我们可以使用提供的 computemerge* 方法进行重映射操作,但我们应该保持它们的快速、简短和简单,并专注于当前的映射,以避免意外的阻塞。
  • ConcurrentHashMap 中的键不按排序顺序排列,因此对于需要排序的情况,ConcurrentSkipListMap 是一个合适的选择。

4. ConcurrentNavigableMap

对于需要排序 key 的情况,我们可以使用 ConcurrentSkipListMap,这是并发版本的 TreeMap

作为 ConcurrentMap 的补充,ConcurrentNavigableMap 支持其键的总排序(默认情况下按升序),并且可以并发导航。为实现并发兼容性,将重写返回映射视图的方法:

  • subMap
  • headMap
  • tailMap
  • subMap
  • headMap
  • tailMap
  • descendingMap

keySet() 视图的迭代器和拆分器增强了弱内存一致性:

  • navigableKeySet
  • keySet
  • descendingKeySet

5. ConcurrentSkipListMap

之前,我们已经介绍了 NavigableMap 接口及其实现 TreeMapConcurrentSkipListMap 可以看作是 TreeMap 的可扩展并发版本。

实践中,Java 中没有红黑树的并发实现。SkipLists 的并发变体在 ConcurrentSkipListMap 中实现,为 containsKeygetputremove 操作及其变体提供预期的平均 log(n) 时间成本。除了 TreeMap 的特性外,key 插入、删除、更新和访问操作都有线程安全保障。以下是并发导航时与 TreeMap 的比较:

@Test
public void givenSkipListMap_whenNavConcurrently_thenCountCorrect() 
  throws InterruptedException {
    NavigableMap<Integer, Integer> skipListMap
      = new ConcurrentSkipListMap<>();
    int count = countMapElementByPollingFirstEntry(skipListMap, 10000, 4);
 
    assertEquals(10000 * 4, count);
}

@Test
public void givenTreeMap_whenNavConcurrently_thenCountError() 
  throws InterruptedException {
    NavigableMap<Integer, Integer> treeMap = new TreeMap<>();
    int count = countMapElementByPollingFirstEntry(treeMap, 10000, 4);
 
    assertNotEquals(10000 * 4, count);
}

private int countMapElementByPollingFirstEntry(
  NavigableMap<Integer, Integer> navigableMap, 
  int elementCount, 
  int concurrencyLevel) throws InterruptedException {
 
    for (int i = 0; i < elementCount * concurrencyLevel; i++) {
        navigableMap.put(i, i);
    }
    
    AtomicInteger counter = new AtomicInteger(0);
    ExecutorService executorService
      = Executors.newFixedThreadPool(concurrencyLevel);
    for (int j = 0; j < concurrencyLevel; j++) {
        executorService.execute(() -> {
            for (int i = 0; i < elementCount; i++) {
                if (navigableMap.pollFirstEntry() != null) {
                    counter.incrementAndGet();
                }
            }
        });
    }
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);
    return counter.get();
}

对幕后性能问题的全面解释超出了本文的范围。详细信息可以在 ConcurrentSkipListMap 的 Javadoc 中找到,该 Javadoc 位于 src.zip 文件的 java/util/concurrent 下。

6. 小结

本文中,我们主要介绍了 ConcurrentMap 接口和 ConcurrentHashMap 的功能,并介绍了所需的 key 排序 ConcurrentNavigableMap