Java程序员都曾被问到的一个问题是:
- 为什么HashMap是线程不安全的?
- 为什么ConcurrentHashMap是线程安全的?
为什么HashMap是线程不安全的?
Fail-Fast 机制
- 如果在使用迭代器的过程中有其他线程修改了map,那么将抛出ConcurrentModificationException,这就是所谓fail-fast策略。
- 看源码的时候我们会看到,在ArrayList,LinkedList,HashMap的内部,有一个int类型的modCount对象,对上述集合内容的修改都将增加这个值。modCount会在迭代器Iterator中使用,在迭代器的构造函数中,有这么一行代码,expectedModCount = modCount。在nextEntity和remove方法的调用过程中,如果modCount != expectedModCount,抛出ConcurrentModificationException异常。
final Entry<K,V> nextEntry() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}
public void remove() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}
死循环
- HashMap中有两个重要的属性,一个是容量,一个是加载因子,容量必须是2的n次方,这是为了保证key hash之后的值可以均匀的分散在数组里。当前结点个数等于容量*加载因子之后,需要进行扩容,扩展为原来的二倍,这个过程称为rehash。
- 因为hashmap是头插入法,新的结点插入在数组的头结点。当线程1执行rehash到newTable[i] = e时,线程1被挂起,此时线程2开始执行rehash并完成,这样会导致结点的循环引用问题。
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) { // table是老数组
while(null != e) {
Entry<K,V> next = e.next;
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);
e.next = newTable[i];
newTable[i] = e;
e = next;
}
}
}
为什么ConcurrentHashMap是线程安全的?
java 7中的结构
- ConcurrentHashMap的数据结构是由一个Segment数组和多个HashEntry组成,Segment数组的意义就是将一个大的table分割成多个小的table来进行加锁,而每一个Segment元素存储的是HashEntry数组+链表。
- 在进行数据的定位时,会首先找到 segment, 然后在 segment 中定位 bucket。
- 如果多线程操作同一个 segment, 就会触发 segment 的锁 ReentrantLock, 这就是分段锁的基本实现原理。
- Segment 继承 ReentrantLock 锁,用于存放数组 HashEntry[]。segment在初始化后不可以扩容,但是HashEntry可以扩容。
static final class Segment<K,V> extends ReentrantLock implements Serializable {
}
- 在插入数据的时候,首先需要找到segment,调用该segment的put方法,将新节点插入segment中。在segment中获取锁,如果获取失败,则走scanAndLockForPut方法,如果获取成功,返回,如果失败,则在循环中一直获取,直到成功。
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 根据key的hash再次进行hash运算
int hash = hash(key.hashCode());
// 基于hash定位segment数组的索引。
// hash值是int值,32bits。segmentShift=28,无符号右移28位,剩下高4位,其余补0。
// segmentMask=15,二进制低4位全部是1,所以j相当于hash右移后的低4位。
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
// 找到对应segment
s = ensureSegment(j);
// 调用该segment的put方法,将新节点插入segment中
return s.put(key, hash, value, false);
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 是否获取锁,失败自旋获取锁(直到成功)
// 拿到结点之后,对结点进行插入操作。
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
}
java 8中的结构
- Java8中不再使用segment,不再使用分段锁,而是使用大的数组,为了解决hash碰撞,在链表长度超过一定值(默认8)的情况下,将链表转为红黑树实现,寻址的复杂度从O(n)转换为Olog(n)。
- 并发控制使用Synchronized和CAS来操作。
- ConcurrentHashMap的构造函数是一个空函数,初始化是在put中实现的。
- Node中value和next都用volatile修饰,保证并发的可见性。value和next都用volatile修饰,保证并发的可见性。
悲观锁:可以完全保证数据的独占性和正确性,因为每次请求都会先对数据进行加锁, 然后进行数据操作,最后再解锁,而加锁释放锁的过程会造成消耗,所以性能不高;
乐观锁:假设没有冲突发生,如果检测到冲突,就失败重试,直到成功。数据库的乐观锁是通过版本控制来实现的。
- 在put的过程中,不断的尝试,因为在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject,这是一种乐观锁的实现方式。
- 对要加入的特定数组的节点tab[i]加锁,遍历链表或红黑树,对数据插入
// 不参与序列化
transient volatile Node<K,V>[] table; // volatile保证线程间可见
// 记录容器的容量大小,通过CAS更新
private transient volatile long baseCount;
private transient volatile int sizeCtl;
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
// while(true)循环,不断的尝试,因为在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果数组(HashMap)还未初始化,进行初始化操作(CAS操作)
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 计算新插入数据在数组(HashMap)中的位置,如果该位置上的节点为null,直接放入数据(CAS操作)
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果该节点的hash值为MOVED,说明正在进行扩容操作或者已经扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//
else {
V oldVal = null;
synchronized (f) { // 对特定数组节点tab[i]加锁
if (tabAt(tab, i) == f) { // 判断tab[i]是否有变化
if (fh >= 0) { // 插入操作
binCount = 1;
for (Node<K,V> e = f;; ++binCount) { // 遍历链表
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { // 如果新插入值和tab[i]处的hash值和key值一样,进行替换
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { // 如果此节点为尾部节点,把此节点的next引用指向新数据节点
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 如果是一颗红黑树
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) //如果数组节点的链表长度超过限定长度,转换为一颗红黑树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}