并发编程之java锁

   日期:2020-09-11     浏览:96    评论:0    
核心提示:一,锁的分类1.线程是否要锁住同步资源锁住 悲观锁不锁住 乐观锁2.锁住同步资源失败 线程是否要阻塞阻塞不阻塞 自旋锁,适应性自旋锁3.多个线程竞争同步资源的流程细节有没有区别不锁住资源,多个线程只有一个能修改资源成功,其它线程会重试 无锁同一个线程执行同步资源时自动获取资源 偏向锁多个线程竞争同步资源时,没有获取资源的线程自旋等待锁释放 轻量级锁多个线程竞争同步资源时,没有获取资源的线程阻塞等待唤醒 重量级锁4.多个线程竞争锁时是否要排队排队

一,锁的分类

1.线程是否要锁住同步资源

锁住 悲观锁
不锁住 乐观锁

2.锁住同步资源失败 线程是否要阻塞

阻塞
不阻塞 自旋锁,适应性自旋锁

3.多个线程竞争同步资源的流程细节有没有区别

不锁住资源,多个线程只有一个能修改资源成功,其它线程会重试 无锁
同一个线程执行同步资源时自动获取资源 偏向锁
多个线程竞争同步资源时,没有获取资源的线程自旋等待锁释放 轻量级锁
多个线程竞争同步资源时,没有获取资源的线程阻塞等待唤醒 重量级锁

4.多个线程竞争锁时是否要排队

排队 公平锁
先尝试插队,插队失败在排队 非公平锁

5.一个线程的多个流程能不能获取同一把锁

能 可重入锁
不能 非可重入锁

6.多个线程能不能共享一把锁

能 共享
不能 排他锁

二,悲观锁与乐观锁

悲观锁与乐观锁时一种广义的概念,体现的是看待线程同步的不同角度。

1.悲观锁

悲观锁认为自己在使用数据的时候一定有别的线程来修改数据,在获取数据的时候会先加锁,确保数据不会被别的线程修改。
锁实现:synchronized 接口Lock的实现类
适用场景:写操作多,先加锁可以保证写操作时数据正确。

2.乐观锁

乐观锁认为自己在使用数据时不会有别的线程修改数据,所以不会添加锁,只是在更新数据的时候去判断之前有没有别的线程更新了这个数据。
锁实现:CAS算法,例如AtomicInteger类的原子自增时通过CAS自旋实现。
适用场景:读操作较多,不加锁的特点能够使其读操作的性能大幅度提升。
乐观锁的执行流程:
线程A获取到数据以后直接操作,操作完数据以后准备更新同步资源,更新之前会先判断内存中同步资源是否被更新:
1.如果没有被更新,更新内存中同步资源的值。
2.如果同步资源被其他线程更新,根据实现方法执行不同的操做(报错or重试)。

3.CAS算法

全名:Compare And Swap(比较并交换)
无锁算法:基于硬件原语实现,在不使用锁(没有线程被阻塞)的情况下实现多线程之间的变量同步。
jdk中的实现:java.util.concurrent包中的原子类就是通过CAS来实现了乐观锁。
算法涉及到的三个操作数:

需要读写的内存值V
进行比较的值A
要写入的新值的B

4.CAS存在的问题

1.ABA问题
线程1准备用CAS将变量的值由A替换为B,在此之前,线程2将变量的值由A替换为C,又由C替换为A,然后线程1执行CAS时发现变量的值仍然为A,所以CAS成功。但实际上这时的现场已经和最初不同了,尽管CAS成功,但可能存在潜藏的问题。
举例:一个小偷,把别人家的钱偷了之后又还了回来,还是原来的钱吗,你老婆出轨之后又回来,还是原来的老婆吗?ABA问题也一样,如果不好好解决就会带来大量的问题。最常见的就是资金问题,也就是别人如果挪用了你的钱,在你发现之前又还了回来。但是别人却已经触犯了法律。
但是jdk已经解决了这个问题。
想追下源码来着,但是一追发现直接到c了。

2.循环时间长开销大
3.只能保证一个共享变量的原子操作

5.Unsafe

AtomicInteger

public final int incrementAndGet() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

从这里可见原子类的方法调用了unsafe类的方法
unsafe

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

再往底层的话,实际加上就到了C。
最终的实现就是 cmpxchg = cas修改变量值

lock cmpxchg 指令

