1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef PROTOCOLPROTO_H
17 #define PROTOCOLPROTO_H
18 
19 #include <cstdint>
20 #include <memory>
21 #include <shared_mutex>
22 #include "communicator_type_define.h"
23 #include "frame_header.h"
24 #include "iprocess_communicator.h"
25 #include "message.h"
26 #include "message_transform.h"
27 #include "parse_result.h"
28 #include "serial_buffer.h"
29 
30 namespace DistributedDB {
31 struct PhyHeaderInfo {
32     uint64_t sourceId = 0u;
33     uint32_t frameId = 0u;
34     FrameType frameType = FrameType::EMPTY;
35     bool sendLabelExchange = true;
36 };
37 
38 struct FrameFragmentInfo {
39     uint8_t *oringinalBytesAddr = nullptr;
40     uint32_t extendHeadSize = 0u;
41     uint32_t splitLength = 0u;
42     uint16_t fragCount = 0u;
43 };
44 
45 struct FragmentPacket {
46     uint8_t *ptrPacket = nullptr;
47     uint32_t leftLength = 0u;
48 };
49 
50 class ProtocolProto {
51 public:
52     // For application layer frame
53     static uint32_t GetAppLayerFrameHeaderLength();
54     static uint32_t GetLengthBeforeSerializedData();
55 
56     // For communication layer frame
57     static uint32_t GetCommLayerFrameHeaderLength();
58 
59     // For handling application layer message. Return a heap object.
60     static SerialBuffer *ToSerialBuffer(const Message *inMsg,
61         std::shared_ptr<ExtendHeaderHandle> &extendHandle, bool onlyMsgHeader, int &outErrorNo);
62     static Message *ToMessage(const SerialBuffer *inBuff, int &outErrorNo, bool onlyMsgHeader = false);
63 
64     // For handling communication layer frame. Return a heap object.
65     static SerialBuffer *BuildEmptyFrameForVersionNegotiate(int &outErrorNo);
66     static SerialBuffer *BuildFeedbackMessageFrame(const Message *inMsg, const LabelType &inLabel, int &outErrorNo);
67     static SerialBuffer *BuildLabelExchange(uint64_t inDistinctValue, uint64_t inSequenceId,
68         const std::set<LabelType> &inLabels, int &outErrorNo);
69     static SerialBuffer *BuildLabelExchangeAck(uint64_t inDistinctValue, uint64_t inSequenceId, int &outErrorNo);
70 
71     // Return E_OK if no error happened. outPieces.size equal zero means not split, in this case, use ori buff.
72     static int SplitFrameIntoPacketsIfNeed(const SerialBuffer *inBuff, uint32_t inMtuSize,
73         std::vector<std::pair<std::vector<uint8_t>, uint32_t>> &outPieces);
74     static int AnalyzeSplitStructure(const ParseResult &inResult, uint32_t &outFragLen, uint32_t &outLastFragLen);
75 
76     // inFrame is the destination, pktBytes and pktLength are the source, fragOffset and fragLength give the boundary
77     static int CombinePacketIntoFrame(SerialBuffer *inFrame, const uint8_t *pktBytes, uint32_t pktLength,
78         uint32_t fragOffset, uint32_t fragLength);
79 
80     // Must not be called in multi-thread
81     // Return E_ALREADY_REGISTER if msgId is already registered
82     // Return E_INVALID_ARGS if member of inFunc not all valid
83     static int RegTransformFunction(uint32_t msgId, const TransformFunc &inFunc);
84 
85     static void UnRegTransformFunction(uint32_t msgId);
86 
87     // For application layer frame. In send case. Focus on frame.
88     static int SetDivergeHeader(SerialBuffer *inBuff, const LabelType &inCommLabel);
89 
90     // For both application and communication layer frame. In send case. Focus on frame.
91     static int SetPhyHeader(SerialBuffer *inBuff, const PhyHeaderInfo &inInfo);
92 
93     // In receive case, return error if parse fail.
94     static int CheckAndParsePacket(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
95         ParseResult &outResult);
96 
97     // The CommPhyHeader had already been parsed into outResult
98     static int CheckAndParseFrame(const SerialBuffer *inBuff, ParseResult &outResult);
99 
100     // Dfx method for helping debugging
101     static void DisplayPacketInformation(const uint8_t *bytes, uint32_t length);
102 
103     ProtocolProto() = delete;
104     ~ProtocolProto() = delete;
105 private:
106     static int CalculateXorSum(const uint8_t *bytes, uint32_t length, uint64_t &outSum);
107 
108     // For handling application layer message
109     static int CalculateDataSerializeLength(const Message *inMsg, uint32_t &outLength);
110     static int SerializeMessage(SerialBuffer *inBuff, const Message *inMsg);
111     static int DeSerializeMessage(const SerialBuffer *inBuff, Message *inMsg, bool onlyMsgHeader);
112     static bool IsSupportMessageVersion(uint16_t version);
113     static bool IsFeedbackErrorMessage(uint32_t errorNo);
114 
115     static int ParseCommPhyHeader(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
116         ParseResult &inResult);
117     static int ParseCommPhyHeaderCheckMagicAndVersion(const uint8_t *bytes, uint32_t length);
118     static int ParseCommPhyHeaderCheckField(const std::string &srcTarget, const CommPhyHeader &phyHeader,
119         const uint8_t *bytes, uint32_t length);
120     static int ParseCommPhyOptHeader(const uint8_t *bytes, uint32_t length, ParseResult &inResult);
121     static int ParseCommDivergeHeader(const uint8_t *bytes, uint32_t length, ParseResult &inResult);
122     static int ParseCommLayerPayload(const uint8_t *bytes, uint32_t length, ParseResult &inResult);
123     static int ParseLabelExchange(const uint8_t *bytes, uint32_t length, ParseResult &inResult);
124     static int ParseLabelExchangeAck(const uint8_t *bytes, uint32_t length, ParseResult &inResult);
125 
126     static int FrameFragmentation(const uint8_t *splitStartBytes, const FrameFragmentInfo &fragmentInfo,
127         const CommPhyHeader &framePhyHeader, std::vector<std::pair<std::vector<uint8_t>, uint32_t>> &outPieces);
128     static int FillFragmentPacket(const CommPhyHeader &phyHeader, const CommPhyOptHeader &phyOptHeader,
129         const uint8_t *fragBytes, uint32_t fragLen, FragmentPacket &outPacket);
130     static int FillFragmentPacketExtendHead(uint8_t *headBytesAddr, uint32_t headLen, FragmentPacket &outPacket);
131     static int GetExtendHeadDataSize(std::shared_ptr<ExtendHeaderHandle> &extendHandle, uint32_t &headSize);
132     static int FillExtendHeadDataIfNeed(std::shared_ptr<ExtendHeaderHandle> &extendHandle, SerialBuffer *buffer,
133         uint32_t headSize);
134 
135     static int GetTransformFunc(uint32_t messageId, TransformFunc &function);
136 
137     static std::shared_mutex msgIdMutex_;
138     static std::map<uint32_t, TransformFunc> msgIdMapFunc_;
139 };
140 } // namespace DistributedDB
141 
142 #endif // PROTOCOLPROTO_H
143