1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef MULTI_VER_DATA_SYNC_H
17 #define MULTI_VER_DATA_SYNC_H
18 
19 #ifndef OMIT_MULTI_VER
20 #include <vector>
21 
22 #include "icommunicator.h"
23 #include "multi_ver_kvdb_sync_interface.h"
24 #include "multi_ver_sync_task_context.h"
25 #include "sync_task_context.h"
26 
27 namespace DistributedDB {
28 class MultiVerRequestPacket {
29 public:
MultiVerRequestPacket()30     MultiVerRequestPacket() : errCode_(E_OK) {};
~MultiVerRequestPacket()31     ~MultiVerRequestPacket() {};
32 
33     uint32_t CalculateLen() const;
34 
35     void SetCommit(MultiVerCommitNode &commit);
36 
37     void GetCommit(MultiVerCommitNode &commit) const;
38 
39     void SetErrCode(int32_t errCode);
40 
41     int32_t GetErrCode() const;
42 private:
43     MultiVerCommitNode commit_;
44     int32_t errCode_ = E_OK;
45 };
46 
47 class MultiVerAckPacket {
48 public:
MultiVerAckPacket()49     MultiVerAckPacket() : errorCode_(0) {};
~MultiVerAckPacket()50     ~MultiVerAckPacket() {};
51 
52     uint32_t CalculateLen() const;
53 
54     void SetData(std::vector<std::vector<uint8_t>> &data);
55 
56     void GetData(std::vector<std::vector<uint8_t>> &data) const;
57 
58     void SetErrorCode(int32_t errCode);
59 
60     void GetErrorCode(int32_t &errCode) const;
61 private:
62     std::vector<std::vector<uint8_t>> entries_;
63     int32_t errorCode_;
64 };
65 
66 class MultiVerDataSync {
67 public:
MultiVerDataSync()68     MultiVerDataSync() : storagePtr_(nullptr), communicateHandle_(nullptr) {};
69     ~MultiVerDataSync();
70     DISABLE_COPY_ASSIGN_MOVE(MultiVerDataSync);
71 
72     static int RegisterTransformFunc();
73 
74     int Initialize(MultiVerKvDBSyncInterface *storagePtr, ICommunicator *communicateHandle);
75 
76     static int Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg);
77 
78     static int DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg);
79 
80     static uint32_t CalculateLen(const Message *inMsg);
81 
82     void TimeOutCallback(MultiVerSyncTaskContext *context, const Message *message) const;
83 
84     int SyncStart(MultiVerSyncTaskContext *context);
85 
86     int RequestRecvCallback(const MultiVerSyncTaskContext *context, const Message *message);
87 
88     int AckRecvCallback(MultiVerSyncTaskContext *context, const Message *message);
89 
90     int PutCommitData(const MultiVerCommitNode &commit, const std::vector<MultiVerKvEntry *> &entries,
91         const std::string &deviceName);
92 
93     int MergeSyncCommit(const MultiVerCommitNode &commit, const std::vector<MultiVerCommitNode> &commits);
94 
95     void ReleaseKvEntry(const MultiVerKvEntry *entry);
96 
97     void SendFinishedRequest(const MultiVerSyncTaskContext *context);
98 
99 private:
100     static int RequestPacketCalculateLen(const Message *inMsg, uint32_t &len);
101 
102     static int RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg);
103 
104     static int RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg);
105 
106     static int AckPacketCalculateLen(const Message *inMsg, uint32_t &len);
107 
108     static int AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg);
109 
110     static int AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg);
111 
112     static bool IsPacketValid(const Message *inMsg, uint16_t messageType);
113 
114     int GetValidCommit(MultiVerSyncTaskContext *context, MultiVerCommitNode &commit);
115 
116     bool IsCommitExisted(const MultiVerCommitNode &);
117 
118     int Send(const DeviceID &deviceId, const Message *inMsg);
119 
120     int SendRequestPacket(const MultiVerSyncTaskContext *context, MultiVerCommitNode &commit);
121 
122     int SendAckPacket(const MultiVerSyncTaskContext *context, const std::vector<MultiVerKvEntry *> &dataItems,
123         int retCode, const Message *message);
124 
125     int GetCommitData(const MultiVerCommitNode &commit, std::vector<MultiVerKvEntry *> &entries);
126 
127     MultiVerKvEntry *CreateKvEntry(const std::vector<uint8_t> &entry);
128 
129     MultiVerKvDBSyncInterface *storagePtr_;
130     ICommunicator *communicateHandle_;
131 };
132 }
133 
134 #endif
135 #endif