硬件:
lock指令在执行后面指令的时候锁定一个北桥信号(不采用锁总线的方式)

三,自旋锁

是指当一个线程在获取锁的时候,如果锁已经被其他线程获取,那么该线程将循环等待,然后不断的判断锁是否能够被成功获取,自旋直到获取到锁才会退出循环。

自旋存在的意义与使用场景

阻塞与唤醒线程需要操作系统切换CPU状态,需要消耗一定时间。
同步代码块逻辑简单,执行时间很短。
自适应自旋锁假定不同线程持有同一个锁对象的时间基本相当,竞争程度趋于稳定,因此,可以根据上一次自旋的时间与结果调整下一次自旋的时间。
JDK>=1.7自旋锁的参数被取消,虚拟机不再支持由用户配置自旋锁,自旋锁总是会执行,自旋锁次数也是由虚拟机自动调整。

四,锁升级

1.锁升级流程

jdk6的时候对锁进行了很多优化,其中就有了锁的升级过程。

1.偏向锁:只有一个线程进入临界区,适用于只有一个线程访问同步块的场景
2.轻量级锁:多线程为竞争或者竞争不激烈,适用于追求响应时间,同步块执行速度非常快。
3.重量级锁:多线程竞争,适用于追求吞吐量,同步块执行速度较长。

2.JVM对象加锁的原理

对象的内存结构?
对象头:比如hash码,对象所属的年代,对象锁,锁状态标志,偏向锁(线程)ID,偏向时间,数组长度(数组对象)等。
实例数据:创建对象时,对象中的成员变量,方法等。
对齐填充:就为了凑够8的倍数。


利用一个插件,对比对象上锁前后的差异:

    <dependencies>
        <dependency>
            <groupId>org.openjdk.jol</groupId>
            <artifactId>jol-core</artifactId>
            <version>0.9</version>
        </dependency>
    </dependencies>
    public static void main(String[] args) {
        Object o = new Object();
        System.out.println(ClassLayout.parseInstance(o).toPrintable());
        System.out.println("===============================================");
        synchronized (o){
            System.out.println(ClassLayout.parseInstance(o).toPrintable());
        }
    }

3.实例对象是怎样存储的?

对象实例存储在堆空间,对象的元数据存在元空间,对象引用存在栈空间。

4.锁消除


public class Demo2 {
    public static void main(String[] args) {
        StringBuffer buffer = new StringBuffer();
        buffer.append("a").append("b");
        System.out.println(ClassLayout.parseInstance(buffer).toPrintable());
    }
}


我们都知道StringBuffer是线程安全的,因为他的关键方法都加了synchronized,但是,从打印结果可以看出,锁被消除了。因为buffer这个引用只会在main方法中使用,不可能被其他线程引用(因为是局部变量,栈私有),所以buffer是不可能共享的资源,JVM会自动消除StringBuffer对象内部的锁。

5.锁粗化


public class Demo3 {
    public static void main(String[] args) {
        int i=0;
        StringBuffer buffer = new StringBuffer();
        while (i<100){
            buffer.append(i);
            i++;
        }
        System.out.println(buffer.toString());
        System.out.println(ClassLayout.parseInstance(buffer).toPrintable());
    }
}

JVM会检测到这样一连串的操作都对同一个对象加锁(while 循环内 100 次执行 append,没有锁粗化的就要进行 100 次加锁/解锁),此时 JVM 就会将加锁的范围粗化到这一连串的操作的外部(比如 while 虚幻体外),使得这一连串操作只需要加一次锁即可。

6.synchronized和volatile

关于synchronized和volatile的具体讲解请看我的另一篇博客

五,AQS

研究了AQS一天,终于找到了他的入口,接下来看我的想法:

1.多线程操做共享数据问题


public class Demo1 {
    public static int m=0;

    public static void main(String[] args)throws Exception {
        Thread []threads=new Thread[100];
        for (int i = 0; i < threads.length; i++) {
            threads[i]=new Thread(()->{
                for (int j = 0; j < 100; j++) {
                    m++;
                }
            });
        }
        for (Thread t :threads) t.start();
        for (Thread t :threads) t.join();//线程顺序结束
        System.out.println(m);
    }
}

毫无疑问,这段代码是存在线程安全问题的,只要了解一点并发编程,都是可以看出来的。那么我们可以怎么来解决呢?

使用synchronized来解决


