多线程与高并发(四)

多线程与高并发(四)

线程池

Executor

执行者, 有一个方法 execute().

ExecutorService

继承自Executor, 也是一个接口.

除了execute()方法之外, 还完善了整个任务执行器的生命周期.

shutdown() 结束,

shutdownNow() 立刻结束,

isShutdown()是否已经结束

isTerminated()是否已经执行完了

awaitTermination(long timeout, TimeUnit unit)等待xx时间

Callable&Future

除此之外, ExecutorService中还有一个Future submit(Callable task)方法.

在java1.5的时候添加了一个叫Callable的接口, 为了解决Runnable没有返回值的情况.

同时还添加了一个Future接口, 在Callable执行完之后, 结果会封装到Future中.

public static void main(String[] args) throws ExecutionException, InterruptedException {
    Callable<String> call = new Callable(){

        @Override
        public String call() throws Exception {
            return "Hello Callable";
        }
    };

    ExecutorService service = Executors.newCachedThreadPool();
    Future<String> future = service.submit(call);

    // future.get() 方法是阻塞的,阻塞直到获取到线程的返回值
    System.out.println(future.get());

    service.shutdown();
}

还有一个类叫做FutureTask, 他可以既当Future又当Task.

public static void main(String[] args) throws ExecutionException, InterruptedException {
    FutureTask<Integer> task = new FutureTask<>(() -> {
        TimeUnit.MILLISECONDS.sleep(200);
        return 1000;
    });

    new Thread(task).start();

    System.out.println(task.get());
}

在FutureTask类中, 实现了RunnableFuture接口, 而Runnable接口又继承了Runnable和Future.

CompletableFuture

CompletableFuture可以用来管理多个Future.

public class Test03 {

   public static void main(String[] args) {

      CompletableFuture<Double> futureTB = CompletableFuture.supplyAsync(() -> priceOfTB());
      CompletableFuture<Double> futureJD = CompletableFuture.supplyAsync(() -> priceOfJD());
      CompletableFuture<Double> futurePDD = CompletableFuture.supplyAsync(() -> priceOfPDD());

      CompletableFuture.allOf(futureTB, futureJD, futurePDD).join();

   }

