Rocketmq--消息发送和接收演示

2023-09-18 18:43:20

使用Java代码来演示消息的发送和接收

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.0.2</version>
</dependency>

1 发送消息

        消息发送步骤:

  • 创建消息生产者, 指定生产者所属的组名
  • 指定Nameserver地址
  • 启动生产者
  • 创建消息对象,指定主题、标签和消息体
  • 发送消息
  • 关闭生产者
//发送消息
public class RocketMQSendTest {
  public static void main(String[] args) throws Exception {
    //1. 创建消息生产者, 指定生产者所属的组名
    DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
    //2. 指定Nameserver地址
    producer.setNamesrvAddr("192.168.109.131:9876");
    //3. 启动生产者
    producer.start();
    //4. 创建消息对象,指定主题、标签和消息体
    Message msg = new Message("myTopic", "myTag",
                ("RocketMQ Message").getBytes());
    //5. 发送消息
    SendResult sendResult = producer.send(msg,10000);
    System.out.println(sendResult);
    //6. 关闭生产者
    producer.shutdown();
 }
}

2 接收消息

        消息接收步骤:

  • 创建消息消费者, 指定消费者所属的组名
  • 指定Nameserver地址
  • 指定消费者订阅的主题和标签
  • 设置回调函数,编写处理消息的方法
  • 启动消息消费者
//接收消息
public class RocketMQReceiveTest {
  public static void main(String[] args) throws MQClientException {
    //1. 创建消息消费者, 指定消费者所属的组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-
group");
    //2. 指定Nameserver地址
    consumer.setNamesrvAddr("192.168.109.131:9876");
    //3. 指定消费者订阅的主题和标签
    consumer.subscribe("myTopic", "*");
    //4. 设置回调函数,编写处理消息的方法
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
msgs,
                        ConsumeConcurrentlyContext
context) {
        System.out.println("Receive New Messages: " + msgs);
        //返回消费状态
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }
   });
    //5. 启动消息消费者
    consumer.start();
    System.out.println("Consumer Started.");
 }
}

3 案例

接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信。设计图如下:

3.1  订单微服务发送消息

1 在 shop-order 中添加rocketmq的依赖

<!--rocketmq-->
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.0.2</version>
</dependency>
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.4.0</version>
</dependency>

2 添加配置

rocketmq:
name-server: 192.168.109.131:9876  #rocketMQ服务的地址
producer:
 group: shop-order # 生产者组

3 编写测试代码

@RestController
@Slf4j
public class OrderController2 {
  @Autowired
  private OrderService orderService;
  @Autowired
  private ProductService productService;
  @Autowired
  private RocketMQTemplate rocketMQTemplate;
  //准备买1件商品
  @GetMapping("/order/prod/{pid}")
  public Order order(@PathVariable("pid") Integer pid) {
    log.info(">>客户下单,这时候要调用商品微服务查询商品信息");
    //通过fegin调用商品微服务
    Product product = productService.findByPid(pid);
    if (product == null){
        Order order = new Order();
      order.setPname("下单失败");
      return order;
   }
    log.info(">>商品信息,查询结果:" + JSON.toJSONString(product));
    Order order = new Order();
    order.setUid(1);
    order.setUsername("测试用户");
    order.setPid(product.getPid());
    order.setPname(product.getPname());
    order.setPprice(product.getPprice());
    order.setNumber(1);
    orderService.save(order);
    //下单成功之后,将消息放到mq中
    rocketMQTemplate.convertAndSend("order-topic", order);
    return order;
 }
}

更多推荐

JVM面试题(一)

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档文章目录前言一、内存模型以及分区,需要详细到每个区放什么。二、堆里面的分区:Eden,survival(from+to),老年代,各自的特点。三、对象创建方法,对象的内存分配,对象的访问定位。四、GC的两种判定方法:五、SafePoint是什么六、GC的三

JVM面试题-JVM对象的创建过程、内存分配、内存布局、访问定位等问题详解

