【RocketMQ】RocketMQ快速入门

news/2024/9/28 23:00:42 标签: RocketMQ, java

🎯 导读:该文档介绍了Apache RocketMQ消息队列的基础应用,包括消息发送与接收的基本流程。首先通过创建生产者实例,并指定名称服务器地址,启动后即可发送消息至指定主题。然后创建消费者实例订阅相应主题,并设置监听器处理接收到的消息。文档中还提供了代码示例,展示了如何实现简单的生产和消费逻辑。此外,文档解释了消息队列在不同场景下的分发策略,如负载均衡与广播模式,并强调了队列数量与消费者数量之间的关系以确保消息的合理分配。

文章目录

    • 消息发送和监听的流程
      • 消息生产者
      • 消息消费者
    • 搭建RocketMQ入门案例
      • 创建项目
      • 加入依赖
      • 编写生产者
      • 编写消费者
    • 说明
      • 一个消费者组消费一个topic
      • 两个消费者组消费一个topic
      • 生产者的消息发送给主题的哪个队列
      • 消费者如何从队列中拉取消息
        • 只有一个消费者,要拉取所有队列的消息
        • 两个消费者,每个消费者要负责两个队列
        • 三个消费者(要求尽量平衡)
        • 四个消费者,一人一个
        • 五个消费者,第五个消费者永远不接收消息

RocketMQ提供了发送多种发送消息的模式,例如同步消息,异步消息,顺序消息,延迟消息,事务消息

消息发送和监听的流程

消息生产者

1、创建消息生产者 producer ,并指定生产者组名

2、指定 Nameserver 地址

3、启动 producer

4、创建消息对象,指定主题 Topic、Tag 和消息体等

5、发送消息

6、关闭 producer

消息消费者

1、创建消费者 consumer ,指定消费者组名

2、指定 Nameserver 地址

3、创建监听订阅主题 Topic和Tag 等

4、处理消息

5、启动消费者 consumer

RocketMQ_32">搭建RocketMQ入门案例

创建项目

在这里插入图片描述

在这里插入图片描述

加入依赖

引入原生API,先不用spring-boot-starter版本

<dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.9.2</version>
        <!--docker的用下面这个版本-->
        <version>4.4.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.22</version>
    </dependency>
</dependencies>

编写生产者

java">/**
 * 测试生产者
 *
 * @throws Exception
 */
@Test
public void testProducer() throws Exception {
    // 创建默认的生产者(指定生产者组名)
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    for (int i = 0; i < 1; i++) {
        // 创建消息
        // 第一个参数:主题的名字
        // 第二个参数:消息内容(要转化为字节数组)
        Message msg = new Message("TopicTest", ("Hello RocketMQ " + i).getBytes());
        // 发送结果
        SendResult send = producer.send(msg);
        // 打印发送状态
        System.out.println(send.getSendStatus());
    }
    // 关闭实例
    producer.shutdown();
}

为了连接方便,可以使用一个常量NAME_SRV_ADDR来存储localhost:9876

【运行】

在这里插入图片描述

在控制台中可以看到创建了一个主题 testTopic

在这里插入图片描述

点击状态,一个主题默认4个队列

在这里插入图片描述

点击路由,可以查看 broker 的 ip 地址

在这里插入图片描述

在CONSUMER管理中,可以查看消费者

编写消费者

java">@Test
public void simpleConsumer() throws Exception {
    // 创建一个消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");
    // 连接 namesrv
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    // 订阅一个主题  * 表示订阅这个主题中所有的消息,后面会有消息过滤的教程
    consumer.subscribe("testTopic", "*");
    // 设置一个监听器 (一直监听的,异步回调方式,消费者线程和监听线程不是一个线程)
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            // 这个就是消费的方法 (业务处理)
            System.out.println("我是消费者");
            // msgs 虽然是List,但是只有一条消息,所以get(0)就行
            System.out.println(msgs.get(0).toString());
            // 消息内容从字节数组转化为String
            System.out.println("消息内容:" + new String(msgs.get(0).getBody()));
            System.out.println("消费上下文:" + context);
            // 返回值  CONSUME_SUCCESS成功,消息会从mq出队
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            // RECONSUME_LATER(报错/null)失败 消息会重新回到队列 过一会重新投递出来 给当前消费者或者其他消费者消费的
//                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    });
    // 启动
    consumer.start();
    // 挂起当前的jvm,让监听一直存在
    System.in.read();
}

【运行】

在这里插入图片描述

