文章目录
- 一. `HashMap`、`HashTable`与`ConcurrentHashMap`的区别
- 二. JDK1.7中的`ConcurrentHashMap`
- 三. JDK1.8 `ConcurrentHashMap`
一. HashMap
、HashTable
与ConcurrentHashMap
的区别
HashMap
是线程不安全的HashTable
是线程安全的,HashTable
的线程安全是采用在每个方法上面都添加上了synchronized
关键字来修饰,即HashTable
是针对整个table
的锁定。所有访问HashTable
的线程都必须竞争同一把锁。当一个线程访问HashTable
的同步方法的时候,其他线程再访问HashTable
的同步方法的时候,可能会进入阻塞或者轮询状态。比如:线程1使用put
方法来添加元素,线程2不但不能使用put
方法添加元素,也不能使用get
方法来获取元素,所以锁竞争越激烈,该类的效率越低。ConcurrentHashMap
使用分段锁技术,ConcurrentHashMap
是由多个Segment
组成(Segment
下包含了多个Node
,即键值对),每个Segment
都有把锁来实现线程安全,当一个线程占用锁访问其中一段数据的时候,其它段的数据也能被其他的线程访问。
为什么HashTable
慢?
HashTable
之所以效率低下主要是因为其实现使用了synchronized
关键字对put
等操作进行加锁,而synchronized
关键字加锁是对整个对象进行加锁,也就是说在进行put
等修改Hash
表的操作时,锁住了整个Hash
表,从而使得其表现的效率低下。
二. JDK1.7中的ConcurrentHashMap
数据结构
jdk1.7中采用了segment
+HashEntry
的方式进行实现,使用了分段锁机制实现了ConcurrentHashMap
- 一个
ConcurrentHashMap
中包含一个Segment<K,V>[] segments
数组 - 一个
segment
对象中包含一个HashEntry<K,V>[] table
数组
1.7的ConcurrentHashMap
是由Segment+HashEntry
组成,和HashMap
一样,仍然是数组+链表。
ConcurrencyLevel
:默认是16,也就是说ConcurrentHashMap
有16个Segments
,所以理论上,这个时候,最多可以同时支持16个线程并发写,只要他们的操作分别分布在不同的Segments
上,Segments
通过ReentrantLock
来进行加锁。这个值在初始化的时候可以设置为其他值,但是一旦初始化后,它是不可以扩容的。
成员变量
ConcurrentHashMap
在初始化的时候,计算出Segment
数组的大小ssize
和每个Segment
中HashEntry
数组的大小cap
,并初始化Segment
数组的第一个元素。
initialCapacity
:初始容量,整个值指的是整个ConcurrentHashMap
的初始容量,实际操作的时候需要平均分给每个Segment
。ssize
:大小为2的幂次方,默认为16.cap
:大小也为2的幂次方,最小值为2,最终结果根据初始化容量initialCapacity
进行计算。
核心成员变量:
Segment
是ConcurrentHashMap
的一个内部类,主要的组成结构如下:
首先通过Key
根据hash算法定位到元素属于哪个Segment
,之后在对应的Segment
中进行具体的put
操作。
其中HashEntry
组成:
和HashMap
非常类似,唯一的区别就是其中的核心数据如value
,以及链表都是volatile
修饰的,保证了获取时的可见性。
并发度(Concurrency Level)
并发度可以理解为程序运行时能够同时更新ConcurrentHashMap
且不产生锁竞争的最大线程数,实际上就是ConcurrentHashMap
中的分段锁的个数,即Segment[]
的数组长度。ConcurrentHashMap
默认的并发度是16,但用户也可以构造函数中设置并发度,当用户设置并发度的时候,ConcurrentHashMap
会使用大于等于该值的最小2幂指数作为实际并发度(如果用户设置并发度是17,实际并发度则为32)。
运行时,通过将Key的高n位(n=32-segmentShift
)和并发度-1做位与运算定位到所在的Segment
。
初始化
initialCapacity
:初始容量,这个值指出的是整个CocurrentHashMap
的初始容量,实际操作的时候需要平均分给每个segment
。loadFactor
:负载因子,segment
数组不可以扩容,所以这个负载因子是给每个segment
内部使用的。
注意点:JDK7中除了第一个segment
之外,剩余的segments
采用的是延迟初始化的机制:每次put
之前都要检查key
对应的segment
是否为null
,如果是则调用ensureSegment()
以确保对应的segment
被创建。
初始化完成,我们可以得到一个Segment
数组:
初始化过程一共会做如下几件事:Segment
数组的长度为16,在初始化的时候可以指定长度,但是在初始化结束后,不可以修改这个长度。Segment[i]
的默认大小为2,即初始一个Segment
只能装入一个HashEntry
,负载因子为0.75,初始阈值为1.5,即在插入第一个元素的时候不会触发扩容,插入第二个元素的时候就会进行第一次扩容。- 初始化了
Segment[0]
,其他位置的Segment
数组没有初始化 - 参数的相关说明:
(1)ssize
:Segment
数组的长度
(2)sshift
:等于ssize
从1左移的次数
(3)segmentShift
:用于定位参与散列运算的位数,移位数
(4)segmentMask
:散列运算的掩码
(5)cap
:Segment
里面HashEntry
数组的长度
put
过程分析
根据hash
值找到对应的Segment
,Segment
的数组下标是hash
值的最后四位。首先是通过key
定位到Segment
,之后在对应的Segment
中进行具体的put
操作。Segment
内部是由数组+链表组成的。
虽然HashEntry
中的value
是用volatile
关键字修饰的,但是并不能保证并发的原子性,所以put
操作是需要加锁的操作,进到put
方法中首先要尝试加锁,如【第373行】代码所示。
三个关键方法:
ensureSegment
方法
这个是【第1106行】的方法。ConcurrentHashMap
初始化的时候会初始化第一个槽segment[0]
,对于其他槽来说,在插入第一个值的时候进行初始化,这里需要考虑并发,因为很可能会有多个线程同时初始化同一个槽segment[k]
,不过只要有一个初始化成功就可以了,对于并发操作,使用CAS进行控制。
scanAndLockForPut
方法
获取写入锁,首先第一步的时候尝试获取锁,如果获取失败肯定就有其他线程存在竞争,则利用scanAndLockForPut
自旋获取锁。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
// 循环获取锁
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
// 进到这里说明数组该位置的链表是空的,没有任何元素
// 当然,进到这里的另一个原因是 tryLock() 失败,所以该槽存在并发,不一定是该位置
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
// 顺着链表往下走
e = e.next;
}
// 重试次数如果超过 MAX_SCAN_RETRIES(单核1多核64),那么不抢了,进入到阻塞队列等待锁
// lock() 是阻塞方法,直到获取锁后返回
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
// 这个时候是有大问题了,那就是有新的元素进到了链表,成为了新的表头
// 所以这边的策略是,相当于重新走一遍这个 scanAndLockForPut 方法
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
scanAndLockForPut
自旋获取锁的过程:
- 尝试自旋获取锁
- 如果重试的次数达到了
MAX_SCAN_RETRIES
则改为阻塞锁获取,保证能成功。
JDK 1.7中put方法的整体执行流程:
场景:线程A和线程B同时执行相同Segment
对象的put
方法。 - (1)线程A执行
tryLock()
方法成功获得锁,则把HashEntry
对象插入到相应的位置 - (2)线程B获取锁失败,则执行
scanAndLockForPut()
方法,在scanAndLockForPut()
方法中,会通过重复执行tryLock()
方法尝试获取锁,在多处理器环境下,重复次数为64,但处理器重复次数为1,当执行tryLock()
方法的次数超过上限时,则执行lock
方法挂起线程B - (3)将当前
Segment
中的table
通过key
的hashcode
定位到HashEntry
- (4)遍历该
HashEntry
,如果不为空则判断传入的key
和当前遍历的key
是否相等,相等则覆盖旧的value
- (5)当线程A执行完插入操作时,会通过
unlock
方法释放锁,接着唤醒线程B继续执行。
注意:在获取锁之后,segment
对链表进行遍历,如果某个HashEntry
节点具有相同的key
,则更新该HashEntry
的value
值,否则新建一个HashEntry
节点,将他设置为链表的新的head
结点,并将原头结点设为新head
的下一个节点。新建过程中如果节点总数(含新建的HashEntry
)超过threshold
,则调用rehash
方法对segment
进行扩容,最后将新建HashEntry
写入到数组中。
rehash
方法
注意:segment
数组是不能扩容的,这里的rehash
方法扩容是指segment
数组某个位置内部的HashEntry
数组进行扩容,扩容后,容量为原来的2倍。
该方法不需要考虑并发,因为到达该方法的时候,是持有该segment
的独占锁的。
这里的扩容比之前的HashMap
要复杂一些,上面有两个挨着的for
循环,第一个for
循环有什么作用呢?
如果没有第一个for
循环,也是可以工作的,但是,这个for
循环下来,如果lastRun
的后面还有比较多的节点,那么这次就是值的的,因为我们只需要克隆lastRun
前面的节点,后面的一串节点跟着lastRun
走就是了,不需要做任何操作。不过比较坏的情况就是每次lastRun
都是链表的最后一个元素或者很靠后的元素,那么这次遍历就有些浪费了,根据统计,如果使用默认的阈值,大约只有1/6的节点需要克隆。
上面【第443行】的过程就如下图:
get
过程分析
整体过程:
- 计算
hash
值,找到segment
数组中的具体位置。 - 槽中也是一个数组,根据
hash
找到数组中的具体位置 - 到这里是链表了,顺着链表进行查找即可
由于HashEntry
中的value
属性是用volatile
关键字修饰的,保存了内存的可见性,所以每次获取时都是最新值。
size
过程分析
因为ConCurrentHashMap
是可以并发插入数据的,所以在准确计算元素时存在一定的难度,一般的思路是统计每个segment
对象中的元素个数,然后进行累加,但是这种方式计算出来的结果不一定是准确的,因为在计算后面几个segment
的时候,已经计算过的segment
同时可能有数据的插入和删除。
计算元素个数的方法的流程:
先采用不加锁的方式,连续计算元素的个数
- 如果前后两次计算结果相同,则说明计算出来的元素个数是准确的。
- 如果前后两次计算的结果不同,则给每个
Segment
进行加锁,再计算一次元素的个数。
并发问题分析
我们可以看到get
过程是没有加锁的,那我们就需要去考虑并发问题了。
添加节点的操作put
和删除节点的操作remove
都是要加segment
上的独占锁的,所以他们之间自然不会有问题,我们需要考虑的问题就是get
的时候在同一个segment
中发生了put
或remove
操作。
-
put
操作的线程安全性
(1)初始化槽,使用了CAS来初始化Segment
中的数组。
(2)添加节点到链表的操作是插入到表头的,所以,如果这个时候get
操作在链表遍历的过程已经到了中间,是不会影响的。当然,另一个并发问题就是get
在put
之后,需要保证刚刚插入表头的节点被读取,这个依赖于setEntryAt
方法中使用的UNSAFE.putOrderedObject
。
(3)扩容:扩容是新创建了数组,然后进行迁移数据,最后面将newTable
设置给属性table
。所以,如果get
操作此时也在进行,那么也没关系,如果get
先行,那么就是在旧的table
上面做查询操作;而put
先行,那么put
操作的可见性保证就是table
使用了volatile
关键字。 -
remove
操作的线性安全性
(1)get
操作需要遍历链表,但是remove
操作会破坏链表。
(2)如果remove
破坏的节点get
操作已经过去了,那么这里就不存在任何问题了。
(3)如果remove
先破坏了一个节点,分两种情况考虑:
- 如果此节点是头节点,那么需要将头结点的
next
设置为数组该位置的元素,table
虽然使用了volatile
修饰,但是volatile
并不能提供数组内部操作的可见性保证,所以源码中使用了UNSAFE
来操作数组。 - 如果要删除的节点不是头结点,他会将要删除节点的后继结点接到前驱节点中,这里的并发保证就是
next
属性是
volatile
的。
三. JDK1.8 ConcurrentHashMap
1.7已经解决了并发的问题,ConcurrentHashMap
是通过分段锁机制来实现的,所以其最大并发度受Segment
的个数限制,但依然存在HashMap
在1.7版本中的问题:
查询遍历链表效率太低
JDK1.8中抛弃了原有的Segment
分段锁,而采用了CAS+Synchronized
来保证并发安全性。
根据JDK1.7HashMap
的介绍,我们知道,查找的时候,根据hash
值我们能够快速定位到数组的具体下标,但是之后的话,需要顺着链表一个个比较下去才能找到我们需要的值,时间复杂度取决于链表的长度,即O(n)
。
数据结构
数据结构采用数组+链表+红黑树
结构上和Java8的HashMap
基本上一样,但是他要保证线程的安全性,所以在源码上会更加复杂一些。
ConcurrentHashMap
相关属性介绍
private transient volatile int sizeCtl;
sizeCtl
是控制标识符,不同的值表示不同的意义:
- 负数代表正在进行初始化或者扩容操作,其中-1表示正在初始化,-N表示有N-1个线程正在进行扩容操作。
- 整数或者0表示
hash
还没有被初始化,这个数值表示初始化或者下一次扩容的大小,类似于扩容阈值。它的值始终是当前ConcurrentHashMap
容量的0.75倍,这与loadfactor
是对应的,实际容量>=sizeCtl
,则扩容。
static final int TREEIFY_THRESHOLD = 8;
- 链表转树的阈值,如果
table[i]
下面的链表长度大于8时就转换为树。
static final int UNTREEIFY_THRESHOLD = 6;
- 树转为链表的阈值,小于等于6时转换为链表,仅在扩容
tranfer
时才可能树转换为链表。
类中相关的节点类:Node
、TreeNode
、TreeBin
、ForwardingNode
与初始化
-
Node
:
保存key
、value
及key
的hash
值的数据结构。即链表中的数据结构。Node
是最核心的内部类,它包装了Key-value
键值对,所有插入ConcurrentHashMap
的数据都包装在这里面。它与HashMap
中的定义非常相似,但有一些差别,他对value
和next
属性设置了volatile
同步锁,他不允许调用setValue
方法直接改变Node
的value
域,它增加了find
方法辅助map.get
方法。 -
TreeNode
:红黑树中的数据结构
树节点类,另外一个核心的数据结构。当链表长度过长的时候,即链表的节点数大于8的时候,会转换为TreeNode
。但是与HashMap不同的是,它并不是直接转换为红黑树,而是把这些节点包装成TreeNode
放在TreeBin
对象中,由TreeBin
完成对红黑树的包装。 -
TreeBin
这个类并不负责包装用户的key、value
值,而是包装很多TreeNode
节点,他代替了TreeNode
的根节点,也就是说在实际的ConcurrentHashMap
数组中,存放的是TreeBin
对象,而不是TreeNode
对象。TreeBin
从字面含义中可以理解为存储树形结构的容器,而树形结构就是指TreeNode
,所以TreeBin
就是封装TreeNode
的容器,他提供转换红黑树的一些条件和锁的控制。 -
ForwardingNode
ForwardingNode
是一个特殊的Node
节点,hash
值为-1,其中存储nextTable
的引用。只有Table
发生扩容的时候,ForwardingNode
才会发挥作用,作为一个占位符放在table
中表示当前节点为null
或者已经被移动。
其中value
和next
都用volatile
修饰,保证并发的可见性。
构造函数初始化
由上可以发现,ConcurrentHashMap
的初始化实际上是一个空实现,并没有做任何的事情,这也是与其他集合类有明显区别的地方,初始化操作并不是在构造函数中实现的,而是在put
操作中实现的,当然ConCurrentHashMap
还提供了其他的构造函数,有指定容量大小或者指定负载因子的构造函数。
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
这个初始化方法,通过提供初始容量,计算了sizeCtl
。
Unsafe
与CAS
在ConCurrentHashMap
中,随处可以看到U.compareAndSwapXXX
方法,这个方法是利用一个CAS算法实现无锁化的修改值的操作,它可以大大降低锁带来的性能消耗,这个算法的基本思想就是不断的去比较当前内存中的变量值与你指定的变量值是否相等,如果相等,则接受你指定的修改的值,否则拒绝你的操作。
Unsafe
代码块控制一些属性的修改工作,比如最常见的sizeCtl
,在JDK1.8的ConcurrentHashMap
中,大量应用。
ConcurrentHashMap
中的几个Unsafe静态核心方法
ConcurrentHashMAp
中定义了三个原子操作,用于对指定位置的节点进行操作。
ConcurrentHashMap
类中的initTable
方法
在该类的构造方法中,只是设置了它的一些重要的参数,我们知道在JDK1.7中,Segment
的初始化是只初始化Segment[0]
,其他的槽初始化时延迟到插入数据的时候。在JDK1.8的时候,在调用一些方法的时候,如:put、computeIfAbsent
等方法的时候,会检查table
是否等于null
初始化方法主要应用了关键属性sizeCtl
,如果这个值小于0,表示其他线程正在进行初始化,那么当前线程就被挂起,放弃这个操作。在这也可以看出来ConcurrentHashMap
的初始化只能由一个线程完成,如果获得了初始化权限,就用CAS方法将sizeCtl
设置为-1,防止其他线程进入。初始化数组之后,将sizeCtl
的值改为0.75*n。
ConcurrentHashMap
类的transfer
扩容方法
数据迁移:transfer
,将原来的tab
数组的元素迁移到新的数组nextTab
中去。
该类的transfer
方法支持多线程进行扩容操作,而并没有加锁,调用此方法的时候,会保证第一个发起数据迁移的线程,nextTab
参数为null
,之后再调用此方法的时候,nextTab
不会为null
。
整个扩容操作分为两部分:
- 第一部分是构建一个
nextTable
,它的容量是原来的两倍,这个操作是单线程完成的,这个单线程的保证是通过RESIZE_STAMP_SHIFT
这个常量经过一次运算来保证的。 - 第二部分是将原来
table
中的元素复制到nextTable
中,这个操作是多线程的。
transfer
方法实现了在并发情况下,高效的从原始数组向新数组中移动元素,首先了解一下并发操作机制:
原数组长度为n
,所有有n
个迁移任务,让每个线程每次负责一个小任务是最简单的,每做完一个任务再检测是否有其他每做完的任务,帮助迁移就可以了,而Doug Lea
使用了一个stride
,简单理解就是步长,每个线程每次负责迁移其中的一部分,如每次迁移16个小任务。所以,我们就需要一个全局的调度者来安排哪个线程执行哪几个任务,这个就是属性transferIndex
的作用。
第一个发起数据迁移的线程会将transferIndex
指向原数组最后的位置,然后从后向前的stride
个任务属于第一个线程,然后将transferIndex
指向新的位置,再往前的stride
个任务属于第二个线程,以此类推。当然,这里说的第二个线程不是真的一定指代了第二个线程,也可以是同一个线程,其实就是将一个大的迁移任务分成了一个个任务包。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// stride 在单核下直接等于 n,多核模式下为 (n>>>3)/NCPU,最小值是 16
// stride 可以理解为”步长“,有 n 个位置是需要进行迁移的,
// 将这 n 个任务分为多个任务包,每个任务包有 stride 个任务
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 如果 nextTab 为 null,先进行一次初始化
// 前面我们说了,外围会保证第一个发起迁移的线程调用此方法时,参数 nextTab 为 null
// 之后参与迁移的线程调用此方法时,nextTab 不会为 null
if (nextTab == null) {
try {
// 容量翻倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
// nextTable 是 ConcurrentHashMap 中的属性
nextTable = nextTab;
// transferIndex 也是 ConcurrentHashMap 的属性,用于控制迁移的位置
transferIndex = n;
}
int nextn = nextTab.length;
// ForwardingNode 翻译过来就是正在被迁移的 Node
// 这个构造方法会生成一个Node,key、value 和 next 都为 null,关键是 hash 为 MOVED
// 后面我们会看到,原数组中位置 i 处的节点完成迁移工作后,
// 就会将位置 i 处设置为这个 ForwardingNode,用来告诉其他线程该位置已经处理过了
// 所以它其实相当于是一个标志。
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// advance 指的是做完了一个位置的迁移工作,可以准备做下一个位置的了
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
/*
* 下面这个 for 循环,最难理解的在前面,而要看懂它们,应该先看懂后面的,然后再倒回来看
*
*/
// i 是位置索引,bound 是边界,注意是从后往前
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 下面这个 while 真的是不好理解
// advance 为 true 表示可以进行下一个位置的迁移了
// 简单理解结局: i 指向了 transferIndex,bound 指向了 transferIndex-stride
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
// 将 transferIndex 值赋给 nextIndex
// 这里 transferIndex 一旦小于等于 0,说明原数组的所有位置都有相应的线程去处理了
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 看括号中的代码,nextBound 是这次迁移任务的边界,注意,是从后往前
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
// 所有的迁移操作已经完成
nextTable = null;
// 将新的 nextTab 赋值给 table 属性,完成迁移
table = nextTab;
// 重新计算 sizeCtl: n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 之前我们说过,sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2
// 然后,每有一个线程参与迁移就会将 sizeCtl 加 1,
// 这里使用 CAS 操作对 sizeCtl 进行减 1,代表做完了属于自己的任务
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 任务结束,方法退出
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 到这里,说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,
// 也就是说,所有的迁移任务都做完了,也就会进入到上面的 if(finishing){} 分支了
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 该位置处是一个 ForwardingNode,代表该位置已经迁移过了
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 对数组该位置处的结点加锁,开始处理数组该位置处的迁移工作
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 头结点的 hash 大于 0,说明是链表的 Node 节点
if (fh >= 0) {
// 下面这一块和 Java7 中的 ConcurrentHashMap 迁移是差不多的,
// 需要将链表一分为二,
// 找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的
// lastRun 之前的节点需要进行克隆,然后分到两个链表中
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 其中的一个链表放在新数组的位置 i
setTabAt(nextTab, i, ln);
// 另一个链表放在新数组的位置 i+n
setTabAt(nextTab, i + n, hn);
// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
// 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
setTabAt(tab, i, fwd);
// advance 设置为 true,代表该位置已经迁移完毕
advance = true;
}
else if (f instanceof TreeBin) {
// 红黑树的迁移
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果一分为二后,节点数少于 8,那么将红黑树转换回链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 将 ln 放置在新数组的位置 i
setTabAt(nextTab, i, ln);
// 将 hn 放置在新数组的位置 i+n
setTabAt(nextTab, i + n, hn);
// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
// 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
setTabAt(tab, i, fwd);
// advance 设置为 true,代表该位置已经迁移完毕
advance = true;
}
}
}
}
}
}
在上图中,第14个槽位插入新的节点之后,链表元素个数已经达到了8,且数组长度为16,所以优先通过扩容来缓解链表过长的问题。
put
过程分析
假设table
已经初始化完成了,put
操作采用CAS+synchronized
实现并发插入或者更新操作;
-
如果没有初始化就先调用
initTable
方法进行初始化过程 -
如果没有
hash
冲突就直接使用CAS
进行插入 -
如果还在进行扩容操作就先进行扩容
-
如果存在
hash
冲突,就加锁来保证线程安全,这里有两种情况:
(1)一种是链表形式,如果是链表形式就直接遍历到链表尾部直接进行插入
(2)另一种是红黑树形式,如果是红黑树形式就按照红黑树结构插入 -
最后一个如果该链表的数量大于阈值8,就要先转换成红黑树的结构
-
如果添加成功,就要调用
addcount
方法统计size
,并且检查是否需要扩容。
注意:对节点使用synchronized
进行锁定,我们可以发现JDK8中的实现也是锁分离的思想,只是锁住的是一个Node
,而不是JDK7中的Segment
,而锁住Node
之前的操作是无锁的,但是也是线程安全的。
最后扩容的时候调用了treeifyBin
方法,这个方法用于将过长的链表转换为TreeBin
对象,但是他并不是直接转换,而是进行一次容量的判断,如果容量没有达到转换的要求,直接进行扩容操作并返回;如果满足条件才将链表的结构转换为TreeBin
,这与HashMap
不同的是,他并没有把TreeNode
直接放入红黑树,而是直接利用了TreeBin
这个小容器来封装所有的TreeNode
。
put方法中的hash方法
put方法中的initTable方法
实例化ConcurrentHashMap
时倘若声明了table
变量,在初始化时会根据参数调整table
的大小,确保table
的大小总是2的幂次方。默认的table
的值是16。
table
的初始化操作会延迟到第一个put
操作再进行,并且初始化只会执行一次,为什么要延迟初始化那,是为了防止初始化后的首次操作就需要扩容,从而影响效率。
采用Unsafe.getObjectVolatile()
来获取,而不是使用table[index]
的原因跟ConcurrentHashMap
的弱一致性有关。在java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table
的副本,虽然table
是volatile
修饰的,但是不能保证每次都拿到table
中的最新元素,Unsafe.getObjectVolatile()
可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。
put
方法的总结
这个put
方法依然沿用HashMap
的put
方法的思想,根据hash
计算这个新插入的点在table
中的位置i
,如果i
位置为空的,直接放进去,否则进行判断,如果i
位置是树节点,按照树的方式插入新的节点,否则把i
插入到链表的末尾,ConcurrentHashMap
不允许key
或value
的值为null
。
同时在put
的时候会涉及多线程,所以会存在以下两种情况:
- 如果一个或者多个线程正在对
ConcurrentHashMap
进行扩容操作,当前线程也要进行扩容操作。这个扩容的操作之所以能被检测到,是因为transfer
方法中在空节点上插入forward
节点,如果检测到需要插入的位置被forward
节点占有,就帮助进行扩容; - 如果检测到要插入的节点是非空且不是
forward
节点,就对这个节点进行加锁,这样就保证了线程安全。
整个流程就是首先定义不允许key或者value为null的情况放入,对于每一个放入的值,首先利用spread
方法对key
进行一次hash
计算,由此来确定这个值在table
中的位置。 如果这个位置为空,那么直接放入。 如果这个位置存在节点,说明发生了hash碰撞,首先判断这个节点的类型,如果是链表节点,则得到的节点就是hash
值相同的节点组成的链表的头结点。需要一次向后遍历确定这个新加入的值所在的位置。如果遇到hash
值与key
值都与新加入节点一致的情况,则只需更新value
的值即可。否则依次向后遍历,直到链表尾插入这个节点。如果加入这个节点以后链表长度大于8,就把这个链表切换为红黑树。如果这个节点的类型已经是红黑树,直接调用树节点的插入方法进行插入新的值。
get
过程分析
get
方法比较简单,给定一个key
来确定value
的时候,必须满足两个条件,key
相同且hash
相同,对于节点可能在链表上或者树上的情况需要分别查找:
get
方法的执行过程:
- 计算
hash
值 - 根据
hash
值找到数组对应的位置:(n-1)& h
- 根据该位置处节点性质进行相应查找
(1)如果该位置为null
,那么直接返回null
就可以了
(2)如果该位置处的节点刚好就是我们需要的,返回该节点的值即可
(3)如果该位置节点的hash
值小于0,说明正在扩容,或者是红黑树
(4)如果以上三个条件都不满足,那就是链表,进行遍历比对即可
JDK1.7和JDK1.8差异
-
put
的时候
(1)1.7插入到链表的头部
(2)1.8插入到链表的尾部 -
扩容操作
1.8当调用其他方法的时候,如果发现有线程对链表或者树进行扩容,那么当前线程也需要帮助进行扩容。 -
数据结构方面
(1)JDK1.7版本:ReentrantLock+Segment+HashEntry
(2)JDK1.8版本:Synchronized+CAS+HashEntry+红黑树
(3)JDK1.8版本的数据结构变得更加简单,使得操作也更加清晰流畅,因为使用Synchronized
来进行同步,所以不需要分段锁的概念,也就不需要Segment
这种数据结构了,由于粒度的降低,实现的复杂度也增加了。
感谢并参考:
https://blog.csdn.net/xiaojie_570/article/details/84931251?spm=1001.2014.3001.5501
https://pdai.tech/md/java/thread/java-thread-x-juc-collection-ConcurrentHashMap.html