0%

ConcurrentHashMap源码学习(java 7)

ConcurrentHashMap 的实现比较复杂,里面涉及到的内容很多,而且有很多的性能优化的策略,想全部搞清楚比较困难,所以主要对里面感兴趣的并发的处理方面,相关的几个函数的实现看下,难免有理解不到位的地方,以后随时修改补充吧。

这里看的是 java 7 中的实现。

另外,推荐一篇文章,写的很详细: The Concurrency Of ConcurrentHashMap

涉及到的并发相关内容

内存屏障

内存屏障用于:

  • 内存屏障两侧的指令禁止重排序
  • load屏障令高速缓存失效,屏障之后的指令强制从主存加载数据;store令屏障前的高速缓存中的数据强制刷新到主存

一个出现了无数次的表格,应该来自《Java并发编程的艺术》:

屏障类型 屏障类型 说明
LoadLoad Barriers Load1;LoadLoad;Load2 确保 Load1 数据的装载先于 Load2 及所有后续装载指令的装载
StoreStore Barriers Store1;StoreStore;Store2 确保 Store1 数据对其他处理器可见(刷新到内存)先于 Store2 及所有后续存储指令的存储
LoadStore Barriers Load1;LoadStore;Store2 确保 Load1 数据装载先于 Store2 及所有后续的存储指令刷新到内存
StoreLoad Barriers Store1;StoreLoad;Load2 确保 Store1 数据对其他处理器变得可见(指刷新到内存)先于 Load2 及所有后续装载指令的装载。StoreLoad Barriers 会使该屏障之前的所有内存访问指令(存储和装载指令)完成之后,才执行该屏障之后的内存访问指令

举一个例子,对于 StoreLoad Barriers,有:Without StoreLoad barrier, a processor can write data only to its buffer without synchronizing with other processors (e.g. by writing to main memory), so another processor can then load the data either from its buffer or from main memory but will not get the most recent value.

volatile

通过内存屏障实现 volatile 的内存语义,具体来说:

  • 在每个 volatile 写操作的前面插入一个 StoreStore 屏障
  • 在每个 volatile 写操作的后面插入一个 StoreLoad 屏障
  • 在每个 volatile 读操作的后面插入一个 LoadLoad 屏障
  • 在每个 volatile 读操作的后面插入一个 LoadStore 屏障

表现为:volatile 保证它修饰的变量的可见性,“当线程 A 首先写入一个 volatile 变量并且线程 B 随后读取该变量时,在写入 volatile 变量之前对 A 可见的所有变量,在 B 读取了 volatile 变量后,对 B 也是可见的”,“从内存可见性角度来看,写入 volatile 变量相当于退出同步代码块,而读取 volatile 变量就相当于进入同步代码块”。

但是要注意,volatile 修饰数组时,保证的是数组的引用的可见性,但数组元素的可见性是没有保证的。

sun.misc.Unsafe 类提供的一些操作

  1. CAS
  • 即 compare and swap,CAS(V,A,B),V 表示要更新的变量;A 是期望值,可以理解为,对于 变量V ,在 V = A 的情况下,修改其值不会产生问题,那么 V 就是可以修改的(隐藏了ABA的问题,但没什么影响,实在需要解决可以用 AtomicStampedReference);B 是要修改成的目标值。CAS(V,A,B)将返回是否修改成功,修改失败的情况下,该线程可以继续尝试修改(乐观锁);
  • 整个 CAS 过程对应的一条 CPU 指令,是原子操作;
  1. getObjectVolatile,volatile 读;

  2. putObjectVolatile,volatile 写;

  3. putOrderedObject,可看作是 putObjectVolatile 的延时版本,写入不一定对其他线程立即可见,但是相比 volatile 写操作,可以减小性能代价。因为,实际上,它对应的是 StoreStore 内存屏障而不是 StoreLoad。

数据结构

ConcurrentHashMap 维护一个 Segment 的数组 segments,每个 Segment 维护一个 table 数组,数组元素是 HashEntry,HashEntry 则是链表的一个节点。可以将 Segment 看作一个 HashMap。

处理并发的基本思路

使用分段锁来减小锁的粒度,即,需要对某个 key 所在的节点加锁时,对它所在的 Segment 加锁,这样其他 Segment 仍然可由其他线程安全的并发写入。这与之前的简单的全局互斥锁封装的 Synchronized*** 相比,类似于数据库中的行锁和表锁,增加了并发量。

只对写操作(put、remove、clear、rehash 等)和必要时的 size() 计数加锁,读操作不加锁,依靠 volatile 和 UNSAFE 类提供的 getObjectVolatile() 等操作实现。

很多的使用了 putOrderedObject() 代替 volatile 写来优化性能,,虽然会牺牲一些一致性。

初始化 Segment

默认创建 16 个 Segment,并且数组 segments 是 final 变量,不会进行扩容。在初始化 map 时只会初始化 segments[0],其他 Segment 在需要时以 segments[0] 作为模板创建。一旦创建后,segments 和 segments[i] 的引用将不会再修改,作为一个只读量来保证线程安全性。

初始化 segments 和 segments[0] 时,代码如下:

1
2
3
4
5
6
7
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;

要注意的是,这里使用的是 putOrderedObject 来写入 segments[0],因此对其他线程可能不会立即可见。

put、ensureSegment 与 rehash

1
2
3
4
5
6
7
8
9
10
11
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // #1 // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}

