查看: 101|回复: 0

详细讲解Java线程池(二)

[复制链接]

6

主题

11

帖子

26

积分

新手上路

Rank: 1

积分
26
发表于 2023-7-18 20:46:59 | 显示全部楼层 |阅读模式
作者:佳瑞Jarrett
著作权归作者所有。商业转载请联系作者进行授权,非商业转载请注明出处。
<hr/>八. 线程的异常捕获

1. 常见范式

1)如果我们使用 execute()提交任务,我们一般要在 Runable 任务的代码加上try-catch 进行异常处理。
2)如果我们使用 submit()提交任务,我们一般要在主线程中,对Future.get()进行 try-catch 进行异常处理。
submit()底层实现依赖 execute(),两者应该统一呀,为什么有差异呢? 下面再扒一扒 submit()的源码。
ThreadPoolExecutor 中没有submit的代码,而是在父类AbstractExecutorService中,有三个submit的重载方法,关键就两行newTaskFor()。
/**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }2.newTaskFor()方法

正是因为这三个重载方法,都调用了 execute,所以 submit 底层依赖execute。 通过查看这里 execute 的实现,不难发现,它就是 ThreadPoolExecutor中的实现。所以,造成 submit 和 execute 的差异化的代码,不在这里。
那么造成差异的一定在 newTaskFor 方法中。 这个方法也就 new 了一个 FutureTask 而已, FutureTask实现 RunnableFuture 接口, RunnableFuture 接口继承 Runnable 接口和 Future 接口。而 Callable 只是FutureTask 的一个成员变量。
3.get()方法

另一个 Java 基础知识点: Callable 和 Future 的关系。 我们一般用 Callable 编写任务代码, Future 是异步返回对象,通过它的 get 方法,阻塞式地获取结果。 FutureTask 的核心代码就是实现了 Future 接口,也就是 get 方法的实现:
/**
     * @throws CancellationException {@inheritDoc}
     */
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            // 核心代码
            s = awaitDone(false, 0L);
        return report(s);
    }其中awaitDone()方法如下:
/**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        // 死循环
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            // 只有任务状态是已完成,才会跳出死循环
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }get 的核心实现是有个 awaitDone 方法,这是一个死循环,只有任务的状态是“已完成”,才会跳出死循环;否则会依赖 UNSAFE 包下的 LockSupport.park 原语进行阻塞,等待LockSupport.unpark 信号量。 而这个信号量只有当运行结束获得结果、或者出现异常的情况下,才会发出来。 分别对应方法 set 和setException。
为什么 submit 之后,通过get 方法可以获取到异常?原因是FutureTask 有一个 Object 类型的 outcome 成员变量,用来记录执行结果。 这个结果可以是传入的泛型,也可以是 Throwable 异常:
FutureTask中的run()方法:
public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }get()方法依赖的report()方法:
/**
     * Returns result or throws exception for completed task.
     *
     * @param s completed state value
     */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        // outcome中记录了执行结果
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }FutureTask 的另一个巧妙的地方就是借用 RunnableAdapter 内部类,将submit 的Runnable 封装成 Callable。 所以就算你 submit 的是 Runnable,一样可以用 get 获取到异常。
4.异常处理

ass="nolink">1)不论是用 execute 还是 submit,都可以自己在业务代码上加 try-catch 进行异常处理。 我一般喜欢使用这种方式,因为我喜欢对不同业务场景的异常进行差异化处理,至少打不一样的日志吧。
lass="nolink">2)如果是 execute,还可以自定义线程池,继承ThreadPoolExecutor 并复写其afterExecute(Runnable r, Throwable t)方法。或者实现 Thread.UncaughtExceptionHandler 接口,实现void uncaughtException(Thread t, Throwable e);方法,并将该 handler 传递给线程池的 ThreadFactory。
3)但是注意, afterExecute 和 UncaughtExceptionHandler 都不适用 submit。 因为通过上面的 FutureTask.run()不难发现,它自己对 Throwable 进行了 try-catch,封装到了 outcome 属性,所以底层方法 execute 的 Worker 是拿不到异常信息的。
九. corePoolSize = 0

