1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef STORAGE_PROXY_H
17 #define STORAGE_PROXY_H
18 
19 #include <atomic>
20 #include <shared_mutex>
21 
22 #include "cloud/cloud_store_types.h"
23 #include "cloud/cloud_meta_data.h"
24 #include "cloud/cloud_db_constant.h"
25 #include "cloud/iAssetLoader.h"
26 #include "cloud/schema_mgr.h"
27 #include "data_transformer.h"
28 #include "icloud_sync_storage_interface.h"
29 #include "query_sync_object.h"
30 
31 
32 namespace DistributedDB {
33 class StorageProxy {
34 public:
35     StorageProxy(ICloudSyncStorageInterface *iCloud);
~StorageProxy()36     virtual ~StorageProxy() {};
37 
38     static std::shared_ptr<StorageProxy> GetCloudDb(ICloudSyncStorageInterface *iCloud);
39 
40     int Close();
41 
42     int GetLocalWaterMark(const std::string &tableName, Timestamp &localMark);
43 
44     int GetLocalWaterMarkByMode(const std::string &tableName, CloudWaterType mode, Timestamp &localMark);
45 
46     int PutLocalWaterMark(const std::string &tableName, Timestamp &localMark);
47 
48     int PutWaterMarkByMode(const std::string &tableName, CloudWaterType mode, Timestamp &localMark);
49 
50     int GetCloudWaterMark(const std::string &tableName, std::string &cloudMark);
51 
52     int SetCloudWaterMark(const std::string &tableName, std::string &cloudMark);
53 
54     int StartTransaction(TransactType type = TransactType::DEFERRED);
55 
56     int Commit();
57 
58     int Rollback();
59 
60     int GetUploadCount(const std::string &tableName, const Timestamp &timestamp, const bool isCloudForcePush,
61         int64_t &count);
62 
63     int GetUploadCount(const QuerySyncObject &query, const bool isCloudForcePush, bool isCompensatedTask,
64         bool isUseWaterMark, int64_t &count);
65 
66     int GetUploadCount(const QuerySyncObject &query, const Timestamp &localMark, bool isCloudForcePush,
67         bool isCompensatedTask, int64_t &count);
68 
69     int GetCloudData(const std::string &tableName, const Timestamp &timeRange,
70         ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult);
71 
72     int GetCloudData(const QuerySyncObject &querySyncObject, const Timestamp &timeRange,
73         ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult);
74 
75     int GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) const;
76 
77     int GetCloudGid(const QuerySyncObject &querySyncObject, bool isCloudForcePush,
78         bool isCompensatedTask, std::vector<std::string> &cloudGid);
79 
80     int GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
81         DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo);
82 
83     int PutCloudSyncData(const std::string &tableName, DownloadData &downloadData);
84 
85     int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
86         const RelationalSchemaObject &localSchema, std::vector<Asset> &assets);
87 
88     int CheckSchema(const TableName &tableName) const;
89 
90     int CheckSchema(std::vector<std::string> &tables);
91 
92     int GetPrimaryColNamesWithAssetsFields(const TableName &tableName, std::vector<std::string> &colNames,
93         std::vector<Field> &assetFields);
94 
95     int NotifyChangedData(const std::string &deviceName, ChangedData &&changedData);
96 
97     int ReleaseContinueToken(ContinueToken &continueStmtToken);
98 
99     int FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess);
100 
101     int SetLogTriggerStatus(bool status);
102     int SetCursorIncFlag(bool flag);
103 
104     int FillCloudLogAndAsset(OpType opType, const CloudSyncData &data);
105 
106     std::string GetIdentify() const;
107 
108     int CleanWaterMark(const TableName &tableName);
109 
110     int CleanWaterMarkInMemory(const TableName &tableName);
111 
112     int CreateTempSyncTrigger(const std::string &tableName);
113 
114     int ClearAllTempSyncTrigger();
115 
116     int IsSharedTable(const std::string &tableName, bool &isSharedTable);
117 
118     void FillCloudGidIfSuccess(const OpType opType, const CloudSyncData &data);
119 
120     void SetCloudTaskConfig(const CloudTaskConfig &config);
121 
122     std::pair<int, uint32_t> GetAssetsByGidOrHashKey(const std::string &tableName, const std::string &gid,
123         const Bytes &hashKey, VBucket &assets);
124 
125     int SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader);
126 
127     int UpdateRecordFlag(const std::string &tableName, bool recordConflict, const LogInfo &logInfo);
128 
129     int GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery, std::vector<std::string> &users);
130 
131     int ClearUnLockingNoNeedCompensated();
132 
133     int MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData,
134         const std::set<std::string> &gidFilters);
135 
136     void SetUser(const std::string &user);
137 
138     void OnSyncFinish();
139 
140     void OnUploadStart();
141 
142     void CleanAllWaterMark();
143 
144     std::string AppendWithUserIfNeed(const std::string &source) const;
145 
146     int GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema);
147 
148     std::pair<int, CloudSyncData> GetLocalCloudVersion();
149 
150     CloudSyncConfig GetCloudSyncConfig() const;
151 
152     bool IsTableExistReference(const std::string &table);
153 
154     bool IsTableExistReferenceOrReferenceBy(const std::string &table);
155 
156     void ReleaseUploadRecord(const std::string &table, const CloudWaterType &type, Timestamp localWaterMark);
157 
158     bool IsTagCloudUpdateLocal(const LogInfo &localInfo, const LogInfo &cloudInfo,
159         SingleVerConflictResolvePolicy policy);
160 protected:
161     void Init();
162 
163     static Timestamp EraseNanoTime(Timestamp localTime);
164 private:
165     ICloudSyncStorageInterface *store_;
166     mutable std::shared_mutex storeMutex_;
167     mutable std::shared_mutex cloudDbMutex_;
168     std::atomic<bool> transactionExeFlag_;
169     std::shared_ptr<CloudMetaData> cloudMetaData_;
170     std::atomic<bool> isWrite_;
171     std::string user_;
172 };
173 }
174 
175 #endif // STORAGE_PROXY_H
176