1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef CLOUD_DB_PROXY_H 17 #define CLOUD_DB_PROXY_H 18 #include <atomic> 19 #include <condition_variable> 20 #include <mutex> 21 #include <shared_mutex> 22 #include "cloud/cloud_db_types.h" 23 #include "cloud/cloud_db_types.h" 24 #include "cloud/icloud_db.h" 25 #include "cloud/iAssetLoader.h" 26 27 namespace DistributedDB { 28 class CloudDBProxy { 29 public: 30 CloudDBProxy(); 31 ~CloudDBProxy() = default; 32 33 void SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB); 34 35 int SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs); 36 37 const std::map<std::string, std::shared_ptr<ICloudDb>> GetCloudDB() const; 38 39 void SwitchCloudDB(const std::string &user); 40 41 void SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader); 42 43 int BatchInsert(const std::string &tableName, std::vector<VBucket> &record, 44 std::vector<VBucket> &extend, Info &uploadInfo); 45 46 int BatchUpdate(const std::string &tableName, std::vector<VBucket> &record, std::vector<VBucket> &extend, 47 Info &uploadInfo); 48 49 int BatchDelete(const std::string &tableName, std::vector<VBucket> &record, std::vector<VBucket> &extend, 50 Info &uploadInfo); 51 52 int Query(const std::string &tableName, VBucket &extend, std::vector<VBucket> &data); 53 54 std::pair<int, std::string> GetEmptyCursor(const std::string &tableName); 55 56 std::pair<int, uint64_t> Lock(); 57 58 int UnLock(); 59 60 int Close(); 61 62 int HeartBeat(); 63 64 bool IsNotExistCloudDB() const; 65 66 int Download(const std::string &tableName, const std::string &gid, const Type &prefix, 67 std::map<std::string, Assets> &assets); 68 69 int RemoveLocalAssets(const std::vector<Asset> &assets); 70 71 int RemoveLocalAssets(const std::string &tableName, const std::string &gid, const Type &prefix, 72 std::map<std::string, Assets> &assets); 73 74 void SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback); 75 76 bool IsExistCloudVersionCallback() const; 77 78 std::pair<int, std::string> GetCloudVersion(const std::string &originVersion) const; 79 80 void SetPrepareTraceId(const std::string &traceId); 81 protected: 82 class CloudActionContext { 83 public: 84 CloudActionContext(); 85 ~CloudActionContext() = default; 86 87 void MoveInRecordAndExtend(std::vector<VBucket> &record, std::vector<VBucket> &extend); 88 89 void MoveInExtend(std::vector<VBucket> &extend); 90 91 void MoveOutRecordAndExtend(std::vector<VBucket> &record, std::vector<VBucket> &extend); 92 93 void MoveInQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data); 94 95 void MoveOutQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data); 96 97 void MoveInLockStatus(std::pair<int, uint64_t> &lockStatus); 98 99 void MoveOutLockStatus(std::pair<int, uint64_t> &lockStatus); 100 101 void MoveInCursorStatus(std::pair<int, std::string> &cursorStatus); 102 103 void MoveOutCursorStatus(std::pair<int, std::string> &cursorStatus); 104 105 void SetActionRes(int res); 106 107 int GetActionRes(); 108 109 void FinishAndNotify(); 110 111 Info GetInfo(); 112 113 void SetInfo(const CloudWaterType &type, DBStatus status); 114 115 void SetTableName(const std::string &tableName); 116 117 std::string GetTableName(); 118 private: 119 static bool IsEmptyAssetId(const Assets &assets); 120 121 static bool IsRecordActionFail(const VBucket &extend, bool isInsert, DBStatus status); 122 123 std::mutex actionMutex_; 124 std::condition_variable actionCv_; 125 bool actionFinished_; 126 int actionRes_; 127 uint32_t totalCount_; 128 uint32_t successCount_; 129 uint32_t failedCount_; 130 131 std::string tableName_; 132 std::vector<VBucket> record_; 133 std::vector<VBucket> extend_; 134 VBucket queryExtend_; 135 std::vector<VBucket> data_; 136 std::pair<int, uint64_t> lockStatus_; 137 std::pair<int, std::string> cursorStatus_; 138 }; 139 enum InnerActionCode : uint8_t { 140 INSERT = 0, 141 UPDATE, 142 DELETE, 143 QUERY, 144 GET_EMPTY_CURSOR, 145 LOCK, 146 UNLOCK, 147 HEARTBEAT, 148 // add action code before INVALID_ACTION 149 INVALID_ACTION 150 }; 151 static int InnerAction(const std::shared_ptr<CloudActionContext> &context, 152 const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action); 153 154 static DBStatus DMLActionTask(const std::shared_ptr<CloudActionContext> &context, 155 const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action); 156 157 static void InnerActionTask(const std::shared_ptr<CloudActionContext> &context, 158 const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action); 159 160 static DBStatus InnerActionLock(const std::shared_ptr<CloudActionContext> &context, 161 const std::shared_ptr<ICloudDb> &cloudDb); 162 163 static DBStatus InnerActionGetEmptyCursor(const std::shared_ptr<CloudActionContext> &context, 164 const std::shared_ptr<ICloudDb> &cloudDb); 165 166 static int GetInnerErrorCode(DBStatus status); 167 168 static DBStatus QueryAction(const std::shared_ptr<CloudActionContext> &context, 169 const std::shared_ptr<ICloudDb> &cloudDb); 170 171 mutable std::shared_mutex cloudMutex_; 172 mutable std::shared_mutex assetLoaderMutex_; 173 std::shared_ptr<ICloudDb> iCloudDb_; 174 std::map<std::string, std::shared_ptr<ICloudDb>> cloudDbs_; 175 std::shared_ptr<IAssetLoader> iAssetLoader_; 176 std::atomic<int64_t> timeout_; 177 178 mutable std::mutex genVersionMutex_; 179 GenerateCloudVersionCallback genVersionCallback_; 180 }; 181 } 182 #endif // CLOUD_DB_PROXY_H 183