一般對DB的CRUD操作都由com_query報文封裝并發(fā)送給DB。com_query報文如下圖所示: PacketLength:3byte表示body長度,防"粘包"。sequenceId:1byte防串包。body部分:是一個以0x00結(jié)尾的字符串,這個字符串就是想要執(zhí)行的SQL,實際操作中即是一直讀body直到讀到一個0x00皆為的字符串即停止。
如果SQL是insert、update或者是delete,則返回的是對應(yīng)的okay或者error報文。okay報文的類表示:
public class OkPacket extends MySQLPacket { public static final byte FIELD_COUNT = 0x00; public static final byte[] OK = new byte[] { 7, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0 }; public static final byte[] AUTH_OK = new byte[] { 7, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0 }; public byte fieldCount = FIELD_COUNT; public long affectedRows; public long insertId; public int serverStatus; public int warningCount; public byte[] message;}error報文的類表示:
public class ErrorPacket extends MySQLPacket { public static final byte FIELD_COUNT = (byte) 0xff; PRivate static final byte SQLSTATE_MARKER = (byte) '#'; private static final byte[] DEFAULT_SQLSTATE = "HY000".getBytes(); public byte fieldCount = FIELD_COUNT; public int errno; public byte mark = SQLSTATE_MARKER; public byte[] sqlState = DEFAULT_SQLSTATE; public byte[] message;}如果執(zhí)行的SQL是select語句,則返回的報文比較復(fù)雜,不過筆者已經(jīng)整理成圖的形式。
首先ResultSet是由很多行(Row)組成,每一行(Row)就表示了一條記錄,Row格式如下所示:每一行(row)又分好field_count個字段,這個field_count將會在比Row還高一層的Result格式中描述,下面有詳解。每一個字段都是一個length-value對,length長度是3byte,其讀取方法很特殊,現(xiàn)在直接用代碼表述:
獲取了length長度之后,則可以根據(jù)此讀出來后面的value。value的類型也會在后面的Resutl格式中描述。
嚴格來說ResultSet是由多個獨立的報文以協(xié)議的形式組織起來,現(xiàn)直接放出ResultSet的協(xié)議格式圖: 從上圖中可以看到,當客戶端發(fā)送一個select的com_query包后,DB會按照下列步驟返回:Step1:返回一個ResultSetHeader報文,其中包含了fieldCount,在此圖就不例出了,現(xiàn)只給出代碼定義。
Step2:根據(jù)讀取到的fieldCount來在接下來的byte流里面讀取fieldCount個FiledPacket報文,FieldPacket報文的代碼定義:
public class FieldPacket extends MySQLPacket { private static final byte[] DEFAULT_CATALOG = "def".getBytes(); private static final byte[] FILLER = new byte[2]; public byte[] catalog = DEFAULT_CATALOG; public byte[] db; public byte[] table; public byte[] orgTable; public byte[] name; public byte[] orgName; public int charsetIndex; public long length; public int type; public int flags; public byte decimals; public byte[] definition;}具體邏輯請參照githubStep3:再讀取一個eof包表示field包流的結(jié)束eof包的代碼定義:
public class EOFPacket extends MySQLPacket { public static final byte FIELD_COUNT = (byte) 0xfe; public byte fieldCount = FIELD_COUNT; public int warningCount; public int status = 2;Step4:一直讀Row,直到讀到last eof位置,Row格式已經(jīng)在上面給過。Step5:如果讀到任何一個error包后,此此讀取結(jié)束,拋出錯誤。Step6:值得注意的是,如果eof中的status & SERVER_MORE_RESULT_EXISTS不為0,表明還有ResultSet。則繼續(xù)返回到fieldCount階段進行下一步的讀取。Step7:至此,整個ResultSet讀取完畢。下面給出上述過程的java代碼(基于Netty):
private boolean handleResultSet(BinaryPacket bin, CmdType cmdType) { boolean result = false; int type = bin.data[0]; switch (type) { case ErrorPacket.FIELD_COUNT: // 重置狀態(tài),且告訴上層當前select已經(jīng)處理完畢 resetSelect(); result = true; ErrorPacket err = new ErrorPacket(); err.read(bin); // write(bin,cmdType); getResponseHandler().errorResponse(bin); logger.error("handleResultSet errorMessage:" + new String(err.message)); break; case EOFPacket.FIELD_COUNT: EOFPacket eof = new EOFPacket(); eof.read(bin); if (selectState == BackendConnState.RESULT_SET_FIELDS) { // logger.info("eof"); // 推進狀態(tài) 需要步進兩次狀態(tài),先到field_eof,再到row selectStateStep(); selectStateStep(); // 給FieldList增加eof addToFieldList(bin); getResponseHandler().fieldListResponse(fieldList); } else { if (eof.hasStatusFlag(MySQLPacket.SERVER_MORE_RESULTS_EXISTS)) { // 重置為select的初始狀態(tài),但是還是處在select mode下 selectState = BackendConnState.RESULT_SET_FIELD_COUNT; } else { // 重置,且告訴上層當前select已經(jīng)處理完畢 resetSelect(); result = true; } getResponseHandler().lastEofResponse(bin); } break; default: switch (selectState) { case BackendConnState.RESULT_SET_FIELD_COUNT: selectStateStep(); addToFieldList(bin); break; case BackendConnState.RESULT_SET_FIELDS: addToFieldList(bin); break; case BackendConnState.RESULT_SET_ROW: getResponseHandler().rowResponse(bin); break; } } return result; }https://github.com/alchemystar/Lancelot.git
新聞熱點
疑難解答