   public static double priceOfTB() {
      try {
         Thread.sleep(100);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return 1.0;
   }

   public static double priceOfJD() {
      try {
         Thread.sleep(200);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return 2.0;
   }

   public static double priceOfPDD() {
      try {
         Thread.sleep(300);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return 3.0;
   }
}

比如以上例子就是使用CompletableFuture来使多个Future都完成才继续.

也可以使用他的其他方法对返回值进行操作.

ThreadPoolExecutor

ThreadPoolExecutor是继承自AbstractExecutorService, 这是一个抽象类, 实现了ExecutorService接口.

在阿里代码规约中是禁止手动new Thread来开启多线程的, 也禁止使用Executors来创建线程池.

推荐的方法是使用ThreadPoolExecutor来手动创建一个线程池.

ThreadPoolExecutor的构造方法有七大参数

ThreadPoolExecutor tpe = new ThreadPoolExecutor(
    // 核心线程数
    2, 
    // 最大线程数
    4,
    // 生存时间
    60, 
    //生存时间单位
    TimeUnit.SECONDS,
    // 任务队列
    new ArrayBlockingQueue<>(4),
    // 线程工厂
    Executors.defaultThreadFactory(),
    // 拒绝策略
    new ThreadPoolExecutor.DiscardOldestPolicy());
  • 核心线程数
    • 线程池一创建出来就会有一些核心线程
  • 最大线程数
    • 当任务处理不过来, 线程池能扩展到的最大线程数
  • 生存时间
    • 线程如果长时间不工作, 就会将线程关闭
  • 生存时间单位
  • 任务队列
    • 阻塞队列 可以放入各种各样的BlockingQueue
  • 线程工厂
    • 传入一个ThreadFactory 可以设置ThreadFactory生产线程的名字
    • 线程的名字非常重要, 如果线程没有自定义名字, 当发现线程出问题的之后, 面对 pool-1-thread-1这种名字无从寻找问题.
  • 拒绝策略
    • 当线程池中的线程都在忙, 阻塞队列满了, 而且线程池已经到达最大线程数的时候, 就会启动拒绝策略.
    • JDK提供了默认四种拒绝策略. 也可以自定义.
    • AbortPolicy 抛异常
    • DiscardPolicy 扔掉, 不抛异常
    • DiscardOldestPolicy 扔掉排队时间最久的
    • CallerRunsPolicy 调用者处理

线程池工厂 Executors

Executors可以说是线程池的工厂, 是用来产生线程池的.

  1. newSingleThreadExecutor()

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
     public LinkedBlockingQueue() {
         this(Integer.MAX_VALUE);
     }

    创建一个只有一个线程的线程池, 为什么要有这种一个线程的线程池? 只想要一个线程直接new Thread不就行了吗 ?

    线程池中不光有线程, 还维护了任务队列, 如果自己new Thread就要自己维护这个任务队列. 而且线程池还可以维护生命周期.

    但是, 这个线程池是有问题的, 他的线程任务队列是LinkedBlockingQueue, 默认给的最大长度是Integer.MAX_VALUE. 这是一个非常危险的行为, 如果线程处理不过来, 导致任务越积越多, 可能会导致任务队列真的存放这么多任务最终导致OOM.

  2. newCachedThreadPool()

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    从参数中可以看出来, cachedThreadPool核心线程数是0, 最大线程数是Integer.MAX_VALUE, 而且任务队列是0.

    如果线程池中没有空闲线程, 就会起一个新的线程来执行这个任务.

    这显然也是有问题的, 如果同时大量的任务打过来很有可能会导致开启过多线程.而过多的线程可能会导致CPU的资源都耗费在线程切换上了.

  3. newFixedThreadPool(int nThreads)

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    创建一个固定线程数的线程池, 这个线程池和SingleThreadExecutor的问题一样, 都是任务队列太长, 可能会导致OOM.

  4. newScheduledThreadPool(int corePoolSize)

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

    定时器任务, 这个线程池和cachedThreadPool的问题是一样的, 最大线程数量太大.

并行和并发

并发: concurrent

并行: parallel

并发指任务提交, 并行指任务执行.

并行是并发的子集.

ThreadPoolExecutor源码解析

1、常用变量的解释

// 1. `ctl`,可以看做一个int类型的数字,高3位表示线程池状态,低29位表示worker数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 2. `COUNT_BITS`,`Integer.SIZE`为32,所以`COUNT_BITS`为29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 3. `CAPACITY`,线程池允许的最大线程数。1左移29位,然后减1,即为 2^29 - 1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// 4. 线程池有5种状态,按大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
// 5. `runStateOf()`,获取线程池状态,通过按位与操作,低29位将全部变成0
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 6. `workerCountOf()`,获取线程池worker数量,通过按位与操作,高3位将全部变成0
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 7. `ctlOf()`,根据线程池状态和线程池worker数量,生成ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }

/*
 * Bit field accessors that don't require unpacking ctl.
 * These depend on the bit layout and on workerCount being never negative.
 */
// 8. `runStateLessThan()`,线程池状态小于xx
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
// 9. `runStateAtLeast()`,线程池状态大于等于xx
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

2、构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 基本类型参数校验
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    // 空指针校验
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    // 根据传入参数`unit`和`keepAliveTime`,将存活时间转换为纳秒存到变量`keepAliveTime `中
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

3、提交执行task的过程

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    // worker数量比核心线程数小,直接创建worker执行任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // worker数量超过核心线程数,任务直接进入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()操作。
        // 这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化。
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 这儿为什么需要判断0值,主要是在线程池构造方法中,核心线程数允许为0
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果线程池不是运行状态,或者任务进入队列失败,则尝试创建worker执行任务。
    // 这儿有3点需要注意:
    // 1. 线程池不是运行状态时,addWorker内部会判断线程池状态
    // 2. addWorker第2个参数表示是否创建核心线程
    // 3. addWorker返回false,则说明任务执行失败,需要执行reject操作
    else if (!addWorker(command, false))
        reject(command);
}

4、addworker源码解析

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 外层自旋
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 这个条件写得比较难懂,我对其进行了调整,和下面的条件等价
        // (rs > SHUTDOWN) || 
        // (rs == SHUTDOWN && firstTask != null) || 
        // (rs == SHUTDOWN && workQueue.isEmpty())
        // 1. 线程池状态大于SHUTDOWN时,直接返回false
        // 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false
        // 3. 线程池状态等于SHUTDOWN,且队列为空,直接返回false
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        // 内层自旋
        for (;;) {
            int wc = workerCountOf(c);
            // worker数量超过容量,直接返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 使用CAS的方式增加worker数量。
            // 若增加成功,则直接跳出外层循环进入到第二部分
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            // 线程池状态发生变化,对外层循环进行自旋
            if (runStateOf(c) != rs)
                continue retry;
            // 其他情况,直接内层循环进行自旋即可
            // else CAS failed due to workerCount change; retry inner loop
        } 
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // worker的添加必须是串行的,因此需要加锁
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                // 这儿需要重新检查线程池状态
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // worker已经调用过了start()方法,则不再创建worker
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // worker创建并添加到workers成功
                    workers.add(w);
                    // 更新`largestPoolSize`变量
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 启动worker线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // worker线程启动失败,说明线程池状态发生了变化(关闭操作被执行),需要进行shutdown相关操作
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

5、线程池worker任务单元

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 这儿是Worker的关键所在,使用了线程工厂创建了一个线程。传入的参数为当前worker
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // 省略代码...
}

