消息发送样例

1、基本例子

先导入maven依赖

1
2
3
4
5
6
<!--rocketmq客户端依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>

1.1 发送消息

发送消息的步骤:

  1. 创建消息生产者producer,并制定生产者组名
  2. 指定Nameserver地址
  3. 启动producer
  4. 创建消息对象,指定主体TopicTag和消息体
  5. 发送消息
  6. 关闭生产者producer
1.1.1 发送同步消息

同步消息,在上一条消息发送完后,才发送下一条

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) throws Exception {
// 实例化生产者producer,指定组名
DefaultMQProducer producer = new DefaultMQProducer("group_name");
// 设置NameServer地址
producer.setNamesrvAddr("111.230.203.104:9876");
// 启动producer实例
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息,指定Topic,Tag和消息体
Message msg = new Message("base", "tagA", ("hello" + i).getBytes());
// 发送消息到broker,设置超时时间
SendResult sendResult = producer.send(msg, 10000);
System.out.println(sendResult);
TimeUnit.SECONDS.sleep(1);
}
// 发送消息后,关闭producer实例
producer.shutdown();
}
1.1.2 发送异步消息

异步消息,不用判断上一条消息是否发送完毕,就发送下一条消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void main(String[] args) throws Exception {
// 实例化生产者producer,指定组名
DefaultMQProducer producer = new DefaultMQProducer("group_name");
// 设置NameServer地址
producer.setNamesrvAddr("111.230.203.104:9876");
// 启动producer实例
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息,指定Topic,Tag和消息体
Message msg = new Message("base", "tagB", ("hello" + i).getBytes());
// 发送消息到broker,设置超时时间
producer.send(msg, new SendCallback(){
// 成功回调函数
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
// 失败回调函数
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
}, 10000);
TimeUnit.SECONDS.sleep(1);
}
// 发送消息后,关闭producer实例
producer.shutdown();
}
1.1.3 发送单向消息

发送单向消息不需要返回结果,如日志发送等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) throws Exception {
// 实例化生产者producer,指定组名
DefaultMQProducer producer = new DefaultMQProducer("group_name");
// 设置NameServer地址
producer.setNamesrvAddr("111.230.203.104:9876");
// 启动producer实例
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息,指定Topic,Tag和消息体
Message msg = new Message("base", "tagC", ("单向消息-" + i).getBytes());
// 发送消息到broker,设置超时时间
producer.sendOneway(msg);

TimeUnit.SECONDS.sleep(1);
}
// 发送消息后,关闭producer实例
producer.shutdown();
}

发送完消息后,可以在rocketmq-dashboard看发送的消息

image-20240216155642712

1.2 消费消息

消费消息的步骤:

  1. 创建消费者Consumer,制定消费者组名
  2. 指定Nameserver地址
  3. 订阅主题TopicTag
  4. 设置回调函数,处理消息
  5. 启动消费者Consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) throws Exception {
// 实例化消费者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
// 设置NameServer地址
consumer.setNamesrvAddr("111.230.203.104:9876");
// 订阅Topic和Tag
consumer.subscribe("base", "tagA");
// 设置消费者使用负载均衡模式,不设置的话默认就是负载均衡
consumer.setMessageModel(MessageModel.CLUSTERING);
// 设置回调函数来处理信息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println(list);

list.forEach(l -> System.out.println(new String(l.getBody())));

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}

2、顺序消息

3、延时消息