接上一篇HashMap的分析。并发场景使用ConcurrentHashMap,谢谢合作。
使用背景
换句话说为什么要使用ConcurrentHashMap,源码的注释开头就描述了,ConcurrentHashMap是一个完全支持并发的散列表。所有的写操作都是有锁,但是读取则是无锁读。也就是强写弱读。
初始化分析
对于初始化来说,主要是由于我们使用无参数构造方法来构造进行数据插入的时候需要进行延迟初始化操作。
Map<String,String> map=new ConcurrentHashMap<>();map.put(key,value);
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
- 执行逻辑
初始化的代码如上所示,当在并发场景中,A线程拿到CPU时间片执行初始化,
sizeCtl
字段被volatile修饰,所以直接读取主存中的结果值初始化为0,所以不满足执行Thread.yield();
紧接着进行一个CAS操作,把sizeCtl
的值设置为-1写入主存,之后如果有B线程也拿到时间片同时进行初始化操作,发现sizeCtl
的值已经满足(sc = sizeCtl) < 0
,所以就需要去执行Thread.yield();
让当前线程的让出CPU执行的时间片。等待再次得到CPU时间片继续执行。初始化完毕之后sizeCtl
的值被设定为散列数组下次需要扩容的容量。
- 针对延迟初始化的场景的优化点。
延迟初始化的使用场景是在当你只是创建并未使用的时候不会初始化Map内部的散列数组。在并发写入较多的场景这一点则需要避免,因为如果有多个线程进行初始化操作,则会势必会有部分线程会空跑,低效。所以在这个场景则需要根据使用的频率,预先指定好Map的容量。
读取数据
读取ConcurrentHashMap中的数据,和HashMap的读取没有太大的区别。只不过在HashMap中,只有俩中节点 要么是树(TreeNode)要么是链表(Node)。在ConcurrentHashMap中则增多增加了二种节点,所有当判断节点的Hash值为负数是支持find方法。针对不同的Node执行其子类的find方法。如果是普通链表则进行单列表遍历即可。
- forwarding Node
用于标记在扩容时期插入的节点
- reservations Node
用于使用 computeIfAbsent 和 compute时的占位节点
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
- computeIfAbsent
computeIfAbsent
方法主要是咋插入某个Key的时候对value进行一些计算。可以看到入参第二个是一个函数式对象,看代码和put方法并没有什么区别,区别点在于散列函数定位到数组下标没有值的时候插入的是ReservationNode
,如果定位到的数组下标有值发生散列冲突,则会在进行遍历操作。最终在插入之前执行一下函数式。后置处理Value。
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
if (key == null || mappingFunction == null)
throw new NullPointerException();
int h = spread(key.hashCode());
V val = null;
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & h)) == null) {
Node<K,V> r = new ReservationNode<K,V>();
synchronized (r) {
if (casTabAt(tab, i, null, r)) {
binCount = 1;
Node<K,V> node = null;
try {
if ((val = mappingFunction.apply(key)) != null)
node = new Node<K,V>(h, key, val, null);
} finally {
setTabAt(tab, i, node);
}
}
}
if (binCount != 0)
break;
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
boolean added = false;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek; V ev;
if (e.hash == h &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
val = e.val;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
if ((val = mappingFunction.apply(key)) != null) {
added = true;
pred.next = new Node<K,V>(h, key, val, null);
}
break;
}
}
}
else if (f instanceof TreeBin) {
binCount = 2;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(h, key, null)) != null)
val = p.val;
else if ((val = mappingFunction.apply(key)) != null) {
added = true;
t.putTreeVal(h, key, val);
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (!added)
return val;
break;
}
}
}
if (val != null)
addCount(1L, binCount);
return val;
}
插入数据
数据写入就需要加锁了 观察上述的computeIfAbsent
方法,其实和Put方法就几乎一摸一样了。所以顺着代码我们来分析
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
- 第一步进来自旋遍历散列数组判断散列数组没有进行初始化则进行初始化
- 进过初始化之后,用准备插入的Key的散列值,定位散列数组的下标,匹配下标位置不存在数据之后继续CAS插入,插入完毕返回推出自旋
2.1 执行casTabAt
方法使用Unsafe(U).compareAndSwapObject
往volatile修饰的散列数组中插入数据。 - 判断当前节点是否正在扩容。则当前线程会来帮助扩容执行
helpTransfer
方法 - 当前节点发生散列冲突,需要加锁锁住当前下标插入完毕之后释放锁
4.1 判断锁住节点的散列值小于零 则需要去判断是否是树节点,或者是占位节点。
4.2 判断是树节点,执行红黑树的插入(p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value))
;如果满足只插入一次,那么直接返回旧Value,不满足条件则返回新Value。
4.3 判断是占位节点,则会抛错。
4.4 如果是链表,则进行遍历。从头结点开始,同时进行节点计数,直到尾部进行插入5 - 当binCount的大于等于树化的阈值,则会调用
treeifyBin(tab, i);
进行扩容或者节点类型转换,(同HashMap的treeifyBin没有太大的区别) - 判断已经插入完毕,执行
addCount(1L, binCount);
增加Map的容量.滞后计算,如果散列数组很小或者未进行扩容,则会立即返回,反之则会等待扩容完毕,再把占用情况处理完,最后更新计数,也就是说读取ConcurrentMap的瞬时容量是不准确的。
链表转树
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
- 当散列数组的容量小于树化的阈值(64)则会进行一次扩容操作。调用
tryPresize(n << 1)
扩容为当前容量的二倍。 - 如果超过树化的阈值,而且节点类型是单链表。则转换节点类型为
TreeNode
- 把转换后的TreeNode节点创建辅助类
TreeBin
,构造红黑树,然后进行CAS操作把红黑树更新到当前index位置。
删除数据
final V replaceNode(Object key, V value, Object cv) {
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
validated = true;
for (Node<K,V> e = f, pred = null;;) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null)
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}
看完了插入看删除,学会了Map拿Offer,可以看到方法入口还是一如既往的自旋。
- 当前散列数组无数据,跳出循环,返会null
- 当前数组正在扩容,当前执行的线程会调用helpTransfer来帮助扩容。
- 加锁hash定位的数组元素遍历单链表或者红黑树。
3.1.1 遍历单链表找到要删除的节点,找到之后设置要删除的节点的value为null
3.1.2 遍历红黑树找到要删除的节点,找到之后设置要删除的节点的value为null - 调用addCount方法更改size
容量统计
由于addCount方法执行是滞后执行的,所以对应的容量统计也是滞后的。
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
通过遍历每个辅助计数器数组得到总的容量。我理解baseCount就是被volatile修饰的,为何不直接读取baseCount就完事了,看baseCount的javaDoc解释Base counter value, used mainly when there is no contention, but also as a fallback during table initialization races. Updated via CAS.,观察addCount
之后发现当CAS正价数量失败时,会调用fullAddCount
的方法来填充counterCells
,每个CounterCell
的值都是1。所以这就是在计算size的时候就需要遍历所有的辅助计数器数组。
扩容分析
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
扩容操作主要氛围七大块。第一部分负责创建一个原有容量*2的新的散列数组,第二部分创建一个ForwardingNode
节点。第三部分开启自旋负责初始化并初始化i
和bound
值,i
指当前处理的槽位序号,bound
指需要处理的槽位边界.先从处理下Old.length()-1
处开始处理。第四部分当前节点没处理则设置当前节点为ForwardingNode
节点类型。表示已经处理,第五部分当前节点不为空并且节点hash值为MOVE类型,代表已处理,跳过进入下一次循环。第六部分加锁当前节点处理链表。第七部分判断当前节点类型是TreeNode
处理红黑树。
在处理链表时候,使用&
把链表分为高低位,同1.8的HashMap扩容一样好,低位的不需要处理还在原来的位置,高位的则放在i+Old.length()
。
当其他线程执行插入或者删除操作时:判断当前节点是否正在扩容。则当前线程会来帮助扩容执行helpTransfer
方法
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
在进行调用transfer
方法执行时。会先cas更新一下扩容的阈值。
1.8和1.7区别分析
1.7中的实现方式采用Segment
+ HashEntry
的方式进行实现
- 其中Segment在实现上继承了ReentrantLock,这样就自带了锁的功能
- 在put方法操作是先获取当前Segment的锁,然后在cas设置到Segment数组中。
- 在size方法操作是便利3次Segment数组,得到的,先采用不加锁的方式,连续计算元素的个数,最多计算3次:
1、如果前后两次计算结果相同,则说明计算出来的元素个数是准确的;
2、如果前后两次计算结果都不同,则给每个Segment进行加锁,再计算一次元素的个数;
1.8中实现方式采用Synchronized
+CAS
+NODE数组
+红黑树
来实现. - 使用baseCount和CounterCell,使用CounterCell来记录cas设置失败的记录,最后求和就是当前的size。
总结
并发场景使用ConcurrentHashMap,并且ConcurrentHashMap在设计之初就是为了强写弱读的存在,所以在使用中要关注场景。