6、核心线程执行逻辑-runworker

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 调用unlock()是为了让外部可以中断
    w.unlock(); // allow interrupts
    // 这个变量用于判断是否进入过自旋(while循环)
    boolean completedAbruptly = true;
    try {
        // 这儿是自旋
        // 1. 如果firstTask不为null,则执行firstTask;
        // 2. 如果firstTask为null,则调用getTask()从队列获取任务。
        // 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待
        while (task != null || (task = getTask()) != null) {
            // 这儿对worker进行加锁,是为了达到下面的目的
            // 1. 降低锁范围,提升性能
            // 2. 保证每个worker执行的任务是串行的
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 如果线程池正在停止,则对当前线程进行中断操作
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            // 执行任务,且在执行前后通过`beforeExecute()`和`afterExecute()`来扩展其功能。
            // 这两个方法在当前类里面为空实现。
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                // 帮助gc
                task = null;
                // 已完成任务数加一 
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 自旋操作被退出,说明线程池正在结束
        processWorkerExit(w, completedAbruptly);
    }
}

WorkStealingPool

public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

WorkStealingPool和原来的区别就是, 每个线程都有自己单独的队列. 当某个线程执行完自己的任务时, 会去其他线程的队列中偷一个任务.

而原本的线程池都是ThreadPoolExecutor, WorkStealingPool是ForkJoinPool.

ForkJoinPool

ForkJoinPool适合做把一个大任务切分成一个个小任务, 小任务执行完的结果再汇总.

因为任务需要可以进行切分, 所以要求任务继承ForkJoinTask. 我们可以使用ForkJoinTask的子类RecursiveTask和RecursiveAction.

public class Test05 {

    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random random = new Random();

    static {
        for (int i = 0; i < nums.length; i++) {
            nums[i] = random.nextInt();
        }
        System.out.println("single calculate sum : " + Arrays.stream(nums).sum());
    }

    static class AddTask extends RecursiveTask<Long> {

        int start;
        int end;

        public AddTask(int start, int end){
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            if (end - start <= MAX_NUM){
                long sum = 0;
                for (int i = start; i < end; i++) {
                    sum += nums[i];
                }
                return sum;
            }else {
                int middle = start + (end - start) / 2;

                AddTask subTask1 = new AddTask(start, middle);
                AddTask subTask2 = new AddTask(middle, end);
                subTask1.fork();
                subTask2.fork();
                return subTask1.join() + subTask2.join();
            }
        }
    }
//    static class AddTask extends RecursiveAction {
//
//        int start;
//        int end;
//
//        public AddTask(int start, int end){
//            this.start = start;
//            this.end = end;
//        }
//
//        @Override
//        protected void compute() {
//            if (end - start <= MAX_NUM){
//                long sum = 0;
//                for (int i = start; i <= end; i++) {
//                    sum += i;
//                }
//                System.out.println("from : " + start + " to : " + end + " sum = " + sum);
//            }else {
//                int middle = start + (end - start) / 2;
//
//                AddTask subTask1 = new AddTask(start, middle);
//                AddTask subTask2 = new AddTask(middle + 1, end);
//                subTask1.fork();
//                subTask2.fork();
//            }
//        }
//    }

    public static void main(String[] args) throws IOException {

        ForkJoinPool fjp = new ForkJoinPool();
        AddTask task = new AddTask(0, nums.length);
        fjp.execute(task);

        System.out.println(task.join());

        System.in.read();
    }
}

ParallelStream

ParallelStream也是使用ForkJoinPool的一个Stream.

Disruptor

单机性能最好的MQ

Disruptor的特点

无锁, 高并发, 使用环形Buffer, 直接覆盖(不用清除)旧的数据, 降低GC频率

实现了基于时间的生产者消费者模式(观察者模式)


   转载规则


《多线程与高并发(四)》 echi1995 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
深入理解JVM(一) 深入理解JVM(一)
深入理解JVM(一)Java从编码到执行 从一个 .java文件到执行, 首先需要经过javac编译成.class文件, 然后使用java执行这个class文件. 在java命令开始后, .class文件会被classLoader加载到内存
下一篇 
多线程与高并发(三) 多线程与高并发(三)
多线程与高并发(三)多线程场景下的容器在Java日常开发中经常使用到容器. 下面就看看多线程场景下容器的选择 容器的选择Map容器一个场景: 100个线程, 每个线程向容器中添加10w条数据, 比较Hashtable,Collections
  目录