集团培训网站建设/seo个人博客
一.消费模式
MQ的消费模式可以大致分为两种,一种是 推Push,一种是 拉Pull。
- Push 是 服务端 (MQ) 主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
- Pull 是 客户端 需要主动到 服务端 (MQ) 取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。
Push模式也是基于Pull模式的,所以不管是Push模式还是Pull模式,都是Pull模式。一般情况下,优先选择Pull模式
二.同步消息(***)
同步消息 发送过后会有一个返回值,也就是mq服务器接收到消息后返回的一个确认,这种方式非常安全,但是性能上并没有这么高,而且在mq集群中,也是要等到所有的从机都复制了消息以后才会返回,所以针对重要的消息可以选择这种方式。
可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。
原生依赖引入:
<!-- 原生api,不是starter --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.0</version></dependency>
同步消息生产者:
public class Producer {public static void main(String[] args) throws Exception {/*1. 谁来发?2. 发给谁?3. 怎么发?4. 发什么?5. 发的结果是什么?6. 关闭连接**///1.创建一个发送消息的对象Producer,并指定生产者组名DefaultMQProducer producer = new DefaultMQProducer("sync-producer-group");//2.设定发送的命名服务器地址producer.setNamesrvAddr("ip:9876");producer.setSendMsgTimeout(1000000);//3.1启动发送的服务producer.start();//4.创建要发送的消息对象,指定topic,指定内容bodyMessage msg = new Message("sync-topic", "hello-rocketmq".getBytes(StandardCharsets.UTF_8));//3.2发送消息SendResult result = producer.send(msg);System.out.println("返回结果:" + result);//5.关闭连接producer.shutdown();}
}
同步消息消费者:
public class Consumer {public static void main(String[] args) throws Exception {//1.创建一个接收消息的对象Consumer,并指定消费者组名//两种模式:①消费者定时拉取模式 ②建立长连接让Broker推送消息(选择第二种)DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sync-producer-group");//2.设定接收的命名服务器地址consumer.setNamesrvAddr("ip:9876");//3.订阅一个主题,* 表示订阅这个主题的所有消息,后期会有消息过滤consumer.subscribe("sync-topic","*");//设置当前消费者的消费模式(默认模式:负载均衡)consumer.setMessageModel(MessageModel.CLUSTERING);//3.设置监听器,用于接收消息(一直监听,异步回调,异步线程)consumer.registerMessageListener(new MessageListenerConcurrently() {@Override//消费消息//消费上下文:consumeConcurrentlyContextpublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {// 这个就是消费的方法 (业务处理)System.out.println("我是消费者");System.out.println(msgs.get(0).toString());System.out.println("消息内容:" + new String(msgs.get(0).getBody()));System.out.println("消费上下文:" + context);//签收消息,消息会从mq出队//如果返回 RECONSUME_LATER 或 null 或 产生异常 那么消息会重新 回到队列 过一会重新投递出来 ,给当前消费者或者其他消费者消费的return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//4.启动接收消息的服务consumer.start();System.out.println("接受消息服务已经开启!");//5 不要关闭消费者!因为需要监听!//挂起System.in.read();}
}
三.异步消息(***)
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知。
例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
异步消息生产者:
public class AsyncProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");producer.setNamesrvAddr("ip:9876");producer.start();Message message = new Message("async-topic", "我是一个异步消息".getBytes());//没有返回值的producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送成功");}@Overridepublic void onException(Throwable e) {System.err.println("发送失败:" + e.getMessage());}});System.out.println("我先执行");//需要接收异步回调,这里需要挂起System.in.read();}
}
消费者无特殊变化:
public class SimpleConsumer {public static void main(String[] args) throws Exception{// 创建一个消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("async-producer-group");// 连接namesrvconsumer.setNamesrvAddr("ip:9876");// 订阅一个主题 * 标识订阅这个主题中所有的消息 后期会有消息过滤consumer.subscribe("async-topic", "*");// 设置一个监听器 (一直监听的, 异步回调方式)consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {// 这个就是消费的方法 (业务处理)System.out.println("我是消费者");System.out.println(msgs.get(0).toString());System.out.println("消息内容:" + new String(msgs.get(0).getBody()));System.out.println("消费上下文:" + context);// 返回值 CONSUME_SUCCESS成功,消息会从mq出队// RECONSUME_LATER(报错/null) 失败 消息会重新回到队列 过一会重新投递出来 给当前消费者或者其他消费者消费的return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动consumer.start();// 挂起当前的jvmSystem.in.read();}
}
四.单向消息(*)
这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,一般用于结果不重要的场景,例如日志信息的发送
单向消息生产者:
public class SingleWayProducer {public static void main(String[] args) throws Exception{// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("single-way-producer-group");// 设置nameServer地址producer.setNamesrvAddr("ip:9876");// 启动实例producer.start();Message msg = new Message("single-way-topic", ("单向消息").getBytes());// 发送单向消息producer.sendOneway(msg);// 关闭实例producer.shutdown();}}
日志服务的编写思路
产生日志的服务利用MQ发送单向消息,不用等回复,大大减少了发送日志的时间,由log-service统一写入日志表中。并且由于日志过于庞大,可以对日志进行冷热分离,近一个月的为热数据,近一年的为冷数据(实际情况据业务而定),存储的位置不同,时间过于久远的日志可以删掉
五.延迟消息(***)
消息放入MQ后,过一段时间,才会被监听到,然后消费
比如下订单业务,提交了一个订单就可以发送一个延时消息,15min后去检查这个订单的状态,如果还是未付款就取消订单释放库存(订单超时)。
在分布式定时调度触发、任务超时处理等场景,使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。
延迟等级
延迟消息生产者:
public class DelayProducer {public static void main(String[] args) throws Exception{// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("delay-producer-group");// 设置nameServer地址producer.setNamesrvAddr("ip:9876");// 启动实例producer.start();Message msg = new Message("delay-topic", ("延迟消息").getBytes());// 给这个消息设定一个延迟等级// messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hmsg.setDelayTimeLevel(3);// 发送单向消息producer.send(msg);// 打印时间System.out.println(new Date());// 关闭实例producer.shutdown();}
}
延迟消息消费者(无特殊变化):
public class MSConsumer {public static void main(String[] args) throws Exception{// 创建一个消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay-producer-group");// 连接namesrvconsumer.setNamesrvAddr("ip:9876");// 订阅一个主题 * 标识订阅这个主题中所有的消息 后期会有消息过滤consumer.subscribe("delay-topic", "*");// 设置一个监听器 (一直监听的, 异步回调方式)consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println(msgs.get(0).toString());System.out.println("消息内容:" + new String(msgs.get(0).getBody()));System.out.println("收到时间:"+new Date());// 返回值 CONSUME_SUCCESS成功,消息会从mq出队// RECONSUME_LATER(报错/null) 失败 消息会重新回到队列 过一会重新投递出来 给当前消费者或者其他消费者消费的return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动consumer.start();// 挂起当前的jvmSystem.in.read();}
}
可以通过打印一下时间差来检测一下(第一次有误差很正常)
六.批量消息
Rocketmq可以一次性发送一组消息,那么这一组消息会被当做一个消息消费。
在对吞吐率有一定要求的情况下,可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。
将消息打包成 Collection<Message> msgs 传入方法中即可,需要注意的是批量消息的大小不能超过 1MiB(否则需要自行分割),其次同一批 batch 中 topic 必须相同。