麻豆小视频在线观看_中文黄色一级片_久久久成人精品_成片免费观看视频大全_午夜精品久久久久久久99热浪潮_成人一区二区三区四区

首頁 > 開發(fā) > Java > 正文

SpringCloud Bus消息總線的實(shí)現(xiàn)

2024-07-14 08:41:00
字體:
供稿:網(wǎng)友

好了現(xiàn)在我們接著上一篇的隨筆,繼續(xù)來講。上一篇我們講到,我們?nèi)绻ジ滤形⒎?wù)的配置,在不重啟的情況下去更新配置,只能依靠spring cloud config了,但是,是我們要一個(gè)服務(wù)一個(gè)服務(wù)的發(fā)送post請(qǐng)求,

我們能受的了嗎?這比之前的沒配置中心好多了,那么我們?nèi)绾卫^續(xù)避免挨個(gè)挨個(gè)的向服務(wù)發(fā)送Post請(qǐng)求來告知服務(wù),你的配置信息改變了,需要及時(shí)修改內(nèi)存中的配置信息。

這時(shí)候我們就不要忘記消息隊(duì)列的發(fā)布訂閱模型。讓所有為服務(wù)來訂閱這個(gè)事件,當(dāng)這個(gè)事件發(fā)生改變了,就可以通知所有微服務(wù)去更新它們的內(nèi)存中的配置信息。這時(shí)Bus消息總線就能解決,你只需要在springcloud Config Server端發(fā)出refresh,就可以觸發(fā)所有微服務(wù)更新了。

如下架構(gòu)圖所示:

SpringCloud,Bus,消息總線

Spring Cloud Bus除了支持RabbitMQ的自動(dòng)化配置之外,還支持現(xiàn)在被廣泛應(yīng)用的Kafka。在本文中,我們將搭建一個(gè)Kafka的本地環(huán)境,并通過它來嘗試使用Spring Cloud Bus對(duì)Kafka的支持,實(shí)現(xiàn)消息總線的功能。

Kafka使用Scala實(shí)現(xiàn),被用作LinkedIn的活動(dòng)流和運(yùn)營(yíng)數(shù)據(jù)處理的管道,現(xiàn)在也被諸多互聯(lián)網(wǎng)企業(yè)廣泛地用作為數(shù)據(jù)流管道和消息系統(tǒng)。

Kafak架構(gòu)圖如下:

SpringCloud,Bus,消息總線

Kafka是基于消息發(fā)布/訂閱模式實(shí)現(xiàn)的消息系統(tǒng),其主要設(shè)計(jì)目標(biāo)如下:

1.消息持久化:以時(shí)間復(fù)雜度為O(1)的方式提供消息持久化能力,即使對(duì)TB級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間復(fù)雜度的訪問性能。

2.高吞吐:在廉價(jià)的商用機(jī)器上也能支持單機(jī)每秒100K條以上的吞吐量

3.分布式:支持消息分區(qū)以及分布式消費(fèi),并保證分區(qū)內(nèi)的消息順序

4.跨平臺(tái):支持不同技術(shù)平臺(tái)的客戶端(如:Java、PHP、Python等)

5.實(shí)時(shí)性:支持實(shí)時(shí)數(shù)據(jù)處理和離線數(shù)據(jù)處理

6.伸縮性:支持水平擴(kuò)展

Kafka中涉及的一些基本概念:

1.Broker:Kafka集群包含一個(gè)或多個(gè)服務(wù)器,這些服務(wù)器被稱為Broker。

2.Topic:邏輯上同Rabbit的Queue隊(duì)列相似,每條發(fā)布到Kafka集群的消息都必須有一個(gè)Topic。(物理上不同Topic的消息分開存儲(chǔ),邏輯上一個(gè)Topic的消息雖然保存于一個(gè)或多個(gè)Broker上,但用戶只需指定消息的Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)

3.Partition:Partition是物理概念上的分區(qū),為了提供系統(tǒng)吞吐率,在物理上每個(gè)Topic會(huì)分成一個(gè)或多個(gè)Partition,每個(gè)Partition對(duì)應(yīng)一個(gè)文件夾(存儲(chǔ)對(duì)應(yīng)分區(qū)的消息內(nèi)容和索引文件)。

4.Producer:消息生產(chǎn)者,負(fù)責(zé)生產(chǎn)消息并發(fā)送到Kafka Broker。

5.Consumer:消息消費(fèi)者,向Kafka Broker讀取消息并處理的客戶端。

6.Consumer Group:每個(gè)Consumer屬于一個(gè)特定的組(可為每個(gè)Consumer指定屬于一個(gè)組,若不指定則屬于默認(rèn)組),組可以用來實(shí)現(xiàn)一條消息被組內(nèi)多個(gè)成員消費(fèi)等功能。

