麻豆小视频在线观看_中文黄色一级片_久久久成人精品_成片免费观看视频大全_午夜精品久久久久久久99热浪潮_成人一区二区三区四区

首頁 > 學院 > 開發設計 > 正文

RabbitMQ的幾種典型使用場景(很有用的一篇文章)

2019-11-08 02:59:51
字體:
來源:轉載
供稿:網友

非常感謝http://www.cnblogs.com/luxiaoxun/p/3918054.html

RabbitMQ主頁:https://www.rabbitmq.com/

AMQP

AMQP協議是一個高級抽象層消息通信協議,RabbitMQ是AMQP協議的實現。它主要包括以下組件

1.Server(broker): 接受客戶端連接,實現AMQP消息隊列和路由功能的進程。

2.Virtual Host:其實是一個虛擬概念,類似于權限控制組,一個Virtual Host里面可以有若干個Exchange和Queue,但是權限控制的最小粒度是Virtual Host

3.Exchange:接受生產者發送的消息,并根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行為,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三種,不同類型的Exchange路由的行為是不一樣的。

4.Message Queue:消息隊列,用于存儲還未被消費者消費的消息。

5.Message: 由Header和Body組成,Header是由生產者添加的各種屬性的集合,包括Message是否被持久化、由哪個Message Queue接受、優先級是多少等。而Body是真正需要傳輸的APP數據。

6.Binding:Binding聯系了Exchange與Message Queue。Exchange在與多個Message Queue發生Binding后會生成一張路由表,路由表中存儲著Message Queue所需消息的限制條件即Binding Key。當Exchange收到Message時會解析其Header得到Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由PRoducer發送Message時指定,兩者的匹配方式由Exchange Type決定。 

7.Connection:連接,對于RabbitMQ而言,其實就是一個位于客戶端和Broker之間的TCP連接。

8.Channel:信道,僅僅創建了客戶端到Broker之間的連接后,客戶端還是不能發送消息的。需要為每一個Connection創建Channel,AMQP協議規定只有通過Channel才能執行AMQP的命令。一個Connection可以包含多個Channel。之所以需要Channel,是因為TCP連接的建立和釋放都是十分昂貴的,如果一個客戶端每一個線程都需要與Broker交互,如果每一個線程都建立一個TCP連接,暫且不考慮TCP連接是否浪費,就算操作系統也無法承受每秒建立如此多的TCP連接。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發送消息必須是串行的,但是建議盡量共用Connection。

9.Command:AMQP的命令,客戶端通過Command完成與AMQP服務器的交互來實現自身的邏輯。例如在RabbitMQ中,客戶端可以通過publish命令發送消息,txSelect開啟一個事務,txCommit提交一個事務。

在了解了AMQP模型以后,需要簡單介紹一下AMQP的協議棧,AMQP協議本身包括三層:

1.Module Layer,位于協議最高層,主要定義了一些供客戶端調用的命令,客戶端可以利用這些命令實現自己的業務邏輯,例如,客戶端可以通過queue.declare聲明一個隊列,利用consume命令獲取一個隊列中的消息。

2.session Layer,主要負責將客戶端的命令發送給服務器,在將服務器端的應答返回給客戶端,主要為客戶端與服務器之間通信提供可靠性、同步機制和錯誤處理。

3.Transport Layer,主要傳輸二進制數據流,提供幀的處理、信道復用、錯誤檢測和數據表示。

RabbitMQ使用場景

學習RabbitMQ的使用場景,來自官方教程:https://www.rabbitmq.com/getstarted.html

場景1:單發送單接收

使用場景:簡單的發送與接收,沒有特別的處理。

Producer:

復制代碼
import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;public class Send {      private final static String QUEUE_NAME = "hello";  public static void main(String[] argv) throws Exception {                    ConnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();    channel.queueDeclare(QUEUE_NAME, false, false, false, null);    String message = "Hello World!";    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());    System.out.println(" [x] Sent '" + message + "'");        channel.close();    connection.close();  }}復制代碼

Consumer:

復制代碼
import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;import com.rabbitmq.client.QueueingConsumer;public class Recv {        private final static String QUEUE_NAME = "hello";    public static void main(String[] argv) throws Exception {    ConnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();    channel.queueDeclare(QUEUE_NAME, false, false, false, null);    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");        QueueingConsumer consumer = new QueueingConsumer(channel);    channel.basicConsume(QUEUE_NAME, true, consumer);        while (true) {      QueueingConsumer.Delivery delivery = consumer.nextDelivery();      String message = new String(delivery.getBody());      System.out.println(" [x] Received '" + message + "'");    }  }}復制代碼

場景2:單發送多接收

使用場景:一個發送端,多個接收端,如分布式的任務派發。為了保證消息發送的可靠性,不丟失消息,使消息持久化了。同時為了防止接收端在處理消息時down掉,只有在消息處理完成后才發送ack消息。

Producer:

復制代碼
import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;import com.rabbitmq.client.MessageProperties;public class NewTask {    private static final String TASK_QUEUE_NAME = "task_queue";  public static void main(String[] argv) throws Exception {    ConnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);        String message = getMessage(argv);        channel.basicPublish( "", TASK_QUEUE_NAME,                 MessageProperties.PERSISTENT_TEXT_PLAIN,                message.getBytes());    System.out.println(" [x] Sent '" + message + "'");        channel.close();    connection.close();  }      private static String getMessage(String[] strings){    if (strings.length < 1)      return "Hello World!";    return joinStrings(strings, " ");  }      private static String joinStrings(String[] strings, String delimiter) {    int length = strings.length;    if (length == 0) return "";    StringBuilder Words = new StringBuilder(strings[0]);    for (int i = 1; i < length; i++) {      words.append(delimiter).append(strings[i]);    }    return words.toString();  }}復制代碼

發送端和場景1不同點:

1、使用“task_queue”聲明了另一個Queue,因為RabbitMQ不容許聲明2個相同名稱、配置不同的Queue

2、使"task_queue"的Queue的durable的屬性為true,即使消息隊列durable

3、使用MessageProperties.PERSISTENT_TEXT_PLAIN使消息durable

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.

Consumer:

復制代碼
import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;import com.rabbitmq.client.QueueingConsumer;  public class Worker {  private static final String TASK_QUEUE_NAME = "task_queue";  public static void main(String[] argv) throws Exception {    ConnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");        channel.basicQos(1);        QueueingConsumer consumer = new QueueingConsumer(channel);    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);        while (true) {      QueueingConsumer.Delivery delivery = consumer.nextDelivery();      String message = new String(delivery.getBody());            System.out.println(" [x] Received '" + message + "'");      doWork(message);      System.out.println(" [x] Done");      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);    }           }    private static void doWork(String task) throws InterruptedException {    for (char ch: task.toCharArray()) {      if (ch == '.') Thread.sleep(1000);    }  }}復制代碼

接收端和場景1不同點:

1、使用“task_queue”聲明消息隊列,并使消息隊列durable

2、在使用channel.basicConsume接收消息時使autoAck為false,即不自動會發ack,由channel.basicAck()在消息處理完成后發送消息。

3、使用了channel.basicQos(1)保證在接收端一個消息沒有處理完時不會接收另一個消息,即接收端發送了ack后才會接收下一個消息。在這種情況下發送端會嘗試把消息發送給下一個not busy的接收端。

注意點:

1)It's a common mistake to miss the basicAck. It's an easy error, but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won't be able to release any unacked messages.

2)Note on message persistence

Marking messages as persistent doesn't fully guarantee that a message won't be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet. Also, RabbitMQ doesn't do fsync(2) for every message -- it may be just saved to cache and not really written to the disk. The persistence guarantees aren't strong, but it's more than enough for our simple task queue. If you need a stronger guarantee you can wrap the publishing code in atransaction.

3)Note about queue size

If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.

場景3:Publish/Subscribe

使用場景:發布、訂閱模式,發送端發送廣播消息,多個接收端接收。

Producer:

復制代碼
import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;public class EmitLog {  private static final String EXCHANGE_NAME = "logs";  public static void main(String[] argv) throws Exception {    ConnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");    String message = getMessage(argv);    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());    System.out.println(" [x] Sent '" + message + "'");    channel.close();    connection.close();  }    private static String getMessage(String[] strings){    if (strings.length < 1)            return "info: Hello World!";    return joinStrings(strings, " ");  }    private static String joinStrings(String[] strings, String delimiter) {    int length = strings.length;    if (length == 0) return "";    StringBuilder words = new StringBuilder(strings[0]);    for (int i = 1; i < length; i++) {        words.append(delimiter).append(strings[i]);    }    return words.toString();  }}復制代碼

發送端:

發送消息到一個名為“logs”的exchange上,使用“fanout”方式發送,即廣播消息,不需要使用queue,發送端不需要關心誰接收。

Consumer:

復制代碼
import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;import com.rabbitmq.client.QueueingConsumer;public class ReceiveLogs {  private static final String EXCHANGE_NAME = "logs";  public static void main(String[] argv) throws Exception {    ConnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");    String queueName = channel.queueDeclare().getQueue();    channel.queueBind(queueName, EXCHANGE_NAME, "");        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");    QueueingConsumer consumer = new QueueingConsumer(channel);    channel.basicConsume(queueName, true, consumer);    while (true) {      QueueingConsumer.Delivery delivery = consumer.nextDelivery();      String message = new String(delivery.getBody());      System.out.println(" [x] Received '" + message + "'");       }  }}復制代碼

接收端:

1、聲明名為“logs”的exchange的,方式為"fanout",和發送端一樣。

2、channel.queueDeclare().getQueue();該語句得到一個隨機名稱的Queue,該queue的類型為non-durable、exclusive、auto-delete的,將該queue綁定到上面的exchange上接收消息。

3、注意binding queue的時候,channel.queueBind()的第三個參數Routing key為空,即所有的消息都接收。如果這個值不為空,在exchange type為“fanout”方式下該值被忽略!

場景4:Routing (按路線發送接收)

使用場景:發送端按routing key發送消息,不同的接收端按不同的routing key接收消息。

Producer:

復制代碼
import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;public class EmitLogDirect {  private static final String EXCHANGE_NAME = "direct_logs";  public static void main(String[] argv) throws Exception {    ConnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();    channel.exchangeDeclare(EXCHANGE_NAME, "direct");    String severity = getSeverity(argv);    String message = getMessage(argv);    channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());    System.out.println(" [x] Sent '" + severity + "':'" + message + "'");    channel.close();    connection.close();  }    private static String getSeverity(String[] strings){    if (strings.length < 1)            return "info";    return strings[0];  }  private static String getMessage(String[] strings){     if (strings.length < 2)            return "Hello World!";    return joinStrings(strings, " ", 1);  }    private static String joinStrings(String[] strings, String delimiter, int startIndex) {    int length = strings.length;    if (length == 0 ) return "";    if (length < startIndex ) return "";    StringBuilder words = new StringBuilder(strings[startIndex]);    for (int i = startIndex + 1; i < length; i++) {        words.append(delimiter).append(strings[i]);    }    return words.toString();  }}復制代碼

發送端和場景3的區別:

1、exchange的type為direct

2、發送消息的時候加入了routing key

Consumer:

