一. HashMapHashTableConcurrentHashMap的区别

  • HashMap是线程不安全的
  • HashTable是线程安全的,HashTable的线程安全是采用在每个方法上面都添加上了synchronized关键字来修饰,即HashTable是针对整个table的锁定。所有访问HashTable的线程都必须竞争同一把锁。当一个线程访问HashTable的同步方法的时候,其他线程再访问HashTable的同步方法的时候,可能会进入阻塞或者轮询状态。比如:线程1使用put方法来添加元素,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,所以锁竞争越激烈,该类的效率越低。
  • ConcurrentHashMap使用分段锁技术ConcurrentHashMap是由多个Segment组成(Segment下包含了多个Node,即键值对),每个Segment都有把锁来实现线程安全,当一个线程占用锁访问其中一段数据的时候,其它段的数据也能被其他的线程访问。
    为什么HashTable
    HashTable之所以效率低下主要是因为其实现使用了synchronized关键字对put等操作进行加锁,而synchronized关键字加锁是对整个对象进行加锁,也就是说在进行put等修改Hash表的操作时,锁住了整个Hash表,从而使得其表现的效率低下。

二. JDK1.7中的ConcurrentHashMap

数据结构

jdk1.7中采用了segment+HashEntry的方式进行实现,使用了分段锁机制实现了ConcurrentHashMap
在这里插入图片描述
在这里插入图片描述

  • 一个ConcurrentHashMap中包含一个Segment<K,V>[] segments数组
  • 一个segment对象中包含一个HashEntry<K,V>[] table数组

1.7的ConcurrentHashMap是由Segment+HashEntry组成,和HashMap一样,仍然是数组+链表
ConcurrencyLevel:默认是16,也就是说ConcurrentHashMap有16个Segments,所以理论上,这个时候,最多可以同时支持16个线程并发写,只要他们的操作分别分布在不同的Segments上,Segments通过ReentrantLock来进行加锁。这个值在初始化的时候可以设置为其他值,但是一旦初始化后,它是不可以扩容的。

成员变量

ConcurrentHashMap在初始化的时候,计算出Segment数组的大小ssize和每个SegmentHashEntry数组的大小cap,并初始化Segment数组的第一个元素。

  • initialCapacity:初始容量,整个值指的是整个ConcurrentHashMap的初始容量,实际操作的时候需要平均分给每个Segment
  • ssize:大小为2的幂次方,默认为16.
  • cap:大小也为2的幂次方,最小值为2,最终结果根据初始化容量initialCapacity进行计算。

核心成员变量:
在这里插入图片描述
SegmentConcurrentHashMap的一个内部类,主要的组成结构如下:
在这里插入图片描述
首先通过Key根据hash算法定位到元素属于哪个Segment,之后在对应的Segment中进行具体的put操作。
其中HashEntry组成:
在这里插入图片描述
HashMap非常类似,唯一的区别就是其中的核心数据如value,以及链表都是volatile修饰的,保证了获取时的可见性

并发度(Concurrency Level)

并发度可以理解为程序运行时能够同时更新ConcurrentHashMap且不产生锁竞争的最大线程数,实际上就是ConcurrentHashMap中的分段锁的个数,即Segment[]的数组长度。ConcurrentHashMap默认的并发度是16,但用户也可以构造函数中设置并发度,当用户设置并发度的时候,ConcurrentHashMap会使用大于等于该值的最小2幂指数作为实际并发度(如果用户设置并发度是17,实际并发度则为32)。
运行时,通过将Key的高n位(n=32-segmentShift)和并发度-1做位与运算定位到所在的Segment

