Apollo是apache旗下的基金項(xiàng)目,它是以Apache ActiveMQ5.x為基礎(chǔ),采用全新的線程和消息調(diào)度架構(gòu)重新實(shí)現(xiàn)的消息中間件,針對(duì)多核處理器進(jìn)行了優(yōu)化處理,它的速度更快、更可靠、更易于維護(hù)。apollo與ActiveQQ一樣支持多協(xié)議:STOMP、AMQP、MQTT、Openwire、 SSL、WebSockets,本文只介紹MQTT協(xié)議的使用。
關(guān)于ActiveMQ5請(qǐng)參考:http://activemq.apache.org,本文只介紹Apollo在windows下安裝和應(yīng)用,Apollo的詳細(xì)文檔請(qǐng)參考官網(wǎng):http://activemq.apache.org/apollo/documentation/user-manual.html.
進(jìn)入http://activemq.apache.org/apollo/download.html,下載windows版本的壓縮包,并解壓到自己工作目錄(如:E:/apache-apollo-1.7),并創(chuàng)建環(huán)境變量APOLLO_HOME=E:/apache-apollo-1.7。如果操作是系統(tǒng)是Windows Vista或更高版本,則需要安裝Microsoft Visual C++ 2010 Redistributable (64位JVM:http://www.microsoft.com/en-us/download/details.aspx?id=14632;32位JVM:http://www.microsoft.com/en-us/download/details.aspx?id=5555)。
進(jìn)入E:/apache-apollo-1.7之下的bin目錄,打開cmd窗口,執(zhí)行命令:apollo create E:/apollo_broker,命令執(zhí)行成功后,在E盤下會(huì)有apollo_broker目錄,在其下有個(gè)bin目錄,其中有兩個(gè)文件:apollo-broker.cmd和apollo-broker-service.exe,第一個(gè)是通過(guò)cmd命令啟動(dòng)apollo服務(wù)的,第二個(gè)是創(chuàng)建window服務(wù)的。
cmd命令啟動(dòng):apollo-broker run,啟動(dòng)成功可以在瀏覽器中查看運(yùn)行情況(http://127.0.0.1:61680/,默認(rèn)用戶名/密碼:admin/passWord);
windows服務(wù)啟動(dòng):執(zhí)行apollo-broker-service.exe,創(chuàng)建windows服務(wù),就可以以windows服務(wù)的方式啟動(dòng)apollo服務(wù)。
MQTT協(xié)議有眾多客戶端實(shí)現(xiàn),相關(guān)請(qǐng)參考:http://activemq.apache.org/apollo/versions/1.7/website/documentation/mqtt-manual.html。
本文采用eclipse的paho客戶端實(shí)現(xiàn)(https://eclipse.org/paho/)。a.javascript客戶端:https://eclipse.org/paho/clients/js/
將Javascript客戶端項(xiàng)目下載下來(lái),并在其項(xiàng)目根目錄下執(zhí)行mvn命令,進(jìn)行編譯,生成target目錄,其下生成mqttws31.js、mqttws31-min.js兩個(gè)js文件,將其拷貝到自己項(xiàng)目相關(guān)目錄下,并在頁(yè)面中引用,即可實(shí)現(xiàn)javascript客戶端的消息訂閱和發(fā)布,demo代碼如下:
var client = new Paho.MQTT.Client(location.hostname, 61623,"/", "clientId");
// 61623是ws連接的默認(rèn)端口,可以在apollo中間件中進(jìn)行配置(關(guān)于apollo的配置請(qǐng)參考:http://activemq.apache.org/apollo/documentation/user-manual.html)
// set callback handlers
client.onConnectionLost = onConnectionLost;
client.onMessageArrived = onMessageArrived;
// connect the client
client.connect({userName:'admin',password:'password',onSuccess:onConnect});
// called when the client connects
function onConnect() { // 連接成功后的處理
// Once a connection has been made, make a subscription and send a message.
console.log("onConnect");
client.subscribe("/topic/event"); // 訂閱消息的主題
var message = new Paho.MQTT.Message("Hello,this is a test");
message.destinationName = "/topic/event";
client.send(message); // 發(fā)送消息
}
// called when the client loses its connection
function onConnectionLost(responSEObject) { // 連接丟失后的處理
if (responseObject.errorCode !== 0) {
console.log("onConnectionLost:"+responseObject.errorMessage);
}
}
// called when a message arrives
function onMessageArrived(message) { // 消息接收成功后的處理
console.log("onMessageArrived:"+message.payloadString);
}b. java客戶端實(shí)現(xiàn)
paho目前只支持J2SE和安卓,下載地址:https://eclipse.org/paho/clients/java/,我們采用maven方式。
maven庫(kù)地址:
https://repo.eclipse.org/content/repositories/paho-releases/ - Official Releases
https://repo.eclipse.org/content/repositories/paho-snapshots/ - Nightly Snapshots
maven dependency:<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.0.1</version>
</dependency>
說(shuō)明:版本為1.0.0或0.9.0時(shí),其jar包根本加載不進(jìn)來(lái),最后搜到1.0.1版本才可以正常使用。
java端實(shí)現(xiàn):public interface IMessage {
String getHost();
Integer getPort();
Integer getQos();
String getTopic();
String getClientId();
String getContent();
byte[] getContentBytes();
Map<String,Object> getOption();
Object getSender();
Date getSendTime();
}public final class MessagePRocessingCenter {
protected static Logger logger=LoggerFactory.getLogger(MessageProcessingCenter.class);
protected static final String BROKER_PREFIX="tcp://";
protected static final String BROKER_HOST="localhost";
protected static final int PORT=61613;
protected static final int QOS=2;
protected static final String TOPIC="/topic/event";
protected static final String CLIENT_ID="clientId";
protected static final String MQ_USER="admin";
protected static final String MQ_PASSWORD="password";
public static void send(IMessage message){
String topic= StringUtils.isEmpty(message.getTopic())?TOPIC: message.getTopic();
int qos=null == message.getQos()?QOS: message.getQos();
String broker=BROKER_PREFIX+ (StringUtils.isEmpty(message.getHost())?BROKER_HOST:message.getHost());
int port=null == message.getPort()?PORT:message.getPort();
broker+=":"+port;
String clientId = StringUtils.isEmpty(message.getClientId())?CLIENT_ID:message.getClientId();
Map<String,Object> opts=message.getOption();
String user=MQ_USER;
String password=MQ_PASSWORD;
if(null != opts){
if(null != opts.get("userName")){
user=opts.get("userName").toString();
}
if(null != opts.get("password")){
password=opts.get("password").toString();
}
}
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(user);
connOpts.setPassword(password.toCharArray());
connOpts.setCleansession(true);
sampleClient.connect(connOpts);
MqttMessage mqm = new MqttMessage(message.getContentBytes());
mqm.setQos(qos);
sampleClient.publish(topic, mqm);
sampleClient.disconnect();
} catch(MqttException me) {
logger.info("********************* send message exception :");
logger.info("********************* reason : " + me.getReasonCode());
logger.info("********************* msg : " + me.getMessage());
logger.info("********************* loc : " + me.getLocalizedMessage());
logger.info("********************* cause : " + me.getCause());
logger.info("********************* excep : " + me);
me.printStackTrace();
}
}
public static void send(Set<IMessage> set){
for(IMessage message:set){
send(message);
}
}
}
至此,MQTT協(xié)議已部署完畢,java端可以發(fā)布消息,而javascript端則可以訂閱并接收到j(luò)ava端發(fā)布的信息。
本文只是依照官網(wǎng)手冊(cè)而實(shí)現(xiàn)的簡(jiǎn)單應(yīng)用,講解不一定十分準(zhǔn)確,有什么不對(duì)的地方還請(qǐng)多多指點(diǎn),更詳細(xì)的應(yīng)用請(qǐng)參考官網(wǎng)文檔:
apollo:http://activemq.apache.org/apollo/documentation/user-manual.html
eclipse paho:https://eclipse.org/paho/
|
新聞熱點(diǎn)
疑難解答
圖片精選
網(wǎng)友關(guān)注