[心缘地方]同学录
首页 | 功能说明 | 站长通知 | 最近更新 | 编码查看转换 | 代码下载 | 常见问题及讨论 | 《深入解析ASP核心技术》 | 王小鸭自动发工资条VBA版
登录系统:用户名: 密码: 如果要讨论问题,请先注册。

[整理]维护多个队列的任务中心

上一篇:[备忘]JAXB注解,Field,List的写法
下一篇:[备忘]spring Scheduled tasks 定时任务只支持6位表达式

添加日期:2016/6/23 16:13:56 快速返回   返回列表 阅读1986次


package com.xxx.task.runner;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

import com.xxx.task.QueueTask;


/**
 * 任务中心,使用多个队列.
 * 
 */
public class MultiQueueTaskCenter {

    /**
     * 每个xx一个队列.
     */
    private static final ConcurrentHashMap<String, LinkedBlockingQueue<QueueTask>> queueMap = new ConcurrentHashMap<String, LinkedBlockingQueue<QueueTask>>();

    /**
     * 根据xx取得对应的队列.
     * 
     * @param key
     * @return
     */
    public static LinkedBlockingQueue<QueueTask> getQueue(String key) {

        // 存在直接返回
        if (queueMap.containsKey(key)) {
            return queueMap.get(key);
        }

        // 避免并发put同名的队列
        // putIfAbsent:不存在则放入并返回null,存在则返回对应value。线程安全
        LinkedBlockingQueue<QueueTask> queue = new LinkedBlockingQueue<QueueTask>();
        LinkedBlockingQueue<QueueTask> rtnQueue = queueMap.putIfAbsent(
                key, queue);
        if (rtnQueue != null) {
            return rtnQueue;
        }
        return queue;
    }

    /**
     * 放入一条消息(队列满的话,等待).
     * 
     * @param msg
     *            消息内容
     */
    public static void putTask(QueueTask task) {
        if (task == null) {
            return;
        }

        try {
            getQueue(task.getQueueName()).put(task);
        } catch (InterruptedException e) {
        }
    }

    /**
     * 取走一条消息(队列无消息,返回null).
     * 
     * @return 消息内容
     */
    public static List<QueueTask> pollTask() {
        List<QueueTask> msgList = new ArrayList<QueueTask>();
        Iterator<LinkedBlockingQueue<QueueTask>> ite = queueMap
                .values().iterator();
        while (ite.hasNext()) {
            QueueTask one = ite.next().poll();
            if (one != null) {
                msgList.add(one);
            }
        }
        return msgList;
    }

    /**
     * 队列总大小.
     */
    public static int getTotalSize() {
        int total = 0;
        Iterator<LinkedBlockingQueue<QueueTask>> ite = queueMap
                .values().iterator();
        while (ite.hasNext()) {
            total += ite.next().size();
        }
        return total;
    }

    /**
     * 每个子队列的大小.
     */
    public static Map<String, Integer> getSizeMap() {
        Map<String, Integer> sizeMap = new HashMap<String, Integer>();
        for (String key : queueMap.keySet()) {
            sizeMap.put(key, queueMap.get(key).size());
        }
        return sizeMap;
    }
}




package com.xxx.task.runner;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.PostConstruct;

import org.springframework.stereotype.Component;

import com.xxx.task.QueueTask;


/**
 * Task执行者.
 * 
 */
@Component
public class TaskExecutor {

    /**
     * 线程池.
     */
    private ExecutorService executor = Executors.newFixedThreadPool(60);

    /**
     * 自动执行
     */
    @PostConstruct
    public void execute() {
        new pollThread().start();
    }

    /**
     * 执行线程.
     * 
     */
    class pollThread extends Thread {
        public void run() {
            while (true) {

                // 从每个子队列取出一个task,放入线程池
                List<QueueTask> list = MultiQueueTaskCenter.pollTask();
                for (QueueTask task : list) {
                    executor.execute(task);
                }

                // 取Task的间隔时间
                try {
                    Thread.sleep(500); // 半秒
                } catch (InterruptedException e) {
                }
            }
        }
    }
}




package com.xxx.task;

public interface QueueTask extends Runnable {

    public String getQueueName();
}



应用时,只要增加类,实现QueueTask接口即可,
public class yyyTask implements QueueTask {..}

然后MultiQueueTaskCenter.putTask(yyyTask实例)即可。

不同的task根据需要使用队列名称即可,
同一个队列中的task,会顺序执行,
不同队列的task,是并行的。
 

评论 COMMENTS
没有评论 No Comments.

添加评论 Add new comment.
昵称 Name:
评论内容 Comment:
验证码(不区分大小写)
Validation Code:
(not case sensitive)
看不清?点这里换一张!(Change it here!)
 
评论由管理员查看后才能显示。the comment will be showed after it is checked by admin.
CopyRight © 心缘地方 2005-2999. All Rights Reserved