初始化

  • initialCapacity:初始容量,这个值指出的是整个CocurrentHashMap的初始容量,实际操作的时候需要平均分给每个segment
  • loadFactor:负载因子,segment数组不可以扩容,所以这个负载因子是给每个segment内部使用的。
    注意点:JDK7中除了第一个segment之外,剩余的segments采用的是延迟初始化的机制:每次put之前都要检查key对应的segment是否为null,如果是则调用ensureSegment()以确保对应的segment被创建。
    在这里插入图片描述
    初始化完成,我们可以得到一个Segment数组:
    初始化过程一共会做如下几件事:
  • Segment数组的长度为16,在初始化的时候可以指定长度,但是在初始化结束后,不可以修改这个长度。
  • Segment[i]的默认大小为2,即初始一个Segment只能装入一个HashEntry,负载因子为0.75,初始阈值为1.5,即在插入第一个元素的时候不会触发扩容,插入第二个元素的时候就会进行第一次扩容。
  • 初始化了Segment[0],其他位置的Segment数组没有初始化
  • 参数的相关说明:
    (1)ssizeSegment数组的长度
    (2)sshift:等于ssize从1左移的次数
    (3)segmentShift:用于定位参与散列运算的位数,移位数
    (4)segmentMask:散列运算的掩码
    (5)capSegment里面HashEntry数组的长度

put过程分析

在这里插入图片描述
根据hash值找到对应的SegmentSegment的数组下标是hash值的最后四位。首先是通过key定位到Segment,之后在对应的Segment中进行具体的put操作。Segment内部是由数组+链表组成的。
在这里插入图片描述
虽然HashEntry中的value是用volatile关键字修饰的,但是并不能保证并发的原子性,所以put操作是需要加锁的操作,进到put方法中首先要尝试加锁,如【第373行】代码所示。
三个关键方法

  • ensureSegment方法
    这个是【第1106行】的方法。ConcurrentHashMap初始化的时候会初始化第一个槽segment[0],对于其他槽来说,在插入第一个值的时候进行初始化,这里需要考虑并发,因为很可能会有多个线程同时初始化同一个槽segment[k],不过只要有一个初始化成功就可以了,对于并发操作,使用CAS进行控制。
    在这里插入图片描述
  • scanAndLockForPut方法
    获取写入锁,首先第一步的时候尝试获取锁,如果获取失败肯定就有其他线程存在竞争,则利用scanAndLockForPut自旋获取锁。

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node

    // 循环获取锁
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
                    // 进到这里说明数组该位置的链表是空的,没有任何元素
                    // 当然,进到这里的另一个原因是 tryLock() 失败,所以该槽存在并发,不一定是该位置
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                // 顺着链表往下走
                e = e.next;
        }
        // 重试次数如果超过 MAX_SCAN_RETRIES(单核1多核64),那么不抢了,进入到阻塞队列等待锁
        //    lock() 是阻塞方法,直到获取锁后返回
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                 // 这个时候是有大问题了,那就是有新的元素进到了链表,成为了新的表头
                 //     所以这边的策略是,相当于重新走一遍这个 scanAndLockForPut 方法
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}
  

scanAndLockForPut自旋获取锁的过程:

  • 尝试自旋获取锁
  • 如果重试的次数达到了MAX_SCAN_RETRIES则改为阻塞锁获取,保证能成功。
    在这里插入图片描述
    JDK 1.7中put方法的整体执行流程
    场景:线程A和线程B同时执行相同Segment对象的put方法。
  • (1)线程A执行tryLock()方法成功获得锁,则把HashEntry对象插入到相应的位置
  • (2)线程B获取锁失败,则执行scanAndLockForPut()方法,在scanAndLockForPut()方法中,会通过重复执行tryLock()方法尝试获取锁,在多处理器环境下,重复次数为64,但处理器重复次数为1,当执行tryLock()方法的次数超过上限时,则执行lock方法挂起线程B
  • (3)将当前Segment中的table通过keyhashcode定位到HashEntry
  • (4)遍历该HashEntry,如果不为空则判断传入的key和当前遍历的key是否相等,相等则覆盖旧的value
  • (5)当线程A执行完插入操作时,会通过unlock方法释放锁,接着唤醒线程B继续执行。

