Java发送/接收消息示例
可以用自己熟悉的开发工具创建一个Java项目,加入RocketMQ Client包的依赖,示例程序1的内容发送消息,这个示例代码是以Sync方式发送消息的。
示例程序1:
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a Producer group name.
DefaultMQProducer Producer = new
DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.100.131:9876");
//Launch the instance.
Producer.start();
for (int i = 0; i < 100; i++) {
//Create a Message instance, specifying Topic, tag and Message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send Message to deliver Message to one of brokers.
SendResult sendResult = Producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the Producer instance is not longer in use.
Producer.shutdown();
}
}
主要流程是:创建一个DefaultMQProducer对象,设置好GroupName和NameServer地址后启动,然后把待发送的消息拼装成Message对象,使用Producer来发送。接下来看看如何接收消息,也就是使用DefaultMQPush-Consumer类实现的消费者程序,如示例程序2所示。
示例程序2:
/*
* Instantiate with specified Consumer group name.
*/
DefaultMQPushConsumer Consumer = new DefaultMQPushConsumer("please rename to unique group name");
/*
* Specify name server addresses.
Consumer.setNamesrvAddr("192.168.249.47:9876");
/*
* Specify where to start in case the specified Consumer group is a brand new one.
*/
Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//Consumer.setMessageModel(MessageModel.BROADCASTING);
/*
* Subscribe one more more Topics to consume.
*/
Consumer.subscribe("TopicTest”, "*");
/*
* Register callback to execute on arrival of Messages fetched from brokers.
*/
Consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/*
* Launch the Consumer instance.
*/
Consumer.start();
Consumer或Producer都必须设置GroupName、NameServer地址以及端口号。然后指明要操作的Topic名称,最后进入发送和接收逻辑。
Java发送/接收消息示例
可以用自己熟悉的开发工具创建一个Java项目,加入RocketMQ Client包的依赖,示例程序1的内容发送消息,这个示例代码是以Sync方式发送消息的。
示例程序1:
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a Producer group name.
DefaultMQProducer Producer = new
DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.100.131:9876");
//Launch the instance.
Producer.start();
for (int i = 0; i < 100; i++) {
//Create a Message instance, specifying Topic, tag and Message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send Message to deliver Message to one of brokers.
SendResult sendResult = Producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the Producer instance is not longer in use.
Producer.shutdown();
}
}
主要流程是:创建一个DefaultMQProducer对象,设置好GroupName和NameServer地址后启动,然后把待发送的消息拼装成Message对象,使用Producer来发送。接下来看看如何接收消息,也就是使用DefaultMQPush-Consumer类实现的消费者程序,如示例程序2所示。
示例程序2:
/*
* Instantiate with specified Consumer group name.
*/
DefaultMQPushConsumer Consumer = new DefaultMQPushConsumer("please rename to unique group name");
/*
* Specify name server addresses.
Consumer.setNamesrvAddr("192.168.249.47:9876");
/*
* Specify where to start in case the specified Consumer group is a brand new one.
*/
Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//Consumer.setMessageModel(MessageModel.BROADCASTING);
/*
* Subscribe one more more Topics to consume.
*/
Consumer.subscribe("TopicTest”, "*");
/*
* Register callback to execute on arrival of Messages fetched from brokers.
*/
Consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/*
* Launch the Consumer instance.
*/
Consumer.start();
Consumer或Producer都必须设置GroupName、NameServer地址以及端口号。然后指明要操作的Topic名称,最后进入发送和接收逻辑。