如何高效實現(xiàn)自定義的應用層協(xié)議
關注、星標公眾號,直達精彩內(nèi)容
簡述
互聯(lián)網(wǎng)上充斥著各種各樣的網(wǎng)絡服務,在對外提供網(wǎng)絡服務時,服務端和客戶端需要遵循同一套數(shù)據(jù)通訊協(xié)議,才能正常的進行通訊;就好像你跟臺灣人溝通用閩南語,跟廣東人溝通就用粵語一樣。
實現(xiàn)自己的應用功能時,已知的知名協(xié)議(http,smtp,ftp等)在安全性、可擴展性等方面不能滿足需求,從而需要設計并實現(xiàn)自己的應用層協(xié)議。
2.協(xié)議分類
2.1按編碼方式
二進制協(xié)議 比如網(wǎng)絡通信運輸層中的tcp協(xié)議。
明文的文本協(xié)議 比如應用層的http、redis協(xié)議。
混合協(xié)議(二進制+明文) 比如蘋果公司早期的APNs推送協(xié)議。
2.2按協(xié)議邊界
固定邊界協(xié)議 能夠明確得知一個協(xié)議報文的長度,這樣的協(xié)議易于解析,比如tcp協(xié)議。
模糊邊界協(xié)議 無法明確得知一個協(xié)議報文的長度,這樣的協(xié)議解析較為復雜,通常需要通過某些特定的字節(jié)來界定報文是否結(jié)束,比如http協(xié)議。
3.協(xié)議優(yōu)劣的基本評判標準
高效的 快速的打包解包減少對cpu的占用,高數(shù)據(jù)壓縮率降低對網(wǎng)絡帶寬的占用。
簡單的 易于人的理解、程序的解析。
易于擴展的 對可預知的變更,有足夠的彈性用于擴展。
容易兼容的
向前兼容,對于舊協(xié)議發(fā)出的報文,能使用新協(xié)議進行解析,只是新協(xié)議支持的新功能不能使用。
向后兼容,對于新協(xié)議發(fā)出的報文,能使用舊協(xié)議進行解析,只是新協(xié)議支持的新功能不能使用。
4.自定義應用層協(xié)議的優(yōu)缺點
4.1優(yōu)點
非知名協(xié)議,數(shù)據(jù)通信更安全,黑客如果要分析協(xié)議的漏洞就必須先破譯你的通訊協(xié)議。
擴展性更好,可以根據(jù)業(yè)務需求和發(fā)展擴展自己的協(xié)議,而已知的知名協(xié)議不好擴展。
4.2缺點
設計難度高,協(xié)議需要易擴展,最好能向后向前兼容。
實現(xiàn)繁瑣,需要自己實現(xiàn)序列化和反序列化。
5.動手前的預備知識
5.1大小端
計算機系統(tǒng)在存儲數(shù)據(jù)時起始地址是高地址還是低地址。
大端 從高地址開始存儲。
小端 從低地址開始存儲。
圖解

判斷 這里以c/c++語言代碼為例,使用了c語言中聯(lián)合體的特性。
#include <stdint.h>
#include <iostream>
using namespace std;
bool bigCheck()
{
union Check
{
char a;
uint32_t data;
};
Check c;
c.data = 1;
if (1 == c.a)
{
return false;
}
return true;
}
int main()
{
if (bigCheck())
{
cout << "big" << endl;
}
else
{
cout << "small" << endl;
}
return 0;
}
5.2網(wǎng)絡字節(jié)序
顧名思義就是數(shù)據(jù)在網(wǎng)絡傳送的字節(jié)流中的起始地址的高低,為了避免在網(wǎng)絡通信中引入其他復雜性,網(wǎng)絡字節(jié)序統(tǒng)一是大端的。
5.3本地字節(jié)序
本地操作系統(tǒng)的大小端,不同操作系統(tǒng)可能采用不同的字節(jié)序。
5.4內(nèi)存對象與布局
任何變量,不管是堆變量還是棧變量都對應著操作系統(tǒng)中的一塊內(nèi)存,由于內(nèi)存對齊的要求程序中的變量并不是緊湊存儲的,例如一個c語言的結(jié)構(gòu)體Test在內(nèi)存中的布局可能如下圖所示。