注意:在获取锁之后,segment对链表进行遍历,如果某个HashEntry节点具有相同的key,则更新该HashEntryvalue值,否则新建一个HashEntry节点,将他设置为链表的新的head结点,并将原头结点设为新head的下一个节点。新建过程中如果节点总数(含新建的HashEntry)超过threshold,则调用rehash方法对segment进行扩容,最后将新建HashEntry写入到数组中。

  • rehash方法
    注意:segment数组是不能扩容的,这里的rehash方法扩容是指segment数组某个位置内部的HashEntry数组进行扩容,扩容后,容量为原来的2倍。
    该方法不需要考虑并发,因为到达该方法的时候,是持有该segment的独占锁的。
    在这里插入图片描述
    这里的扩容比之前的HashMap要复杂一些,上面有两个挨着的for循环,第一个for循环有什么作用呢?
    如果没有第一个for循环,也是可以工作的,但是,这个for循环下来,如果lastRun的后面还有比较多的节点,那么这次就是值的的,因为我们只需要克隆lastRun前面的节点,后面的一串节点跟着lastRun走就是了,不需要做任何操作。不过比较坏的情况就是每次lastRun都是链表的最后一个元素或者很靠后的元素,那么这次遍历就有些浪费了,根据统计,如果使用默认的阈值,大约只有1/6的节点需要克隆。
    上面【第443行】的过程就如下图:
    在这里插入图片描述

get过程分析

整体过程:

  • 计算hash值,找到segment数组中的具体位置。
  • 槽中也是一个数组,根据hash找到数组中的具体位置
  • 到这里是链表了,顺着链表进行查找即可

在这里插入图片描述
由于HashEntry中的value属性是用volatile关键字修饰的,保存了内存的可见性,所以每次获取时都是最新值。

size过程分析

因为ConCurrentHashMap是可以并发插入数据的,所以在准确计算元素时存在一定的难度,一般的思路是统计每个segment对象中的元素个数,然后进行累加,但是这种方式计算出来的结果不一定是准确的,因为在计算后面几个segment的时候,已经计算过的segment同时可能有数据的插入和删除。
计算元素个数的方法的流程:
先采用不加锁的方式,连续计算元素的个数

  • 如果前后两次计算结果相同,则说明计算出来的元素个数是准确的。
  • 如果前后两次计算的结果不同,则给每个Segment进行加锁,再计算一次元素的个数。

并发问题分析

我们可以看到get过程是没有加锁的,那我们就需要去考虑并发问题了。
添加节点的操作put和删除节点的操作remove都是要加segment上的独占锁的,所以他们之间自然不会有问题,我们需要考虑的问题就是get的时候在同一个segment中发生了putremove操作。

  • put操作的线程安全性
    (1)初始化槽,使用了CAS来初始化Segment中的数组。
    (2)添加节点到链表的操作是插入到表头的,所以,如果这个时候get操作在链表遍历的过程已经到了中间,是不会影响的。当然,另一个并发问题就是getput之后,需要保证刚刚插入表头的节点被读取,这个依赖于setEntryAt方法中使用的UNSAFE.putOrderedObject
    (3)扩容:扩容是新创建了数组,然后进行迁移数据,最后面将newTable设置给属性table。所以,如果get操作此时也在进行,那么也没关系,如果get先行,那么就是在旧的table上面做查询操作;而put先行,那么put操作的可见性保证就是table使用了volatile关键字。

  • remove操作的线性安全性
    (1)get操作需要遍历链表,但是remove操作会破坏链表。
    (2)如果remove破坏的节点get操作已经过去了,那么这里就不存在任何问题了。
    (3)如果remove先破坏了一个节点,分两种情况考虑:

  1. 如果此节点是头节点,那么需要将头结点的next设置为数组该位置的元素,table虽然使用了volatile修饰,但是volatile并不能提供数组内部操作的可见性保证,所以源码中使用了UNSAFE来操作数组。
  2. 如果要删除的节点不是头结点,他会将要删除节点的后继结点接到前驱节点中,这里的并发保证就是next属性是
    volatile的。

三. JDK1.8 ConcurrentHashMap

1.7已经解决了并发的问题,ConcurrentHashMap是通过分段锁机制来实现的,所以其最大并发度受Segment的个数限制,但依然存在HashMap在1.7版本中的问题:
查询遍历链表效率太低
JDK1.8中抛弃了原有的Segment分段锁,而采用了CAS+Synchronized来保证并发安全性。
根据JDK1.7HashMap的介绍,我们知道,查找的时候,根据hash值我们能够快速定位到数组的具体下标,但是之后的话,需要顺着链表一个个比较下去才能找到我们需要的值,时间复杂度取决于链表的长度,即O(n)

