package com.xxx.task.runner;
import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue;
import com.xxx.task.QueueTask;
/** * 任务中心,使用多个队列. * */ public class MultiQueueTaskCenter {
/** * 每个xx一个队列. */ private static final ConcurrentHashMap<String, LinkedBlockingQueue<QueueTask>> queueMap = new ConcurrentHashMap<String, LinkedBlockingQueue<QueueTask>>();
/** * 根据xx取得对应的队列. * * @param key * @return */ public static LinkedBlockingQueue<QueueTask> getQueue(String key) {
// 存在直接返回 if (queueMap.containsKey(key)) { return queueMap.get(key); }
// 避免并发put同名的队列 // putIfAbsent:不存在则放入并返回null,存在则返回对应value。线程安全 LinkedBlockingQueue<QueueTask> queue = new LinkedBlockingQueue<QueueTask>(); LinkedBlockingQueue<QueueTask> rtnQueue = queueMap.putIfAbsent( key, queue); if (rtnQueue != null) { return rtnQueue; } return queue; }
/** * 放入一条消息(队列满的话,等待). * * @param msg * 消息内容 */ public static void putTask(QueueTask task) { if (task == null) { return; }
try { getQueue(task.getQueueName()).put(task); } catch (InterruptedException e) { } }
/** * 取走一条消息(队列无消息,返回null). * * @return 消息内容 */ public static List<QueueTask> pollTask() { List<QueueTask> msgList = new ArrayList<QueueTask>(); Iterator<LinkedBlockingQueue<QueueTask>> ite = queueMap .values().iterator(); while (ite.hasNext()) { QueueTask one = ite.next().poll(); if (one != null) { msgList.add(one); } } return msgList; }
/** * 队列总大小. */ public static int getTotalSize() { int total = 0; Iterator<LinkedBlockingQueue<QueueTask>> ite = queueMap .values().iterator(); while (ite.hasNext()) { total += ite.next().size(); } return total; }
/** * 每个子队列的大小. */ public static Map<String, Integer> getSizeMap() { Map<String, Integer> sizeMap = new HashMap<String, Integer>(); for (String key : queueMap.keySet()) { sizeMap.put(key, queueMap.get(key).size()); } return sizeMap; } }
package com.xxx.task.runner;
import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
import javax.annotation.PostConstruct;
import org.springframework.stereotype.Component;
import com.xxx.task.QueueTask;
/** * Task执行者. * */ @Component public class TaskExecutor {
/** * 线程池. */ private ExecutorService executor = Executors.newFixedThreadPool(60);
/** * 自动执行 */ @PostConstruct public void execute() { new pollThread().start(); }
/** * 执行线程. * */ class pollThread extends Thread { public void run() { while (true) {
// 从每个子队列取出一个task,放入线程池 List<QueueTask> list = MultiQueueTaskCenter.pollTask(); for (QueueTask task : list) { executor.execute(task); }
// 取Task的间隔时间 try { Thread.sleep(500); // 半秒 } catch (InterruptedException e) { } } } } }
package com.xxx.task;
public interface QueueTask extends Runnable {
public String getQueueName(); }
应用时,只要增加类,实现QueueTask接口即可, public class yyyTask implements QueueTask {..}
然后MultiQueueTaskCenter.putTask(yyyTask实例)即可。
不同的task根据需要使用队列名称即可, 同一个队列中的task,会顺序执行, 不同队列的task,是并行的。
|