日本樱花网站怎么做/近期新闻热点大事件
RabbitMQ Tutorials — RabbitMQhttps://rabbitmq.com/getstarted.html
1. 订阅模式
订阅:微信中的公众号,所有订阅该公众号的人都会接受到该消息。
注意:
- 消息是发送到交换机而不是队列
- 消息可以发送到队列,也可以发送到交换机
- 消费者的消息来源只能是队列
如果将消息发送到没有绑定队列的交换机上,消息会去哪?
消息存放在交换机- 消息丢失
说明:消息只能存放于队列,不能存放到交换机,交换机只是用于消息的传递。
2. 队列绑定到交换机
3. 生产者
public class Send {private final static String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 消息内容String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");// 关闭连接channel.close();connection.close();}
}
4. 消费者1
public class Recv {private final static String QUEUE_NAME = "test_queue_work";private final static String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");Thread.sleep(10);// 返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}
5. 消费者2
public class Recv2 {private final static String QUEUE_NAME = "test_queue_work2";private final static String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");Thread.sleep(10);// 返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}
5. 消费者3
public class Recv3 {private final static String QUEUE_NAME = "test_queue_work";private final static String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");Thread.sleep(10);// 返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}
6. 测试
结果:消费者1和消费者2同时都能获取到消息。
注意:订阅模式和work模式的区别。
work模式将消息发送到队列
订阅模式将消息发送到交换机
work模式是1个队列2个消费者,订阅模式是2个队列2个消费者