数据结构

数据结构采用数组+链表+红黑树在这里插入图片描述

结构上和Java8的HashMap基本上一样,但是他要保证线程的安全性,所以在源码上会更加复杂一些。

ConcurrentHashMap相关属性介绍

private transient volatile int sizeCtl;

sizeCtl是控制标识符,不同的值表示不同的意义:

  • 负数代表正在进行初始化或者扩容操作,其中-1表示正在初始化,-N表示有N-1个线程正在进行扩容操作。
  • 整数或者0表示hash还没有被初始化,这个数值表示初始化或者下一次扩容的大小,类似于扩容阈值。它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的,实际容量>=sizeCtl,则扩容
static final int TREEIFY_THRESHOLD = 8;
  • 链表转树的阈值,如果table[i]下面的链表长度大于8时就转换为树。
static final int UNTREEIFY_THRESHOLD = 6;
  • 树转为链表的阈值,小于等于6时转换为链表,仅在扩容tranfer时才可能树转换为链表。

类中相关的节点类:NodeTreeNodeTreeBinForwardingNode与初始化

  • Node:
    保存keyvaluekeyhash值的数据结构。即链表中的数据结构。Node是最核心的内部类,它包装了Key-value键值对,所有插入ConcurrentHashMap的数据都包装在这里面。它与HashMap中的定义非常相似,但有一些差别,他对valuenext属性设置了volatile同步锁,他不允许调用setValue方法直接改变Nodevalue域,它增加了find方法辅助map.get方法。

  • TreeNode:红黑树中的数据结构
    树节点类,另外一个核心的数据结构。当链表长度过长的时候,即链表的节点数大于8的时候,会转换为TreeNode但是与HashMap不同的是,它并不是直接转换为红黑树,而是把这些节点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装

  • TreeBin
    这个类并不负责包装用户的key、value值,而是包装很多TreeNode节点,他代替了TreeNode根节点,也就是说在实际的ConcurrentHashMap数组中,存放的是 TreeBin对象,而不是TreeNode对象。 TreeBin从字面含义中可以理解为存储树形结构的容器,而树形结构就是指TreeNode,所以 TreeBin就是封装TreeNode的容器,他提供转换红黑树的一些条件和锁的控制

  • ForwardingNode
    ForwardingNode是一个特殊的Node节点,hash值为-1,其中存储nextTable的引用。只有Table发生扩容的时候,ForwardingNode才会发挥作用,作为一个占位符放在table中表示当前节点为null或者已经被移动。

其中valuenext都用volatile修饰,保证并发的可见性。
在这里插入图片描述

构造函数初始化

在这里插入图片描述
由上可以发现,ConcurrentHashMap的初始化实际上是一个空实现,并没有做任何的事情,这也是与其他集合类有明显区别的地方,初始化操作并不是在构造函数中实现的,而是在put操作中实现的,当然ConCurrentHashMap还提供了其他的构造函数,有指定容量大小或者指定负载因子的构造函数。


public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

这个初始化方法,通过提供初始容量,计算了sizeCtl

UnsafeCAS

ConCurrentHashMap中,随处可以看到U.compareAndSwapXXX方法,这个方法是利用一个CAS算法实现无锁化的修改值的操作,它可以大大降低锁带来的性能消耗,这个算法的基本思想就是不断的去比较当前内存中的变量值与你指定的变量值是否相等,如果相等,则接受你指定的修改的值,否则拒绝你的操作。
Unsafe代码块控制一些属性的修改工作,比如最常见的sizeCtl,在JDK1.8的ConcurrentHashMap中,大量应用。

ConcurrentHashMap中的几个Unsafe静态核心方法

ConcurrentHashMAp中定义了三个原子操作,用于对指定位置的节点进行操作。
在这里插入图片描述

ConcurrentHashMap类中的initTable方法

