麻豆小视频在线观看_中文黄色一级片_久久久成人精品_成片免费观看视频大全_午夜精品久久久久久久99热浪潮_成人一区二区三区四区

首頁(yè) > 學(xué)院 > 開發(fā)設(shè)計(jì) > 正文

ActiveMQApollo之MQTT

2019-11-14 15:39:57
字體:
來(lái)源:轉(zhuǎn)載
供稿:網(wǎng)友

Apollo是apache旗下的基金項(xiàng)目,它是以Apache ActiveMQ5.x為基礎(chǔ),采用全新的線程和消息調(diào)度架構(gòu)重新實(shí)現(xiàn)的消息中間件,針對(duì)多核處理器進(jìn)行了優(yōu)化處理,它的速度更快、更可靠、更易于維護(hù)。apollo與ActiveQQ一樣支持多協(xié)議:STOMP、AMQP、MQTT、Openwire、 SSL、WebSockets,本文只介紹MQTT協(xié)議的使用。 
關(guān)于ActiveMQ5請(qǐng)參考:http://activemq.apache.org,本文只介紹Apollo在windows下安裝和應(yīng)用,Apollo的詳細(xì)文檔請(qǐng)參考官網(wǎng):http://activemq.apache.org/apollo/documentation/user-manual.html.

 

Apollo的下載和安裝

 

1.下載并安裝

進(jìn)入http://activemq.apache.org/apollo/download.html,下載windows版本的壓縮包,并解壓到自己工作目錄(如:E:/apache-apollo-1.7),并創(chuàng)建環(huán)境變量APOLLO_HOME=E:/apache-apollo-1.7。如果操作是系統(tǒng)是Windows Vista或更高版本,則需要安裝Microsoft Visual C++ 2010 Redistributable (64位JVM:http://www.microsoft.com/en-us/download/details.aspx?id=14632;32位JVM:http://www.microsoft.com/en-us/download/details.aspx?id=5555)。

 

2.創(chuàng)建broker實(shí)例并啟動(dòng)服務(wù)