对象内存分配的两种方式指针碰撞适用场合:堆内存规整(即没有内存碎片)的情况下。原理:用过的内存全部整合到一边,没有用过的内存放在另一边,中间有一个分界指针,只需要向着没用过的内存方向将该指针移动对象内存大小位置即可。使用该分配方式的GC收集器:Serial,ParNew空闲列表适用场合:堆内存不规整的情况下。原理:虚拟

解决npm install遇到的问题:Error while executing:

目录一、遇到问题二、解决办法方法一方法二方法三方法四一、遇到问题npmERR!Errorwhileexecuting:npmERR!D:\IT_base\git\Git\cmd\git.EXEls-remote-h-tssh://git@github.com/sohee-lee7/Squire.gitnpmERR!np

Python爬虫如何使用代理IP进行抓取

前言Python爬虫是一种非常强大的工具,可以用于抓取各种网站的数据。但是,在一些情况下,我们需要使用代理IP来完成数据抓取,如绕过IP限制或保护隐私信息等。本文将介绍如何使用Python爬虫抓取数据时使用代理IP,并提供示例代码和注意事项。一、什么是代理IP代理IP是一种充当客户端和服务器之间中间人的IP地址。客户端

python爬虫:同步模式和异步模式的区别

简单介绍区别Python爬虫可以使用同步模式和异步模式来执行任务,这两种模式有不同的工作方式和优缺点。下面是它们之间的主要区别:同步模式:同步模式是传统的编程方式,代码按照顺序执行,每个操作都会阻塞当前线程直到完成。当一个请求或操作需要时间较长时,程序将被阻塞,等待结果返回,这可能导致程序性能较低,尤其在大量IO密集型

浅谈C++|类的成员

一.类对象作为类成员类可以作为另一个类的成员代码:#include<iostream>usingnamespacestd;classphone{public:stringshouji;phone(stringshouji1):shouji(shouji1){cout<<"phone的构造函数调用"<<endl;}~ph

Java基于SpingBoot的地方废物回收机构管理系统,可作为毕业设计

博主介绍:✌程序员徐师兄、7年大厂程序员经历。全网粉丝30W+,Csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌文章目录1.简介2.技术栈3.可行性分析四系统设计第五章系统功能实现5.1管理员功能模块六、源码获取1.简介地方废物回收机构的需求和管理上的不断提升,地

第23章_瑞萨MCU零基础入门系列教程之ADC与DSP

本教程基于韦东山百问网出的DShanMCU-RA6M5开发板进行编写,需要的同学可以在这里获取:https://item.taobao.com/item.htm?id=728461040949配套资料获取:https://renesas-docs.100ask.net瑞萨MCU零基础入门系列教程汇总:https://b

Java环境搭建&安装IDE

Java环境搭建、安装IDE文章目录Java环境搭建、安装IDE1.下载JavaJDK,配置环境变量,在命令行环境下完成helloworld程序;简介安装Step0安装包准备工作Step1下载JavaJDKStep2配置环境变量配置JAVA_HOME配置Path配置CLASSPATHStep4检验运行程序2.选择一款自

C++之浅拷贝、深拷贝、拷贝构造函数、拷贝赋值运算符、自定义的深拷贝函数应用总结(二百二十九)

简介:CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长!优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀人生格言:人生从来没有捷径,只有行动才是治疗恐惧和懒惰的唯一良药.更多原创,欢迎关注:Android系统攻城狮1.前言本篇目的:理解C+

数据库锁及批量更新死锁处理

数据库锁锁间隙锁锁定的是一个间隙范围,而不会锁住某条记录。共享锁就是读锁,独占锁就是写锁,可以理解为读写锁,读读不互斥,读写互斥,写写互斥,共享锁(S锁)、独占锁(X锁)指的就是InnoDB上的行锁(记录锁)。意向锁是InnoDB引擎的一种特殊的表锁,在获取共享锁和独占锁之前必须拿到对应类型的意向锁。乐观锁和悲观锁更多

热文推荐