在该类的构造方法中,只是设置了它的一些重要的参数,我们知道在JDK1.7中,Segment的初始化是只初始化Segment[0],其他的槽初始化时延迟到插入数据的时候。在JDK1.8的时候,在调用一些方法的时候,如:put、computeIfAbsent等方法的时候,会检查table是否等于null
在这里插入图片描述
初始化方法主要应用了关键属性sizeCtl,如果这个值小于0,表示其他线程正在进行初始化,那么当前线程就被挂起,放弃这个操作。在这也可以看出来ConcurrentHashMap的初始化只能由一个线程完成,如果获得了初始化权限,就用CAS方法将sizeCtl设置为-1,防止其他线程进入。初始化数组之后,将sizeCtl的值改为0.75*n。

ConcurrentHashMap类的transfer扩容方法

数据迁移:transfer,将原来的tab数组的元素迁移到新的数组nextTab中去。
该类的transfer方法支持多线程进行扩容操作,而并没有加锁,调用此方法的时候,会保证第一个发起数据迁移的线程,nextTab参数为null,之后再调用此方法的时候,nextTab不会为null
整个扩容操作分为两部分:

  • 第一部分是构建一个nextTable,它的容量是原来的两倍,这个操作是单线程完成的,这个单线程的保证是通过RESIZE_STAMP_SHIFT这个常量经过一次运算来保证的。
  • 第二部分是将原来table中的元素复制到nextTable中,这个操作是多线程的。
    transfer 方法实现了在并发情况下,高效的从原始数组向新数组中移动元素,首先了解一下并发操作机制:
    原数组长度为n,所有有n个迁移任务,让每个线程每次负责一个小任务是最简单的,每做完一个任务再检测是否有其他每做完的任务,帮助迁移就可以了,而Doug Lea使用了一个stride,简单理解就是步长,每个线程每次负责迁移其中的一部分,如每次迁移16个小任务。所以,我们就需要一个全局的调度者来安排哪个线程执行哪几个任务,这个就是属性transferIndex的作用。
    第一个发起数据迁移的线程会将transferIndex指向原数组最后的位置,然后从后向前的stride个任务属于第一个线程,然后将transferIndex指向新的位置,再往前的stride个任务属于第二个线程,以此类推。当然,这里说的第二个线程不是真的一定指代了第二个线程,也可以是同一个线程,其实就是将一个大的迁移任务分成了一个个任务包。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;

    // stride 在单核下直接等于 n,多核模式下为 (n>>>3)/NCPU,最小值是 16
    // stride 可以理解为”步长“,有 n 个位置是需要进行迁移的,
    //   将这 n 个任务分为多个任务包,每个任务包有 stride 个任务
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range

    // 如果 nextTab 为 null,先进行一次初始化
    //    前面我们说了,外围会保证第一个发起迁移的线程调用此方法时,参数 nextTab 为 null
    //       之后参与迁移的线程调用此方法时,nextTab 不会为 null
    if (nextTab == null) {
        try {
            // 容量翻倍
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        // nextTable 是 ConcurrentHashMap 中的属性
        nextTable = nextTab;
        // transferIndex 也是 ConcurrentHashMap 的属性,用于控制迁移的位置
        transferIndex = n;
    }

    int nextn = nextTab.length;

    // ForwardingNode 翻译过来就是正在被迁移的 Node
    // 这个构造方法会生成一个Node,key、value 和 next 都为 null,关键是 hash 为 MOVED
    // 后面我们会看到,原数组中位置 i 处的节点完成迁移工作后,
    //    就会将位置 i 处设置为这个 ForwardingNode,用来告诉其他线程该位置已经处理过了
    //    所以它其实相当于是一个标志。
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);


    // advance 指的是做完了一个位置的迁移工作,可以准备做下一个位置的了
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab

    /*
     * 下面这个 for 循环,最难理解的在前面,而要看懂它们,应该先看懂后面的,然后再倒回来看
     * 
     */

    // i 是位置索引,bound 是边界,注意是从后往前
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;

        // 下面这个 while 真的是不好理解
        // advance 为 true 表示可以进行下一个位置的迁移了
        //   简单理解结局: i 指向了 transferIndex,bound 指向了 transferIndex-stride
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;

            // 将 transferIndex 值赋给 nextIndex
            // 这里 transferIndex 一旦小于等于 0,说明原数组的所有位置都有相应的线程去处理了
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                // 看括号中的代码,nextBound 是这次迁移任务的边界,注意,是从后往前
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
                // 所有的迁移操作已经完成
                nextTable = null;
                // 将新的 nextTab 赋值给 table 属性,完成迁移
                table = nextTab;
                // 重新计算 sizeCtl: n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }

            // 之前我们说过,sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2
            // 然后,每有一个线程参与迁移就会将 sizeCtl 加 1,
            // 这里使用 CAS 操作对 sizeCtl 进行减 1,代表做完了属于自己的任务
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 任务结束,方法退出
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;

                // 到这里,说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,
                // 也就是说,所有的迁移任务都做完了,也就会进入到上面的 if(finishing){} 分支了
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 该位置处是一个 ForwardingNode,代表该位置已经迁移过了
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            // 对数组该位置处的结点加锁,开始处理数组该位置处的迁移工作
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // 头结点的 hash 大于 0,说明是链表的 Node 节点
                    if (fh >= 0) {
                        // 下面这一块和 Java7 中的 ConcurrentHashMap 迁移是差不多的,
                        // 需要将链表一分为二,
                        //   找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的
                        //   lastRun 之前的节点需要进行克隆,然后分到两个链表中
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 其中的一个链表放在新数组的位置 i
                        setTabAt(nextTab, i, ln);
                        // 另一个链表放在新数组的位置 i+n
                        setTabAt(nextTab, i + n, hn);
                        // 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
                        //    其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
                        setTabAt(tab, i, fwd);
                        // advance 设置为 true,代表该位置已经迁移完毕
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        // 红黑树的迁移
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // 如果一分为二后,节点数少于 8,那么将红黑树转换回链表
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;

                        // 将 ln 放置在新数组的位置 i
                        setTabAt(nextTab, i, ln);
                        // 将 hn 放置在新数组的位置 i+n
                        setTabAt(nextTab, i + n, hn);
                        // 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
                        //    其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
                        setTabAt(tab, i, fwd);
                        // advance 设置为 true,代表该位置已经迁移完毕
                        advance = true;
                    }
                }
            }
        }
    }
}