進(jìn)入E:/apache-apollo-1.7之下的bin目錄,打開cmd窗口,執(zhí)行命令:apollo create E:/apollo_broker,命令執(zhí)行成功后,在E盤下會(huì)有apollo_broker目錄,在其下有個(gè)bin目錄,其中有兩個(gè)文件:apollo-broker.cmd和apollo-broker-service.exe,第一個(gè)是通過(guò)cmd命令啟動(dòng)apollo服務(wù)的,第二個(gè)是創(chuàng)建window服務(wù)的。 
cmd命令啟動(dòng):apollo-broker run,啟動(dòng)成功可以在瀏覽器中查看運(yùn)行情況(http://127.0.0.1:61680/,默認(rèn)用戶名/密碼:admin/passWord); 
windows服務(wù)啟動(dòng):執(zhí)行apollo-broker-service.exe,創(chuàng)建windows服務(wù),就可以以windows服務(wù)的方式啟動(dòng)apollo服務(wù)。

 

3.MQTT協(xié)議的應(yīng)用

MQTT協(xié)議有眾多客戶端實(shí)現(xiàn),相關(guān)請(qǐng)參考:http://activemq.apache.org/apollo/versions/1.7/website/documentation/mqtt-manual.html。 
本文采用eclipse的paho客戶端實(shí)現(xiàn)(https://eclipse.org/paho/)。

a.javascript客戶端:https://eclipse.org/paho/clients/js/

Javascript客戶端項(xiàng)目下載下來(lái),并在其項(xiàng)目根目錄下執(zhí)行mvn命令,進(jìn)行編譯,生成target目錄,其下生成mqttws31.js、mqttws31-min.js兩個(gè)js文件,將其拷貝到自己項(xiàng)目相關(guān)目錄下,并在頁(yè)面中引用,即可實(shí)現(xiàn)javascript客戶端的消息訂閱和發(fā)布,demo代碼如下: 
var client = new Paho.MQTT.Client(location.hostname, 61623,"/", "clientId"); 
// 61623是ws連接的默認(rèn)端口,可以在apollo中間件中進(jìn)行配置(關(guān)于apollo的配置請(qǐng)參考:http://activemq.apache.org/apollo/documentation/user-manual.html) 
// set callback handlers 
client.onConnectionLost = onConnectionLost; 
client.onMessageArrived = onMessageArrived; 
// connect the client 
client.connect({userName:'admin',password:'password',onSuccess:onConnect}); 
// called when the client connects 
function onConnect() { // 連接成功后的處理 
// Once a connection has been made, make a subscription and send a message. 
console.log("onConnect"); 
client.subscribe("/topic/event"); // 訂閱消息的主題 
var message = new Paho.MQTT.Message("Hello,this is a test"); 
message.destinationName = "/topic/event"; 
client.send(message); // 發(fā)送消息 

// called when the client loses its connection 
function onConnectionLost(responSEObject) { // 連接丟失后的處理 
if (responseObject.errorCode !== 0) { 
console.log("onConnectionLost:"+responseObject.errorMessage); 


// called when a message arrives 
function onMessageArrived(message) { // 消息接收成功后的處理 
console.log("onMessageArrived:"+message.payloadString); 
}

b. java客戶端實(shí)現(xiàn)

paho目前只支持J2SE和安卓,下載地址:https://eclipse.org/paho/clients/java/,我們采用maven方式。 
maven庫(kù)地址: 
https://repo.eclipse.org/content/repositories/paho-releases/ - Official Releases 
https://repo.eclipse.org/content/repositories/paho-snapshots/ - Nightly Snapshots 
maven dependency: 
<dependency> 
<groupId>org.eclipse.paho</groupId> 
<artifactId>org.eclipse.paho.client.mqttv3</artifactId> 
<version>1.0.1</version> 
</dependency>
 
說(shuō)明:版本為1.0.0或0.9.0時(shí),其jar包根本加載不進(jìn)來(lái),最后搜到1.0.1版本才可以正常使用。 
java端實(shí)現(xiàn): 
public interface IMessage { 
String getHost(); 
Integer getPort(); 
Integer getQos(); 
String getTopic(); 
String getClientId(); 
String getContent(); 
byte[] getContentBytes(); 

 


Map<String,Object> getOption(); 
Object getSender(); 
Date getSendTime(); 
}
 
public final class MessagePRocessingCenter { 
protected static Logger logger=LoggerFactory.getLogger(MessageProcessingCenter.class); 
protected static final String BROKER_PREFIX="tcp://"; 
protected static final String BROKER_HOST="localhost"; 
protected static final int PORT=61613; 
protected static final int QOS=2; 
protected static final String TOPIC="/topic/event"; 
protected static final String CLIENT_ID="clientId"; 
protected static final String MQ_USER="admin"; 
protected static final String MQ_PASSWORD="password"; 
public static void send(IMessage message){ 
String topic= StringUtils.isEmpty(message.getTopic())?TOPIC: message.getTopic(); 
int qos=null == message.getQos()?QOS: message.getQos(); 
String broker=BROKER_PREFIX+ (StringUtils.isEmpty(message.getHost())?BROKER_HOST:message.getHost()); 
int port=null == message.getPort()?PORT:message.getPort(); 
broker+=":"+port; 
String clientId = StringUtils.isEmpty(message.getClientId())?CLIENT_ID:message.getClientId(); 
Map<String,Object> opts=message.getOption(); 
String user=MQ_USER; 
String password=MQ_PASSWORD; 
if(null != opts){ 
if(null != opts.get("userName")){ 
user=opts.get("userName").toString(); 

if(null != opts.get("password")){ 
password=opts.get("password").toString(); 


MemoryPersistence persistence = new MemoryPersistence(); 
try { 
MqttClient sampleClient = new MqttClient(broker, clientId, persistence); 
MqttConnectOptions connOpts = new MqttConnectOptions(); 
connOpts.setUserName(user); 
connOpts.setPassword(password.toCharArray()); 
connOpts.setCleansession(true); 
sampleClient.connect(connOpts); 
MqttMessage mqm = new MqttMessage(message.getContentBytes()); 
mqm.setQos(qos); 
sampleClient.publish(topic, mqm); 
sampleClient.disconnect(); 
} catch(MqttException me) { 
logger.info("********************* send message exception :"); 
logger.info("********************* reason : " + me.getReasonCode()); 
logger.info("********************* msg : " + me.getMessage()); 
logger.info("********************* loc : " + me.getLocalizedMessage()); 
logger.info("********************* cause : " + me.getCause()); 
logger.info("********************* excep : " + me); 
me.printStackTrace(); 


public static void send(Set<IMessage> set){ 
for(IMessage message:set){ 
send(message); 


}

 

小結(jié)

至此,MQTT協(xié)議已部署完畢,java端可以發(fā)布消息,而javascript端則可以訂閱并接收到j(luò)ava端發(fā)布的信息。 
本文只是依照官網(wǎng)手冊(cè)而實(shí)現(xiàn)的簡(jiǎn)單應(yīng)用,講解不一定十分準(zhǔn)確,有什么不對(duì)的地方還請(qǐng)多多指點(diǎn),更詳細(xì)的應(yīng)用請(qǐng)參考官網(wǎng)文檔: 
apollo:http://activemq.apache.org/apollo/documentation/user-manual.html 
eclipse paho:https://eclipse.org/paho/

 


發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 国产在线精品一区二区不卡 | 久久久久久久久久性 | 午夜偷拍视频 | 国产精品麻豆一区二区三区 | 色就色 综合偷拍区91网 | 精品久久久久久久久久久aⅴ | 国产一级二级在线播放 | 日本精品久久久久 | 久久久久久久久久美女 | 干一夜综合| 污黄视频在线播放 | 黄色网战入口 | 精品国产一区二区三区免费 | 91久久久久久亚洲精品禁果 | 91网视频在线观看 | 一级在线免费观看视频 | 国产一级毛片高清视频完整版 | 国产精品99久久久久久久女警 | 国产精品美女久久久久久网站 | 久久精品国产精品亚洲 | 国产精品爆操 | 国产精选电影免费在线观看网站 | 欧美精品久久久久久久久久 | 欧美成人激情在线 | 黄色av电影在线播放 | 91天堂国产在线 | 369看片你懂的小视频在线观看 | 一级电影在线免费观看 | 国产精品av久久久久久久久久 | 在线小视频国产 | 免费看污视频在线观看 | 国产精品区一区二区三区 | 国产成人精品免高潮在线观看 | 在线看免电影网站 | 日本一级黄色大片 | 91久久另类重口变态 | 国产精品久久久久久久久久东京 | 欧美日韩综合视频 | 精品一区二区三区欧美 | 日本羞羞的午夜电视剧 | 在线观看免费毛片视频 |