1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef VIRTUAL_CLOUD_DB_H 17 #define VIRTUAL_CLOUD_DB_H 18 #include <atomic> 19 #include <mutex> 20 #include "icloud_db.h" 21 22 namespace DistributedDB { 23 class VirtualCloudDb : public ICloudDb { 24 public: 25 struct CloudData { 26 VBucket record; 27 VBucket extend; 28 }; 29 VirtualCloudDb() = default; 30 ~VirtualCloudDb() override = default; 31 DBStatus BatchInsert(const std::string &tableName, std::vector<VBucket> &&record, 32 std::vector<VBucket> &extend) override; 33 34 DBStatus BatchInsertWithGid(const std::string &tableName, std::vector<VBucket> &&record, 35 std::vector<VBucket> &extend); 36 37 DBStatus BatchUpdate(const std::string &tableName, std::vector<VBucket> &&record, 38 std::vector<VBucket> &extend) override; 39 40 DBStatus BatchDelete(const std::string &tableName, std::vector<VBucket> &extend) override; 41 42 DBStatus Query(const std::string &tableName, VBucket &extend, std::vector<VBucket> &data) override; 43 44 DBStatus DeleteByGid(const std::string &tableName, VBucket &extend); 45 46 std::pair<DBStatus, std::string> GetEmptyCursor(const std::string &tableName) override; 47 48 std::pair<DBStatus, uint32_t> Lock() override; 49 50 DBStatus UnLock() override; 51 52 DBStatus HeartBeat() override; 53 54 DBStatus Close() override; 55 56 void SetCloudError(bool cloudError); 57 58 void SetBlockTime(int32_t blockTime); 59 60 void ClearHeartbeatCount(); 61 62 int32_t GetHeartbeatCount() const; 63 64 bool GetLockStatus() const; 65 66 void SetHeartbeatError(bool heartbeatError); 67 68 void SetIncrementData(const std::string &tableName, const VBucket &record, const VBucket &extend); 69 70 uint32_t GetQueryTimes(const std::string &tableName); 71 72 void SetActionStatus(DBStatus status); 73 74 DBStatus GetDataStatus(const std::string &gid, bool &deleteStatus); 75 76 void ClearAllData(); 77 78 void ForkQuery(const std::function<void(const std::string &, VBucket &)> &forkFunc); 79 80 void ForkUpload(const std::function<void(const std::string &, VBucket &)> &forkUploadFunc); 81 82 void ForkBeforeBatchUpdate(const std::function<void(const std::string &, std::vector<VBucket> &, 83 std::vector<VBucket> &, bool isDelete)> &forkBeforeBatchUpdateFunc); 84 85 void ForkInsertConflict(const std::function<DBStatus(const std::string &, VBucket &, VBucket &, 86 std::vector<CloudData> &)> &forkUploadFunc); 87 88 int32_t GetLockCount() const; 89 90 void Reset(); 91 92 void SetInsertFailed(int32_t count); 93 94 void SetClearExtend(int32_t count); 95 96 void SetCloudNetworkError(bool cloudNetworkError); 97 98 void SetConflictInUpload(bool conflict); 99 100 void SetHeartbeatBlockTime(int32_t blockTime); 101 102 void SetInsertHook(const std::function<void(VBucket &)> &insertCheckFunc); 103 private: 104 DBStatus InnerBatchInsert(const std::string &tableName, std::vector<VBucket> &&record, 105 std::vector<VBucket> &extend); 106 107 DBStatus InnerUpdate(const std::string &tableName, std::vector<VBucket> &&record, 108 std::vector<VBucket> &extend, bool isDelete); 109 110 DBStatus InnerUpdateWithoutLock(const std::string &tableName, std::vector<VBucket> &&record, 111 std::vector<VBucket> &extend, bool isDelete); 112 113 DBStatus UpdateCloudData(const std::string &tableName, CloudData &&cloudData); 114 115 void GetCloudData(const std::string &cursor, bool isIncreCursor, std::vector<CloudData> allData, 116 std::vector<VBucket> &data, VBucket &extend); 117 118 bool IsCloudGidMatching(const std::vector<QueryNode> &queryNodes, VBucket &extend); 119 120 bool IsCloudGidMatchingInner(const QueryNode &queryNode, VBucket &extend); 121 122 bool IsPrimaryKeyMatching(const std::vector<QueryNode> &queryNodes, VBucket &record); 123 124 bool IsPrimaryKeyMatchingInner(const QueryNode &queryNode, VBucket &record); 125 126 void AddAssetIdForExtend(VBucket &record, VBucket &extend); 127 128 void AddAssetsIdInner(Assets &assets); 129 130 std::atomic<bool> cloudError_ = false; 131 std::atomic<bool> cloudNetworkError_ = false; 132 std::atomic<bool> heartbeatError_ = false; 133 std::atomic<bool> lockStatus_ = false; 134 std::atomic<bool> conflictInUpload_ = false; 135 std::atomic<int32_t> blockTimeMs_ = 0; 136 std::atomic<int32_t> heartbeatBlockTimeMs_ = 0; 137 std::atomic<int64_t> currentGid_ = 0; 138 std::atomic<int64_t> currentCursor_ = 1; 139 std::atomic<int64_t> currentVersion_ = 0; 140 std::atomic<int32_t> queryLimit_ = 100; 141 std::atomic<int32_t> heartbeatCount_ = 0; 142 std::atomic<int32_t> lockCount_ = 0; 143 std::atomic<int32_t> insertFailedCount_ = 0; 144 std::atomic<int32_t> missingExtendCount_ = 0; 145 std::mutex cloudDataMutex_; 146 std::map<std::string, std::vector<CloudData>> cloudData_; 147 std::map<std::string, std::vector<CloudData>> incrementCloudData_; 148 bool isSetCrementCloudData_ = false; 149 std::string increPrefix_ = "increPrefix_"; 150 std::map<std::string, uint32_t> queryTimes_; 151 DBStatus actionStatus_ = OK; 152 std::function<void(const std::string &, VBucket &)> forkFunc_; 153 std::function<void(const std::string &, VBucket &)> forkUploadFunc_; 154 std::function<void(const std::string &, std::vector<VBucket> &, std::vector<VBucket> &, 155 bool isDelete)> forkBeforeBatchUpdateFunc_; 156 std::function<DBStatus(const std::string &, VBucket &, VBucket &, 157 std::vector<CloudData> &)> forkUploadConflictFunc_; 158 std::function<void(VBucket &)> insertCheckFunc_; 159 }; 160 } 161 #endif // VIRTUAL_CLOUD_DB_H 162