在这里插入图片描述
在上图中,第14个槽位插入新的节点之后,链表元素个数已经达到了8,且数组长度为16,所以优先通过扩容来缓解链表过长的问题。

put过程分析

在这里插入图片描述
假设table已经初始化完成了,put操作采用CAS+synchronized实现并发插入或者更新操作;

  • 如果没有初始化就先调用initTable方法进行初始化过程

  • 如果没有hash冲突就直接使用CAS进行插入

  • 如果还在进行扩容操作就先进行扩容

  • 如果存在hash冲突,就加锁来保证线程安全,这里有两种情况:
    (1)一种是链表形式,如果是链表形式就直接遍历到链表尾部直接进行插入
    (2)另一种是红黑树形式,如果是红黑树形式就按照红黑树结构插入

  • 最后一个如果该链表的数量大于阈值8,就要先转换成红黑树的结构

  • 如果添加成功,就要调用addcount方法统计size,并且检查是否需要扩容。
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

注意:对节点使用synchronized进行锁定,我们可以发现JDK8中的实现也是锁分离的思想,只是锁住的是一个Node,而不是JDK7中的Segment,而锁住Node之前的操作是无锁的,但是也是线程安全的。
最后扩容的时候调用了treeifyBin方法,这个方法用于将过长的链表转换为TreeBin对象,但是他并不是直接转换,而是进行一次容量的判断,如果容量没有达到转换的要求,直接进行扩容操作并返回;如果满足条件才将链表的结构转换为TreeBin这与HashMap不同的是,他并没有把TreeNode直接放入红黑树,而是直接利用了TreeBin这个小容器来封装所有的TreeNode

put方法中的hash方法

