总所周知,java是支持多线程的.
在多线程情况下,可能会出现多个线程同时访问同一个共享
,可变
资源的情况;这种资源可能是:对象,变量,文件等.
共享:资源可以由多个线程同时访问
可变:资源可以在其生命周期内被修改
为什么需要同步器
先来个栗子
public void decStock(){
Integer stock;
stock = getStock();
if (stock == null || stock <= 0){
System.out.println("下单失败,库存不足");
return;
}
stock--;
writeStock(stock);
System.out.println("下单成功,当前剩余库存 : " + stock);
}
这是一个下订单的场景,为了防止超卖的问题,所以做了
先读取库存 -> 判断库存是否充足 -> 减少库存 -> 将修改后的库存写回数据库
这种操作.
那就来测试一下,库存初始设置为5,同时发送30个请求.
下单成功,当前剩余库存 : 4
下单成功,当前剩余库存 : 4
下单成功,当前剩余库存 : 4
下单成功,当前剩余库存 : 4
下单成功,当前剩余库存 : 4
下单成功,当前剩余库存 : 4
下单成功,当前剩余库存 : 4
下单成功,当前剩余库存 : 3
下单成功,当前剩余库存 : 2
下单成功,当前剩余库存 : 1
下单成功,当前剩余库存 : 0
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
可以看出,明显是发生了多次超卖的情况,订单超过了库存数量.
那么,为什么呢?
因为 读取库存,写回库存这是两个操作,并不是原子性的.
也就满足了上面所说的共享
,可变
的情况.
在多线程情况下,可能会发生,线程1在读取库存之后停下,将cpu让给线程2的情况.导致线程1和线程2获取到的库存数量是相同的,导致超卖.
如何解决呢?
在方法上加一个synchronized关键词即可.
synchronized public void decStock(){
Integer stock;
stock = getStock();
if (stock == null || stock <= 0){
System.out.println("下单失败,库存不足");
return;
}
stock--;
writeStock(stock);
System.out.println("下单成功,当前剩余库存 : " + stock);
}
结果:
下单成功,当前剩余库存 : 4
下单成功,当前剩余库存 : 3
下单成功,当前剩余库存 : 2
下单成功,当前剩余库存 : 1
下单成功,当前剩余库存 : 0
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
这么简单?
此处讨论单机情况下,在服务器只部署一个实例时,就是这么简单.他将这个方法变成了一个不可分割,不可中断的原子操作,每一次线程读取的数据都会是前一个线程确保已经写完了的数据.
但是单纯会使用synchronized关键词并不是目的,我想要知道synchronized到底做了什么可以保证不超卖.
AQS同步器
synchronized是基于底层C++语言实现的同步机制,而AQS同步器是纯java实现的.
因此这个东西放在java.util.concurrent
包下,是一个抽象类 AbstractQueuedSynchronizer
.
AQS核心有三种东西: CAS,自旋,LockSupport
自己写一个AQS同步器
public class MyLock {
// 用于记录当前锁的状态
@Getter
@Setter
private volatile int state = 0;
// 当前持有锁的线程
@Getter
@Setter
private Thread lockHolder;
private static final Unsafe UNSAFE = UnsafeInstance.reflectGetUnsafe();
private static final long stateOffset;
// 用于保存对排队状态的线程的引用
// 必须保证该队列的线程安全
private ConcurrentLinkedQueue<Thread> waiters = new ConcurrentLinkedQueue<>();
static {
try {
stateOffset = UNSAFE.objectFieldOffset(MyLock.class.getDeclaredField("state"));
}catch (Exception e){
throw new Error();
}
}
public final boolean compareAndSwapState(int expect, int update){
// cas compare and swap 原子操作
return UNSAFE.compareAndSwapInt(this, stateOffset, expect, update);
}
private boolean aquire(){
Thread current = Thread.currentThread();
int currentState = getState();
// currentState等于0,锁还没被持有,这时可以加锁
if (currentState == 0){
// 线程存放队列如果是空,或者当前线程是队列的队首的话,允许该线程进行加锁.否则加锁失败
if ((waiters.size() == 0 || current == waiters.peek()) && compareAndSwapState(currentState, 1)){
// 在加锁成功后,要将锁持有者改为当前线程
setLockHolder(current);
return true;
}
}
// 如果尝试加锁失败,返回false
return false;
}
public void lock(){
// 刚开始开始尝试加锁
if (aquire()){
// 如果首次加锁成功,直接返回
return;
}
// 首次加锁失败,将该线程存放到队列中进行排队
Thread current = Thread.currentThread();
waiters.add(current);
// 进行自旋
for (;;){
// 在死循环中尝试进行加锁.
// 只有队首线程可以尝试加锁,其他线程使用park让出cpu资源.
if (current == waiters.peek() && aquire()){
// 如果队首线程加锁成功,将该线程出队列.
waiters.poll();
return;
}
// 将该线程阻塞以让出cpu使用权.等待后续唤醒.
LockSupport.park(current);
}
}
public void unlock(){
// 只有锁持有者才可以进行解锁
if (Thread.currentThread() != lockHolder){
throw new RuntimeException("lockHolder is not current thread");
}
int state = getState();
// 进行解锁,将state设置为0以便下个线程加锁
if (compareAndSwapState(state, 0)){
// 已经进行解锁,将斥锁人改为null
setLockHolder(null);
// 唤醒队首的线程
Thread first = waiters.peek();
if (first != null){
LockSupport.unpark(first);
}
}
}
}
public class UnsafeInstance {
public static Unsafe reflectGetUnsafe(){
try {
// 使用反射去获取Unsafe类,因为Unsafe这个魔法类可以绕过虚拟机直接操作内存. 视为危险操作.
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
return null;
}
}
使用我们完成的Lock类对方法进行加锁
private MyLock sync = new MyLock();
public void decStock(){
sync.lock();
Integer stock;
stock = getStock();
if (stock == null || stock <= 0){
System.out.println("下单失败,库存不足");
sync.unlock();
return;
}
stock--;
writeStock(stock);
System.out.println("下单成功,当前剩余库存 : " + stock);
sync.unlock();
}
结果显示:
下单成功,当前剩余库存 : 4
下单成功,当前剩余库存 : 3
下单成功,当前剩余库存 : 2
下单成功,当前剩余库存 : 1
下单成功,当前剩余库存 : 0
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
下单失败,库存不足
当然,AQS并没有仅仅是上面这么简单,他还加入了一些其他的特性如:
- 阻塞等待队列
- 共享/独占
- 公平/非公平
- 可重入
- 允许中断