AQS同步器原理

总所周知,java是支持多线程的.
在多线程情况下,可能会出现多个线程同时访问同一个共享,可变资源的情况;这种资源可能是:对象,变量,文件等.
共享:资源可以由多个线程同时访问
可变:资源可以在其生命周期内被修改

为什么需要同步器

先来个栗子

    public void decStock(){
        Integer stock;
        stock = getStock();

        if (stock == null || stock <= 0){
            System.out.println("下单失败,库存不足");
            return;
        }

        stock--;
        writeStock(stock);
        System.out.println("下单成功,当前剩余库存 : " + stock);

    }

这是一个下订单的场景,为了防止超卖的问题,所以做了
先读取库存 -> 判断库存是否充足 -> 减少库存 -> 将修改后的库存写回数据库
这种操作.

那就来测试一下,库存初始设置为5,同时发送30个请求.

下单成功,当前剩余库存 : 4
下单成功,当前剩余库存 : 4
下单成功,当前剩余库存 : 4
下单成功,当前剩余库存 : 4
下单成功,当前剩余库存 : 4
下单成功,当前剩余库存 : 4
下单成功,当前剩余库存 : 4
下单成功,当前剩余库存 : 3
下单成功,当前剩余库存 : 2
下单成功,当前剩余库存 : 1
下单成功,当前剩余库存 : 0
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足

可以看出,明显是发生了多次超卖的情况,订单超过了库存数量.

那么,为什么呢?
因为 读取库存,写回库存这是两个操作,并不是原子性的.
也就满足了上面所说的共享,可变的情况.
在多线程情况下,可能会发生,线程1在读取库存之后停下,将cpu让给线程2的情况.导致线程1和线程2获取到的库存数量是相同的,导致超卖.

如何解决呢?

在方法上加一个synchronized关键词即可.

    synchronized public void decStock(){
        Integer stock;
        stock = getStock();

        if (stock == null || stock <= 0){
            System.out.println("下单失败,库存不足");
            return;
        }

        stock--;
        writeStock(stock);
        System.out.println("下单成功,当前剩余库存 : " + stock);

    }

结果:

下单成功,当前剩余库存 : 4
下单成功,当前剩余库存 : 3
下单成功,当前剩余库存 : 2
下单成功,当前剩余库存 : 1
下单成功,当前剩余库存 : 0
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足

这么简单?
此处讨论单机情况下,在服务器只部署一个实例时,就是这么简单.他将这个方法变成了一个不可分割,不可中断的原子操作,每一次线程读取的数据都会是前一个线程确保已经写完了的数据.
但是单纯会使用synchronized关键词并不是目的,我想要知道synchronized到底做了什么可以保证不超卖.

AQS同步器

synchronized是基于底层C++语言实现的同步机制,而AQS同步器是纯java实现的.
因此这个东西放在java.util.concurrent包下,是一个抽象类 AbstractQueuedSynchronizer.
AQS核心有三种东西: CAS,自旋,LockSupport

自己写一个AQS同步器

public class MyLock {

    // 用于记录当前锁的状态
    @Getter
    @Setter
    private volatile int state = 0;

    // 当前持有锁的线程
    @Getter
    @Setter
    private Thread lockHolder;

    private static final Unsafe UNSAFE = UnsafeInstance.reflectGetUnsafe();

    private static final long stateOffset;

    // 用于保存对排队状态的线程的引用
    // 必须保证该队列的线程安全
    private ConcurrentLinkedQueue<Thread> waiters = new ConcurrentLinkedQueue<>();

    static {
        try {
            stateOffset = UNSAFE.objectFieldOffset(MyLock.class.getDeclaredField("state"));
        }catch (Exception e){
            throw new Error();
        }
    }

    public final boolean compareAndSwapState(int expect, int update){
        // cas  compare and swap 原子操作
        return UNSAFE.compareAndSwapInt(this, stateOffset, expect, update);
    }

    private boolean aquire(){
        Thread current = Thread.currentThread();
        int currentState = getState();
        // currentState等于0,锁还没被持有,这时可以加锁
        if (currentState == 0){
            // 线程存放队列如果是空,或者当前线程是队列的队首的话,允许该线程进行加锁.否则加锁失败
            if ((waiters.size() == 0 || current == waiters.peek()) && compareAndSwapState(currentState, 1)){
                // 在加锁成功后,要将锁持有者改为当前线程
                setLockHolder(current);
                return true;
            }
        }
        // 如果尝试加锁失败,返回false
        return false;
    }

    public void lock(){
        // 刚开始开始尝试加锁
        if (aquire()){
            // 如果首次加锁成功,直接返回
            return;
        }

        // 首次加锁失败,将该线程存放到队列中进行排队
        Thread current = Thread.currentThread();
        waiters.add(current);
        // 进行自旋
        for (;;){
            // 在死循环中尝试进行加锁.
            // 只有队首线程可以尝试加锁,其他线程使用park让出cpu资源.
            if (current == waiters.peek() && aquire()){
                // 如果队首线程加锁成功,将该线程出队列.
                waiters.poll();
                return;
            }
            // 将该线程阻塞以让出cpu使用权.等待后续唤醒.
            LockSupport.park(current);
        }
    }

    public void unlock(){
        // 只有锁持有者才可以进行解锁
        if (Thread.currentThread() != lockHolder){
            throw new RuntimeException("lockHolder is not current thread");
        }

        int state = getState();
        // 进行解锁,将state设置为0以便下个线程加锁
        if (compareAndSwapState(state, 0)){
            // 已经进行解锁,将斥锁人改为null
            setLockHolder(null);
            // 唤醒队首的线程
            Thread first = waiters.peek();
            if (first != null){
                LockSupport.unpark(first);
            }
        }
    }

}

public class UnsafeInstance {

    public static Unsafe reflectGetUnsafe(){
        try {
            // 使用反射去获取Unsafe类,因为Unsafe这个魔法类可以绕过虚拟机直接操作内存. 视为危险操作.
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            return (Unsafe) field.get(null);
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
        return null;
    }
}

使用我们完成的Lock类对方法进行加锁


    private MyLock sync = new MyLock();

    public void decStock(){
        sync.lock();
        Integer stock;
        stock = getStock();
        if (stock == null || stock <= 0){
            System.out.println("下单失败,库存不足");
            sync.unlock();
            return;
        }

        stock--;
        writeStock(stock);
        System.out.println("下单成功,当前剩余库存 : " + stock);
        sync.unlock();

    }

结果显示:

下单成功,当前剩余库存 : 4
下单成功,当前剩余库存 : 3
下单成功,当前剩余库存 : 2
下单成功,当前剩余库存 : 1
下单成功,当前剩余库存 : 0
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足

当然,AQS并没有仅仅是上面这么简单,他还加入了一些其他的特性如:

  • 阻塞等待队列
  • 共享/独占
  • 公平/非公平
  • 可重入
  • 允许中断

   转载规则


《AQS同步器原理》 echi1995 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
HashMap的最大容量是多少. HashMap的最大容量是多少.
HashMap的最大容量是多少.首先, HashMap底层是数组+链表, 所以HashMap的容量约等于 数组长度 * 链表长度.因为链表长度不固定,甚至可能链表会是树结构, 所以我们主要讨论数组长度. 那么, 数组的最大长度是多长呢? 仔
下一篇 
spring如何getBean spring如何getBean
spring总是有一种神奇的魔力,让人想要去探究他到底是做了什么. 提前搭建好工程 不要选择Spring Initializr, 那是使用SpringBoot的,不利于学习使用. 一路下一步, 填写项目名称, finish. 可以看到,
  目录