1)线程池提交任务后,首先判断当前池中线程数是否小于 corePoolSize。
2)如果小于则尝试创建新的线程执行该任务;否则尝试添加到等待队列。
3)如果添加队列成功,判断当前池内线程数是否为0,如果是则创建一个 firstTask 为null 的 worker,这个worker 会从等待队列中获取任务并执行。
4)如果添加到等待队列失败,一般是队列已满,才会再尝试创建新的线程。
5)但在创建之前需要与 maximumPoolSize 比较,如果小于则创建成功。
6)否则执行拒绝策略。
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                // 添加到等待队列成功后, 判断当前池内线程数是否为 0, 如果
                // 是则创建一个 firstTask 为 null 的 worker, 这个 worker 会从等待队列中获取任务并执行。
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }十. 自行实现一个线程池代码

参考代码:https://www.jianshu.com/p/9b7dfb407f72
1.线程池的变量

/**存放线程的集合*/
private ArrayList<MyThead> threads;
/**任务队列*/
private ArrayBlockingQueue<Runnable> taskQueue;

/**线程池初始限定大小*/
private int threadNum;
/**已经工作的线程数目*/
private int workThreadNum;2.线程池的核心方法

public void execute(Runnable runnable) {
        try {
            mainLock.lock();
            //线程池未满,每加入一个任务则开启一个线程
            if(workThreadNum < threadNum) {
                MyThead myThead = new MyThead(runnable);
                myThead.start();
                threads.add(myThead);
                workThreadNum++;
            }
            //线程池已满,放入任务队列,等待有空闲线程时执行
            else {
                //队列已满,无法添加时,拒绝任务
                if(!taskQueue.offer(runnable)) {
                    rejectTask();
                }
            }
        } finally {
            mainLock.unlock();
        }
    }到这里,一个线程池已经实现的差不多了,我们还有最后一个难点要解决:从任务队列中取出任务,分配给线程池中“空闲”的线程完成。
分配任务给线程的第一种思路
很容易想到一种解决思路:额外开启一个线程,时刻监控线程池的线程空余情况,一旦有线程空余,则马上从任务队列取出任务,交付给空余线程完成。
这种思路理解起来很容易,但仔细思考,实现起来很麻烦(1. 如何检测到线程池中的空闲线程 2. 如何将任务交付给一个.start()运行状态中的空闲线程)。而且使线程池的架构变的更复杂和不优雅。
分配任务给线程的第二种思路
现在我们来讲第二种解决思路:线程池中的所有线程一直都是运行状态的,线程的空闲只是代表此刻它没有在执行任务而已;我们可以让运行中的线程,一旦没有执行任务时,就自己从队列中取任务来执行。
为了达到这种效果,我们要重写run方法,所以要写一个自定义Thread类,然后让线程池都放这个自定义线程类
class MyThead extends Thread{
        private Runnable task;
        
        public MyThead(Runnable runnable) {
            this.task = runnable;
        }
        @Override
        public void run() {
            //该线程一直启动着,不断从任务队列取出任务执行
            while (true) {
                //如果初始化任务不为空,则执行初始化任务
                if(task != null) {
                    task.run();
                    task = null;
                }
                //否则去任务队列取任务并执行
                else {
                    Runnable queueTask = taskQueue.poll();
                    if(queueTask != null)
                        queueTask.run();   
                }
            }
        }
    }3.简单线程池

/**
* 自定义简单线程池
*/
public class MyThreadPool{
    /**存放线程的集合*/
    private ArrayList<MyThead> threads;
    /**任务队列*/
    private ArrayBlockingQueue<Runnable> taskQueue;
    /**线程池初始限定大小*/
    private int threadNum;
    /**已经工作的线程数目*/
    private int workThreadNum;
   
    private final ReentrantLock mainLock = new ReentrantLock();
   
    public MyThreadPool(int initPoolNum) {
        threadNum = initPoolNum;
        threads = new ArrayList<>(initPoolNum);
        //任务队列初始化为线程池线程数的四倍
        taskQueue = new ArrayBlockingQueue<>(initPoolNum*4);
        
        threadNum = initPoolNum;
        workThreadNum = 0;
    }
   
    public void execute(Runnable runnable) {
        try {
            mainLock.lock();
            //线程池未满,每加入一个任务则开启一个线程
            if(workThreadNum < threadNum) {
                MyThead myThead = new MyThead(runnable);
                myThead.start();
                threads.add(myThead);
                workThreadNum++;
            }
            //线程池已满,放入任务队列,等待有空闲线程时执行
            else {
                //队列已满,无法添加时,拒绝任务
                if(!taskQueue.offer(runnable)) {
                    rejectTask();
                }
            }
        } finally {
            mainLock.unlock();
        }
    }
   
