前言
RabbitMQ 是使用 Erlang 語言開發(fā)的消息中間件, 其遵循了高級消息隊列協(xié)議(Advanced Message Queuing Protocol, AMQP)。
與 Kafka 等消息隊列相比,RabbitMQ 最大的優(yōu)勢在于其較高的可靠性:
因為具有較高可靠性和一致性, RabbitMQ 可以勝任訂單處理、秒殺等一致性要求較高的業(yè)務場景。
RabbitMQ 概念與機制
RabbitMQ 中的概念模型:
交換機(Exchange)
生產(chǎn)者發(fā)送的消息會首先送到交換機(Exchange), 交換機根據(jù)自身類型和消息的 routing-key 等信息將消息投遞到綁定的消息隊列中。
RabbitMQ中的四種標準交換機:
direct: 如果消息的 routing-key 與隊列的 binding-key 完全相同,direct類型的交換機則會將消息投遞到該隊列中。
topic: 允許隊列的 binding-key 中包含通配符*和#, topic 交換機會將消息投遞到 binding-key 與 routing-key 匹配的隊列中。
fanout: fanout 交換機不進行任何匹配, 將消息投遞到所有綁定的隊列
header: header 交換機根據(jù)消息頭進行投遞,現(xiàn)在已較少使用
我們可以使用 RabbitMQ 的插件機制使用第三方交換機或自行開發(fā)交換機。如實現(xiàn)延時投遞的delayed-message-exchange。
消息頭中的delivery-mode可以設置為 persistent(持久化) 或者 transient(易失)。 Exchange 和 Queue 在處理持久化的消息時都會先將消息寫入磁盤中再進行下一步處理, 即使 RabbitMQ 崩潰也不會丟失。
消費者客戶端通常使用的channel.basicConsume使用推(push)模式投遞消息, 即當有新消息時 Broker 通過 channel 主動向客戶端發(fā)送消息。客戶端也可以使用channel.basicGet從 Broker 拉取消息。
ACK機制
RabbitMQ 提供了確認送達(acknowledge)機制保證消息被正確處理不會丟失。
確認送達的回執(zhí)有三種:
RabbitMQ 的 Queue 可以設置 no_ack=true, 則消息被投遞后即刪除不等待回執(zhí)。
channel.basicConsume 可以指定auto_ack模式,若auto_ack=true當客戶端收到完整消息后即會自動發(fā)出ACK回執(zhí),否則必須顯式的發(fā)出回執(zhí)。
Java 代碼示例
首先安裝并啟動RabbitMQ實例, Mac用戶可以使用 Homebrew 進行安裝:
brew install rabbitmq
啟動服務:
brew services start rabbitmq
或者使用官方docker鏡像:
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-management
RabbitMQ官網(wǎng)提供了Ubuntu、RPM以及Windows等多種平臺安裝方式。
RabbitMQ默認TCP端口為5672, Web控制臺默認端口15672。
在Maven中添加依賴:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.1</version></dependency>
編寫生產(chǎn)者:
package rabbit;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;/** * @author finley */public class RabbitProducer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); try (Connection conn = factory.newConnection(); Channel channel = conn.createChannel()) { String exchangeName = "test-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); String routingKey = "hello"; byte[] msg = "hello world".getBytes(); AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder(); propsBuilder.deliveryMode(2); // persistent propsBuilder.priority(0); // normal propsBuilder.contentType("text/plain"); channel.basicPublish(exchangeName, routingKey, propsBuilder.build(), msg); } }}
編寫消費者:
package rabbit;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.*;/** * @author finley */public class RabbitConsumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); try (Connection conn = factory.newConnection(); Channel channel = conn.createChannel()) { String exchangeName = "test-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); String queueName = channel.queueDeclare().getQueue(); String bindingKey = "hello"; channel.queueBind(queueName, exchangeName, bindingKey); while(true) { channel.basicConsume(queueName, false, "", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); String bodyStr = new String(body, "UTF-8"); System.out.println("routingKey: " + routingKey + ", contentType: " + contentType + ", body: " + bodyStr); long deliveryTag = envelope.getDeliveryTag(); channel.basicAck(deliveryTag, false); } }); } } }}
RabbitMQ 的消息為字節(jié), 可以將 Java 對象序列化后作為消息體發(fā)送。
總結
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對VeVb武林網(wǎng)的支持。
新聞熱點
疑難解答
圖片精選