復制代碼
import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;import com.rabbitmq.client.QueueingConsumer;public class ReceiveLogsDirect {  private static final String EXCHANGE_NAME = "direct_logs";  public static void main(String[] argv) throws Exception {    ConnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();    channel.exchangeDeclare(EXCHANGE_NAME, "direct");    String queueName = channel.queueDeclare().getQueue();        if (argv.length < 1){      System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");      System.exit(1);    }        for(String severity : argv){          channel.queueBind(queueName, EXCHANGE_NAME, severity);    }        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");    QueueingConsumer consumer = new QueueingConsumer(channel);    channel.basicConsume(queueName, true, consumer);    while (true) {      QueueingConsumer.Delivery delivery = consumer.nextDelivery();      String message = new String(delivery.getBody());      String routingKey = delivery.getEnvelope().getRoutingKey();      System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");       }  }}復制代碼

接收端和場景3的區別:

在綁定queue和exchange的時候使用了routing key,即從該exchange上只接收routing key指定的消息。

場景5:Topics (按topic發送接收)

使用場景:發送端不只按固定的routing key發送消息,而是按字符串“匹配”發送,接收端同樣如此。

Producer:

復制代碼
import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;public class EmitLogTopic {  private static final String EXCHANGE_NAME = "topic_logs";  public static void main(String[] argv) {    Connection connection = null;    Channel channel = null;    try {      ConnectionFactory factory = new ConnectionFactory();      factory.setHost("localhost");        connection = factory.newConnection();      channel = connection.createChannel();      channel.exchangeDeclare(EXCHANGE_NAME, "topic");      String routingKey = getRouting(argv);      String message = getMessage(argv);      channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());      System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");    }    catch  (Exception e) {      e.printStackTrace();    }    finally {      if (connection != null) {        try {          connection.close();        }        catch (Exception ignore) {}      }    }  }    private static String getRouting(String[] strings){    if (strings.length < 1)            return "anonymous.info";    return strings[0];  }  private static String getMessage(String[] strings){     if (strings.length < 2)            return "Hello World!";    return joinStrings(strings, " ", 1);  }    private static String joinStrings(String[] strings, String delimiter, int startIndex) {    int length = strings.length;    if (length == 0 ) return "";    if (length < startIndex ) return "";    StringBuilder words = new StringBuilder(strings[startIndex]);    for (int i = startIndex + 1; i < length; i++) {        words.append(delimiter).append(strings[i]);    }    return words.toString();  }}復制代碼

發送端和場景4的區別:

1、exchange的type為topic

2、發送消息的routing key不是固定的單詞,而是匹配字符串,如"*.lu.#",*匹配一個單詞,#匹配0個或多個單詞。

Consumer:

復制代碼
import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;import com.rabbitmq.client.QueueingConsumer;public class ReceiveLogsTopic {  private static final String EXCHANGE_NAME = "topic_logs";  public static void main(String[] argv) {    Connection connection = null;    Channel channel = null;    try {      ConnectionFactory factory = new ConnectionFactory();      factory.setHost("localhost");        connection = factory.newConnection();      channel = connection.createChannel();      channel.exchangeDeclare(EXCHANGE_NAME, "topic");      String queueName = channel.queueDeclare().getQueue();       if (argv.length < 1){        System.err.println("Usage: ReceiveLogsTopic [binding_key]...");        System.exit(1);      }          for(String bindingKey : argv){            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);      }          System.out.println(" [*] Waiting for messages. To exit press CTRL+C");      QueueingConsumer consumer = new QueueingConsumer(channel);      channel.basicConsume(queueName, true, consumer);      while (true) {        QueueingConsumer.Delivery delivery = consumer.nextDelivery();        String message = new String(delivery.getBody());        String routingKey = delivery.getEnvelope().getRoutingKey();        System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");         }    }    catch  (Exception e) {      e.printStackTrace();    }    finally {      if (connection != null) {        try {          connection.close();        }        catch (Exception ignore) {}      }    }  }}復制代碼

接收端和場景4的區別:

1、exchange的type為topic

2、接收消息的routing key不是固定的單詞,而是匹配字符串。

注意點:

Topic exchange

Topic exchange is powerful and can behave like other exchanges. When a queue is bound with "#" (hash) binding key - it will receive all the messages, regardless of the routing key - like in fanout exchange. When special characters "*" (star) and "#" (hash) aren't used in bindings, the topic exchange will behave just like a direct one.

 

參考:

https://www.rabbitmq.com/getstarted.html

http://backend.blog.163.com/blog/static/202294126201322215551999/


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 黄色av一区二区三区 | 99麻豆久久久国产精品免费 | 羞羞网站入口 | 成人在线观看免费 | 久久久精品精品 | 免费a视频 | 99视频网 | 国产免费一级淫片 | lutube成人福利在线观看污 | 免费中文视频 | 黄色高清av | 男男羞羞视频网站国产 | 91精品国产99久久久久久红楼 | 美女视频网站黄色 | 午夜视频导航 | 韩日黄色片 | 色综av| 91精品国产乱码久久久久久久久 | 午夜色片 | av一二三四区 | 羞羞视频免费网站含羞草 | 视频一区二区三区视频 | 狠狠操天天射 | 96视频在线免费观看 | 久久亚洲成人 | 国产宾馆3p国语对白 | 欧美成人免费一区二区三区 | 国产精品夜色视频一级区 | 91香蕉国产亚洲一区二区三区 | 蜜桃网站在线观看 | 国产一级一区二区 | 国产成人在线视频 | 欧美一级淫片免费播放口 | 久久看免费视频 | 中国精品久久 | 日日噜噜夜夜爽 | 成人mm视频在线观看 | 久久里面有精品 | 亚洲一区二区不卡视频 | 福利在线国产 | 成人在线视频黄色 |