官方网址:http://activemq.apache.org/
(1)下载 我下的是apache-activemq-5.8.0-bin.zip
(2)解压即可
(3)运行bin/activemq.bat,即可运行
(4)配置文件是conf/activemq.xml,默认就运行的很好。
(5)默认监听的是0.0.0.0:61616,应该是本机的多个IP都可以吧。
(6)然后就可以发消息给它,其它应用收消息了。
(7)可以访问http://localhost:8161/admin/index.jsp 查看当前的队列、Topic和持久订户的信息、发送消息等等,很方便。 ---------------------------------- 根目录下有个activemq-all-5.8.0.jar,复制,放到你的工程的lib中就行了。 然后就可以写发送或接收的代码了
下面的范例,使用的是持久订阅,请注意。
发送代码:
import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;
public class Producer {
// ActiveMq的地址、用户名、密码,这里都是默认的。 private static String url = ActiveMQConnection.DEFAULT_BROKER_URL; private static String user = ActiveMQConnection.DEFAULT_USER; private static String password = ActiveMQConnection.DEFAULT_PASSWORD;
// Topic的名称 private static String topicName = "userSyncTopic";
public static void main(String[] args) {
Connection connection = null;
try { // 创建一个连接 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = connectionFactory.createConnection(); connection.start();
// 创建一个session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 取得Topic,没有的话,会自动创建 Topic topic = session.createTopic(topicName);
// 创建一个消息生产者对象 MessageProducer producer = session.createProducer(topic);
// 设置消息持久化存储 producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 创建一个消息对象,这是用的是普通的TextMessage TextMessage message = session.createTextMessage("你好,这是个简单的消息文本");
// 消息发送 producer.send(message);
// 关一切 connection.stop(); connection.close();
} catch (JMSException e) { e.printStackTrace(); }
} }
接收代码,使用receive方法
import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer {
// ActiveMq的地址、用户名、密码,这里都是默认的。 private static String url = ActiveMQConnection.DEFAULT_BROKER_URL; private static String user = ActiveMQConnection.DEFAULT_USER; private static String password = ActiveMQConnection.DEFAULT_PASSWORD;
// Topic的名称 private static String topicName = "userSyncTopic";
public static void main(String[] args) {
Connection connection = null;
try { // 创建一个连接 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = connectionFactory.createConnection(); connection.setClientID("aaa"); // 设置客户端ID,持久订阅必须的。 connection.start();
// 创建一个session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 取得Topic,没有的话,会自动创建 Topic topic = session.createTopic(topicName);
// 创建持久订阅,aaa是订户名称,和clientID可以不同 MessageConsumer consumer = session.createDurableSubscriber(topic, "aaa");
// 这样,是普通订阅,消费者没启动的话,期间的消息都收不到。 // MessageConsumer consumer = session.createConsumer(destination);
// 循环接收消息 long a = System.currentTimeMillis(); long b = System.currentTimeMillis(); while (b - a < 30000) {
// 接收消息,不等待 Message message = consumer.receiveNoWait(); if (message != null) {
// 实际应用时,根据需要转换类型,这里是普通文本,所以转为TextMessage System.out.println("收到的message是:" + ((TextMessage) message).getText()); } else { System.out.println("no messsage."); }
// 休息1秒 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(".." + (b - a) / 1000); b = System.currentTimeMillis(); }
// 关一切 connection.close();
} catch (JMSException e) { e.printStackTrace(); }
} }
接收代码,使用MessageListener
import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer2 {
// ActiveMq的地址、用户名、密码,这里都是默认的。 private static String url = ActiveMQConnection.DEFAULT_BROKER_URL; private static String user = ActiveMQConnection.DEFAULT_USER; private static String password = ActiveMQConnection.DEFAULT_PASSWORD;
// Topic的名称 private static String topicName = "userSyncTopic";
public static void main(String[] args) {
Connection connection = null;
try { // 创建一个连接 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = connectionFactory.createConnection(); connection.setClientID("bbb"); // 设置客户端ID,持久订阅必须的。 connection.start();
// 创建一个session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 取得Topic,没有的话,会自动创建 Topic topic = session.createTopic(topicName);
// 创建持久订阅,bbb是订户名称,和clientID可以不同 MessageConsumer consumer = session.createDurableSubscriber(topic, "bbb");
// 这样,是普通订阅,消费者没启动的话,期间的消息都收不到。 // MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) {
if (message instanceof TextMessage) { try { System.out.println("收到的message是:" + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } } } });
// 等待30秒结束 long a = System.currentTimeMillis(); long b = System.currentTimeMillis(); while (b - a < 30000) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(".." + (b - a) / 1000); b = System.currentTimeMillis(); } // 关一切 connection.close();
} catch (JMSException e) { e.printStackTrace(); }
} }
仅供参考。
|