消息发送样例
1、基本例子
先导入maven
依赖
1 2 3 4 5 6
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency>
|
1.1 发送消息
发送消息的步骤:
- 创建消息生产者
producer
,并制定生产者组名
- 指定
Nameserver
地址
- 启动
producer
- 创建消息对象,指定主体
Topic
、Tag
和消息体
- 发送消息
- 关闭生产者
producer
1.1.1 发送同步消息
同步消息,在上一条消息发送完后,才发送下一条
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("group_name"); producer.setNamesrvAddr("111.230.203.104:9876"); producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("base", "tagA", ("hello" + i).getBytes()); SendResult sendResult = producer.send(msg, 10000); System.out.println(sendResult); TimeUnit.SECONDS.sleep(1); } producer.shutdown(); }
|
1.1.2 发送异步消息
异步消息,不用判断上一条消息是否发送完毕,就发送下一条消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("group_name"); producer.setNamesrvAddr("111.230.203.104:9876"); producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("base", "tagB", ("hello" + i).getBytes()); producer.send(msg, new SendCallback(){ @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable e) { e.printStackTrace(); } }, 10000); TimeUnit.SECONDS.sleep(1); } producer.shutdown(); }
|
1.1.3 发送单向消息
发送单向消息不需要返回结果,如日志发送等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("group_name"); producer.setNamesrvAddr("111.230.203.104:9876"); producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("base", "tagC", ("单向消息-" + i).getBytes()); producer.sendOneway(msg);
TimeUnit.SECONDS.sleep(1); } producer.shutdown(); }
|
发送完消息后,可以在rocketmq-dashboard
看发送的消息
1.2 消费消息
消费消息的步骤:
- 创建消费者
Consumer
,制定消费者组名
- 指定
Nameserver
地址
- 订阅主题
Topic
和Tag
- 设置回调函数,处理消息
- 启动消费者
Consumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name"); consumer.setNamesrvAddr("111.230.203.104:9876"); consumer.subscribe("base", "tagA"); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { System.out.println(list);
list.forEach(l -> System.out.println(new String(l.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
|
2、顺序消息
3、延时消息