HashMap的线程不安全是在说什么

   日期:2020-09-26     浏览:83    评论:0    
核心提示:Java程序员都曾被问到的一个问题是:为什么HashMap是线程不安全的?为什么ConcurrentHashMap是线程安全的?为什么HashMap是线程不安全的?Fail-Fast 机制如果在使用迭代器的过程中有其他线程修改了map,那么将抛出ConcurrentModificationException,这就是所谓fail-fast策略。看源码的时候我们会看到,在ArrayList,LinkedList,HashMap的内部,有一个int类型的modCount对象,对上述集合内容的修改

Java程序员都曾被问到的一个问题是:

  • 为什么HashMap是线程不安全的?
  • 为什么ConcurrentHashMap是线程安全的?

为什么HashMap是线程不安全的?

Fail-Fast 机制

  • 如果在使用迭代器的过程中有其他线程修改了map,那么将抛出ConcurrentModificationException,这就是所谓fail-fast策略。
  • 看源码的时候我们会看到,在ArrayList,LinkedList,HashMap的内部,有一个int类型的modCount对象,对上述集合内容的修改都将增加这个值。modCount会在迭代器Iterator中使用,在迭代器的构造函数中,有这么一行代码,expectedModCount = modCount。在nextEntity和remove方法的调用过程中,如果modCount != expectedModCount,抛出ConcurrentModificationException异常。
final Entry<K,V> nextEntry() { 
    if (modCount != expectedModCount)
        throw new ConcurrentModificationException();
}
public void remove() { 
    if (modCount != expectedModCount)
        throw new ConcurrentModificationException();
}

死循环

  • HashMap中有两个重要的属性,一个是容量,一个是加载因子,容量必须是2的n次方,这是为了保证key hash之后的值可以均匀的分散在数组里。当前结点个数等于容量*加载因子之后,需要进行扩容,扩展为原来的二倍,这个过程称为rehash。
  • 因为hashmap是头插入法,新的结点插入在数组的头结点。当线程1执行rehash到newTable[i] = e时,线程1被挂起,此时线程2开始执行rehash并完成,这样会导致结点的循环引用问题。
void transfer(Entry[] newTable, boolean rehash) { 
   int newCapacity = newTable.length;
    for (Entry<K,V> e : table) { // table是老数组
        while(null != e) { 
            Entry<K,V> next = e.next;
            if (rehash) { 
                e.hash = null == e.key ? 0 : hash(e.key);
            }
            int i = indexFor(e.hash, newCapacity);
            e.next = newTable[i];
            newTable[i] = e;
            e = next;
        }
    }
}

为什么ConcurrentHashMap是线程安全的?

java 7中的结构

  • ConcurrentHashMap的数据结构是由一个Segment数组和多个HashEntry组成,Segment数组的意义就是将一个大的table分割成多个小的table来进行加锁,而每一个Segment元素存储的是HashEntry数组+链表。
  • 在进行数据的定位时,会首先找到 segment, 然后在 segment 中定位 bucket。
  • 如果多线程操作同一个 segment, 就会触发 segment 的锁 ReentrantLock, 这就是分段锁的基本实现原理。
  • Segment 继承 ReentrantLock 锁,用于存放数组 HashEntry[]。segment在初始化后不可以扩容,但是HashEntry可以扩容。
static final class Segment<K,V> extends ReentrantLock implements Serializable { 
}
  • 在插入数据的时候,首先需要找到segment,调用该segment的put方法,将新节点插入segment中。在segment中获取锁,如果获取失败,则走scanAndLockForPut方法,如果获取成功,返回,如果失败,则在循环中一直获取,直到成功。
public V put(K key, V value) { 
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 根据key的hash再次进行hash运算
    int hash = hash(key.hashCode());
    // 基于hash定位segment数组的索引。
    // hash值是int值,32bits。segmentShift=28,无符号右移28位,剩下高4位,其余补0。
    // segmentMask=15,二进制低4位全部是1,所以j相当于hash右移后的低4位。
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
    // 找到对应segment
        s = ensureSegment(j);
    // 调用该segment的put方法,将新节点插入segment中
    return s.put(key, hash, value, false);
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) { 
    // 是否获取锁,失败自旋获取锁(直到成功)
    // 拿到结点之后,对结点进行插入操作。
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value); 
}

java 8中的结构

  • Java8中不再使用segment,不再使用分段锁,而是使用大的数组,为了解决hash碰撞,在链表长度超过一定值(默认8)的情况下,将链表转为红黑树实现,寻址的复杂度从O(n)转换为Olog(n)。
  • 并发控制使用Synchronized和CAS来操作。
  • ConcurrentHashMap的构造函数是一个空函数,初始化是在put中实现的。
  • Node中value和next都用volatile修饰,保证并发的可见性。value和next都用volatile修饰,保证并发的可见性。

悲观锁:可以完全保证数据的独占性和正确性,因为每次请求都会先对数据进行加锁, 然后进行数据操作,最后再解锁,而加锁释放锁的过程会造成消耗,所以性能不高;
乐观锁:假设没有冲突发生,如果检测到冲突,就失败重试,直到成功。数据库的乐观锁是通过版本控制来实现的。

  • 在put的过程中,不断的尝试,因为在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject,这是一种乐观锁的实现方式。
  • 对要加入的特定数组的节点tab[i]加锁,遍历链表或红黑树,对数据插入
// 不参与序列化
transient volatile Node<K,V>[] table; // volatile保证线程间可见
 // 记录容器的容量大小,通过CAS更新
private transient volatile long baseCount;


private transient volatile int sizeCtl;

final V putVal(K key, V value, boolean onlyIfAbsent) { 
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
     // while(true)循环,不断的尝试,因为在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject
        for (Node<K,V>[] tab = table;;) { 
            Node<K,V> f; int n, i, fh;
       // 如果数组(HashMap)还未初始化,进行初始化操作(CAS操作)
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
       // 计算新插入数据在数组(HashMap)中的位置,如果该位置上的节点为null,直接放入数据(CAS操作)
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
       // 如果该节点的hash值为MOVED,说明正在进行扩容操作或者已经扩容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
       // 
            else { 
                V oldVal = null;
                synchronized (f) {   // 对特定数组节点tab[i]加锁
                    if (tabAt(tab, i) == f) {  // 判断tab[i]是否有变化
                        if (fh >= 0) {  // 插入操作
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {  // 遍历链表
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {  // 如果新插入值和tab[i]处的hash值和key值一样,进行替换
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {  // 如果此节点为尾部节点,把此节点的next引用指向新数据节点
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {  // 如果是一颗红黑树
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) { 
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) { 
                    if (binCount >= TREEIFY_THRESHOLD) //如果数组节点的链表长度超过限定长度,转换为一颗红黑树
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount); 
        return null;
    }
 
打赏
 本文转载自:网络 
所有权利归属于原作者,如文章来源标示错误或侵犯了您的权利请联系微信13520258486
更多>最近资讯中心
更多>最新资讯中心
0相关评论

推荐图文
推荐资讯中心
点击排行
最新信息
新手指南
采购商服务
供应商服务
交易安全
关注我们
手机网站:
新浪微博:
微信关注:

13520258486

周一至周五 9:00-18:00
(其他时间联系在线客服)

24小时在线客服