1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef CLOUD_DB_PROXY_H
17 #define CLOUD_DB_PROXY_H
18 #include <atomic>
19 #include <condition_variable>
20 #include <mutex>
21 #include <shared_mutex>
22 #include "cloud/cloud_db_types.h"
23 #include "cloud/cloud_db_types.h"
24 #include "cloud/icloud_db.h"
25 #include "cloud/iAssetLoader.h"
26 
27 namespace DistributedDB {
28 class CloudDBProxy {
29 public:
30     CloudDBProxy();
31     ~CloudDBProxy() = default;
32 
33     void SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB);
34 
35     int SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs);
36 
37     const std::map<std::string, std::shared_ptr<ICloudDb>> GetCloudDB() const;
38 
39     void SwitchCloudDB(const std::string &user);
40 
41     void SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader);
42 
43     int BatchInsert(const std::string &tableName, std::vector<VBucket> &record,
44         std::vector<VBucket> &extend, Info &uploadInfo);
45 
46     int BatchUpdate(const std::string &tableName, std::vector<VBucket> &record, std::vector<VBucket> &extend,
47         Info &uploadInfo);
48 
49     int BatchDelete(const std::string &tableName, std::vector<VBucket> &record, std::vector<VBucket> &extend,
50         Info &uploadInfo);
51 
52     int Query(const std::string &tableName, VBucket &extend, std::vector<VBucket> &data);
53 
54     std::pair<int, std::string> GetEmptyCursor(const std::string &tableName);
55 
56     std::pair<int, uint64_t> Lock();
57 
58     int UnLock();
59 
60     int Close();
61 
62     int HeartBeat();
63 
64     bool IsNotExistCloudDB() const;
65 
66     int Download(const std::string &tableName, const std::string &gid, const Type &prefix,
67         std::map<std::string, Assets> &assets);
68 
69     int RemoveLocalAssets(const std::vector<Asset> &assets);
70 
71     int RemoveLocalAssets(const std::string &tableName, const std::string &gid, const Type &prefix,
72         std::map<std::string, Assets> &assets);
73 
74     void SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback);
75 
76     bool IsExistCloudVersionCallback() const;
77 
78     std::pair<int, std::string> GetCloudVersion(const std::string &originVersion) const;
79 
80     void SetPrepareTraceId(const std::string &traceId);
81 protected:
82     class CloudActionContext {
83     public:
84         CloudActionContext();
85         ~CloudActionContext() = default;
86 
87         void MoveInRecordAndExtend(std::vector<VBucket> &record, std::vector<VBucket> &extend);
88 
89         void MoveInExtend(std::vector<VBucket> &extend);
90 
91         void MoveOutRecordAndExtend(std::vector<VBucket> &record, std::vector<VBucket> &extend);
92 
93         void MoveInQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data);
94 
95         void MoveOutQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data);
96 
97         void MoveInLockStatus(std::pair<int, uint64_t> &lockStatus);
98 
99         void MoveOutLockStatus(std::pair<int, uint64_t> &lockStatus);
100 
101         void MoveInCursorStatus(std::pair<int, std::string> &cursorStatus);
102 
103         void MoveOutCursorStatus(std::pair<int, std::string> &cursorStatus);
104 
105         void SetActionRes(int res);
106 
107         int GetActionRes();
108 
109         void FinishAndNotify();
110 
111         Info GetInfo();
112 
113         void SetInfo(const CloudWaterType &type, DBStatus status);
114 
115         void SetTableName(const std::string &tableName);
116 
117         std::string GetTableName();
118     private:
119         static bool IsEmptyAssetId(const Assets &assets);
120 
121         static bool IsRecordActionFail(const VBucket &extend, bool isInsert, DBStatus status);
122 
123         std::mutex actionMutex_;
124         std::condition_variable actionCv_;
125         bool actionFinished_;
126         int actionRes_;
127         uint32_t totalCount_;
128         uint32_t successCount_;
129         uint32_t failedCount_;
130 
131         std::string tableName_;
132         std::vector<VBucket> record_;
133         std::vector<VBucket> extend_;
134         VBucket queryExtend_;
135         std::vector<VBucket> data_;
136         std::pair<int, uint64_t> lockStatus_;
137         std::pair<int, std::string> cursorStatus_;
138     };
139     enum InnerActionCode : uint8_t {
140         INSERT = 0,
141         UPDATE,
142         DELETE,
143         QUERY,
144         GET_EMPTY_CURSOR,
145         LOCK,
146         UNLOCK,
147         HEARTBEAT,
148         // add action code before INVALID_ACTION
149         INVALID_ACTION
150     };
151     static int InnerAction(const std::shared_ptr<CloudActionContext> &context,
152         const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action);
153 
154     static DBStatus DMLActionTask(const std::shared_ptr<CloudActionContext> &context,
155         const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action);
156 
157     static void InnerActionTask(const std::shared_ptr<CloudActionContext> &context,
158         const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action);
159 
160     static DBStatus InnerActionLock(const std::shared_ptr<CloudActionContext> &context,
161         const std::shared_ptr<ICloudDb> &cloudDb);
162 
163     static DBStatus InnerActionGetEmptyCursor(const std::shared_ptr<CloudActionContext> &context,
164         const std::shared_ptr<ICloudDb> &cloudDb);
165 
166     static int GetInnerErrorCode(DBStatus status);
167 
168     static DBStatus QueryAction(const std::shared_ptr<CloudActionContext> &context,
169         const std::shared_ptr<ICloudDb> &cloudDb);
170 
171     mutable std::shared_mutex cloudMutex_;
172     mutable std::shared_mutex assetLoaderMutex_;
173     std::shared_ptr<ICloudDb> iCloudDb_;
174     std::map<std::string, std::shared_ptr<ICloudDb>> cloudDbs_;
175     std::shared_ptr<IAssetLoader> iAssetLoader_;
176     std::atomic<int64_t> timeout_;
177 
178     mutable std::mutex genVersionMutex_;
179     GenerateCloudVersionCallback genVersionCallback_;
180 };
181 }
182 #endif // CLOUD_DB_PROXY_H
183