麻豆小视频在线观看_中文黄色一级片_久久久成人精品_成片免费观看视频大全_午夜精品久久久久久久99热浪潮_成人一区二区三区四区

首頁 > 學(xué)院 > 開發(fā)設(shè)計(jì) > 正文

AMQP-cpp 開發(fā)

2019-11-08 03:22:50
字體:
供稿:網(wǎng)友

linux開發(fā):從官網(wǎng)下載amqp-cpp(https://github.com/CopernicaMarketingSoftware/AMQP-CPP)這個(gè)庫,這個(gè)庫是linux 下的。里面包含了linux下的tcp通信(不兼容widows),適合linux開發(fā)。

windows開發(fā):由于官網(wǎng)沒有提供可以直接在windows下使用的amqp-cpp,所以需要將官網(wǎng)的linux下的庫經(jīng)過修改,剝離了tcp通信部分,只留下了AMQP協(xié)議框架部分(https://git.oschina.net/Ailsc/amqp-cpp.git)。所以開發(fā)的話需要自己加入tcp(iocp:https://git.oschina.net/Ailsc/jeflib.git)通信部分配合AMQP-cpp協(xié)議框架,來共同開發(fā)AMQP。amqp-cpp.git內(nèi)部有完整的Demo。

AMQP開發(fā)之前需要了解AMQP協(xié)議,可以參考前面幾章對vhost,exchange,que的參數(shù)做一個(gè)基本了解。然后配合AMQP-cpp進(jìn)行軟件開發(fā)。

開發(fā)流程: 1.實(shí)現(xiàn)自己的AMQP::ConnectionHandler,ConnectionHandler類似于一個(gè)中間組件,將AMQP請求指令轉(zhuǎn)化為數(shù)據(jù)流,然后通過Tcp將數(shù)據(jù)流發(fā)送出去。

class CTcpConnectionHandle : public AMQP::ConnectionHandler{public: CTcpConnectionHandle(CTcpLink *plink); virtual ~CTcpConnectionHandle();public: ///> 當(dāng)AMQP發(fā)送數(shù)據(jù)的時(shí)候觸發(fā)onData virtual void onData(AMQP::Connection *connection, const char *buffer, size_t size); ///> 當(dāng)AMQP協(xié)議準(zhǔn)備好(已經(jīng)登錄成功)觸發(fā) virtual void onConnected(AMQP::Connection *connection); ///> 這個(gè)需要在tcp 斷開的時(shí)候手動調(diào)用一下 virtual void onClosed(AMQP::Connection *connection); ///> 當(dāng)發(fā)生錯誤的時(shí)候,該連接不可再用需要重新建立連接 virtual void onError(AMQP::Connection *connection, const char *message);PRivate: CTcpLink *m_plink;//和當(dāng)前連接關(guān)聯(lián)};

2.實(shí)現(xiàn)TcpLink進(jìn)行數(shù)據(jù)流的收發(fā)操作,以及數(shù)據(jù)解析。

class CTcpLink :public jeflib::iocp::ILinkContext{ friend class CTcpConnectionHandle;public: CTcpLink(); ~CTcpLink();public: ///> set usr psw vhost void setAMQP(std::string strusr, std::string strpsw, std::string strvhost); ///> block bool IsReady(); virtual void on_close();//on connection close,主動斷開不會收到通知 ///> for tcp svr call back virtual void on_accepted(jeflib::iocp::NETHANDLE nethandle, const char *szip, const unsigned short sport){}; ///> tcp recv data virtual void on_recv(const char *pdata, int ndatasize); ///> client connect ok virtual void on_connect_ok(jeflib::iocp::NETHANDLE nethandle); ///> client connect error virtual void on_connect_err(jeflib::iocp::NETHANDLE nethandle, bool bactive/*是否為主本端動關(guān)閉*/); ///> for iocp send data call back to resend virtual void on_send_ok(const char *pdata, int ndatasize){} virtual void on_send_err(const char *pdata, int ndatasize){} virtual CTcpLink* new_context(){ return new CTcpLink; } bool send(const char *data, int size); void parse(); Operator AMQP::Connection&(){ return *m_pConnect;}protected: std::promise<bool> m_bready;//是否準(zhǔn)備好 bool m_blink; mutable std::mutex m_lock; std::vector<char> m_recv_buff;//數(shù)據(jù)接收緩沖區(qū) AMQP::Connection *m_pConnect;//AMQP connect CTcpConnectionHandle *m_phandler; bool m_bpares;private://AMQP 登錄信息 std::string m_strusr; std::string m_strpsw; std::string m_strvhost;};

3.AMQP::Connection 這個(gè)是將網(wǎng)絡(luò)來的數(shù)據(jù)解析為AMQP協(xié)議,并且將數(shù)據(jù)行為反應(yīng)到對應(yīng)的回調(diào)函數(shù)之中。

void CTcpLink::parse(){ if (m_pConnect == NULL) return; uint64_t use = 0; ///> 不阻塞線程,parse一次只能一個(gè)線程調(diào)用,不能多個(gè)線程 std::unique_lock<std::mutex> guard(m_lock, std::try_to_lock); if (!guard.owns_lock()) return; if (m_bpares) return; m_bpares = true; size_t size = m_recv_buff.size(); while (size - use >= m_pConnect->expected()) { std::vector<char> buff(m_recv_buff.begin() + use, m_recv_buff.begin() + use + m_pConnect->expected()); ///> 解析期間打開鎖,允許接收數(shù)據(jù) guard.unlock(); use += m_pConnect->parse(buff.data(), buff.size()); guard.lock(); } m_recv_buff.erase(m_recv_buff.begin(), m_recv_buff.begin() + use); m_bpares = false;}

4.連接Rabbit服務(wù)器,然后進(jìn)行AMQP的通信。通信流程

Y-操作成功 連接服務(wù)器-》(Y)創(chuàng)建通道=》(Y)啟用ACK=》(Y)聲明exchange=》(Y)聲明隊(duì)列=》(Y)bind 隊(duì)列=》(Y)開始消費(fèi)隊(duì)列,publish隊(duì)列

異常處理: tcp連接異常:可以采取重連或者關(guān)閉連接。 rabbit登錄異常:斷開Tcp連接。 通道異常:關(guān)閉當(dāng)前通道,然后重啟一個(gè)通道,復(fù)制該異常通道的業(yè)務(wù)。

if(channel ready){ if(declare que) { channel->bind current que; channel->declare next que; } else//聲明隊(duì)列失敗 { 從需要聲明的隊(duì)列中移除當(dāng)前錯誤的隊(duì)列,因此即使再次嘗試聲明該隊(duì)列還是會異常,所以拋棄該隊(duì)列。 channel->erase(que); 獲取當(dāng)前所有需要聲明的隊(duì)列 aryque = channel->getqueary(); 重啟通道,并且將需要聲明的隊(duì)列讓新通道去做 new channel(aryque ); }}

具體AMQP實(shí)現(xiàn)請參照上述Demo。

注意事項(xiàng): 1.隊(duì)列的Auto delete 屬性,只有已經(jīng)發(fā)生過consumer操作的時(shí)候才會生效。 2.只有觸發(fā)AMQP::ConnectionHandler::onConnected才能表明AMQP協(xié)議準(zhǔn)備完成,包括連接和登錄準(zhǔn)備完畢,才能進(jìn)行AMQP操作。 3.AMQP的通道不是線程安全的,在進(jìn)行通道操作的時(shí)候需要加鎖。或者一個(gè)線程操作一個(gè)通道。 4.AMQP同一個(gè)通道的指令處理順序需要保證序列性,因?yàn)橐粋€(gè)請求或者指令可能存在幾個(gè)通信包。例如:publis存在三次send才完成一次publish指令。否則可能導(dǎo)致指令交匯,導(dǎo)致異常。


發(fā)表評論 共有條評論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 国产精品成人免费一区久久羞羞 | 亚洲成人在线视频网 | 亚洲成人在线免费观看 | 永久免费毛片 | 嗯~啊~用力~高h | 欧美一级三级在线观看 | 日本看片一区二区三区高清 | 成年片在线观看 | 国产精品午夜在线 | 香蕉成人在线观看 | 黄色网址电影 | 九草在线 | 久久国产精品99国产 | 神马顶级推理片免费看 | 一级在线视频 | 中文字幕在线观看视频一区 | 国产精品jk白丝蜜臀av软件 | bt 自拍 另类 综合 欧美 | 午夜视频在线观看免费视频 | 欧美日韩在线中文字幕 | 免费久久久 | 久久久久久久久浪潮精品 | 嗯~啊~弄嗯~啊h高潮视频 | 国产精品成人一区 | www.精品在线| 香蕉视频1024 | 污片视频在线观看 | 污黄视频在线观看 | 爱性久久久久久久 | 亚洲欧美在线视频免费 | 色羞羞 | 欧美爱爱视频 | 亚洲第一色片 | 狠狠干天天操 | 圆产精品久久久久久久久久久 | 久久久久久久一区二区三区 | 黄色毛片一级 | 激情毛片 | 久久草草影视免费网 | 成熟女人特级毛片www免费 | 中文字幕亚洲欧美 |