自助分红网站建设/友情链接收录
HashMap内部的数据结构是什么?底层是怎么实现的?
HashMap内部结构
jdk8以前:数组+链表
jdk8以后:数组+链表 (当链表长度到8时,转化为红黑树)
在并发的情况,发生扩容时,可能会产生循环链表,在执行get的时候,会触发死循环,引起 CPU的
100%问题,所以一定要避免在并发环境下使用HashMap。延伸考察ConcurrentHashMap与HashMap、
HashTable等,考察 对技术细节的深入了解程度;
老生常谈,HashMap的死循环
问题
最近的几次面试中,我都问了是否了解HashMap在并发使用时可能发生死循环,导致cpu100%,结果
让我很意外,都表示不知道有这样的问题,让我意外的是面试者的工作年限都不短。
由于HashMap并非是线程安全的,所以在高并发的情况下必然会出现问题,这是一个普遍的问题,
如果是在单线程下使用HashMap,自然是没有问题的,如果后期由于代码优化,这段逻辑引入了多线程
并发执行,在一个未知的时间点,会发现CPU占用100%,居高不下,通过查看堆栈,你会惊讶的发现,
线程都Hang在hashMap的get()方法上,服务重启之后,问题消失,过段时间可能又复现了。
这是为什么?
原因分析
在了解来龙去脉之前,我们先看看HashMap的数据结构。
在内部,HashMap使用一个Entry数组保存key、value数据,当一对key、value被加入时,会通过一个
hash算法得到数组的下标index,算法很简单,根据key的hash值,对数组的大小取模 hash & (length-
1),并把结果插入数组该位置,如果该位置上已经有元素了,就说明存在hash冲突,这样会在index位
置生成链表。
如果存在hash冲突,最惨的情况,就是所有元素都定位到同一个位置,形成一个长长的链表,这样get
一个值时,最坏情况需要遍历所有节点,性能变成了O(n),所以元素的hash值算法和HashMap的初始
化大小很重要。当插入一个新的节点时,如果不存在相同的key,则会判断当前内部元素是否已经达到阈值(默认是数组
大小的0.75),如果已经达到阈值,会对数组进行扩容,也会对链表中的元素进行rehash。
实现
HashMap的put方法实现:
1、判断key是否已经存在
2、检查容量是否达到阈值threshold
如果元素个数已经达到阈值,则扩容,并把原来的元素移动过去。
3、扩容实现
public V put(K key, V value) {
if (key == null)
return putForNullKey(value);
int hash = hash(key);
int i = indexFor(hash, table.length);
// 如果key已经存在,则替换value,并返回旧值
for (Entry<K,V> e = table[i]; e != null; e = e.next) {
Object k;
if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
V oldValue = e.value;
e.value = value;
e.recordAccess(this);
return oldValue;
}
}
modCount++;
// key不存在,则插入新的元素
addEntry(hash, key, value, i);
return null;
}
void addEntry(int hash, K key, V value, int bucketIndex) {
if ((size >= threshold) && (null != table[bucketIndex])) {
resize(2 * table.length);
hash = (null != key) ? hash(key) : 0;
bucketIndex = indexFor(hash, table.length);
}
createEntry(hash, key, value, bucketIndex);
}这里会新建一个更大的数组,并通过transfer方法,移动元素。
移动的逻辑也很清晰,遍历原来table中每个位置的链表,并对每个元素进行重新hash,在新的
newTable找到归宿,并插入。
案例分析
假设HashMap初始化大小为4,插入个3节点,不巧的是,这3个节点都hash到同一个位置,如果按照默
认的负载因子的话,插入第3个节点就会扩容,为了验证效果,假设负载因子是1.
void resize(int newCapacity) {
Entry[] oldTable = table;
int oldCapacity = oldTable.length;
...
Entry[] newTable = new Entry[newCapacity];
...
transfer(newTable, rehash);
table = newTable;
threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {
while(null != e) {
Entry<K,V> next = e.next;
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);
e.next = newTable[i];
newTable[i] = e;
e = next;
}
}
}
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {
while(null != e) {
Entry<K,V> next = e.next;
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);
e.next = newTable[i];
newTable[i] = e;
e = next;
}
}}
以上是节点移动的相关逻辑。
插入第4个节点时,发生rehash,假设现在有两个线程同时进行,线程1和线程2,两个线程都会新建新
的数组。
假设 线程2 在执行到 Entry<K,V> next = e.next; 之后,cpu时间片用完了,这时变量e指向节点a,
变量next指向节点b。
线程1继续执行,很不巧,a、b、c节点rehash之后又是在同一个位置7,开始移动节点
第一步,移动节点a第二步,移动节点b
注意,这里的顺序是反过来的,继续移动节点c这个时候 线程1 的时间片用完,内部的table还没有设置成新的newTable, 线程2 开始执行,这时内部
的引用关系如下:这时,在 线程2 中,变量e指向节点a,变量next指向节点b,开始执行循环体的剩余逻辑。
Entry<K,V> next = e.next;
int i = indexFor(e.hash, newCapacity);
e.next = newTable[i];
newTable[i] = e;
e = next;
执行之后的引用关系如下图
执行后,变量e指向节点b,因为e不是null,则继续执行循环体,执行后的引用关系变量e又重新指回节点a,只能继续执行循环体,这里仔细分析下:
1、执行完 Entry<K,V> next = e.next; ,目前节点a没有next,所以变量next指向null;
2、 e.next = newTable[i]; 其中 newTable[i] 指向节点b,那就是把a的next指向了节点b,这样a和b
就相互引用了,形成了一个环;
3、 newTable[i] = e 把节点a放到了数组i位置;
4、 e = next; 把变量e赋值为null,因为第一步中变量next就是指向null;
所以最终的引用关系是这样的:节点a和b互相引用,形成了一个环,当在数组该位置get寻找对应的key时,就发生了死循环。
另外,如果线程2把newTable设置成到内部的table,节点c的数据就丢了,看来还有数据遗失的问题。
总结
所以在并发的情况,发生扩容时,可能会产生循环链表,在执行get的时候,会触发死循环,引起CPU的
100%问题,所以一定要避免在并发环境下使用HashMap。
曾经有人把这个问题报给了Sun,不过Sun不认为这是一个bug,因为在HashMap本来就不支持多线程
使用,要并发就用ConcurrentHashmap。
ConcurrentHashMap在jdk1.8中的改进
一、简单回顾ConcurrentHashMap在jdk1.7中的设计
先简单看下ConcurrentHashMap类在jdk1.7中的设计,其基本结构如图所示:每一个segment都是一个HashEntry<K,V>[] table, table中的每一个元素本质上都是一个HashEntry的
单向队列。比如table[3]为首节点,table[3]->next为节点1,之后为节点2,依次类推。
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
// 将整个hashmap分成几个小的map,每个segment都是一个锁;与hashtable相比,这么设计的目
的是对于put, remove等操作,可以减少并发冲突,对
// 不属于同一个片段的节点可以并发操作,大大提高了性能
final Segment<K,V>[] segments;
// 本质上Segment类就是一个小的hashmap,里面table数组存储了各个节点的数据,继承了
ReentrantLock, 可以作为互拆锁使用
static final class Segment<K,V> extends ReentrantLock implements Serializable
{
transient volatile HashEntry<K,V>[] table;
transient int count;
}
// 基本节点,存储Key, Value值
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
}
二、在jdk1.8中主要做了2方面的改进
改进一:取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存数据,采用
table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。改进二:将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。对于
hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均
匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然
ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存
在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因
此,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度可以降低到
O(logN),可以改进性能。
为了说明以上2个改动,看一下put操作是如何实现的。
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果table为空,初始化;否则,根据hash值计算得到数组索引i,如果tab[i]为空,直接新
建节点Node即可。注:tab[i]实质为链表或者红黑树的首节点。
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果tab[i]不为空并且hash值为MOVED,说明该链表正在进行transfer操作,返回扩容完成
后的table。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 针对首个节点进行加锁操作,而不是segment,进一步减少线程冲突
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果在链表中找到值为key的节点e,直接设置e.val = value即
可。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 如果没有找到值为key的节点,直接新建Node并加入链表即可。
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}}
}
// 如果首节点为TreeBin类型,说明为红黑树结构,执行putTreeVal操作。
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果节点数>=8,那么转换链表结构为红黑树结构。
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 计数增加1,有可能触发transfer操作(扩容)。
addCount(1L, binCount);
return null;
}
另外,在其他方面也有一些小的改进,比如新增字段 transient volatile CounterCell[] counterCells; 可
方便的计算hashmap中所有元素的个数,性能大大优于jdk1.7中的size()方法。
三、ConcurrentHashMap jdk1.7、jdk1.8性能比较
测试程序如下:
public class CompareConcurrentHashMap {
private static ConcurrentHashMap<String, Integer> map = new
ConcurrentHashMap<String, Integer>(40000);
public static void putPerformance(int index, int num) {
for (int i = index; i < (num + index) ; i++)
map.put(String.valueOf(i), i);
}
public static void getPerformance2() {
long start = System.currentTimeMillis();
for (int i = 0; i < 400000; i++)
map.get(String.valueOf(i));
long end = System.currentTimeMillis();
System.out.println("get: it costs " + (end - start) + " ms");
}public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
final CountDownLatch cdLatch = new CountDownLatch(4);
for (int i = 0; i < 4; i++) {
final int finalI = i;
new Thread(new Runnable() {
public void run() {
CompareConcurrentHashMap.putPerformance(100000 * finalI,
100000);
cdLatch.countDown();
}
}).start();
}
cdLatch.await();
long end = System.currentTimeMillis();
System.out.println("put: it costs " + (end - start) + " ms");
CompareConcurrentHashMap.getPerformance2();
}
}
程序运行多次后取平均值,结果如下:
四、Collections.synchronizedList和CopyOnWriteArrayList性能分析
CopyOnWriteArrayList在线程对其进行变更操作的时候,会拷贝一个新的数组以存放新的字段,因此写
操作性能很差;而Collections.synchronizedList读操作采用了synchronized,因此读性能较差。以下为
测试程序:
public class App {
private static List<String> arrayList = Collections.synchronizedList(new
ArrayList<String>());
private static List<String> copyOnWriteArrayList = new
CopyOnWriteArrayList<String>();
private static CountDownLatch cdl1 = new CountDownLatch(2);
private static CountDownLatch cdl2 = new CountDownLatch(2);
private static CountDownLatch cdl3 = new CountDownLatch(2);
private static CountDownLatch cdl4 = new CountDownLatch(2);
static class Thread1 extends Thread {
@Override
public void run() {
for (int i = 0; i < 10000; i++)arrayList.add(String.valueOf(i));
cdl1.countDown();
}
}
static class Thread2 extends Thread {
@Override
public void run() {
for (int i = 0; i < 10000; i++)
copyOnWriteArrayList.add(String.valueOf(i));
cdl2.countDown();
}
}
static class Thread3 extends Thread1 {
@Override
public void run() {
int size = arrayList.size();
for (int i = 0; i < size; i++)
arrayList.get(i);
cdl3.countDown();
}
}
static class Thread4 extends Thread1 {
@Override
public void run() {
int size = copyOnWriteArrayList.size();
for (int i = 0; i < size; i++)
copyOnWriteArrayList.get(i);
cdl4.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
long start1 = System.currentTimeMillis();
new Thread1().start();
new Thread1().start();
cdl1.await();
System.out.println("arrayList add: " + (System.currentTimeMillis() -
start1));
long start2 = System.currentTimeMillis();
new Thread2().start();
new Thread2().start();
cdl2.await();
System.out.println("copyOnWriteArrayList add: " +
(System.currentTimeMillis() - start2));
long start3 = System.currentTimeMillis();
new Thread3().start();
new Thread3().start();
cdl3.await();
System.out.println("arrayList get: " + (System.currentTimeMillis() -
start3));
long start4 = System.currentTimeMillis();
new Thread4().start();结果如下:
谈谈ConcurrentHashMap1.7和1.8的不同实现
ConcurrentHashMap
在多线程环境下,使用 HashMap 进行 put 操作时存在丢失数据的情况,为了避免这种bug的隐患,强烈
建议使用 ConcurrentHashMap 代替 HashMap ,为了对更深入的了解,本文将对JDK1.7和1.8的不同实现
进行分析。
JDK1.7
数据结构
jdk1.7中采用 Segment + HashEntry 的方式进行实现,结构如下:
ConcurrentHashMap 初始化时,计算出 Segment 数组的大小 ssize 和每个 Segment 中 HashEntry 数
组的大小 cap ,并初始化 Segment 数组的第一个元素;其中 ssize 大小为2的幂次方,默认为16, cap
大小也是2的幂次方,最小值为2,最终结果根据根据初始化容量 initialCapacity 进行计算,计算过
程如下:
new Thread4().start();
cdl4.await();
System.out.println("copyOnWriteArrayList get: " +
(System.currentTimeMillis() - start4));
}
}if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
其中 Segment 在实现上继承了 ReentrantLock ,这样就自带了锁的功能。
put实现
当执行 put 方法插入数据时,根据key的hash值,在 Segment 数组中找到相应的位置,如果相应位置的
Segment 还未初始化,则通过CAS进行赋值,接着执行 Segment 对象的 put 方法通过加锁机制插入数
据,实现如下:
场景:线程A和线程B同时执行相同 Segment 对象的 put 方法
1、线程A执行 tryLock() 方法成功获取锁,则把 HashEntry 对象插入到相应的位置;
2、线程B获取锁失败,则执行 scanAndLockForPut() 方法,在 scanAndLockForPut 方法中,会通过
重复执行 tryLock() 方法尝试获取锁,在多处理器环境下,重复次数为64,单处理器重复次数为1,当
执行 tryLock() 方法的次数超过上限时,则执行 lock() 方法挂起线程B;
3、当线程A执行完插入操作时,会通过 unlock() 方法释放锁,接着唤醒线程B继续执行;
size实现
因为 ConcurrentHashMap 是可以并发插入数据的,所以在准确计算元素时存在一定的难度,一般的思
路是统计每个 Segment 对象中的元素个数,然后进行累加,但是这种方式计算出来的结果并不一样的准
确的,因为在计算后面几个 Segment 的元素个数时,已经计算过的 Segment 同时可能有数据的插入或则
删除,在1.7的实现中,采用了如下方式:
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {先采用不加锁的方式,连续计算元素的个数,最多计算3次:
1、如果前后两次计算结果相同,则说明计算出来的元素个数是准确的;
2、如果前后两次计算结果都不同,则给每个 Segment 进行加锁,再计算一次元素的个数;
JDK1.8
数据结构
1.8中放弃了 Segment 臃肿的设计,取而代之的是采用 Node + CAS + Synchronized 来保证并发安全进
行实现,结构如下:
只有在执行第一次 put 方法时才会调用 initTable() 初始化 Node 数组,实现如下:
put实现
当执行 put 方法插入数据时,根据key的hash值,在 Node 数组中找到相应的位置,实现如下:
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}1、如果相应位置的 Node 还未初始化,则通过CAS插入相应的数据;
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
2、如果相应位置的 Node 不为空,且当前该节点不处于移动状态,则对该节点加 synchronized 锁,如
果该节点的 hash 不小于0,则遍历链表更新节点或插入新节点;
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
3、如果该节点是 TreeBin 类型的节点,说明是红黑树结构,则通过 putTreeVal 方法往红黑树中插入
节点;
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
4、如果 binCount 不为0,说明 put 操作对数据产生了影响,如果当前链表的个数达到8个,则通过
treeifyBin 方法转化为红黑树,如果 oldVal 不为空,说明是一次更新操作,没有对元素个数产生影
响,则直接返回旧值;if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
5、如果插入的是一个新节点,则执行 addCount() 方法尝试更新元素个数 baseCount ;
size实现
1.8中使用一个 volatile 类型的变量 baseCount 记录元素的个数,当插入新数据或则删除数据时,会通
过 addCount() 方法更新 baseCount ,实现如下:
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
1、初始化时 counterCells 为空,在并发量很高时,如果存在两个线程同时执行 CAS 修改 baseCount
值,则失败的线程会继续执行方法体中的逻辑,使用 CounterCell 记录元素个数的变化;
2、如果 CounterCell 数组 counterCells 为空,调用 fullAddCount() 方法进行初始化,并插入对应
的记录数,通过 CAS 设置cellsBusy字段,只有设置成功的线程才能初始化 CounterCell 数组,实现如
下:
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;3、如果通过 CAS 设置cellsBusy字段失败的话,则继续尝试通过 CAS 修改 baseCount 字段,如果修改
baseCount 字段成功的话,就退出循环,否则继续循环插入 CounterCell 对象;
所以在1.8中的 size 实现比1.7简单多,因为元素个数保存 baseCount 中,部分元素的变化个数保存在
CounterCell 数组中,实现如下:
通过累加 baseCount 和 CounterCell 数组中的数量,即可得到元素的总个数;
深入分析ConcurrentHashMap1.8的扩容实现
ConcurrentHashMap相关的文章写了不少,有个遗留问题一直没有分析,也被好多人请教过,被搁置
在一旁,即如何在并发的情况下实现数组的扩容。
什么情况会触发扩容
当往hashMap中成功插入一个key/value节点时,有可能触发扩容动作:
1、如果新增节点之后,所在链表的元素个数达到了阈值 8,则会调用 treeifyBin 方法把链表转换成红
黑树,不过在结构转换之前,会对数组长度进行判断,实现如下:
}
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break;
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}如果数组长度n小于阈值 MIN_TREEIFY_CAPACITY ,默认是64,则会调用 tryPresize 方法把数组长度
扩大到原来的两倍,并触发 transfer 方法,重新调整节点的位置。
2、新增节点之后,会调用 addCount 方法记录元素个数,并检查是否需要进行扩容,当数组元素个数达
到阈值时,会触发 transfer 方法,重新调整节点的位置。transfer实现
transfer 方法实现了在并发的情况下,高效的从原始组数往新数组中移动元素,假设扩容之前节点的
分布如下,这里区分蓝色节点和红色节点,是为了后续更好的分析:
在上图中,第14个槽位插入新节点之后,链表元素个数已经达到了8,且数组长度为16,优先通过扩容
来缓解链表过长的问题,实现如下:
1、根据当前数组长度n,新建一个两倍长度的数组 nextTable ;2、初始化 ForwardingNode 节点,其中保存了新数组 nextTable 的引用,在处理完每个槽位的节点之
后当做占位节点,表示该槽位已经处理过了;
3、通过 for 自循环处理每个槽位中的链表元素,默认 advace 为真,通过CAS设置 transferIndex 属
性值,并初始化 i 和 bound 值, i 指当前处理的槽位序号, bound 指需要处理的槽位边界,先处理槽位
15的节点;
4、在当前假设条件下,槽位15中没有节点,则通过CAS插入在第二步中初始化的 ForwardingNode 节
点,用于告诉其它线程该槽位已经处理过了;
5、如果槽位15已经被线程A处理了,那么线程B处理到这个节点时,取到该节点的hash值应该为
MOVED ,值为 -1 ,则直接跳过,继续处理下一个槽位14的节点;6、处理槽位14的节点,是一个链表结构,先定义两个变量节点 ln 和 hn ,按我的理解应该是 lowNode
和 highNode ,分别保存hash值的第X位为0和1的节点,具体实现如下:
使用 fn&n 可以快速把链表中的元素区分成两类,A类是hash值的第X位为0,B类是hash值的第X位为
1,并通过 lastRun 记录最后需要处理的节点,A类和B类节点可以分散到新数组的槽位14和30中,在原
数组的槽位14中,蓝色节点第X为0,红色节点第X为1,把链表拉平显示如下:
1、通过遍历链表,记录 runBit 和 lastRun ,分别为1和节点6,所以设置 hn 为节点6, ln 为null;
2、重新遍历链表,以 lastRun 节点为终止条件,根据第X位的值分别构造ln链表和hn链表:
ln链:和原来链表相比,顺序已经不一样了hn链:
通过CAS把ln链表设置到新数组的i位置,hn链表设置到i+n的位置;
7、如果该槽位是红黑树结构,则构造树节点 lo 和 hi ,遍历红黑树中的节点,同样根据 hash&n 算法,
把节点分为两类,分别插入到 lo 和 hi 为头的链表中,根据 lo 和 hi 链表中的元素个数分别生成 ln 和
hn 节点,其中 ln 节点的生成逻辑如下:
(
1)如果 lo 链表的元素个数小于等于 UNTREEIFY_THRESHOLD ,默认为6,则通过 untreeify 方法把
树节点链表转化成普通节点链表;
(
2)否则判断 hi 链表中的元素个数是否等于0:如果等于0,表示 lo 链表中包含了所有原始节点,则
设置原始红黑树给 ln ,否则根据 lo 链表重新构造红黑树。
最后,同样的通过CAS把 ln 设置到新数组的 i 位置, hn 设置到 i+n 位置。