public class Demo2 {

    public static int m=0;

    public static void main(String[] args)throws Exception {
        Thread []threads=new Thread[100];
        for (int i = 0; i < threads.length; i++) {
            threads[i]=new Thread(()->{
                synchronized (Demo2.class) {
                    for (int j = 0; j < 100; j++) {
                        m++;
                    }
                }
            });
        }
        for (Thread t :threads) t.start();
        for (Thread t :threads) t.join();//线程顺序结束
        System.out.println(m);
    }

这样解决了线程安全的问题,但是同时也存在一个问题,synchronized是操作系统层面的方法,也就是需要jvm和操做系统之间进行一个切换(用户态和内核态的切换),这样实际上是影响效率的。另一种解决办法:

使用ReentrantLock来解决


public class Demo3 {

    public static int m=0;
    public static Lock lock=new ReentrantLock();
    public static void main(String[] args)throws Exception {
        Thread []threads=new Thread[100];
        for (int i = 0; i < threads.length; i++) {
            threads[i]=new Thread(()->{

                try {
                    lock.lock();
                    for (int j = 0; j < 100; j++) {
                            m++;
                    }
                } finally {
                    lock.unlock();
                }
            });
        }
        for (Thread t :threads) t.start();
        for (Thread t :threads) t.join();//线程顺序结束
        System.out.println(m);
    }
}

那么这种方式的底层是怎么做的呢?接下来追源码。

    public ReentrantLock() {
        sync = new NonfairSync();
    }

这个sync又是啥呢?接着追

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

它实际上是ReentrantLock的一个内部类继承了Sync,而他里面的方法实际上也是调用了Sync的方法。这样目标就明确了,我们可以看一下Sync
这个Sync的源码:

abstract static class Sync extends AbstractQueuedSynchronizer

由此可知,实际上ReentrantLock是利用AbstractQueuedSynchronizer也就是AQS来实现的。

2.AbstractQueuedSynchronizer概述

此处参考的文章链接
这个类的内部有一个内部类Node

static final class Node {
	static final Node SHARED = new Node();
	volatile Node prev;//前驱指针
	volatile Node next;//后继指针
	volatile Thread thread;//当前线程
	private transient volatile Node head;//头节点
	private transient volatile Node tail;//尾节点
	private volatile int state;//锁状态,加锁成功则为1,重入+1 解锁则为0
	.....
}

看到这里就想到了LinkedHashMap,实际上也就是类似于维持了一个双向的链表,每个节点都是一个线程。

它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里volatile是核心关键词,具体volatile的语义,在此不述。state的访问方式有三种:

    getState()
    setState()
    compareAndSetState()

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
  不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

`isHeldExclusively()`:该线程是否正在独占资源。只有用到condition才需要去实现它。
`tryAcquire(int)`:独占方式。尝试获取资源,成功则返回true,失败则返回false。
`tryRelease(int)`:独占方式。尝试释放资源,成功则返回true,失败则返回false。
`tryAcquireShared(int)`:共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
`tryReleaseShared(int)`:共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryReleasetryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock

3.源码解读与执行流程分析

3.1,独占锁

acquire(int)==
此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区代码了。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

函数流程如下:

tryAcquire()尝试直接去获取资源,如果成功则直接返回;
addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
tryAcquire(int)
此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。这也正是tryLock()的语义,还是那句话,当然不仅仅只限于tryLock()。

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

一开始还傻乎乎的想为啥直接抛异常了,后来才反应过来,这不是给自定义的方法么?AQS这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了。
addWaiter(Node)
此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。

