ActiveMQ (二)

in 编程
关注公众号【好便宜】( ID:haopianyi222 ),领红包啦~
阿里云,国内最大的云服务商,注册就送数千元优惠券:https://t.cn/AiQe5A0g
腾讯云,良心云,价格优惠: https://t.cn/AieHwwKl
搬瓦工,CN2 GIA 优质线路,搭梯子、海外建站推荐: https://t.cn/AieHwfX9


 今天继续给大家分享的是ActiveMQ,如有不足,敬请指教。

 上次我们说到,我们发现消费者每次只能消费一条消息。当队列中有多条消息的时候,我们需要多次运行消费者,才能消费完这些消息。很麻烦!!!!如何解决这个问题呢?

 这就需要使用ActiveMQ监听器来监听队列,持续消费消息。

一、ActiveMQ监听器

1.1 配置步骤说明

1.2 配置步骤

1.2.1 创建监听器MyListener类

package com.xkt.listener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * @author lzx
 *
 */
public class MyListener implements MessageListener {

	@Override
	public void onMessage(Message message) {

		if (null != message) {
			if (message instanceof TextMessage) {
				try {
					TextMessage tMsg = (TextMessage) message;
					String content = tMsg.getText();
					System.out.println("监听到的消息是 " + content);
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}

}

1.2.2 修改MyConsumer代码,加载监听器

package com.xkt.consumer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.xkt.listener.MyListener;

/**
 * @author lzx
 *
 */
public class Myconsumer {

	private ConnectionFactory factory;

	private Connection connection;

	private Session session;

	private Destination destination;

	private MessageConsumer consumer;

	public void receiveFromMq() {

		try {
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");

			connection = factory.createConnection();
			connection.start();

			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

			// 创建目的地, 目的地命名即队列命名, 消息消费者需要通过此命名访问对应的队列
			destination = session.createQueue("queue");

			// 5.创建消息消费者, 创建的消息消费者与某目的地对应, 即方法参数目的地
			consumer = session.createConsumer(destination);

			// 7.加载监听器
			consumer.setMessageListener(new MyListener());
			// 监听器需要持续加载,这里我们使用输入流阻塞当前线程结束。监听指定队列,只要有消息进来,就消费这条消息
			System.in.read();

			// 在java项目中,可以通过IO阻塞程序,持续加载监听器
			// 在web项目中,可以通过配置文件,直接加载监听器。

		} catch (Exception e) {
			e.printStackTrace();
			System.out.println("读取失败");
		} finally {
			if (null != consumer) {
				try {
					consumer.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			if (null != session) {
				try {
					session.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			if (null != connection) {
				try {
					connection.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}

	}

}

1.2.3 测试

图示
图示
图示


 在以上示例中,只能向一个消费者发送消息。但是有一些场景,需求有多个消费者都能接收到消息,比如:美团APP每天的消息推送。该如何实现呢?

二、Topic模式实现

2.1 配置步骤说明

  1. 搭建ActiveMQ消息服务器。(略)
  2. 创建主题订阅者。
  3. 创建主题发布者。

2.2 配置步骤

2.2.1 创建主题订阅者MySubscriber

package com.xkt.subscriber;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author lzx
 *
 */
public class MySubscriber implements Runnable {

	/**
	 * 多线程的线程安全问题 解决方案:
	 * 
	 * (1)加锁 --极不推荐 (2)不使用全局变量 ---> SpringMVC是线程安全的吗? 答:默认不是 解决办法:(1)使用原型模式--不推荐
	 * (2)不使用全局变量 (3)ThreadLocal (3)其它框架来代替,比如redis
	 */
	private TopicConnectionFactory factory;

	private TopicConnection connection;

	private TopicSession session;

	private Topic topic;

	private TopicSubscriber subscriber;

	private Message message;

	@Override
	public void run() {

		try {
			// 1、创建连接工厂
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");

			// 2、创建连接
			connection = factory.createTopicConnection();
			connection.start();

			// 3、创建会话
			session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

			// 4、创建topic主题
			topic = session.createTopic("topic-gzsxt");

			// 5、创建订阅者
			subscriber = session.createSubscriber(topic);

			// 6、订阅
			while (true) {

				message = subscriber.receive();

				if (null != message) {
					if (message instanceof TextMessage) {
						TextMessage tMsg = (TextMessage) message;

						String content = tMsg.getText();

						System.out.println("订阅者: " + Thread.currentThread().getName() + " 接收的消息是:" + content);
					}
				}
			}

		} catch (JMSException e) {

			e.printStackTrace();
		}

	}

}

2.2.2 修改测试类

package com.xkt.test;

import com.xkt.subscriber.MySubscriber;

/**
 * @author lzx
 *
 */
public class TestMQ {

	public static void main(String[] args) {

		MySubscriber sub = new MySubscriber();

		Thread t1 = new Thread(sub);

		Thread t2 = new Thread(sub);

		t1.start();

		t2.start();
	}

}

2.2.3 查看测试结果

2.2.4 创建主题发布者MyPublisher

package com.xkt.publish;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author lzx
 *
 */
public class MyPublisher {

	private TopicConnectionFactory factory;

	private TopicConnection connection;

	private TopicSession session;

	private Topic topic;

	private TopicPublisher publisher;

	private Message message;

	public void publish(String msg) {

		try {

			// 1、创建连接工厂
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");

			// 2、创建连接
			connection = factory.createTopicConnection();
			connection.start();

			// 3、创建会话
			session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

			// 4、创建topic主题
			topic = session.createTopic("topic-gzsxt");

			// 5、创建发布者
			publisher = session.createPublisher(topic);

			// 6、创建消息对象
			message = session.createTextMessage(msg);

			// 7、发布消息
			publisher.publish(message);

		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (null != publisher) {
				try {
					publisher.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			if (null != session) {
				try {
					session.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			if (null != connection) {
				try {
					connection.stop();

					connection.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			if (null != session) {
				try {
					session.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}

	}
}

2.2.5 修改测试类

package com.xkt.test;

import org.junit.Test;

import com.xkt.publish.MyPublisher;
import com.xkt.subscriber.MySubscriber;

/**
 * @author lzx
 *
 */
public class TestMQ {

	public static void main(String[] args) {

		MySubscriber sub = new MySubscriber();

		Thread t1 = new Thread(sub);

		Thread t2 = new Thread(sub);

		t1.start();

		t2.start();
	}

	@Test
	public void publish() {
		MyPublisher publisher = new MyPublisher();

		publisher.publish("hello,欢迎收听FM 89.9频道-交通频道");
	}
}

2.2.6 查看测试结果

2.3 Topic小结

  1. Topic模式能够实现多个订阅者同时消费消息。
  2. Topic主题模式下,消息不会保存,只有在线的订阅者才会接收到消息。
  3. 通常可以用来解决公共消息推送的相关业务。


版权说明:欢迎以任何方式进行转载,但请在转载后注明出处!

关注公众号【好便宜】( ID:haopianyi222 ),领红包啦~
阿里云,国内最大的云服务商,注册就送数千元优惠券:https://t.cn/AiQe5A0g
腾讯云,良心云,价格优惠: https://t.cn/AieHwwKl
搬瓦工,CN2 GIA 优质线路,搭梯子、海外建站推荐: https://t.cn/AieHwfX9
扫一扫关注公众号添加购物返利助手,领红包
Comments are closed.

推荐使用阿里云服务器

超多优惠券

服务器最低一折,一年不到100!

朕已阅去看看