如上述,一旦创建后,segments 和 segments[i] 的引用将不会再修改,作为一个只读量来保证线程安全性,所以 #1 处的判断,如果读到的 s 不是 null,那么 s 必然是已经初始化的。这里先进行一个这样的判断而不是直接执行 volatile 读,是减小 volatile 读在性能上的影响。

但是读到的 s 是 null 时,如注释所说,这是“nonvolatile”的,因为这里读取 s 是使用的 UNSAFE.getObject()。因此,在 ensureSegment() 中使用 UNSAFE.getObjectVolatile() 再次检查了 segments 中的值,在写入前仍然要进行检查。而且,和双重校验锁的单例模式的实现类似,双重校验锁最终在同步代码块中检查了要修改的元素的值,在这里,写入 segments 时,依靠 CAS 来保证正确的写入,并且将 CAS 放在 while 循环中,保证一定写入成功,更新的 segments 的元素对其他线程是可见的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}

找到/创建了 Segment 后,将进行 put 操作。

首先尝试获取锁:

1
2
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);

如果直接获取到锁,直接向下执行;不然,会进入 scanAndLockForPut(),等待获取锁,同时在其中会定位应当放入的新节点在 table 中的位置,找到应插入的链表的头节点,遍历这一链表尝试寻找相同 key 的节点,如果找不到则预先创建一个 node,这个 node 将在后续获取到锁之后直接使用,执行这一遍历有两个优势:

  • 遍历链表,可以将链表加载到缓存,后续操作得到性能上的提升
  • 如果要插入的 key 不存在,预先创建一个 node,后续插入时直接使用

scanAndLockForPut() 将在获取到锁后返回,使用头插法进行插入。获取链表的头节点时,使用的是 entryAt() 函数,通过 UNSAFE.getObjectVolatile() 从 table 中读取链表的头节点。虽然 table 的引用是 volatile 的,但是 table 中的元素并不是,为了保证可见性,需要使用 UNSAFE.getObjectVolatile() 执行 volatile 读。

与 HashMap 类似,某个 Segment 内的元素数量超过了 threshold 时,会进行扩容,将 table 数组容量扩容为原来的两倍,之后定位索引的方式与 HashMap 是一致的,可参看令人惊叹的HashMap—数据结构与hash函数。因为使用了可重入锁,rehash() 也只有从 put 函数进入这一个入口,也是加锁的。需要留意的是 rehash 操作在遍历链表时,首先执行了这样一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;

这一段代码是用来“Reuse consecutive sequence at same slot”:在遍历中尝试找到一个节点 lastRun,从这一个节点开始的之后的所有节点,都不需要移动位置,这一个操作看似多余,但其实“Statistically, at the default threshold, only about one-sixth of them need cloning when a table doubles.”

remove

首先调用 segmentForHash() 函数,segmentForHash() 中使用 UNSAFE.getObjectVolatile() 执行 volatile 读保证可见性,检查 key 应在的 Segment 是否为 null,如果是 null 那么一定没有存储这个 key。

如果找到了 key 所在的 Segment,将会尝试获得这一 Segment 的锁执行删除操作,尝试获得锁的过程与 put 操作类似。remove、put、rehash、clear 都是加锁的,所以不必担心它们之间的线程安全性。

之后定位要查找的节点在数组 table 中的位置,table 是 volatile 的,而且对 table 的引用在创建后只有 rehash 时才修改,所以不必担心 table 的引用的线程安全性。定位头节点使用的是 entryAt() 方法,内部是使用 UNSAFE.getObjectVolatile() 执行了 volatile 读操作,这是因为 table 引用本身是 volatile 的,但是数组元素的可见性并没有保证。

最终使用 setEntryAt() 和 setNext() 从链表中移除节点,都是使用了 UNSAFE.putOrderedObject()。因为释放锁时会写入更新的值,所以使用 UNSAFE.putOrderedObject() 相比 UNSAFE.putObjectVolatile() 只是在可见性上做出了一点牺牲,但是性能优化很多。

get

get 函数的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

首先要明确的是对 get 等方法的行为的许诺:“Retrievals reflect the results of the most recently completed update operations holding upon their onset.”,也就是说,get 函数得到的结果,是最近完成的更新的结果,但是因为 get 是不加锁的,可能同时存在 put、remove 等操作,get 得到的不一定是最新的结果。

get 中将 tab 和 e 放在了局部变量中来访问,这导致失去一些一致性,可以看作是一种防御性的思路,首先保证正确性,这也能够满足做出的上述许诺。

比较关心的是 get 与 put、remove 的可见性问题。

get 与 put 并发时,插入节点是头插法,如果在插入新的头节点完成之后(因为 put 使用的是 UNSAFE.putOrderedObject(),所以理论上在 put 完成释放锁之后才保证对其他线程的可见性)进行查询,依赖于 UNSAFE.getObjectVolatile() 保证可见性;如果在插入头节点之前已经在进行 get,那么 get 将沿着链表查询,而新插入的头节点是不会考虑的;在需要 rehash 时,rehash 将更新 table 数组的引用,而 get 中使用局部变量 tab 保存了 table 的原引用,将在原数组上进行遍历。这也是符合上述许诺的。

get 与 remove 并发时,如果移除的是头节点,get 的情况与以上 put 时类似;如果移除的是其他节点,而此时 get 还未到达该节点,仍然通过 volatile 语义保证可见性,如果 get 已经经过了这一节点,那么自然也不会对查询结果有影响。