1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "storage_proxy.h"
17 
18 #include "cloud/cloud_storage_utils.h"
19 #include "cloud/schema_mgr.h"
20 #include "db_common.h"
21 #include "store_types.h"
22 
23 namespace DistributedDB {
StorageProxy(ICloudSyncStorageInterface * iCloud)24 StorageProxy::StorageProxy(ICloudSyncStorageInterface *iCloud)
25     :store_(iCloud),
26     transactionExeFlag_(false),
27     isWrite_(false)
28 {
29 }
30 
GetCloudDb(ICloudSyncStorageInterface * iCloud)31 std::shared_ptr<StorageProxy> StorageProxy::GetCloudDb(ICloudSyncStorageInterface *iCloud)
32 {
33     std::shared_ptr<StorageProxy> proxy = std::make_shared<StorageProxy>(iCloud);
34     proxy->Init();
35     return proxy;
36 }
37 
Init()38 void StorageProxy::Init()
39 {
40     cloudMetaData_ = std::make_shared<CloudMetaData>(store_);
41 }
42 
Close()43 int StorageProxy::Close()
44 {
45     std::unique_lock<std::shared_mutex> writeLock(storeMutex_);
46     if (transactionExeFlag_.load()) {
47         LOGE("the transaction has been started, storage proxy can not closed");
48         return -E_BUSY;
49     }
50     store_ = nullptr;
51     cloudMetaData_ = nullptr;
52     return E_OK;
53 }
54 
GetLocalWaterMark(const std::string & tableName,Timestamp & localMark)55 int StorageProxy::GetLocalWaterMark(const std::string &tableName, Timestamp &localMark)
56 {
57     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
58     if (cloudMetaData_ == nullptr) {
59         return -E_INVALID_DB;
60     }
61     if (transactionExeFlag_.load() && isWrite_.load()) {
62         LOGE("the write transaction has been started, can not get meta");
63         return -E_BUSY;
64     }
65     return cloudMetaData_->GetLocalWaterMark(AppendWithUserIfNeed(tableName), localMark);
66 }
67 
GetLocalWaterMarkByMode(const std::string & tableName,CloudWaterType mode,Timestamp & localMark)68 int StorageProxy::GetLocalWaterMarkByMode(const std::string &tableName, CloudWaterType mode, Timestamp &localMark)
69 {
70     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
71     if (cloudMetaData_ == nullptr) {
72         return -E_INVALID_DB;
73     }
74     if (transactionExeFlag_.load() && isWrite_.load()) {
75         LOGE("the write transaction has been started, can not get meta");
76         return -E_BUSY;
77     }
78     return cloudMetaData_->GetLocalWaterMarkByType(AppendWithUserIfNeed(tableName), mode, localMark);
79 }
80 
PutLocalWaterMark(const std::string & tableName,Timestamp & localMark)81 int StorageProxy::PutLocalWaterMark(const std::string &tableName, Timestamp &localMark)
82 {
83     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
84     if (cloudMetaData_ == nullptr) {
85         return -E_INVALID_DB;
86     }
87     if (transactionExeFlag_.load() && isWrite_.load()) {
88         LOGE("the write transaction has been started, can not put meta");
89         return -E_BUSY;
90     }
91     return cloudMetaData_->SetLocalWaterMark(AppendWithUserIfNeed(tableName), localMark);
92 }
93 
PutWaterMarkByMode(const std::string & tableName,CloudWaterType mode,Timestamp & localMark)94 int StorageProxy::PutWaterMarkByMode(const std::string &tableName, CloudWaterType mode, Timestamp &localMark)
95 {
96     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
97     if (cloudMetaData_ == nullptr) {
98         return -E_INVALID_DB;
99     }
100     if (transactionExeFlag_.load() && isWrite_.load()) {
101         LOGE("the write transaction has been started, can not put meta");
102         return -E_BUSY;
103     }
104     return cloudMetaData_->SetLocalWaterMarkByType(AppendWithUserIfNeed(tableName), mode, localMark);
105 }
106 
GetCloudWaterMark(const std::string & tableName,std::string & cloudMark)107 int StorageProxy::GetCloudWaterMark(const std::string &tableName, std::string &cloudMark)
108 {
109     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
110     if (cloudMetaData_ == nullptr) {
111         return -E_INVALID_DB;
112     }
113     return cloudMetaData_->GetCloudWaterMark(AppendWithUserIfNeed(tableName), cloudMark);
114 }
115 
SetCloudWaterMark(const std::string & tableName,std::string & cloudMark)116 int StorageProxy::SetCloudWaterMark(const std::string &tableName, std::string &cloudMark)
117 {
118     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
119     if (cloudMetaData_ == nullptr) {
120         return -E_INVALID_DB;
121     }
122     return cloudMetaData_->SetCloudWaterMark(AppendWithUserIfNeed(tableName), cloudMark);
123 }
124 
StartTransaction(TransactType type)125 int StorageProxy::StartTransaction(TransactType type)
126 {
127     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
128     if (store_ == nullptr) {
129         return -E_INVALID_DB;
130     }
131     int errCode = store_->StartTransaction(type);
132     if (errCode == E_OK) {
133         transactionExeFlag_.store(true);
134         isWrite_.store(type == TransactType::IMMEDIATE);
135     }
136     return errCode;
137 }
138 
Commit()139 int StorageProxy::Commit()
140 {
141     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
142     if (store_ == nullptr) {
143         return -E_INVALID_DB;
144     }
145     int errCode = store_->Commit();
146     if (errCode == E_OK) {
147         transactionExeFlag_.store(false);
148     }
149     return errCode;
150 }
151 
Rollback()152 int StorageProxy::Rollback()
153 {
154     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
155     if (store_ == nullptr) {
156         return -E_INVALID_DB;
157     }
158     int errCode = store_->Rollback();
159     if (errCode == E_OK) {
160         transactionExeFlag_.store(false);
161     }
162     return errCode;
163 }
164 
GetUploadCount(const QuerySyncObject & query,const bool isCloudForcePush,bool isCompensatedTask,bool isUseWaterMark,int64_t & count)165 int StorageProxy::GetUploadCount(const QuerySyncObject &query, const bool isCloudForcePush,
166     bool isCompensatedTask, bool isUseWaterMark, int64_t &count)
167 {
168     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
169     if (store_ == nullptr) {
170         return -E_INVALID_DB;
171     }
172     if (!transactionExeFlag_.load()) {
173         LOGE("the transaction has not been started");
174         return -E_TRANSACT_STATE;
175     }
176     std::vector<Timestamp> timeStampVec;
177     std::vector<CloudWaterType> waterTypeVec = DBCommon::GetWaterTypeVec();
178     for (size_t i = 0; i < waterTypeVec.size(); i++) {
179         Timestamp tmpMark = 0u;
180         if (isUseWaterMark) {
181             int errCode = cloudMetaData_->GetLocalWaterMarkByType(AppendWithUserIfNeed(query.GetTableName()),
182                 waterTypeVec[i], tmpMark);
183             if (errCode != E_OK) {
184                 return errCode;
185             }
186         }
187         timeStampVec.push_back(tmpMark);
188     }
189     return store_->GetAllUploadCount(query, timeStampVec, isCloudForcePush, isCompensatedTask, count);
190 }
191 
GetUploadCount(const std::string & tableName,const Timestamp & localMark,const bool isCloudForcePush,int64_t & count)192 int StorageProxy::GetUploadCount(const std::string &tableName, const Timestamp &localMark,
193     const bool isCloudForcePush, int64_t &count)
194 {
195     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
196     if (store_ == nullptr) {
197         return -E_INVALID_DB;
198     }
199     if (!transactionExeFlag_.load()) {
200         LOGE("the transaction has not been started");
201         return -E_TRANSACT_STATE;
202     }
203     QuerySyncObject query;
204     query.SetTableName(tableName);
205     return store_->GetUploadCount(query, localMark, isCloudForcePush, false, count);
206 }
207 
GetUploadCount(const QuerySyncObject & query,const Timestamp & localMark,bool isCloudForcePush,bool isCompensatedTask,int64_t & count)208 int StorageProxy::GetUploadCount(const QuerySyncObject &query, const Timestamp &localMark,
209     bool isCloudForcePush, bool isCompensatedTask, int64_t &count)
210 {
211     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
212     if (store_ == nullptr) {
213         return -E_INVALID_DB;
214     }
215     if (!transactionExeFlag_.load()) {
216         LOGE("the transaction has not been started");
217         return -E_TRANSACT_STATE;
218     }
219     return store_->GetUploadCount(query, localMark, isCloudForcePush, isCompensatedTask, count);
220 }
221 
GetCloudData(const std::string & tableName,const Timestamp & timeRange,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)222 int StorageProxy::GetCloudData(const std::string &tableName, const Timestamp &timeRange,
223     ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
224 {
225     QuerySyncObject querySyncObject;
226     querySyncObject.SetTableName(tableName);
227     return GetCloudData(querySyncObject, timeRange, continueStmtToken, cloudDataResult);
228 }
229 
GetCloudData(const QuerySyncObject & querySyncObject,const Timestamp & timeRange,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)230 int StorageProxy::GetCloudData(const QuerySyncObject &querySyncObject, const Timestamp &timeRange,
231     ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
232 {
233     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
234     if (store_ == nullptr) {
235         return -E_INVALID_DB;
236     }
237     if (!transactionExeFlag_.load()) {
238         LOGE("the transaction has not been started");
239         return -E_TRANSACT_STATE;
240     }
241     TableSchema tableSchema;
242     int errCode = store_->GetCloudTableSchema(querySyncObject.GetRelationTableName(), tableSchema);
243     if (errCode != E_OK) {
244         return errCode;
245     }
246     return store_->GetCloudData(tableSchema, querySyncObject, timeRange, continueStmtToken, cloudDataResult);
247 }
248 
GetCloudDataNext(ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult) const249 int StorageProxy::GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) const
250 {
251     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
252     if (store_ == nullptr) {
253         return -E_INVALID_DB;
254     }
255     if (!transactionExeFlag_.load()) {
256         LOGE("the transaction has not been started");
257         return -E_TRANSACT_STATE;
258     }
259     return store_->GetCloudDataNext(continueStmtToken, cloudDataResult);
260 }
261 
GetCloudGid(const QuerySyncObject & querySyncObject,bool isCloudForcePush,bool isCompensatedTask,std::vector<std::string> & cloudGid)262 int StorageProxy::GetCloudGid(const QuerySyncObject &querySyncObject, bool isCloudForcePush,
263     bool isCompensatedTask, std::vector<std::string> &cloudGid)
264 {
265     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
266     if (store_ == nullptr) {
267         return -E_INVALID_DB;
268     }
269     TableSchema tableSchema;
270     int errCode = store_->GetCloudTableSchema(querySyncObject.GetRelationTableName(), tableSchema);
271     if (errCode != E_OK) {
272         return errCode;
273     }
274     return store_->GetCloudGid(tableSchema, querySyncObject, isCloudForcePush, isCompensatedTask, cloudGid);
275 }
276 
GetInfoByPrimaryKeyOrGid(const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)277 int StorageProxy::GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
278     DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
279 {
280     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
281     if (store_ == nullptr) {
282         return -E_INVALID_DB;
283     }
284     if (!transactionExeFlag_.load()) {
285         LOGE("the transaction has not been started");
286         return -E_TRANSACT_STATE;
287     }
288 
289     int errCode = store_->GetInfoByPrimaryKeyOrGid(tableName, vBucket, dataInfoWithLog, assetInfo);
290     if (errCode == E_OK) {
291         dataInfoWithLog.logInfo.timestamp = EraseNanoTime(dataInfoWithLog.logInfo.timestamp);
292         dataInfoWithLog.logInfo.wTimestamp = EraseNanoTime(dataInfoWithLog.logInfo.wTimestamp);
293     }
294     if ((dataInfoWithLog.logInfo.flag & static_cast<uint64_t>(LogInfoFlag::FLAG_LOGIC_DELETE)) != 0) {
295         assetInfo.clear();
296     }
297     return errCode;
298 }
299 
SetCursorIncFlag(bool flag)300 int StorageProxy::SetCursorIncFlag(bool flag)
301 {
302     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
303     if (store_ == nullptr) {
304         return -E_INVALID_DB;
305     }
306     return store_->SetCursorIncFlag(flag);
307 }
308 
PutCloudSyncData(const std::string & tableName,DownloadData & downloadData)309 int StorageProxy::PutCloudSyncData(const std::string &tableName, DownloadData &downloadData)
310 {
311     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
312     if (store_ == nullptr) {
313         return -E_INVALID_DB;
314     }
315     if (!transactionExeFlag_.load()) {
316         LOGE("the transaction has not been started");
317         return -E_TRANSACT_STATE;
318     }
319     downloadData.user = user_;
320     return store_->PutCloudSyncData(tableName, downloadData);
321 }
322 
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)323 int StorageProxy::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
324     const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
325 {
326     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
327     if (store_ == nullptr) {
328         return -E_INVALID_DB;
329     }
330     if (!transactionExeFlag_.load()) {
331         LOGE("the transaction has not been started");
332         return -E_TRANSACT_STATE;
333     }
334     return store_->CleanCloudData(mode, tableNameList, localSchema, assets);
335 }
336 
ReleaseContinueToken(ContinueToken & continueStmtToken)337 int StorageProxy::ReleaseContinueToken(ContinueToken &continueStmtToken)
338 {
339     return store_->ReleaseCloudDataToken(continueStmtToken);
340 }
341 
CheckSchema(const TableName & tableName) const342 int StorageProxy::CheckSchema(const TableName &tableName) const
343 {
344     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
345     if (store_ == nullptr) {
346         return -E_INVALID_DB;
347     }
348     return store_->ChkSchema(tableName);
349 }
350 
CheckSchema(std::vector<std::string> & tables)351 int StorageProxy::CheckSchema(std::vector<std::string> &tables)
352 {
353     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
354     if (store_ == nullptr) {
355         return -E_INVALID_DB;
356     }
357     if (tables.empty()) {
358         return -E_INVALID_ARGS;
359     }
360     for (const auto &table : tables) {
361         int ret = store_->ChkSchema(table);
362         if (ret != E_OK) {
363             return ret;
364         }
365     }
366     return E_OK;
367 }
368 
GetPrimaryColNamesWithAssetsFields(const TableName & tableName,std::vector<std::string> & colNames,std::vector<Field> & assetFields)369 int StorageProxy::GetPrimaryColNamesWithAssetsFields(const TableName &tableName, std::vector<std::string> &colNames,
370     std::vector<Field> &assetFields)
371 {
372     if (!colNames.empty()) {
373         // output parameter should be empty
374         return -E_INVALID_ARGS;
375     }
376 
377     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
378     if (store_ == nullptr) {
379         return -E_INVALID_DB;
380     }
381     // GetTableInfo
382     TableSchema tableSchema;
383     int ret = store_->GetCloudTableSchema(tableName, tableSchema);
384     if (ret != E_OK) {
385         LOGE("Cannot get cloud table schema: %d", ret);
386         return ret;
387     }
388     for (const auto &field : tableSchema.fields) {
389         if (field.primary) {
390             colNames.push_back(field.colName);
391         }
392         if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
393             assetFields.push_back(field);
394         }
395     }
396     if (colNames.empty() || colNames.size() > 1) {
397         (void)colNames.insert(colNames.begin(), CloudDbConstant::ROW_ID_FIELD_NAME);
398     }
399     return E_OK;
400 }
401 
NotifyChangedData(const std::string & deviceName,ChangedData && changedData)402 int StorageProxy::NotifyChangedData(const std::string &deviceName, ChangedData &&changedData)
403 {
404     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
405     if (store_ == nullptr) {
406         return -E_INVALID_DB;
407     }
408     ChangeProperties changeProperties;
409     store_->GetAndResetServerObserverData(changedData.tableName, changeProperties);
410     changedData.properties = changeProperties;
411     store_->TriggerObserverAction(deviceName, std::move(changedData), true);
412     return E_OK;
413 }
414 
FillCloudAssetForDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)415 int StorageProxy::FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess)
416 {
417     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
418     if (store_ == nullptr) {
419         return -E_INVALID_DB;
420     }
421     if (!transactionExeFlag_.load() || !isWrite_.load()) {
422         LOGE("the write transaction has not started before fill download assets");
423         return -E_TRANSACT_STATE;
424     }
425     return store_->FillCloudAssetForDownload(tableName, asset, isDownloadSuccess);
426 }
427 
SetLogTriggerStatus(bool status)428 int StorageProxy::SetLogTriggerStatus(bool status)
429 {
430     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
431     if (store_ == nullptr) {
432         return -E_INVALID_DB;
433     }
434     return store_->SetLogTriggerStatus(status);
435 }
436 
FillCloudLogAndAsset(OpType opType,const CloudSyncData & data)437 int StorageProxy::FillCloudLogAndAsset(OpType opType, const CloudSyncData &data)
438 {
439     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
440     if (store_ == nullptr) {
441         return -E_INVALID_DB;
442     }
443     if (!transactionExeFlag_.load()) {
444         LOGE("the transaction has not been started");
445         return -E_TRANSACT_STATE;
446     }
447     return store_->FillCloudLogAndAsset(opType, data, true, false);
448 }
449 
GetIdentify() const450 std::string StorageProxy::GetIdentify() const
451 {
452     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
453     if (store_ == nullptr) {
454         LOGW("[StorageProxy] store is nullptr return default");
455         return "";
456     }
457     return store_->GetIdentify();
458 }
459 
EraseNanoTime(DistributedDB::Timestamp localTime)460 Timestamp StorageProxy::EraseNanoTime(DistributedDB::Timestamp localTime)
461 {
462     return localTime / CloudDbConstant::TEN_THOUSAND * CloudDbConstant::TEN_THOUSAND;
463 }
464 
CleanWaterMark(const DistributedDB::TableName & tableName)465 int StorageProxy::CleanWaterMark(const DistributedDB::TableName &tableName)
466 {
467     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
468     if (cloudMetaData_ == nullptr) {
469         LOGW("[StorageProxy] meta is nullptr return default");
470         return -E_INVALID_DB;
471     }
472     return cloudMetaData_->CleanWaterMark(AppendWithUserIfNeed(tableName));
473 }
474 
CleanWaterMarkInMemory(const DistributedDB::TableName & tableName)475 int StorageProxy::CleanWaterMarkInMemory(const DistributedDB::TableName &tableName)
476 {
477     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
478     if (cloudMetaData_ == nullptr) {
479         LOGW("[StorageProxy] CleanWaterMarkInMemory is nullptr return default");
480         return -E_INVALID_DB;
481     }
482     cloudMetaData_->CleanWaterMarkInMemory(AppendWithUserIfNeed(tableName));
483     return E_OK;
484 }
485 
SetUser(const std::string & user)486 void StorageProxy::SetUser(const std::string &user)
487 {
488     user_ = user;
489     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
490     if (store_ != nullptr) {
491         store_->SetUser(user);
492     }
493 }
494 
CreateTempSyncTrigger(const std::string & tableName)495 int StorageProxy::CreateTempSyncTrigger(const std::string &tableName)
496 {
497     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
498     if (store_ == nullptr) {
499         return -E_INVALID_DB;
500     }
501     return store_->CreateTempSyncTrigger(tableName);
502 }
503 
ClearAllTempSyncTrigger()504 int StorageProxy::ClearAllTempSyncTrigger()
505 {
506     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
507     if (store_ == nullptr) {
508         return -E_INVALID_DB;
509     }
510     // Clean up all temporary triggers
511     return store_->ClearAllTempSyncTrigger();
512 }
513 
IsSharedTable(const std::string & tableName,bool & IsSharedTable)514 int StorageProxy::IsSharedTable(const std::string &tableName, bool &IsSharedTable)
515 {
516     std::unique_lock<std::shared_mutex> writeLock(storeMutex_);
517     if (store_ == nullptr) {
518         return -E_INVALID_DB;
519     }
520     IsSharedTable = store_->IsSharedTable(tableName);
521     return E_OK;
522 }
523 
FillCloudGidIfSuccess(const OpType opType,const CloudSyncData & data)524 void StorageProxy::FillCloudGidIfSuccess(const OpType opType, const CloudSyncData &data)
525 {
526     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
527     if (store_ == nullptr) {
528         LOGW("[StorageProxy] fill gid failed with store invalid");
529         return;
530     }
531     int errCode = store_->FillCloudLogAndAsset(opType, data, true, true);
532     if (errCode != E_OK) {
533         LOGW("[StorageProxy] fill gid failed %d", errCode);
534     }
535 }
536 
SetCloudTaskConfig(const CloudTaskConfig & config)537 void StorageProxy::SetCloudTaskConfig(const CloudTaskConfig &config)
538 {
539     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
540     if (store_ == nullptr) {
541         LOGW("[StorageProxy] fill gid failed with store invalid");
542         return;
543     }
544     store_->SetCloudTaskConfig(config);
545 }
546 
GetAssetsByGidOrHashKey(const std::string & tableName,const std::string & gid,const Bytes & hashKey,VBucket & assets)547 std::pair<int, uint32_t> StorageProxy::GetAssetsByGidOrHashKey(const std::string &tableName, const std::string &gid,
548     const Bytes &hashKey, VBucket &assets)
549 {
550     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
551     if (store_ == nullptr) {
552         return { -E_INVALID_DB, static_cast<uint32_t>(LockStatus::UNLOCK) };
553     }
554     TableSchema tableSchema;
555     int errCode = store_->GetCloudTableSchema(tableName, tableSchema);
556     if (errCode != E_OK) {
557         LOGE("get cloud table schema failed: %d", errCode);
558         return { errCode, static_cast<uint32_t>(LockStatus::UNLOCK) };
559     }
560     return store_->GetAssetsByGidOrHashKey(tableSchema, gid, hashKey, assets);
561 }
562 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)563 int StorageProxy::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
564 {
565     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
566     if (store_ == nullptr) {
567         return -E_INVALID_DB;
568     }
569     return store_->SetIAssetLoader(loader);
570 }
571 
UpdateRecordFlag(const std::string & tableName,bool recordConflict,const LogInfo & logInfo)572 int StorageProxy::UpdateRecordFlag(const std::string &tableName, bool recordConflict, const LogInfo &logInfo)
573 {
574     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
575     if (store_ == nullptr) {
576         return -E_INVALID_DB;
577     }
578     return store_->UpdateRecordFlag(tableName, recordConflict, logInfo);
579 }
580 
GetCompensatedSyncQuery(std::vector<QuerySyncObject> & syncQuery,std::vector<std::string> & users)581 int StorageProxy::GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery, std::vector<std::string> &users)
582 {
583     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
584     if (store_ == nullptr) {
585         return -E_INVALID_DB;
586     }
587     return store_->GetCompensatedSyncQuery(syncQuery, users);
588 }
589 
ClearUnLockingNoNeedCompensated()590 int StorageProxy::ClearUnLockingNoNeedCompensated()
591 {
592     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
593     if (store_ == nullptr) {
594         return -E_INVALID_DB;
595     }
596     return store_->ClearUnLockingNoNeedCompensated();
597 }
598 
MarkFlagAsConsistent(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)599 int StorageProxy::MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData,
600     const std::set<std::string> &gidFilters)
601 {
602     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
603     if (store_ == nullptr) {
604         return -E_INVALID_DB;
605     }
606     return store_->MarkFlagAsConsistent(tableName, downloadData, gidFilters);
607 }
608 
OnSyncFinish()609 void StorageProxy::OnSyncFinish()
610 {
611     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
612     if (store_ == nullptr) {
613         return;
614     }
615     store_->SyncFinishHook();
616 }
617 
OnUploadStart()618 void StorageProxy::OnUploadStart()
619 {
620     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
621     if (store_ == nullptr) {
622         return;
623     }
624     store_->DoUploadHook();
625 }
626 
CleanAllWaterMark()627 void StorageProxy::CleanAllWaterMark()
628 {
629     cloudMetaData_->CleanAllWaterMark();
630 }
631 
AppendWithUserIfNeed(const std::string & source) const632 std::string StorageProxy::AppendWithUserIfNeed(const std::string &source) const
633 {
634     if (user_.empty()) {
635         return source;
636     }
637     return source + "_" + user_;
638 }
639 
GetCloudDbSchema(std::shared_ptr<DataBaseSchema> & cloudSchema)640 int StorageProxy::GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema)
641 {
642     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
643     if (store_ == nullptr) {
644         return -E_INVALID_DB;
645     }
646     return store_->GetCloudDbSchema(cloudSchema);
647 }
648 
GetLocalCloudVersion()649 std::pair<int, CloudSyncData> StorageProxy::GetLocalCloudVersion()
650 {
651     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
652     if (store_ == nullptr) {
653         return {-E_INTERNAL_ERROR, {}};
654     }
655     return store_->GetLocalCloudVersion();
656 }
657 
GetCloudSyncConfig() const658 CloudSyncConfig StorageProxy::GetCloudSyncConfig() const
659 {
660     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
661     if (store_ == nullptr) {
662         return {};
663     }
664     return store_->GetCloudSyncConfig();
665 }
666 
IsTableExistReference(const std::string & table)667 bool StorageProxy::IsTableExistReference(const std::string &table)
668 {
669     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
670     if (store_ == nullptr) {
671         return false;
672     }
673     return store_->IsTableExistReference(table);
674 }
675 
IsTableExistReferenceOrReferenceBy(const std::string & table)676 bool StorageProxy::IsTableExistReferenceOrReferenceBy(const std::string &table)
677 {
678     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
679     if (store_ == nullptr) {
680         return false;
681     }
682     return store_->IsTableExistReferenceOrReferenceBy(table);
683 }
684 
ReleaseUploadRecord(const std::string & table,const CloudWaterType & type,Timestamp localWaterMark)685 void StorageProxy::ReleaseUploadRecord(const std::string &table, const CloudWaterType &type, Timestamp localWaterMark)
686 {
687     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
688     if (store_ == nullptr) {
689         return;
690     }
691     store_->ReleaseUploadRecord(table, type, localWaterMark);
692 }
693 
IsTagCloudUpdateLocal(const LogInfo & localInfo,const LogInfo & cloudInfo,SingleVerConflictResolvePolicy policy)694 bool StorageProxy::IsTagCloudUpdateLocal(const LogInfo &localInfo, const LogInfo &cloudInfo,
695     SingleVerConflictResolvePolicy policy)
696 {
697     if (store_ == nullptr) {
698         return false;
699     }
700     return store_->IsTagCloudUpdateLocal(localInfo, cloudInfo, policy);
701 }
702 }
703