package com.trip.platform.task;
import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.springframework.stereotype.Component;
/** * Task执行者. * */ @Component public class TaskExecutor {
/** * 线程池(最大100,队列满了重新执行). */ private ExecutorService executor = new ThreadPoolExecutor(2, 100, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),new ThreadPoolExecutor.CallerRunsPolicy());
/** * 自动执行 */ @PostConstruct public void execute() {
Thread t = new pollThread(); t.setDaemon(true); t.setName("TaskExecutor"); t.start(); }
/** * 执行线程. * */ class pollThread extends Thread { public void run() { while (true) {
// 从每个IPCC的子队列取出一个task,放入线程池 List<QueueTask> list = MultiQueueTaskCenter.pollTask(); for (QueueTask task : list) { executor.execute(task); }
// 休眠指定的时间 try { Thread.sleep(10); //1秒取100次任务 } catch (InterruptedException e) { } } } } }
很简单,一个线程不停的从队列拉任务,扔到线程池执行。 但是,发现任务多时,任务堆积,无人处理。 发现拉任务的线程不见了……
看日志,发现异常信息:
Exception in thread "TaskExecutor" java.lang.NullPointerException at com.trip.xxx.xxx.controller.xxxController$1.run(xxxController.java:543) at java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy.rejectedExecution(ThreadPoolExecutor.java:2038) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at com.trip.platform.task.TaskExecutor$pollThread.run(TaskExecutor.java:51)
这个CallerRunsPolicy的代码如下:
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { }
/** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
线程池满了,新Task放不进的时候,会执行reject方法,调用RejectedExecutionHandler。 由于用的CallerRunsPolicy,它是直接执行的Task的run方法。
也就是,拉取任务这个线程,直接调用的该Task的run方法。 碰巧,这个Task里面空指针了,于是pollThread 直接抛异常退出了。 --------------------------- 解决办法: (1)加try catch吧 try { //线程池满,可能被拒绝,扔异常出来 executor.execute(task); } catch (Exception e) {
} (2)用的SynchronousQueue队列,它是一对一的,一个任务对应一个线程,100个线程都满了,就会拒绝任务。
我换成了new LinkedBlockingQueue<Runnable>(5000),然后把线程min和max改成了200和1000.
最小200个线程,再来放队列,队列5000个都放满了,会创建新线程,如此这般,直到1000个线程,才会拒绝任务。
|