struct Test
{
char a;
char b;
int32_t c;
};
5.5序列化與反序列化
將計算機語言中的內(nèi)存對象轉(zhuǎn)換為網(wǎng)絡字節(jié)流,例如把c語言中的結(jié)構(gòu)體Test轉(zhuǎn)化成uint8_t data[6]字節(jié)流。
將網(wǎng)絡字節(jié)流轉(zhuǎn)換為計算機語言中的內(nèi)存對象,例如把uint8_t data[6]字節(jié)流轉(zhuǎn)化成c語言中的結(jié)構(gòu)體Test。

6.一個例子
6.1 協(xié)議設計
本協(xié)議采用固定邊界+混合編碼策略。
協(xié)議頭 8字節(jié)的定長協(xié)議頭。支持版本號,基于魔數(shù)的快速校驗,不同服務的復用。定長協(xié)議頭使協(xié)議易于解析且高效。
協(xié)議體 變長json作為協(xié)議體。json使用明文文本編碼,可讀性強、易于擴展、前后兼容、通用的編解碼算法。json協(xié)議體為協(xié)議提供了良好的擴展性和兼容性。
協(xié)議可視化圖

6.2 協(xié)議實現(xiàn)
talk is easy,just code it,使用c/c++語言來實現(xiàn)。
6.2.1c/c++語言實現(xiàn)
使用結(jié)構(gòu)體MyProtoHead來存儲協(xié)議頭
/*
協(xié)議頭
*/
struct MyProtoHead
{
uint8_t version; //協(xié)議版本號
uint8_t magic; //協(xié)議魔數(shù)
uint16_t server; //協(xié)議復用的服務號,標識協(xié)議之上的不同服務
uint32_t len; //協(xié)議長度(協(xié)議頭長度+變長json協(xié)議體長度)
};
使用開源的Jsoncpp類來存儲協(xié)議體https://sourceforge.net/proje...
協(xié)議消息體
/*
協(xié)議消息體
*/
struct MyProtoMsg
{
MyProtoHead head; //協(xié)議頭
Json::Value body; //協(xié)議體
};
打包類
/*
MyProto打包類
*/
class MyProtoEnCode
{
public:
//協(xié)議消息體打包函數(shù)
uint8_t * encode(MyProtoMsg * pMsg, uint32_t & len);
private:
//協(xié)議頭打包函數(shù)
void headEncode(uint8_t * pData, MyProtoMsg * pMsg);
};
解包類
typedef enum MyProtoParserStatus
{
ON_PARSER_INIT = 0,
ON_PARSER_HAED = 1,
ON_PARSER_BODY = 2,
}MyProtoParserStatus;
/*
MyProto解包類
*/
class MyProtoDeCode
{
public:
void init();
void clear();
bool parser(void * data, size_t len);
bool empty();
MyProtoMsg * front();
void pop();
private:
bool parserHead(uint8_t ** curData, uint32_t & curLen,
uint32_t & parserLen, bool & parserBreak);
bool parserBody(uint8_t ** curData, uint32_t & curLen,
uint32_t & parserLen, bool & parserBreak);
private:
MyProtoMsg mCurMsg; //當前解析中的協(xié)議消息體
queue<MyProtoMsg *> mMsgQ; //解析好的協(xié)議消息隊列
vector<uint8_t> mCurReserved; //未解析的網(wǎng)絡字節(jié)流
MyProtoParserStatus mCurParserStatus; //當前解析狀態(tài)
};
6.2.2打包(序列化)
void MyProtoEnCode::headEncode(uint8_t * pData, MyProtoMsg * pMsg)
{
//設置協(xié)議頭版本號為1
*pData = 1;
++pData;
//設置協(xié)議頭魔數(shù)
*pData = MY_PROTO_MAGIC;
++pData;
//設置協(xié)議服務號,把head.server本地字節(jié)序轉(zhuǎn)換為網(wǎng)絡字節(jié)序
*(uint16_t *)pData = htons(pMsg->head.server);
pData += 2;
//設置協(xié)議總長度,把head.len本地字節(jié)序轉(zhuǎn)換為網(wǎng)絡字節(jié)序
*(uint32_t *)pData = htonl(pMsg->head.len);
}
uint8_t * MyProtoEnCode::encode(MyProtoMsg * pMsg, uint32_t & len)
{
uint8_t * pData = NULL;
Json::FastWriter fWriter;
//協(xié)議json體序列化
string bodyStr = fWriter.write(pMsg->body);
//計算協(xié)議消息序列化后的總長度
len = MY_PROTO_HEAD_SIZE + (uint32_t)bodyStr.size();
pMsg->head.len = len;
//申請協(xié)議消息序列化需要的空間
pData = new uint8_t[len];
//打包協(xié)議頭
headEncode(pData, pMsg);
//打包協(xié)議體
memcpy(pData + MY_PROTO_HEAD_SIZE, bodyStr.data(), bodyStr.size());
return pData;
}
6.2.3解包(反序列化)
bool MyProtoDeCode::parserHead(uint8_t ** curData, uint32_t & curLen,
uint32_t & parserLen, bool & parserBreak)
{
parserBreak = false;
if (curLen < MY_PROTO_HEAD_SIZE)
{
parserBreak = true; //終止解析
return true;
}
uint8_t * pData = *curData;
//解析版本號
mCurMsg.head.version = *pData;
pData++;
//解析魔數(shù)
mCurMsg.head.magic = *pData;
pData++;
//魔數(shù)不一致,則返回解析失敗
if (MY_PROTO_MAGIC != mCurMsg.head.magic)
{
return false;
}
//解析服務號
mCurMsg.head.server = ntohs(*(uint16_t*)pData);
pData+=2;
//解析協(xié)議消息體的長度
mCurMsg.head.len = ntohl(*(uint32_t*)pData);
//異常大包,則返回解析失敗
if (mCurMsg.head.len > MY_PROTO_MAX_SIZE)
{
return false;
}
//解析指針向前移動MY_PROTO_HEAD_SIZE字節(jié)
(*curData) += MY_PROTO_HEAD_SIZE;
curLen -= MY_PROTO_HEAD_SIZE;
parserLen += MY_PROTO_HEAD_SIZE;
mCurParserStatus = ON_PARSER_HAED;
return true;
}
bool MyProtoDeCode::parserBody(uint8_t ** curData, uint32_t & curLen,
uint32_t & parserLen, bool & parserBreak)
{
parserBreak = false;
uint32_t jsonSize = mCurMsg.head.len - MY_PROTO_HEAD_SIZE;
if (curLen < jsonSize)
{
parserBreak = true; //終止解析
return true;
}
Json::Reader reader; //json解析類
if (!reader.parse((char *)(*curData),
(char *)((*curData) + jsonSize), mCurMsg.body, false))
{
return false;
}
//解析指針向前移動jsonSize字節(jié)
(*curData) += jsonSize;
curLen -= jsonSize;
parserLen += jsonSize;
mCurParserStatus = ON_PARSER_BODY;
return true;
}
bool MyProtoDeCode::parser(void * data, size_t len)
{
if (len <= 0)
{
return false;
}
uint32_t curLen = 0;
uint32_t parserLen = 0;
uint8_t * curData = NULL;
curData = (uint8_t *)data;
//把當前要解析的網(wǎng)絡字節(jié)流寫入未解析完字節(jié)流之后
while (len--)
{
mCurReserved.push_back(*curData);
++curData;
}
curLen = mCurReserved.size();
curData = (uint8_t *)&mCurReserved[0];
//只要還有未解析的網(wǎng)絡字節(jié)流,就持續(xù)解析
while (curLen > 0)
{
bool parserBreak = false;
//解析協(xié)議頭
if (ON_PARSER_INIT == mCurParserStatus ||
ON_PARSER_BODY == mCurParserStatus)
{
if (!parserHead(&curData, curLen, parserLen, parserBreak))
{
return false;
}
if (parserBreak) break;
}
//解析完協(xié)議頭,解析協(xié)議體
if (ON_PARSER_HAED == mCurParserStatus)
{
if (!parserBody(&curData, curLen, parserLen, parserBreak))
{
return false;
}
if (parserBreak) break;
}
if (ON_PARSER_BODY == mCurParserStatus)
{
//拷貝解析完的消息體放入隊列中
MyProtoMsg * pMsg = NULL;
pMsg = new MyProtoMsg;
*pMsg = mCurMsg;
mMsgQ.push(pMsg);
}
}
if (parserLen > 0)
{
//刪除已經(jīng)被解析的網(wǎng)絡字節(jié)流
mCurReserved.erase(mCurReserved.begin(), mCurReserved.begin() + parserLen);
}
return true;
}
7.完整源碼與測試
code is easy,just run it.
7.1源碼
#include <stdint.h>
#include <stdio.h>
#include <queue>
#include <vector>
#include <iostream>
#include <string.h>
#include <json/json.h>
#include <arpa/inet.h>
using namespace std;
const uint8_t MY_PROTO_MAGIC = 88;
const uint32_t MY_PROTO_MAX_SIZE = 10 * 1024 * 1024; //10M
const uint32_t MY_PROTO_HEAD_SIZE = 8;
typedef enum MyProtoParserStatus
{
ON_PARSER_INIT = 0,
ON_PARSER_HAED = 1,
ON_PARSER_BODY = 2,
}MyProtoParserStatus;
/*
協(xié)議頭
*/
struct MyProtoHead
{
uint8_t version; //協(xié)議版本號
uint8_t magic; //協(xié)議魔數(shù)
uint16_t server; //協(xié)議復用的服務號,標識協(xié)議之上的不同服務
uint32_t len; //協(xié)議長度(協(xié)議頭長度+變長json協(xié)議體長度)
};
/*
協(xié)議消息體
*/
struct MyProtoMsg
{
MyProtoHead head; //協(xié)議頭
Json::Value body; //協(xié)議體
};
void myProtoMsgPrint(MyProtoMsg & msg)
{
string jsonStr = "";
Json::FastWriter fWriter;
jsonStr = fWriter.write(msg.body);
printf("Head[version=%d,magic=%d,server=%d,len=%d]\n"
"Body:%s", msg.head.version, msg.head.magic,
msg.head.server, msg.head.len, jsonStr.c_str());
}
/*
MyProto打包類
*/
class MyProtoEnCode
{
public:
//協(xié)議消息體打包函數(shù)
uint8_t * encode(MyProtoMsg * pMsg, uint32_t & len);
private:
//協(xié)議頭打包函數(shù)
void headEncode(uint8_t * pData, MyProtoMsg * pMsg);
};
void MyProtoEnCode::headEncode(uint8_t * pData, MyProtoMsg * pMsg)
{
//設置協(xié)議頭版本號為1
*pData = 1;
++pData;
//設置協(xié)議頭魔數(shù)
*pData = MY_PROTO_MAGIC;
++pData;
//設置協(xié)議服務號,把head.server本地字節(jié)序轉(zhuǎn)換為網(wǎng)絡字節(jié)序
*(uint16_t *)pData = htons(pMsg->head.server);
pData += 2;
//設置協(xié)議總長度,把head.len本地字節(jié)序轉(zhuǎn)換為網(wǎng)絡字節(jié)序
*(uint32_t *)pData = htonl(pMsg->head.len);
}
uint8_t * MyProtoEnCode::encode(MyProtoMsg * pMsg, uint32_t & len)
{
uint8_t * pData = NULL;
Json::FastWriter fWriter;
//協(xié)議json體序列化
string bodyStr = fWriter.write(pMsg->body);
//計算協(xié)議消息序列化后的總長度
len = MY_PROTO_HEAD_SIZE + (uint32_t)bodyStr.size();
pMsg->head.len = len;
//申請協(xié)議消息序列化需要的空間
pData = new uint8_t[len];
//打包協(xié)議頭
headEncode(pData, pMsg);
//打包協(xié)議體
memcpy(pData + MY_PROTO_HEAD_SIZE, bodyStr.data(), bodyStr.size());
return pData;
}
/*
MyProto解包類
*/
class MyProtoDeCode
{
public:
void init();
void clear();
bool parser(void * data, size_t len);
bool empty();
MyProtoMsg * front();
void pop();
private:
bool parserHead(uint8_t ** curData, uint32_t & curLen,
uint32_t & parserLen, bool & parserBreak);
bool parserBody(uint8_t ** curData, uint32_t & curLen,
uint32_t & parserLen, bool & parserBreak);
private:
MyProtoMsg mCurMsg; //當前解析中的協(xié)議消息體
queue<MyProtoMsg *> mMsgQ; //解析好的協(xié)議消息隊列
vector<uint8_t> mCurReserved; //未解析的網(wǎng)絡字節(jié)流
MyProtoParserStatus mCurParserStatus; //當前解析狀態(tài)
};
void MyProtoDeCode::init()
{
mCurParserStatus = ON_PARSER_INIT;
}
void MyProtoDeCode::clear()
{
MyProtoMsg * pMsg = NULL;
while (!mMsgQ.empty())
{
pMsg = mMsgQ.front();
delete pMsg;
mMsgQ.pop();
}
}
bool MyProtoDeCode::parserHead(uint8_t ** curData, uint32_t & curLen,
uint32_t & parserLen, bool & parserBreak)
{
parserBreak = false;
if (curLen < MY_PROTO_HEAD_SIZE)
{
parserBreak = true; //終止解析
return true;
}
uint8_t * pData = *curData;
//解析版本號
mCurMsg.head.version = *pData;
pData++;
//解析魔數(shù)
mCurMsg.head.magic = *pData;
pData++;
//魔數(shù)不一致,則返回解析失敗
if (MY_PROTO_MAGIC != mCurMsg.head.magic)
{
return false;
}
//解析服務號
mCurMsg.head.server = ntohs(*(uint16_t*)pData);
pData+=2;
//解析協(xié)議消息體的長度
mCurMsg.head.len = ntohl(*(uint32_t*)pData);
//異常大包,則返回解析失敗
if (mCurMsg.head.len > MY_PROTO_MAX_SIZE)
{
return false;
}
//解析指針向前移動MY_PROTO_HEAD_SIZE字節(jié)
(*curData) += MY_PROTO_HEAD_SIZE;
curLen -= MY_PROTO_HEAD_SIZE;
parserLen += MY_PROTO_HEAD_SIZE;
mCurParserStatus = ON_PARSER_HAED;
return true;
}
bool MyProtoDeCode::parserBody(uint8_t ** curData, uint32_t & curLen,
uint32_t & parserLen, bool & parserBreak)
{
parserBreak = false;
uint32_t jsonSize = mCurMsg.head.len - MY_PROTO_HEAD_SIZE;
if (curLen < jsonSize)
{
parserBreak = true; //終止解析
return true;
}
Json::Reader reader; //json解析類
if (!reader.parse((char *)(*curData),
(char *)((*curData) + jsonSize), mCurMsg.body, false))
{
return false;
}
//解析指針向前移動jsonSize字節(jié)
(*curData) += jsonSize;
curLen -= jsonSize;
parserLen += jsonSize;
mCurParserStatus = ON_PARSER_BODY;
return true;
}
bool MyProtoDeCode::parser(void * data, size_t len)
{
if (len <= 0)
{
return false;
}
uint32_t curLen = 0;
uint32_t parserLen = 0;
uint8_t * curData = NULL;
curData = (uint8_t *)data;
//把當前要解析的網(wǎng)絡字節(jié)流寫入未解析完字節(jié)流之后
while (len--)
{
mCurReserved.push_back(*curData);
++curData;
}
curLen = mCurReserved.size();
curData = (uint8_t *)&mCurReserved[0];
//只要還有未解析的網(wǎng)絡字節(jié)流,就持續(xù)解析
while (curLen > 0)
{
bool parserBreak = false;
//解析協(xié)議頭
if (ON_PARSER_INIT == mCurParserStatus ||
ON_PARSER_BODY == mCurParserStatus)
{
if (!parserHead(&curData, curLen, parserLen, parserBreak))
{
return false;
}
if (parserBreak) break;
}
//解析完協(xié)議頭,解析協(xié)議體
if (ON_PARSER_HAED == mCurParserStatus)
{
if (!parserBody(&curData, curLen, parserLen, parserBreak))
{
return false;
}
if (parserBreak) break;
}
if (ON_PARSER_BODY == mCurParserStatus)
{
//拷貝解析完的消息體放入隊列中
MyProtoMsg * pMsg = NULL;
pMsg = new MyProtoMsg;
*pMsg = mCurMsg;
mMsgQ.push(pMsg);
}
}
if (parserLen > 0)
{
//刪除已經(jīng)被解析的網(wǎng)絡字節(jié)流
mCurReserved.erase(mCurReserved.begin(), mCurReserved.begin() + parserLen);
}
return true;
}
bool MyProtoDeCode::empty()
{
return mMsgQ.empty();
}
MyProtoMsg * MyProtoDeCode::front()
{
MyProtoMsg * pMsg = NULL;
pMsg = mMsgQ.front();
return pMsg;
}
void MyProtoDeCode::pop()
{
mMsgQ.pop();
}
int main()
{
uint32_t len = 0;
uint8_t * pData = NULL;
MyProtoMsg msg1;
MyProtoMsg msg2;
MyProtoDeCode myDecode;
MyProtoEnCode myEncode;
msg1.head.server = 1;
msg1.body["op"] = "set";
msg1.body["key"] = "id";
msg1.body["value"] = "9856";
msg2.head.server = 2;
msg2.body["op"] = "get";
msg2.body["key"] = "id";
myDecode.init();
pData = myEncode.encode(&msg1, len);
if (!myDecode.parser(pData, len))
{
cout << "parser falied!" << endl;
}
else
{
cout << "msg1 parser successful!" << endl;
}
pData = myEncode.encode(&msg2, len);
if (!myDecode.parser(pData, len))
{
cout << "parser falied!" << endl;
}
else
{
cout << "msg2 parser successful!" << endl;
}
MyProtoMsg * pMsg = NULL;
while (!myDecode.empty())
{
pMsg = myDecode.front();
myProtoMsgPrint(*pMsg);
myDecode.pop();
}
return 0;
}
7.2運行測試

8.總結(jié)
不到350行的代碼向我們展示了一個自定義的應用層協(xié)議該如何實現(xiàn),當然這個協(xié)議是不夠完善的,還可以對其完善,比如對協(xié)議體進行加密加強協(xié)議的安全性等。
版權(quán)聲明:本文來源網(wǎng)絡,免費傳達知識,版權(quán)歸原作者所有。如涉及作品版權(quán)問題,請聯(lián)系我進行刪除。
???????????????? END ???????????????
關注我的微信公眾號,回復“加群”按規(guī)則加入技術(shù)交流群。
點擊“閱讀原文”查看更多分享,歡迎點分享、收藏、點贊、在看。
