1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef VIRTUAL_CLOUD_DB_H
17 #define VIRTUAL_CLOUD_DB_H
18 #include <atomic>
19 #include <mutex>
20 #include "icloud_db.h"
21 
22 namespace DistributedDB {
23 class VirtualCloudDb : public ICloudDb {
24 public:
25     struct CloudData {
26         VBucket record;
27         VBucket extend;
28     };
29     VirtualCloudDb() = default;
30     ~VirtualCloudDb() override = default;
31     DBStatus BatchInsert(const std::string &tableName, std::vector<VBucket> &&record,
32         std::vector<VBucket> &extend) override;
33 
34     DBStatus BatchInsertWithGid(const std::string &tableName, std::vector<VBucket> &&record,
35         std::vector<VBucket> &extend);
36 
37     DBStatus BatchUpdate(const std::string &tableName, std::vector<VBucket> &&record,
38         std::vector<VBucket> &extend) override;
39 
40     DBStatus BatchDelete(const std::string &tableName, std::vector<VBucket> &extend) override;
41 
42     DBStatus Query(const std::string &tableName, VBucket &extend, std::vector<VBucket> &data) override;
43 
44     DBStatus DeleteByGid(const std::string &tableName, VBucket &extend);
45 
46     std::pair<DBStatus, std::string> GetEmptyCursor(const std::string &tableName) override;
47 
48     std::pair<DBStatus, uint32_t> Lock() override;
49 
50     DBStatus UnLock() override;
51 
52     DBStatus HeartBeat() override;
53 
54     DBStatus Close() override;
55 
56     void SetCloudError(bool cloudError);
57 
58     void SetBlockTime(int32_t blockTime);
59 
60     void ClearHeartbeatCount();
61 
62     int32_t GetHeartbeatCount() const;
63 
64     bool GetLockStatus() const;
65 
66     void SetHeartbeatError(bool heartbeatError);
67 
68     void SetIncrementData(const std::string &tableName, const VBucket &record, const VBucket &extend);
69 
70     uint32_t GetQueryTimes(const std::string &tableName);
71 
72     void SetActionStatus(DBStatus status);
73 
74     DBStatus GetDataStatus(const std::string &gid, bool &deleteStatus);
75 
76     void ClearAllData();
77 
78     void ForkQuery(const std::function<void(const std::string &, VBucket &)> &forkFunc);
79 
80     void ForkUpload(const std::function<void(const std::string &, VBucket &)> &forkUploadFunc);
81 
82     void ForkBeforeBatchUpdate(const std::function<void(const std::string &, std::vector<VBucket> &,
83         std::vector<VBucket> &, bool isDelete)> &forkBeforeBatchUpdateFunc);
84 
85     void ForkInsertConflict(const std::function<DBStatus(const std::string &, VBucket &, VBucket &,
86         std::vector<CloudData> &)> &forkUploadFunc);
87 
88     int32_t GetLockCount() const;
89 
90     void Reset();
91 
92     void SetInsertFailed(int32_t count);
93 
94     void SetClearExtend(int32_t count);
95 
96     void SetCloudNetworkError(bool cloudNetworkError);
97 
98     void SetConflictInUpload(bool conflict);
99 
100     void SetHeartbeatBlockTime(int32_t blockTime);
101 
102     void SetInsertHook(const std::function<void(VBucket &)> &insertCheckFunc);
103 private:
104     DBStatus InnerBatchInsert(const std::string &tableName, std::vector<VBucket> &&record,
105         std::vector<VBucket> &extend);
106 
107     DBStatus InnerUpdate(const std::string &tableName, std::vector<VBucket> &&record,
108         std::vector<VBucket> &extend, bool isDelete);
109 
110     DBStatus InnerUpdateWithoutLock(const std::string &tableName, std::vector<VBucket> &&record,
111         std::vector<VBucket> &extend, bool isDelete);
112 
113     DBStatus UpdateCloudData(const std::string &tableName, CloudData &&cloudData);
114 
115     void GetCloudData(const std::string &cursor, bool isIncreCursor, std::vector<CloudData> allData,
116         std::vector<VBucket> &data, VBucket &extend);
117 
118     bool IsCloudGidMatching(const std::vector<QueryNode> &queryNodes, VBucket &extend);
119 
120     bool IsCloudGidMatchingInner(const QueryNode &queryNode, VBucket &extend);
121 
122     bool IsPrimaryKeyMatching(const std::vector<QueryNode> &queryNodes, VBucket &record);
123 
124     bool IsPrimaryKeyMatchingInner(const QueryNode &queryNode, VBucket &record);
125 
126     void AddAssetIdForExtend(VBucket &record, VBucket &extend);
127 
128     void AddAssetsIdInner(Assets &assets);
129 
130     std::atomic<bool> cloudError_ = false;
131     std::atomic<bool> cloudNetworkError_ = false;
132     std::atomic<bool> heartbeatError_ = false;
133     std::atomic<bool> lockStatus_ = false;
134     std::atomic<bool> conflictInUpload_ = false;
135     std::atomic<int32_t> blockTimeMs_ = 0;
136     std::atomic<int32_t> heartbeatBlockTimeMs_ = 0;
137     std::atomic<int64_t> currentGid_ = 0;
138     std::atomic<int64_t> currentCursor_ = 1;
139     std::atomic<int64_t> currentVersion_ = 0;
140     std::atomic<int32_t> queryLimit_ = 100;
141     std::atomic<int32_t> heartbeatCount_ = 0;
142     std::atomic<int32_t> lockCount_ = 0;
143     std::atomic<int32_t> insertFailedCount_ = 0;
144     std::atomic<int32_t> missingExtendCount_ = 0;
145     std::mutex cloudDataMutex_;
146     std::map<std::string, std::vector<CloudData>> cloudData_;
147     std::map<std::string, std::vector<CloudData>> incrementCloudData_;
148     bool isSetCrementCloudData_ = false;
149     std::string increPrefix_ = "increPrefix_";
150     std::map<std::string, uint32_t> queryTimes_;
151     DBStatus actionStatus_ = OK;
152     std::function<void(const std::string &, VBucket &)> forkFunc_;
153     std::function<void(const std::string &, VBucket &)> forkUploadFunc_;
154     std::function<void(const std::string &, std::vector<VBucket> &, std::vector<VBucket> &,
155         bool isDelete)> forkBeforeBatchUpdateFunc_;
156     std::function<DBStatus(const std::string &, VBucket &, VBucket &,
157         std::vector<CloudData> &)> forkUploadConflictFunc_;
158     std::function<void(VBucket &)> insertCheckFunc_;
159 };
160 }
161 #endif // VIRTUAL_CLOUD_DB_H
162