[心缘地方]同学录
首页 | 功能说明 | 站长通知 | 最近更新 | 编码查看转换 | 代码下载 | 常见问题及讨论 | 《深入解析ASP核心技术》 | 王小鸭自动发工资条VBA版
登录系统:用户名: 密码: 如果要讨论问题,请先注册。

[整理]ActiveMQ的简单使用方法

上一篇:[整理]如何实现ActiveMq的Topic的持久订阅
下一篇:[备忘]Java,单点登录,CAS是支持跨域的~~

添加日期:2013/9/5 15:28:05 快速返回   返回列表 阅读8328次
官方网址:http://activemq.apache.org/

(1)下载
我下的是apache-activemq-5.8.0-bin.zip

(2)解压即可

(3)运行bin/activemq.bat,即可运行

(4)配置文件是conf/activemq.xml,默认就运行的很好。

(5)默认监听的是0.0.0.0:61616,应该是本机的多个IP都可以吧。

(6)然后就可以发消息给它,其它应用收消息了。

(7)可以访问http://localhost:8161/admin/index.jsp
查看当前的队列、Topic和持久订户的信息、发送消息等等,很方便。
----------------------------------
根目录下有个activemq-all-5.8.0.jar,复制,放到你的工程的lib中就行了。
然后就可以写发送或接收的代码了

下面的范例,使用的是持久订阅,请注意。

发送代码:

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {

    // ActiveMq的地址、用户名、密码,这里都是默认的。
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static String user = ActiveMQConnection.DEFAULT_USER;
    private static String password = ActiveMQConnection.DEFAULT_PASSWORD;

    // Topic的名称
    private static String topicName = "userSyncTopic";

    public static void main(String[] args) {

        Connection connection = null;

        try {
            // 创建一个连接
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    user, password, url);
            connection = connectionFactory.createConnection();
            connection.start();

            // 创建一个session
            Session session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);

            // 取得Topic,没有的话,会自动创建
            Topic topic = session.createTopic(topicName);

            // 创建一个消息生产者对象
            MessageProducer producer = session.createProducer(topic);

            // 设置消息持久化存储
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);

            // 创建一个消息对象,这是用的是普通的TextMessage
            TextMessage message = session.createTextMessage("你好,这是个简单的消息文本");

            // 消息发送
            producer.send(message);

            // 关一切
            connection.stop();
            connection.close();

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}



接收代码,使用receive方法

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {

    // ActiveMq的地址、用户名、密码,这里都是默认的。
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static String user = ActiveMQConnection.DEFAULT_USER;
    private static String password = ActiveMQConnection.DEFAULT_PASSWORD;

    // Topic的名称
    private static String topicName = "userSyncTopic";

    public static void main(String[] args) {

        Connection connection = null;

        try {
            // 创建一个连接
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    user, password, url);
            connection = connectionFactory.createConnection();
            connection.setClientID("aaa"); // 设置客户端ID,持久订阅必须的。
            connection.start();

            // 创建一个session
            Session session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);

            // 取得Topic,没有的话,会自动创建
            Topic topic = session.createTopic(topicName);

            // 创建持久订阅,aaa是订户名称,和clientID可以不同
            MessageConsumer consumer = session.createDurableSubscriber(topic,
                    "aaa");

            // 这样,是普通订阅,消费者没启动的话,期间的消息都收不到。
            // MessageConsumer consumer = session.createConsumer(destination);

            // 循环接收消息
            long a = System.currentTimeMillis();
            long b = System.currentTimeMillis();
            while (b - a < 30000) {

                // 接收消息,不等待
                Message message = consumer.receiveNoWait();
                if (message != null) {

                    // 实际应用时,根据需要转换类型,这里是普通文本,所以转为TextMessage
                    System.out.println("收到的message是:"
                            + ((TextMessage) message).getText());
                } else {
                    System.out.println("no messsage.");
                }

                // 休息1秒
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(".." + (b - a) / 1000);
                b = System.currentTimeMillis();
            }

            // 关一切
            connection.close();

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}



接收代码,使用MessageListener

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer2 {

    // ActiveMq的地址、用户名、密码,这里都是默认的。
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static String user = ActiveMQConnection.DEFAULT_USER;
    private static String password = ActiveMQConnection.DEFAULT_PASSWORD;

    // Topic的名称
    private static String topicName = "userSyncTopic";

    public static void main(String[] args) {

        Connection connection = null;

        try {
            // 创建一个连接
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    user, password, url);
            connection = connectionFactory.createConnection();
            connection.setClientID("bbb"); // 设置客户端ID,持久订阅必须的。
            connection.start();

            // 创建一个session
            Session session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);

            // 取得Topic,没有的话,会自动创建
            Topic topic = session.createTopic(topicName);

            // 创建持久订阅,bbb是订户名称,和clientID可以不同
            MessageConsumer consumer = session.createDurableSubscriber(topic,
                    "bbb");

            // 这样,是普通订阅,消费者没启动的话,期间的消息都收不到。
            // MessageConsumer consumer = session.createConsumer(destination);

            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {

                    if (message instanceof TextMessage) {
                        try {
                            System.out.println("收到的message是:"
                                    + ((TextMessage) message).getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });

            // 等待30秒结束
            long a = System.currentTimeMillis();
            long b = System.currentTimeMillis();
            while (b - a < 30000) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(".." + (b - a) / 1000);
                b = System.currentTimeMillis();
            }
            // 关一切
            connection.close();

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}



仅供参考。
 

评论 COMMENTS
guest352772375
2014/5/8 14:58:33
貌似没有 服务端到 指定jsp端的demo?

添加评论 Add new comment.
昵称 Name:
评论内容 Comment:
验证码(不区分大小写)
Validation Code:
(not case sensitive)
看不清?点这里换一张!(Change it here!)
 
评论由管理员查看后才能显示。the comment will be showed after it is checked by admin.
CopyRight © 心缘地方 2005-2999. All Rights Reserved