可以從kafka的架構(gòu)圖看到Kafka是需要Zookeeper支持的,你需要在你的Kafka配置里面指定Zookeeper在哪里,它是通過Zookeeper做一些可靠性的保證,做broker的主從,我們還要知道Kafka的消息是以topic形式作為組織的,Producers發(fā)送topic形式的消息,Consumer是按照組來分的,所以,一組Consumers都會(huì)都要同樣的topic形式的消息。在服務(wù)端,它還做了一些分片,那么一個(gè)Topic可能分布在不同的分片上面,方便我們拓展部署多個(gè)機(jī)器,Kafka是天生分布式的。這里為了演示,我們只需要用它的默認(rèn)配置,在windows上做個(gè)小Demo即可。

我們這里主要針對(duì)Spring Cloud Bus對(duì)Kafka的支持,實(shí)現(xiàn)消息總線的功能,具體的Kafka,RabbitMQ消息隊(duì)列希望自己去找資料來學(xué)習(xí)一下。有了一些概念的支持后,我們進(jìn)行一些Demo。

如下:首先新建一個(gè)springCloud-config-client1模塊,方便我們進(jìn)行測(cè)試所引入的依賴如下:

<dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-actuator</artifactId>    </dependency>    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-web</artifactId>    </dependency>    <dependency>      <groupId>org.springframework.cloud</groupId>      <artifactId>spring-cloud-starter-config</artifactId>      <version>1.4.0.RELEASE</version>    </dependency>    <dependency>      <groupId>org.springframework.cloud</groupId>      <artifactId>spring-cloud-starter-eureka</artifactId>      <version>1.3.5.RELEASE</version>    </dependency>    <dependency>      <groupId>org.springframework.cloud</groupId>      <artifactId>spring-cloud-starter-bus-kafka</artifactId>      <version>1.3.2.RELEASE</version>    </dependency>

接著要注意一下,client1的配置文件要改為bootstrap.yml,因?yàn)檫@種配置格式,是優(yōu)先加載的,上一篇隨筆有講過,client1的配置如下:

server: port: 7006spring: application:  name: cloud-config cloud:  config:#啟動(dòng)什么環(huán)境下的配置,dev 表示開發(fā)環(huán)境,這跟你倉(cāng)庫的文件的后綴有關(guān),比如,倉(cāng)庫配置文件命名格式是cloud-config-dev.properties,所以profile 就要寫dev   profile: dev   discovery:    enabled: true#這個(gè)名字是Config Server端的服務(wù)名字,不能瞎寫。    service-id: config-server#注冊(cè)中心eureka: client:  service-url:   defaultZone: http://localhost:8888/eureka/,http://localhost:8889/eureka/#是否需要權(quán)限拉去,默認(rèn)是true,如果不false就不允許你去拉取配置中心Server更新的內(nèi)容management: security:  enabled: false

接著啟動(dòng)類如下:

@SpringBootApplication@EnableDiscoveryClientpublic class Client1Application {  public static void main(String[] args) {    SpringApplication.run(Client1Application.class, args);  }}

接著將client中的TestController賦值一份到client1中,代碼如下:

@RestController//這里面的屬性有可能會(huì)更新的,git中的配置中心變化的話就要刷新,沒有這個(gè)注解內(nèi),配置就不能及時(shí)更新@RefreshScopepublic class TestController {  @Value("${name}")  private String name;  @Value("${age}")  private Integer age;  @RequestMapping("/test")  public String test(){    return this.name+this.age;  }}

接著還要在先前的隨筆中的模塊中的Config Server加入如下配置:

#是否需要權(quán)限拉去,默認(rèn)是true,如果不false就不允許你去拉取配置中心Server更新的內(nèi)容management: security:  enabled: false

接著還要做一點(diǎn)就是,在config-client,config-client1,和config-Server都要引入kafka的依賴,如下:

 <dependency>      <groupId>org.springframework.cloud</groupId>      <artifactId>spring-cloud-starter-bus-kafka</artifactId>      <version>1.3.2.RELEASE</version>    </dependency>

我們工程準(zhǔn)備好了,暫時(shí)先放在這里,下面進(jìn)行Kafka的安裝下載,首先我們?nèi)afka官網(wǎng)kafka.apache.org/downloads 下來官網(wǎng)推薦的版本,

SpringCloud,Bus,消息總線

首先我們進(jìn)到下載好的Kafka目錄中kafka_2.11-1.1.0/bin/windows 下編輯kafka-run-class.bat如下:

找到這條配置 如下:

 

復(fù)制代碼 代碼如下:
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp %CLASSPATH% %KAFKA_OPTS% %*

 

可以看到%CLASSPATH%沒有雙引號(hào),

因此用雙引號(hào)括起來,不然啟動(dòng)不起來的,報(bào)你JDK沒安裝好,修改后如下:

 

復(fù)制代碼 代碼如下:
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*

 

接著,打開config文件夾中的server.properties配置如下:

SpringCloud,Bus,消息總線

可以看到是連接到本地的zookeeper就行了。

接著我們進(jìn)行先啟動(dòng)zookeeper,再啟動(dòng)Kafka,如下:

SpringCloud,Bus,消息總線

SpringCloud,Bus,消息總線

SpringCloud,Bus,消息總線

當(dāng)看到上面的信息證明啟動(dòng)Zookeeper啟動(dòng)成功。、

接下來再開一個(gè)CMD啟動(dòng)Kafka,如下:

SpringCloud,Bus,消息總線

SpringCloud,Bus,消息總線

看到這些信息說明Kafka啟動(dòng)成功了

好了,接下來把前面的工程,兩個(gè)注冊(cè)中心,一個(gè)springcloud-config-server,兩個(gè)springcloud-config-client,springcloud-config-client1啟動(dòng)起來,

SpringCloud,Bus,消息總線

SpringCloud,Bus,消息總線

可以看到springcloudBus是在0分片上,如果兩個(gè)config-client啟動(dòng)都出現(xiàn)上面信息,證明啟動(dòng)成功了。

好了現(xiàn)在我們進(jìn)行訪問一下config-server端,如下:

SpringCloud,Bus,消息總線

再訪問兩個(gè)client,如下:

SpringCloud,Bus,消息總線

SpringCloud,Bus,消息總線

好了,好戲開始了,現(xiàn)在我們?nèi)it倉(cāng)庫上修改配置中心的文件,將年齡改為24,如下:

SpringCloud,Bus,消息總線

接下來,我們我們用refresh刷新配置服務(wù)端配置,通知兩個(gè)client去更新內(nèi)存中的配置信息。用postman發(fā)送localhost:7000/bus/refresh,如下:

SpringCloud,Bus,消息總線

可以看到?jīng)]有返回什么信息,但是不要擔(dān)心,這是成功的通知所有client去更新了內(nèi)存中的信息了。

接著我們分別重新請(qǐng)求config-server,兩個(gè)client,刷新頁面,結(jié)果如下:

SpringCloud,Bus,消息總線

兩個(gè)client如下:

SpringCloud,Bus,消息總線

SpringCloud,Bus,消息總線

可以看到所有client自動(dòng)更新內(nèi)存中的配置信息了。

到目前為止,上面都是刷新說有的配置的信息的,如果我們想刷新某個(gè)特定服務(wù)的配置信息也是可以的。我們可以指定刷新范圍,如下:

指定刷新范圍

上面的例子中,我們通過向服務(wù)實(shí)例請(qǐng)求Spring Cloud Bus的/bus/refresh接口,從而觸發(fā)總線上其他服務(wù)實(shí)例的/refresh。但是有些特殊場(chǎng)景下(比如:灰度發(fā)布),我們希望可以刷新微服務(wù)中某個(gè)具體實(shí)例的配置。

Spring Cloud Bus對(duì)這種場(chǎng)景也有很好的支持:/bus/refresh接口還提供了destination參數(shù),用來定位具體要刷新的應(yīng)用程序。比如,我們可以請(qǐng)求/bus/refresh?destination=服務(wù)名字:9000,此時(shí)總線上的各應(yīng)用實(shí)例會(huì)根據(jù)destination屬性的值來判斷是否為自己的實(shí)例名,

若符合才進(jìn)行配置刷新,若不符合就忽略該消息。

destination參數(shù)除了可以定位具體的實(shí)例之外,還可以用來定位具體的服務(wù)。定位服務(wù)的原理是通過使用Spring的PathMatecher(路徑匹配)來實(shí)現(xiàn),比如:/bus/refresh?destination=customers:**,該請(qǐng)求會(huì)觸發(fā)customers服務(wù)的所有實(shí)例進(jìn)行刷新。

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持VeVb武林網(wǎng)。


注:相關(guān)教程知識(shí)閱讀請(qǐng)移步到JAVA教程頻道。
發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 欧美一级一区二区三区 | 久久久电影电视剧免费看 | 在线成人一区二区 | 一区二区三区国产视频 | 免费视频一区 | 欧美精品毛片 | 久久亚洲视频网 | 久久久国产精品成人免费 | 欧美在线黄色 | 香蕉黄色网 | 91精品国产综合久久久欧美 | 欧美成人小视频 | 免费黄色小网站 | 毛片成人网 | 国产真实孩交 | 国产一级aaa全黄毛片 | 久久99精品视频在线观看 | 久在线播放 | 国产一区二区三区四区精 | 怦然心动50免费完整版 | 热99在线视频 | 国产无限资源在线观看 | 亚洲尻逼视频 | 久久亚洲国产午夜精品理论片 | 91av在线免费观看 | 国产精品爱久久久久久久 | 成人在线视频精品 | 一二区电影 | 91久久国产综合久久91猫猫 | av黄色片网站 | 国产午夜电影 | 久久成人精品视频 | 男人的天堂视频网站 | 国产精品视频yy9299一区 | 91精品国产91久久久久久蜜臀 | 大逼逼影院 | 久久经典| 免费a级黄色片 | 一级一级一级一级毛片 | av在线中文| 精品一区二区久久久久久久网精 |