    private Node addWaiter(Node mode) {
     //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

Node结点是对每一个访问同步代码的线程的封装,其包含了需要同步的线程本身以及线程的状态,如是否被阻塞,是否等待唤醒,是否已经被取消等。变量waitStatus则表示当前被封装成Node结点的等待状态,共有4种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。

CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。

SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。

CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。

PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。

0状态:值为0,代表初始化状态。

AQS在判断状态时,通过用waitStatus>0表示取消状态,而waitStatus<0表示有效状态。

== enq(Node)==
此方法用于将node加入队尾。

    private Node enq(final Node node) {
        for (;;) {//自旋
            Node t = tail;
            if (t == null) { // Must initialize
            // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {//正常放入队列尾部
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

cas自旋volatile变量
acquireQueued(Node, int)
通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。聪明的你立刻应该能想到该线程下一步该干什么了吧:进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。没错,就是这样!是不是跟医院排队拿号有点相似~~acquireQueued()就是干这件事:在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回。这个函数非常关键,还是上源码吧:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;//标记是否已经拿到锁
        try {
            boolean interrupted = false;//标记等待过程中是否被中断过
            for (;;) {
                final Node p = node.predecessor();//拿到前驱节点
                //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    //拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //如果自己可以休息了,就进入waiting状态,直到被unpark()
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire(Node, Node)
此方法主要用于检查状态,看看自己是否真的可以去休息了。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;//拿到前驱的状态
        if (ws == Node.SIGNAL)
            //如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
            return true;
        if (ws > 0) {
           
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。
parkAndCheckInterrupt()
如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

Thread.interrupted()会清除当前线程的中断标记位。
整个获取锁的流程:
1.如果尝试获取锁成功,直接返回。
2.没成功,先加入到等待队列尾部,标记为独占模式。
3.尝试这获取一次锁后,如果还是获取不到就去休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
4.如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
这也就是ReentrantLock.lock()的流程

release(int)
此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
                //唤醒等待队列里的下一个线程
            return true;
        }
        return false;
    }

它是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自定义同步器在设计tryRelease()的时候要明确这一点!!
tryRelease(int)
此方法尝试去释放指定量的资源。

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

还是需要AQS的实现类自己去写。
unparkSuccessor(Node)
此方法用于唤醒等待队列中下一个线程。

    private void unparkSuccessor(Node node) {
        //这里,node一般为当前线程所在的结点。
        int ws = node.waitStatus;
        if (ws < 0)
        	//置零当前线程所在的结点状态,允许失败。
            compareAndSetWaitStatus(node, ws, 0);

        //找到下一个需要唤醒的结点s
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {//如果为空或已取消
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//唤醒
    }

用unpark()唤醒等待队列中最前边的那个未放弃线程。
释放锁的流程
1.释放指定锁的资源并返回结果。
2.如果成功释放,就唤醒等待队列中最前边的那个未放弃线程。
3.如果没成功,返回false。

3.2,共享锁

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {//成功
                        setHeadAndPropagate(node, r);//将head指向自己,还有剩余资源可以再唤醒之后的线程
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

跟独占模式比,这里只有线程是head.next时(“老二”),才会去尝试获取资源,有剩余的话还会唤醒之后的队友。那么问题就来了,假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要2个。老大先唤醒老二,老二一看资源不够,他是把资源让给老三呢,还是不让?答案是否定的!老二会继续park()等待其他线程释放资源,也更不会去唤醒老三和老四了。独占模式,同一时刻只有一个线程去执行,这样做未尝不可;但共享模式下,多个线程是可以同时执行的,现在因为老二的资源需求量大,而把后面量小的老三和老四也都卡住了。当然,这并不是问题,只是AQS保证严格按照入队顺序唤醒罢了(保证公平,但降低了并发)。
== setHeadAndPropagate(Node, int)==

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

== doReleaseShared()==

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

4.自定义锁

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

 	isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
    tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回falsetryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回falsetryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false

自定义一个简单的锁


public class MLock implements Lock {
    private AbstractQueuedSynchronizer sync=new Sync();
    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return null;
    }

    //自定义一个独占锁
    private class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            assert arg == 1;
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    }

}

Demo测试:


public class Demo6 {
    public static int m = 0;
    public static Lock lock = new MLock();

    public static void main(String[] args) throws Exception {
        Thread[] threads = new Thread[100];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {

                try {
                    lock.lock();
                    for (int j = 0; j < 100; j++) {
                        m++;
                    }
                } finally {
                    lock.unlock();
                }
            });
        }
        for (Thread t : threads) t.start();
        for (Thread t : threads) t.join();//线程顺序结束
        System.out.println(m);
    }

}
 
打赏
 本文转载自:网络 
所有权利归属于原作者,如文章来源标示错误或侵犯了您的权利请联系微信13520258486
更多>最近资讯中心
更多>最新资讯中心
0相关评论

推荐图文
推荐资讯中心
点击排行
最新信息
新手指南
采购商服务
供应商服务
交易安全
关注我们
手机网站:
新浪微博:
微信关注:

13520258486

周一至周五 9:00-18:00
(其他时间联系在线客服)

24小时在线客服