说明

  • 一个生产者组可以投递到多个主题
  • 一个消费者组只能订阅一个主题

在这里插入图片描述

一个消费者组消费一个topic

在这里插入图片描述

【负载均衡模式】消息1给 C1 消费,消息2给 C2 消费,以此类推

【广播模式】同一条消息既给 C1 消费,又给 C2 消费

两个消费者组消费一个topic

同一消息,两个消费者组都获取到,但是组内要分配给哪个消费者,就看是负载【均衡模式】还是【广播模式】了

在这里插入图片描述

生产者的消息发送给主题的哪个队列

生产者会将消息轮询发送到主题的4个队列

在这里插入图片描述

消费者如何从队列中拉取消息

只有一个消费者,要拉取所有队列的消息

在这里插入图片描述

  • 代理者:MQ
  • 消费者:我们的程序

测试,生产者生产12个消息

在这里插入图片描述

在这里插入图片描述

差值:代理者位点-消费者位点。如果差值太大,说明消息堆积

两个消费者,每个消费者要负责两个队列

在这里插入图片描述

三个消费者(要求尽量平衡)

在这里插入图片描述

四个消费者,一人一个

在这里插入图片描述

五个消费者,第五个消费者永远不接收消息

队列数量最好大于等于消费者组内的消费者数量!!!

在这里插入图片描述


http://www.niftyadmin.cn/n/5682018.html

相关文章

MATLAB数据文件读写:2.矩阵数据读取

矩阵数据读取 写入文件–save函数 保存变量到文件中&#xff0c;用于以后使用。 save(fielname) 将当前工作区中所有变量保存到matlab格式的二进制文件filename中。: .mat save(filename, ‘var’,fmt) 将当前工作区中var指定的结构体数组的变量或字段保存到matlab格式的…

线上报名小程序怎么做

在这个数字化、智能化的时代&#xff0c;信息技术的发展正以前所未有的速度改变着我们的生活。无论是学习、工作还是娱乐&#xff0c;互联网都成为了我们不可或缺的一部分。而在线上报名这一领域&#xff0c;小程序的出现更是为广大用户带来了前所未有的便捷与高效。今天&#…

用大模型 vs 垂直大模型

人工智能&#xff08;AI&#xff09;大模型的发展已经进入了一个新的战场&#xff0c;主要分为通用大模型和垂直大模型两个方向。通用大模型因其广泛的应用场景和普适性备受关注&#xff0c;而垂直大模型则因其在特定领域内的高效性和专业性逐渐崭露头角。随着技术的不断演进&a…

基于Node.js+Express+MySQL+VUE实现的在线电影视频点播网站管理系统的设计与实现部署安装

目录 1. 引言 1.1开发背景 1.2开发意义 1.3国内外研究 2. 需求分析 3. 系统架构设计 4. 关键技术选型 5. 功能模块设计 5.1功能图 5.2界面介绍 6. 总结 1. 引言 随着互联网技术的快速发展和普及&#xff0c;人们获取信息的方式发生了巨大变化&#xff0c;其中在…

AI周报(9.22-9.28)

AI应用-Siipet宠物沟通师 Siipet是一款由SiiPet公司推出的创新宠物行为分析相机&#xff0c;旨在通过尖端技术加深宠物与主人之间的情感联系。这款相机利用先进的AI算法&#xff0c;能够自动识别和分析家中宠物的行为&#xff0c;并提供定制化的护理建议。 SiiPet相机的核心功…

iOS 小组件

基本知识 时间轴 小组件通过AppIntentTimelineProvider进行 UI 刷新 struct Provider: AppIntentTimelineProvider {func placeholder(in context: Context) -> SimpleEntry {// 添加占位的&#xff08;选择添加的时候使用&#xff09;// todo}func snapshot(for configu…

Goland的使用

一、安装Goland 一、Goland简介 Goland是由JetBrains公司旨在为go开发者提供的一个符合人体工程学的新的商业IDE。这个IDE整合了IntelliJ平台的有关go语言的编码辅助功能和工具集成特点 二、下载相应的安装包 1、官网下载地址 GoLand by JetBrains: More than just a Go IDE 三…

Android个性名片界面的设计——约束布局的应用

节选自《Android应用开发项目式教程》&#xff0c;机械工业出版社&#xff0c;2024年7月出版 做最简单的安卓入门教程&#xff0c;手把手视频、代码、答疑全配齐 【任务目标】 使用约束布局、TextView控件实现一个个性名片界面的设计&#xff0c;界面如图1所示。 图1 个性名片…