    private void rejectTask() {
        System.out.println("任务队列已满,无法继续添加,请扩大您的初始化线程池!");
    }
    public static void main(String[] args) {
        MyThreadPool myThreadPool = new MyThreadPool(5);
        Runnable task = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"执行中");
            }
        };
        
        for (int i = 0; i < 20; i++) {
            myThreadPool.execute(task);
        }
    }
   
    class MyThead extends Thread{
        private Runnable task;
        
        public MyThead(Runnable runnable) {
            this.task = runnable;
        }
        @Override
        public void run() {
            //该线程一直启动着,不断从任务队列取出任务执行
            while (true) {
                //如果初始化任务不为空,则执行初始化任务
                if(task != null) {
                    task.run();
                    task = null;
                }
                //否则去任务队列取任务并执行
                else {
                    Runnable queueTask = taskQueue.poll();
                    if(queueTask != null)
                        queueTask.run();   
                }
            }
        }
    }
}4.总结

自定义线程池的整个工作过程:
1.初始化线程池,指定线程池的大小。
2.向线程池中放入任务执行。
3.如果线程池中创建的线程数目未到指定大小,则创建我们自定义的线程类放入线程池集合,并执行任务。执行完了后该线程会一直监听队列
4.如果线程池中创建的线程数目已满,则将任务放入缓冲任务队列
5.线程池中所有创建的线程,都会一直从缓存任务队列中取任务,取到任务马上执行
十一. 线程池的监控与评估

对于资源紧张的应用,如果担心线程池资源使用不当,可以利用 ThreadPoolExecutor 的 API 实现简单的监控,然后进行分析和优化。


ThreadPoolExecutor类的getActiveCount()方法:该方法可以获取线程池中当前正在执行任务的线程数量。
ThreadPoolExecutor类的getPoolSize()方法:该方法可以获取线程池中当前的线程数量。
ThreadPoolExecutor类的getQueue()方法:该方法可以获取线程池中的任务队列。
ThreadPoolExecutor类的getCompletedTaskCount()方法:该方法可以获取线程池中已经完成的任务数量。
多线程的情况下数据如何线程间同步? 使用了hashMap会报错吗?

  • JConsole工具:JConsole是Java自带的监控工具,可以用于监控Java应用程序的运行情况,包括线程池的状态和行为。
  • VisualVM工具:VisualVM是一个免费的Java性能分析工具,可以用于监控Java应用程序的运行情况,包括线程池的状态和行为。
十二. 使用线程池一些建议

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
说明:Executors 返回的线程池对象的弊端如下:
1) Excutors.fixedThreadPool(fixedPoolSize)和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2) Excutors.cachedThreadPool()允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM
【建议】创建线程池的时候应该尽量给线程给个具体的业务名字前缀,方便定位问题
// 1. 使用guava的ThreadFactoryBuilder
ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();


// 2. 自己实现 ThreadFactory接口

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 线程工厂,它设置线程名称,有利于我们定位问题。
*/
public final class NamingThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final ThreadFactory delegate;
    private final String name;

    /**
     * 创建一个带名字的线程池生产工厂
     */
    public NamingThreadFactory(ThreadFactory delegate, String name) {
        this.delegate = delegate;
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = delegate.newThread(r);
        t.setName(name + threadNum.incrementAndGet());
        return t;
    }
}【建议】不同类型的业务任务尽量使用不同的线程池
十三. 其他问题

1.线程池如何复用已经创建的线程?

线程池中的一个Work对象可以看做是一个线程。如果线程池中的线程数已经到达最大值,则可以复用Woker中的线程,即不断循环从队列中获取任务然后然后执行任务,如果从阻塞队列中获取到的任务不未null,
这样则能够复用线程执行任务,
2.核心工作线程是否会被回收?

线程池中有个allowCoreThreadTimeOut字段能够描述是否回收核心工作线程,线程池默认是false表示不回收核心线程,我们可以使用allowCoreThreadTimeOut(true)方法来设置线程池回收核心线程
3.线程池需不需要关闭

一般来讲,线程池的生命周期跟随服务的生命周期。 如果一个服务(Service)停止服务了,那么需要调用 shutdown 方法进行关闭。 所以 ExecutorService.shutdown 在Java 以及一些中间件的源码中,是封装在 Service 的 shutdown 方法内的。
4.线程池关闭的shutdown 和 shutdownNow的区别是什么?


  • shutdown => 平缓关闭,等待所有已添加到线程池中的任务执行完再关闭。
  • shutdownNow => 立刻关闭,停止正在执行的任务,并返回队列中未执行的任务。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

快速回复 返回顶部 返回列表