Linux下安装RocketMQ
本文来自官方文档https://rocketmq.apache.org/zh/docs/
1.下载及安装 https://rocketmq.apache.org/ 页面本身是支持中文的。
官方支持源码和二进制包安装的,使用源码安装时需要安装maven环境进行编译。
2.启动 1). 启动NameServer 1 2 3 4 5 6 # $ nohup sh bin/mqnamesrv & # $ tail -f ~/logs/rocketmqlogs/namesrv.logThe Name Server boot success...
2).启动Broker+Proxy 1 2 3 4 5 6 # $ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &# $ tail -f ~/logs/rocketmqlogs/broker_default.log The broker[broker-a,192.169.1.2:10911] boot success...
3.使用Java代码进行连接 1).安装依赖 目前最新版本是5.0.4,可以在如下链接中查看最新的依赖https://search.maven.org/search?q=g:org.apache.rocketmq%20AND%20a:rocketmq-client-java
1 2 3 4 5 <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-client-java</artifactId > <version > ${rocketmq-client-java-version}</version > </dependency >
2).通过mqadmin创建 Topic。 1 $ sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
3).在Java项目中创建发送程序 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class ProducerExample { private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class); public static void main (String[] args) throws ClientException { String endpoint = "localhost:8081" ; String topic = "TestTopic" ; ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint); ClientConfiguration configuration = builder.build(); Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(configuration) .build(); Message message = provider.newMessageBuilder() .setTopic(topic) .setKeys("messageKey" ) .setTag("messageTag" ) .setBody("messageBody" .getBytes()) .build(); try { SendReceipt sendReceipt = producer.send(message); logger.info("Send message successfully, messageId={}" , sendReceipt.getMessageId()); } catch (ClientException e) { logger.error("Failed to send message" , e); } }
4).在已创建的Java工程中,创建订阅普通消息程序并运行。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class PushConsumerExample { private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class); private PushConsumerExample () { } public static void main (String[] args) throws ClientException, IOException, InterruptedException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String endpoints = "localhost:8081" ; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .build(); String tag = "*" ; FilterExpression filterExpression = new FilterExpression (tag, FilterExpressionType.TAG); String consumerGroup = "YourConsumerGroup" ; String topic = "TestTopic" ; PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(consumerGroup) .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView -> { logger.info("Consume message successfully, messageId={}" , messageView.getMessageId()); return ConsumeResult.SUCCESS; }) .build(); Thread.sleep(Long.MAX_VALUE); } }
5).关闭服务器 1 2 3 4 5 6 7 8 $ sh bin/mqshutdown broker The mqbroker(36695) is running... Send shutdown request to mqbroker with proxy enable OK(36695) $ sh bin/mqshutdown namesrv The mqnamesrv(36664) is running... Send shutdown request to mqnamesrv(36664) OK