多线程与高并发(二)
LongAdder
先看一个例子
public class Test01 {
static long count1 = 0;
static AtomicLong count2 = new AtomicLong(0);
static LongAdder count3 = new LongAdder();
public static void main(String[] args) {
Thread[] threads = new Thread[1000];
Object lock = new Object();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 100000; j++) {
synchronized (lock) {
count1++;
}
}
});
}
long startTime = System.currentTimeMillis();
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long endTime = System.currentTimeMillis();
System.out.println("sync : " + (endTime - startTime));
// -------------
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 100000; j++) {
count2.incrementAndGet();
}
});
}
startTime = System.currentTimeMillis();
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
endTime = System.currentTimeMillis();
System.out.println("Atomic : " + (endTime - startTime));
// --------------
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 100000; j++) {
count3.increment();
}
});
}
startTime = System.currentTimeMillis();
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
endTime = System.currentTimeMillis();
System.out.println("LongAdder : " + (endTime - startTime));
}
}
例子描述的很简单, 都是起1000个线程做累加操作, 分别看看synchronized,Atomic类, LongAdder三种方式那种效率更高.
sync : 1365
Atomic : 2319
LongAdder : 473
执行时间可以看出, LongAdder远比其他两种方式更快.
那么为什么呢?
java doc
/**
* One or more variables that together maintain an initially zero
* {@code long} sum. When updates (method {@link #add}) are contended
* across threads, the set of variables may grow dynamically to reduce
* contention. Method {@link #sum} (or, equivalently, {@link
* #longValue}) returns the current total combined across the
* variables maintaining the sum.
*
* <p>This class is usually preferable to {@link AtomicLong} when
* multiple threads update a common sum that is used for purposes such
* as collecting statistics, not for fine-grained synchronization
* control. Under low update contention, the two classes have similar
* characteristics. But under high contention, expected throughput of
* this class is significantly higher, at the expense of higher space
* consumption.
*
* <p>LongAdders can be used with a {@link
* java.util.concurrent.ConcurrentHashMap} to maintain a scalable
* frequency map (a form of histogram or multiset). For example, to
* add a count to a {@code ConcurrentHashMap<String,LongAdder> freqs},
* initializing if not already present, you can use {@code
* freqs.computeIfAbsent(k -> new LongAdder()).increment();}
*
* <p>This class extends {@link Number}, but does <em>not</em> define
* methods such as {@code equals}, {@code hashCode} and {@code
* compareTo} because instances are expected to be mutated, and so are
* not useful as collection keys.
*
* @since 1.8
* @author Doug Lea
*/
LongAdder中会维护一个或多个变量,这些变量共同组成一个long型的“和”。当多个线程同时更新(特指“add”)值时,为了减少竞争,可能会动态地增加这组变量的数量。“sum”方法(等效于longValue方法)返回这组变量的“和”值。
当我们的场景是为了统计技术,而不是为了更细粒度的同步控制时,并且是在多线程更新的场景时,LongAdder类比AtomicLong更好用。 在小并发的环境下,论更新的效率,两者都差不多。但是高并发的场景下,LongAdder有着明显更高的吞吐量,但是有着更高的空间复杂度。
LongAddr维护一个Cell数组
LongAddr在高并发下性能更好的原因是使用了分段锁.
就像ConcurrentHashMap的分段锁一样, LongAddr中维护了一个Cell数组, 每个线程尽量都写在自己那个分段中, 当求和的时候再将每个分段中的值加起来.
ReentrantLock
ReentrantLock是可以替代Synchronized的.
public class Test02 {
synchronized void m(){
try {
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
System.out.println("m");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 可以替换成
public class Test02 {
Lock lock = new ReentrantLock();
void m(){
lock.lock();
try {
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
System.out.println("m");
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
注意, 上面的lock.lock()是不能放在try块中的.
ReentrantLock是要手动解锁的, 而且要确保必须要解锁.
synchronized在同步代码块或者同步方法执行完之后自动释放锁, 使用ReentrantLock要在finally中unlock.
为什么要用ReentrantLock
既然synchronized会自动释放锁, ReentrantLock要手动释放, 为什么还要用ReentrantLock呢?
tryLock
public class Test03 {
ReentrantLock lock = new ReentrantLock();
void m1(){
lock.lock();
try {
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
System.out.println(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
void m2(){
try {
boolean b = lock.tryLock(5, TimeUnit.SECONDS);
System.out.println("m2 " + b);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
Test03 t = new Test03();
new Thread(t::m1).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(t::m2).start();
}
}
ReentrantLock提供了tryLock方法, 在给定时间内尝试获取锁.
当锁获取成功, 或者超市之后返回. 会返回一个boolean类型的值, 标志是否获得锁成功,
后续的处理可以根据这个布尔值来判断.
当然,上述代码是有问题的, 如果tryLock没有获取到锁, 就不应该在finally中unlock().
lockInterruptly
lock.lockInterruptibly()的作用是:如果当前线程未被中断,则获取锁定(需要等待别的线程释放锁才行),如果已被中断则出现异常。但是使用lock.lock()时,当前线程被中断,不会报错。
公平锁
公平锁的意思是, 按照线程访问的顺序获取锁, 而非公平锁会有一个抢占锁的过程.
非公平锁的性能要更好一些, ReentrantLock和synchronized都是默认非公平锁.
ReentrantLock lock = new ReentrantLock(true);
这就代表是公平锁.
当已经加锁时, 其他线程过来访问是会被加入到一个等待队列中. 此时如果有一个线程过来访问, 该线程是否会去检查等待队列中是否有等待线程是判断该锁是否公平的条件.
非公平锁不会判断等待队列中是否有线程正在等待,而直接去试图获取锁.
此时是有可能直接获取到锁而先于其他先到并且等待的线程获得到锁.
CountDownLatch
countdown 倒数 latch 门闩
差不多相当于赛跑前 3,2,1 砰 的作用.
private static void usingCountDownLatch(){
Thread[] threads = new Thread[100];
CountDownLatch latch = new CountDownLatch(threads.length);
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
int result = 0;
for (int j = 0; j < 10000; j++) {
result += j;
}
latch.countDown();
});
}
for (Thread thread : threads) {
thread.start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end latch");
}
开启100个线程, 每个线程在做完从1加到10000之后, 使用latch.countDown();
在countDownLatch上-1, 当countDownLatch为中值为0 的时候, 可以继续从latch.await();
向下走.
join()也可以做到这个, 为什么要用CountDownLatch呢?
join必须是线程结束之后才结束, 而且所有注册到这个线程上的线程全部都要结束.
而CountDownLatch的大小可以自己设置, 一个线程里可以多次调用countDown().
比如有100个线程, 要求只要80个做完就可以, 那就可以new CountDownLatch(80);
CyclicBarrier
栅栏
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(20, new Runnable() {
@Override
public void run() {
System.out.println("满人 发车");
}
});
for (int i = 0; i < 100; i++) {
new Thread(() -> {
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
这段代码的意思是, 每来一个线程, 运行到barrier.await();
时, 当barrier中堆了20个,就把栅栏放下来.
输出:
满人 发车
满人 发车
满人 发车
满人 发车
满人 发车
ReadWriteLock
读写锁的概念就是共享锁和排他锁
public class Test07 {
static Lock lock = new ReentrantLock();
private static int value;
static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();
public static void read(Lock lock){
lock.lock();
try {
Thread.sleep(1000);
System.out.println("read over!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public static void write(Lock lock, int v){
lock.lock();
try {
Thread.sleep(1000);
value = v;
System.out.println("write over!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
Runnable readR = () -> read(lock);
Runnable writeR = () -> write(lock, new Random().nextInt());
for (int i = 0; i < 18; i++) {
new Thread(readR).start();
}
for (int i = 0; i < 2; i++) {
new Thread(writeR).start();
}
}
}
如果是上面这种互斥锁, 那么20个线程无论如何都是要运行20秒的.
但是读的时候并没有发生数据改变, 因此读和读之间并不需要加锁.
简单修改一下:
public class Test07 {
static Lock lock = new ReentrantLock();
private static int value;
static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();
public static void read(Lock lock){
lock.lock();
try {
Thread.sleep(1000);
System.out.println("read over!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public static void write(Lock lock, int v){
lock.lock();
try {
Thread.sleep(1000);
value = v;
System.out.println("write over!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
Runnable readR = () -> read(readLock);
Runnable writeR = () -> write(writeLock, new Random().nextInt());
for (int i = 0; i < 18; i++) {
new Thread(readR).start();
}
for (int i = 0; i < 2; i++) {
new Thread(writeR).start();
}
}
}
将传入的锁换成读写锁, 理论上来说读的18个线程应该可以互不影响的读, 1秒内就可以完成18个读操作.
Semaphore
信号量
public static void main(String[] args) {
Semaphore s = new Semaphore(1);
new Thread(() -> {
try {
s.acquire();
System.out.println("T1 running..");
Thread.sleep(200);
System.out.println("T1 running..");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
s.release();
}
}).start();
new Thread(() -> {
try {
s.acquire();
System.out.println("T2 running..");
Thread.sleep(200);
System.out.println("T2 running..");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
s.release();
}
}).start();
}
Semaphore s = new Semaphore(1);
的意思是, 开一个信号量, 当一个线程获取锁的时候该信号量-1, 如果为0则获取失败,必须等待.
主要是用于限流, 比如卖票, 开5个窗口, 最多同时允许5个线程同时运行.
Semaphore 也可以选择公平和非公平.
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Exchanger
交换
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
String s = "T1";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t1").start();
new Thread(() -> {
String s = "T2";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t2").start();
}
输出:
t1 T2
t2 T1
Exchanger.exchange()方法是阻塞的.
t1执行到exchange()方法的时候将要交换的值放入Exchanger中, 然后阻塞等待.
t2执行到exchange()方法的时候在Exchanger中进行交换. 然后两个线程回复运行.
LockSupport
LockSupport可以实现对某个线程的阻塞和唤醒.
如果用nofity(), 会唤醒阻塞的所有线程, 而想要针对某一个特定的线程进行唤醒是较为困难的.
public static void main(String[] args) {
Thread t = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(i);
if (i == 5){
LockSupport.park();
}
}
});
t.start();
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after 8s");
LockSupport.unpark(t);
}
当子线程打印出5之后, 使子线程阻塞.
在主线程中等待8秒之后, 唤醒特定的子线程.
AQS同步器中就是使用LockSupport将线程阻塞的.
另一个有意思的事情是, unpark() 可以先于park()调用.
public static void main(String[] args) {
Thread t = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(i);
if (i == 5){
LockSupport.park();
}
}
});
t.start();
LockSupport.unpark(t);
}
如果是notify的话, nofity在wait之前是没有作用的.
但是在LockSupport中, 如果先执行了unpark(), 在后续阻塞的时候就会直接放行.
线程可见性
一个小例子:
直接使用volatile
public class Test11 {
static List list = new ArrayList<>();
public static void main(String[] args) {
new Thread(() -> {
for (int i = 0; i < 10; i++) {
list.add(i);
System.out.println(i);
}
}, "t1").start();
new Thread(() -> {
while (true){
if (list.size() == 5){
break;
}
}
System.out.println("t2 结束");
}, "t2").start();
}
}
t1一直在向list中添加数据, t2在检视list的大小, 当list.size()大于5的时候跳出循环打印信息.
结果可能会出现t1一直在执行, 已经执行完了t2还是没有监视到list.size()大于5.
这种情况我们之前碰到过, 没有加volatile导致线程间不可见.
那将list前加上volatile重新运行测试
发现还是有t2一直不结束的情况出现.
volatile不是已经保证线程可见性了吗?
但是List类型是个引用类型, volatile保证的是这个引用的地址值.
因为list向里面添加并没有改变地址值, 所以t2中感知不到list的变化.
使用synchronized
public class Test12 {
static volatile List list = new ArrayList<>();
public static void main(String[] args) {
Object lock = new Object();
new Thread(() -> {
synchronized (lock){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t2 结束");
}
}, "t2").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
synchronized (lock){
for (int i = 0; i < 10; i++) {
list.add(i);
System.out.println(i);
if (list.size() == 5){
lock.notify();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "t1").start();
}
}
使用synchronized的方式加锁, 在t1线程向list中添加了5个元素之后, nofity唤醒t2.
最后结果依然是 t1添加完十次之后, t2线程才会打印.
这里涉及到notify方法, notify不会释放锁
.
因此t2在wait等待被t1nofity之后, 必须要获取到lock这把锁.
那么做一个小小的修改:
public class Test13 {
static volatile List list = new ArrayList<>();
public static void main(String[] args) {
Object lock = new Object();
new Thread(() -> {
synchronized (lock){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t2 结束");
lock.notify();
}
}, "t2").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
synchronized (lock){
for (int i = 0; i < 10; i++) {
list.add(new Object());
System.out.println(i);
if (list.size() == 5){
lock.notify();
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "t1").start();
}
}
t1线程在notify唤醒t2之后, 自己wait释放这把锁, t2获取到这把锁之后打印结束的文字, 并且再次唤醒t1, 使t1继续未完成的向list中添加元素操作.
使用CountDownLatch
public class Test14 {
static volatile List list = new ArrayList<>();
public static void main(String[] args) {
Object lock = new Object();
CountDownLatch latch = new CountDownLatch(5);
new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t2 结束");
}, "t2").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
synchronized (lock){
for (int i = 0; i < 10; i++) {
list.add(new Object());
System.out.println(i);
latch.countDown();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "t1").start();
}
}
如果将t1中的sleep去掉
public class Test14 {
static volatile List list = new ArrayList<>();
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(5);
new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t2 结束");
}, "t2").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
for (int i = 0; i < 10; i++) {
list.add(new Object());
System.out.println(i);
latch.countDown();
}
}, "t1").start();
}
}
此时的输出是有问题的, 在t1中size为5的时候, 此时t2的latch已经打开.
但是t1并没有让渡CPU使用权, 所以t2仍然只能在t1执行完之后再执行.
解决方法就是再添加一个CountDownLatch, 当size为5的时候将t1拦住, t2执行完后再将t1放开.
使用LockSupport
使用LockSupport同理, 也是需要两个, 在t1的size为5时将t2unpark, 将自己park.
在t2完成之后将t1unpark.
两个线程交替打印
两个线程交替打印, 一个线程打印AZ, 一个线程打印025
public static void main(String[] args) {
String s = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
ReentrantLock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Thread t1 = new Thread(() -> {
for (int i = 0; i < s.length(); i++) {
lock.lock();
try {
System.out.println(s.charAt(i));
condition2.signal();
condition1.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 26; i++) {
lock.lock();
try {
System.out.println(i);
condition1.signal();
condition2.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}, "t2");
t1.start();
t2.start();
}
实现一个生产者消费者容器
public class Test16<T> {
final private LinkedList<T> lists = new LinkedList<T>();
final private int MAX = 10;
public synchronized void put(T t){
while (lists.size() == MAX){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lists.add(t);
this.notifyAll();
}
public synchronized T consume(){
while (lists.size() == 0){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = lists.removeFirst();
this.notifyAll();
return t;
}
public static void main(String[] args) {
Test16<String> t = new Test16<>();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 5; j++) {
System.out.println(t.consume());
}
}, "c"+i).start();
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < 25; j++) {
t.put(Thread.currentThread().getName() + " " + j);
}
}, "p"+i).start();
}
}
}
ReentrantLock版本, 使用Condition指定唤醒生产者或消费者
public class Test17<T> {
final private LinkedList<T> lists = new LinkedList<T>();
final private int MAX = 10;
private ReentrantLock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();
public void put(T t){
lock.lock();
try {
while (lists.size() == MAX){
producer.await();
}
lists.add(t);
consumer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public synchronized T consume(){
lock.lock();
try {
while (lists.size() == 0){
consumer.await();
}
T t = lists.removeFirst();
producer.signalAll();
return t;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return null;
}
public static void main(String[] args) {
Test17<String> t = new Test17<>();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 5; j++) {
System.out.println(t.consume());
}
}, "c"+i).start();
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < 25; j++) {
t.put(Thread.currentThread().getName() + " " + j);
}
}, "p"+i).start();
}
}
}
AQS
从lock()开始看AQS源码
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
lock.lock();
}
进入lock()
// ReentrantLock.java
public void lock() {
// 调用Sync的lock方法
// 此时的sync是Sync的子类NonfairSync
sync.lock();
}
// NonfairSync
final void lock() {
// cas交换state状态
// 因此Sync是继承AQS的, 并且没有重写这个方法
if (compareAndSetState(0, 1))
// 如果cas操作成功, 将当前线程保存在AQS的持有线程中
// AQS中维护了一个exclusiveOwnerThread, 标志持有锁的线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 如果cas失败, 调用AQS中的acquire方法
acquire(1);
}
// AbstractQueuedSynchronizer.java
protected final boolean compareAndSetState(int expect, int update) {
// stateOffset是16
// AQS内部维护了一个int 类型 使用volatile修饰的state
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
public final void acquire(int arg) {
// AQS中的tryAcquire方法直接抛出了一个异常
// 会调回到Sync中的tryAcquire方法
// 如果tryAcquire返回true, 尝试加锁成功, !true = false, 后面判断条件短路, 方法结束
if (!tryAcquire(arg) &&
// 如果尝试获取锁失败, 将线程放入队列
// AQS中维护了一个Node组成的双向链表
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 没有获取到锁, 而且成功放到等待队列中, 将自己线程阻塞节约CPU资源
selfInterrupt();
}
// NonfairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取当前锁状态
int c = getState();
// 如果c==0, 意味着没加锁, 可以进行获取锁操作
if (c == 0) {
// 尝试获取锁, 如果获取成功, 如上面的lock方法一样
// 如果失败, 返回false
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果c!=0, 意味着已经加锁了.
// 判断一下当前线程是不是持锁线程
else if (current == getExclusiveOwnerThread()) {
// 如果当前线程是吃锁线程, 将state加上传入的参数
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 到这里就意味着, 要么已经上锁, 要么没有和其他线程抢锁失败
// 方法名叫tryAcquire, 尝试获取 尝试获取失败, 返回false
return false;
}
// AbstractQueuedSynchronizer.java
private Node addWaiter(Node mode) {
// 用当前线程和给定的模式创建一个node
Node node = new Node(Thread.currentThread(), mode);
// 尝试是否可以直接添加到等待队列队尾
Node pred = tail;
// 如果tail是空,在enq()方法中初始化
// 如果tail不是空, 用cas来尝试入队
if (pred != null) {
node.prev = pred;
// 如果cas成功, 直接返回
// 如果失败, 进入enq方法, 以自旋的方式尝试直到成功
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// AbstractQueuedSynchronizer.java
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果等待队列没有初始化, 将新建一个node节点
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 尝试添加到队尾
// 如果没有添加成功,就不停的重试
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// AbstractQueuedSynchronizer.java
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取到node的前置节点
final Node p = node.predecessor();
// 如果前置节点就是head, 就去尝试获取一下锁
if (p == head && tryAcquire(arg)) {
// 如果获取成功了,说明前面的节点已经释放锁了, 那么当前节点就是head
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 看看当前节点是否应该被park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// AbstractQueuedSynchronizer.java
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果现在的状态已经是SIGNAL, 就将这个线程park
if (ws == Node.SIGNAL)
return true;
// 如果现在的状态是CANCELLED, 就把这个线程移出等待队列
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果现在状态是CONDITION或PROPAGATE, 就把线程状态改成SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- CANCELLED:1,在同步队列中等待的线程等待超时或者中断,需要从同步队列中取消等待。
- SIGNAL:-1,后继节点的线程处于等待,而当前节点的线程如果释放同步状态或者取消,将会通知后继节点,表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
- CONDITION:-2,节点在等待队列中,节点线程在等待condition上,当其他线程对Condition调用signal()后,会把该节点从等待队列转移到同步队列中,加入到对同步状态的获取中
- PROPAGATE:-3,表示下一次共享式同步状态获取将会无条件地被传播下去
- INITIAL:0,初始化状态
ThreadLocal
public class Test19 {
static ThreadLocal<Person> tl = new ThreadLocal<>();
public static void main(String[] args) {
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(tl.get());
}).start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
tl.set(new Person());
}).start();
}
static class Person{
String name = "zhangsan";
}
}
上面这段代码中使用了ThreadLocal, 在t1等待两秒, t2等待一秒后设置一个new Person(), t1获取到的结果是null
因为ThreadLocal是给每个线程保存自己线程的值, 线程间不通用.
// ThreadLocal.java
public void set(T value) {
// 获取当前线程
Thread t = Thread.currentThread();
// 根据当前线程, 获取ThreadLocalMap
ThreadLocalMap map = getMap(t);
if (map != null)
// 将当前threadLocal对象作为key放进map
map.set(this, value);
else
createMap(t, value);
}
// ThreadLocal.java
ThreadLocalMap getMap(Thread t) {
// 获取ThreadLocalMap
// t的threadLocals是存放在Thread类中的.
// 因此此处返回的是当前线程的threadLocals
return t.threadLocals;
}
// Thread.java
// ThreadLocalMap是保存在Thread类中的
ThreadLocal.ThreadLocalMap threadLocals = null;
Java的四种引用
- 强引用
- 软引用
- 弱引用
- 虚引用
强引用
普通我们用到的引用就是强引用.
Object o = new Object();
public class Test20 {
public static void main(String[] args) throws IOException {
M m = new M();
m = null;
System.gc();
System.in.read();
}
}
class M {
@Override
protected void finalize() throws Throwable {
System.out.println("finalize");
}
}
在Object类中有一个方法finalize(), 这个方法是对象被垃圾回收之前会调用的方法
软引用
当一个对象被软引用指向的时候, 当内存不够用的时候就会回收.
public class Test21 {
public static void main(String[] args) {
// 用软引用 分配一个10m的byte数组
SoftReference<byte[]> m = new SoftReference<>(new byte[1024 * 1024 * 10]);
System.out.println(m.get());
// 此时20m的内存只占用10m, 尝试垃圾回收
System.gc();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 垃圾回收后再尝试获取
System.out.println(m.get());
// 20m的内存已经使用了10m, 再次分配15m内存, 此时内存不够用, 软引用应该要被回收
byte[] bytes = new byte[1024 * 1024 * 15];
System.out.println(m.get());
}
}
在运行这个代码之前, 先将JVM的堆内存设置成20m. 加上参数: -Xms20M -Xmx20M
java8运行结果会是java.lang.OutOfMemoryError: Java heap space
, java11的运行会自动将m引用删掉.
可以用作缓存
弱引用
只要GC就会被回收.
public class Test22 {
public static void main(String[] args) {
WeakReference<M> m = new WeakReference<>(new M());
// 获取弱引用
System.out.println(m.get());
// 调用GC
System.gc();
// 再次尝试获取弱引用
System.out.println(m.get());
ThreadLocal<M> threadLocal = new ThreadLocal<>();
threadLocal.set(new M());
//
threadLocal.remove();
}
}
com.echi.thread.M@76ed5528
null
finalize
弱引用的作用在于, 如果有一个强引用指向他的时候, 只要这个强引用消失掉, 这个弱引用也会在下一次GC回收.
弱引用的一个应用就是ThreadLocal
// ThreadLocalMap中实际是Entry数组,
// 而Entry是继承自WeakReference的
// 也就是说Entry也是一个弱引用.
// 所以这个key是通过弱引用只想的ThreadLocal对象
static class Entry extends WeakReference<ThreadLocal<?>> {}
// 而threadLocal在Thread中是一个强引用,
// 因此, 当线程结束的时候, ThreadLocal上的强引用消失.
// 如果Entry是一个强引用, threadLocal就不能被回收, 有可能发生内存泄漏
那么还有一个问题, 当key指向的弱引用被回收了之后,key变为null, value再也无法被访问到了.
但是value是被强引用指向的, 仍然不会回收.
当这个threadLocals中的key越来越多的时候, 仍可能会出现内存泄漏的情况.
所以使用threadLocal, 当用完了之后一定要remove();
虚引用
虚引用主要是用来管理堆外内存.
// 先将堆内存调小
public class Test23 {
private static final List<Object> LIST = new LinkedList<>();
// 引用queue 装引用的队列
private static final ReferenceQueue<M> QUEUE = new ReferenceQueue<>();
public static void main(String[] args) {
// 先用虚引用装一个M
PhantomReference<M> phantomReference = new PhantomReference<>(new M(), QUEUE);
new Thread(() -> {
while (true){
// 循环的申请1m的空间
LIST.add(new byte[1024*1024]);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 尝试获取虚引用
System.out.println(phantomReference.get());
}
}).start();
new Thread(() -> {
while (true){
// 看看队列中有没有值
Reference<? extends M> poll = QUEUE.poll();
if (poll != null){
System.out.println(" 虚引用被回收了 " + poll);
}
}
}).start();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
虚引用调用get()方法只会返回null,
null
null
null
null
null
null
null
null
null
null
finalize
null
null
null
null
null
虚引用被回收了 java.lang.ref.PhantomReference@2f791ac9
null
null
在NIO中有一个直接内存, DirectByteBuffer. 这个Buffer使用的是直接内存, 堆外内存.
当DirectByteBuffer回收的时候可以通过QUEUE检测到, 从而清理堆外内存.