在这里插入图片描述
在这里插入图片描述

put方法中的initTable方法

实例化ConcurrentHashMap时倘若声明了table变量,在初始化时会根据参数调整table的大小,确保table的大小总是2的幂次方。默认的table的值是16。
table的初始化操作会延迟到第一个put操作再进行,并且初始化只会执行一次为什么要延迟初始化那,是为了防止初始化后的首次操作就需要扩容,从而影响效率。
在这里插入图片描述
在这里插入图片描述
采用Unsafe.getObjectVolatile()来获取,而不是使用table[index]的原因跟ConcurrentHashMap的弱一致性有关。在java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的副本,虽然tablevolatile修饰的,但是不能保证每次都拿到table中的最新元素,Unsafe.getObjectVolatile()可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。

put方法的总结

这个put方法依然沿用HashMapput方法的思想,根据hash计算这个新插入的点在table中的位置i,如果i位置为空的,直接放进去,否则进行判断,如果i位置是树节点,按照树的方式插入新的节点,否则把i插入到链表的末尾,ConcurrentHashMap不允许keyvalue的值为null
同时在put的时候会涉及多线程,所以会存在以下两种情况:

  • 如果一个或者多个线程正在对ConcurrentHashMap进行扩容操作,当前线程也要进行扩容操作。这个扩容的操作之所以能被检测到,是因为transfer方法中在空节点上插入forward节点,如果检测到需要插入的位置被forward节点占有,就帮助进行扩容;
  • 如果检测到要插入的节点是非空且不是forward节点,就对这个节点进行加锁,这样就保证了线程安全。

整个流程就是首先定义不允许key或者value为null的情况放入,对于每一个放入的值,首先利用spread方法对key进行一次hash计算,由此来确定这个值在table中的位置。 如果这个位置为空,那么直接放入。 如果这个位置存在节点,说明发生了hash碰撞,首先判断这个节点的类型,如果是链表节点,则得到的节点就是hash值相同的节点组成的链表的头结点。需要一次向后遍历确定这个新加入的值所在的位置。如果遇到hash值与key值都与新加入节点一致的情况,则只需更新value的值即可。否则依次向后遍历,直到链表尾插入这个节点。如果加入这个节点以后链表长度大于8,就把这个链表切换为红黑树。如果这个节点的类型已经是红黑树,直接调用树节点的插入方法进行插入新的值。

get过程分析

get方法比较简单,给定一个key来确定value的时候,必须满足两个条件,key相同且hash相同,对于节点可能在链表上或者树上的情况需要分别查找:
在这里插入图片描述
get方法的执行过程:

  • 计算hash
  • 根据hash值找到数组对应的位置:(n-1)& h
  • 根据该位置处节点性质进行相应查找
    (1)如果该位置为null,那么直接返回null就可以了
    (2)如果该位置处的节点刚好就是我们需要的,返回该节点的值即可
    (3)如果该位置节点的hash值小于0,说明正在扩容,或者是红黑树
    (4)如果以上三个条件都不满足,那就是链表,进行遍历比对即可

JDK1.7和JDK1.8差异

  • put的时候
    (1)1.7插入到链表的头部
    (2)1.8插入到链表的尾部

  • 扩容操作
    1.8当调用其他方法的时候,如果发现有线程对链表或者树进行扩容,那么当前线程也需要帮助进行扩容。

  • 数据结构方面
    (1)JDK1.7版本:ReentrantLock+Segment+HashEntry
    (2)JDK1.8版本:Synchronized+CAS+HashEntry+红黑树
    (3)JDK1.8版本的数据结构变得更加简单,使得操作也更加清晰流畅,因为使用Synchronized来进行同步,所以不需要分段锁的概念,也就不需要Segment这种数据结构了,由于粒度的降低,实现的复杂度也增加了。

感谢并参考

https://blog.csdn.net/xiaojie_570/article/details/84931251?spm=1001.2014.3001.5501
https://pdai.tech/md/java/thread/java-thread-x-juc-collection-ConcurrentHashMap.html


版权声明:本文为m0_52675592原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/m0_52675592/article/details/116070659