1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "storage_proxy.h"
17
18 #include "cloud/cloud_storage_utils.h"
19 #include "cloud/schema_mgr.h"
20 #include "db_common.h"
21 #include "store_types.h"
22
23 namespace DistributedDB {
StorageProxy(ICloudSyncStorageInterface * iCloud)24 StorageProxy::StorageProxy(ICloudSyncStorageInterface *iCloud)
25 :store_(iCloud),
26 transactionExeFlag_(false),
27 isWrite_(false)
28 {
29 }
30
GetCloudDb(ICloudSyncStorageInterface * iCloud)31 std::shared_ptr<StorageProxy> StorageProxy::GetCloudDb(ICloudSyncStorageInterface *iCloud)
32 {
33 std::shared_ptr<StorageProxy> proxy = std::make_shared<StorageProxy>(iCloud);
34 proxy->Init();
35 return proxy;
36 }
37
Init()38 void StorageProxy::Init()
39 {
40 cloudMetaData_ = std::make_shared<CloudMetaData>(store_);
41 }
42
Close()43 int StorageProxy::Close()
44 {
45 std::unique_lock<std::shared_mutex> writeLock(storeMutex_);
46 if (transactionExeFlag_.load()) {
47 LOGE("the transaction has been started, storage proxy can not closed");
48 return -E_BUSY;
49 }
50 store_ = nullptr;
51 cloudMetaData_ = nullptr;
52 return E_OK;
53 }
54
GetLocalWaterMark(const std::string & tableName,Timestamp & localMark)55 int StorageProxy::GetLocalWaterMark(const std::string &tableName, Timestamp &localMark)
56 {
57 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
58 if (cloudMetaData_ == nullptr) {
59 return -E_INVALID_DB;
60 }
61 if (transactionExeFlag_.load() && isWrite_.load()) {
62 LOGE("the write transaction has been started, can not get meta");
63 return -E_BUSY;
64 }
65 return cloudMetaData_->GetLocalWaterMark(AppendWithUserIfNeed(tableName), localMark);
66 }
67
GetLocalWaterMarkByMode(const std::string & tableName,CloudWaterType mode,Timestamp & localMark)68 int StorageProxy::GetLocalWaterMarkByMode(const std::string &tableName, CloudWaterType mode, Timestamp &localMark)
69 {
70 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
71 if (cloudMetaData_ == nullptr) {
72 return -E_INVALID_DB;
73 }
74 if (transactionExeFlag_.load() && isWrite_.load()) {
75 LOGE("the write transaction has been started, can not get meta");
76 return -E_BUSY;
77 }
78 return cloudMetaData_->GetLocalWaterMarkByType(AppendWithUserIfNeed(tableName), mode, localMark);
79 }
80
PutLocalWaterMark(const std::string & tableName,Timestamp & localMark)81 int StorageProxy::PutLocalWaterMark(const std::string &tableName, Timestamp &localMark)
82 {
83 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
84 if (cloudMetaData_ == nullptr) {
85 return -E_INVALID_DB;
86 }
87 if (transactionExeFlag_.load() && isWrite_.load()) {
88 LOGE("the write transaction has been started, can not put meta");
89 return -E_BUSY;
90 }
91 return cloudMetaData_->SetLocalWaterMark(AppendWithUserIfNeed(tableName), localMark);
92 }
93
PutWaterMarkByMode(const std::string & tableName,CloudWaterType mode,Timestamp & localMark)94 int StorageProxy::PutWaterMarkByMode(const std::string &tableName, CloudWaterType mode, Timestamp &localMark)
95 {
96 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
97 if (cloudMetaData_ == nullptr) {
98 return -E_INVALID_DB;
99 }
100 if (transactionExeFlag_.load() && isWrite_.load()) {
101 LOGE("the write transaction has been started, can not put meta");
102 return -E_BUSY;
103 }
104 return cloudMetaData_->SetLocalWaterMarkByType(AppendWithUserIfNeed(tableName), mode, localMark);
105 }
106
GetCloudWaterMark(const std::string & tableName,std::string & cloudMark)107 int StorageProxy::GetCloudWaterMark(const std::string &tableName, std::string &cloudMark)
108 {
109 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
110 if (cloudMetaData_ == nullptr) {
111 return -E_INVALID_DB;
112 }
113 return cloudMetaData_->GetCloudWaterMark(AppendWithUserIfNeed(tableName), cloudMark);
114 }
115
SetCloudWaterMark(const std::string & tableName,std::string & cloudMark)116 int StorageProxy::SetCloudWaterMark(const std::string &tableName, std::string &cloudMark)
117 {
118 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
119 if (cloudMetaData_ == nullptr) {
120 return -E_INVALID_DB;
121 }
122 return cloudMetaData_->SetCloudWaterMark(AppendWithUserIfNeed(tableName), cloudMark);
123 }
124
StartTransaction(TransactType type)125 int StorageProxy::StartTransaction(TransactType type)
126 {
127 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
128 if (store_ == nullptr) {
129 return -E_INVALID_DB;
130 }
131 int errCode = store_->StartTransaction(type);
132 if (errCode == E_OK) {
133 transactionExeFlag_.store(true);
134 isWrite_.store(type == TransactType::IMMEDIATE);
135 }
136 return errCode;
137 }
138
Commit()139 int StorageProxy::Commit()
140 {
141 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
142 if (store_ == nullptr) {
143 return -E_INVALID_DB;
144 }
145 int errCode = store_->Commit();
146 if (errCode == E_OK) {
147 transactionExeFlag_.store(false);
148 }
149 return errCode;
150 }
151
Rollback()152 int StorageProxy::Rollback()
153 {
154 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
155 if (store_ == nullptr) {
156 return -E_INVALID_DB;
157 }
158 int errCode = store_->Rollback();
159 if (errCode == E_OK) {
160 transactionExeFlag_.store(false);
161 }
162 return errCode;
163 }
164
GetUploadCount(const QuerySyncObject & query,const bool isCloudForcePush,bool isCompensatedTask,bool isUseWaterMark,int64_t & count)165 int StorageProxy::GetUploadCount(const QuerySyncObject &query, const bool isCloudForcePush,
166 bool isCompensatedTask, bool isUseWaterMark, int64_t &count)
167 {
168 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
169 if (store_ == nullptr) {
170 return -E_INVALID_DB;
171 }
172 if (!transactionExeFlag_.load()) {
173 LOGE("the transaction has not been started");
174 return -E_TRANSACT_STATE;
175 }
176 std::vector<Timestamp> timeStampVec;
177 std::vector<CloudWaterType> waterTypeVec = DBCommon::GetWaterTypeVec();
178 for (size_t i = 0; i < waterTypeVec.size(); i++) {
179 Timestamp tmpMark = 0u;
180 if (isUseWaterMark) {
181 int errCode = cloudMetaData_->GetLocalWaterMarkByType(AppendWithUserIfNeed(query.GetTableName()),
182 waterTypeVec[i], tmpMark);
183 if (errCode != E_OK) {
184 return errCode;
185 }
186 }
187 timeStampVec.push_back(tmpMark);
188 }
189 return store_->GetAllUploadCount(query, timeStampVec, isCloudForcePush, isCompensatedTask, count);
190 }
191
GetUploadCount(const std::string & tableName,const Timestamp & localMark,const bool isCloudForcePush,int64_t & count)192 int StorageProxy::GetUploadCount(const std::string &tableName, const Timestamp &localMark,
193 const bool isCloudForcePush, int64_t &count)
194 {
195 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
196 if (store_ == nullptr) {
197 return -E_INVALID_DB;
198 }
199 if (!transactionExeFlag_.load()) {
200 LOGE("the transaction has not been started");
201 return -E_TRANSACT_STATE;
202 }
203 QuerySyncObject query;
204 query.SetTableName(tableName);
205 return store_->GetUploadCount(query, localMark, isCloudForcePush, false, count);
206 }
207
GetUploadCount(const QuerySyncObject & query,const Timestamp & localMark,bool isCloudForcePush,bool isCompensatedTask,int64_t & count)208 int StorageProxy::GetUploadCount(const QuerySyncObject &query, const Timestamp &localMark,
209 bool isCloudForcePush, bool isCompensatedTask, int64_t &count)
210 {
211 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
212 if (store_ == nullptr) {
213 return -E_INVALID_DB;
214 }
215 if (!transactionExeFlag_.load()) {
216 LOGE("the transaction has not been started");
217 return -E_TRANSACT_STATE;
218 }
219 return store_->GetUploadCount(query, localMark, isCloudForcePush, isCompensatedTask, count);
220 }
221
GetCloudData(const std::string & tableName,const Timestamp & timeRange,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)222 int StorageProxy::GetCloudData(const std::string &tableName, const Timestamp &timeRange,
223 ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
224 {
225 QuerySyncObject querySyncObject;
226 querySyncObject.SetTableName(tableName);
227 return GetCloudData(querySyncObject, timeRange, continueStmtToken, cloudDataResult);
228 }
229
GetCloudData(const QuerySyncObject & querySyncObject,const Timestamp & timeRange,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)230 int StorageProxy::GetCloudData(const QuerySyncObject &querySyncObject, const Timestamp &timeRange,
231 ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
232 {
233 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
234 if (store_ == nullptr) {
235 return -E_INVALID_DB;
236 }
237 if (!transactionExeFlag_.load()) {
238 LOGE("the transaction has not been started");
239 return -E_TRANSACT_STATE;
240 }
241 TableSchema tableSchema;
242 int errCode = store_->GetCloudTableSchema(querySyncObject.GetRelationTableName(), tableSchema);
243 if (errCode != E_OK) {
244 return errCode;
245 }
246 return store_->GetCloudData(tableSchema, querySyncObject, timeRange, continueStmtToken, cloudDataResult);
247 }
248
GetCloudDataNext(ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult) const249 int StorageProxy::GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) const
250 {
251 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
252 if (store_ == nullptr) {
253 return -E_INVALID_DB;
254 }
255 if (!transactionExeFlag_.load()) {
256 LOGE("the transaction has not been started");
257 return -E_TRANSACT_STATE;
258 }
259 return store_->GetCloudDataNext(continueStmtToken, cloudDataResult);
260 }
261
GetCloudGid(const QuerySyncObject & querySyncObject,bool isCloudForcePush,bool isCompensatedTask,std::vector<std::string> & cloudGid)262 int StorageProxy::GetCloudGid(const QuerySyncObject &querySyncObject, bool isCloudForcePush,
263 bool isCompensatedTask, std::vector<std::string> &cloudGid)
264 {
265 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
266 if (store_ == nullptr) {
267 return -E_INVALID_DB;
268 }
269 TableSchema tableSchema;
270 int errCode = store_->GetCloudTableSchema(querySyncObject.GetRelationTableName(), tableSchema);
271 if (errCode != E_OK) {
272 return errCode;
273 }
274 return store_->GetCloudGid(tableSchema, querySyncObject, isCloudForcePush, isCompensatedTask, cloudGid);
275 }
276
GetInfoByPrimaryKeyOrGid(const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)277 int StorageProxy::GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
278 DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
279 {
280 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
281 if (store_ == nullptr) {
282 return -E_INVALID_DB;
283 }
284 if (!transactionExeFlag_.load()) {
285 LOGE("the transaction has not been started");
286 return -E_TRANSACT_STATE;
287 }
288
289 int errCode = store_->GetInfoByPrimaryKeyOrGid(tableName, vBucket, dataInfoWithLog, assetInfo);
290 if (errCode == E_OK) {
291 dataInfoWithLog.logInfo.timestamp = EraseNanoTime(dataInfoWithLog.logInfo.timestamp);
292 dataInfoWithLog.logInfo.wTimestamp = EraseNanoTime(dataInfoWithLog.logInfo.wTimestamp);
293 }
294 if ((dataInfoWithLog.logInfo.flag & static_cast<uint64_t>(LogInfoFlag::FLAG_LOGIC_DELETE)) != 0) {
295 assetInfo.clear();
296 }
297 return errCode;
298 }
299
SetCursorIncFlag(bool flag)300 int StorageProxy::SetCursorIncFlag(bool flag)
301 {
302 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
303 if (store_ == nullptr) {
304 return -E_INVALID_DB;
305 }
306 return store_->SetCursorIncFlag(flag);
307 }
308
PutCloudSyncData(const std::string & tableName,DownloadData & downloadData)309 int StorageProxy::PutCloudSyncData(const std::string &tableName, DownloadData &downloadData)
310 {
311 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
312 if (store_ == nullptr) {
313 return -E_INVALID_DB;
314 }
315 if (!transactionExeFlag_.load()) {
316 LOGE("the transaction has not been started");
317 return -E_TRANSACT_STATE;
318 }
319 downloadData.user = user_;
320 return store_->PutCloudSyncData(tableName, downloadData);
321 }
322
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)323 int StorageProxy::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
324 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
325 {
326 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
327 if (store_ == nullptr) {
328 return -E_INVALID_DB;
329 }
330 if (!transactionExeFlag_.load()) {
331 LOGE("the transaction has not been started");
332 return -E_TRANSACT_STATE;
333 }
334 return store_->CleanCloudData(mode, tableNameList, localSchema, assets);
335 }
336
ReleaseContinueToken(ContinueToken & continueStmtToken)337 int StorageProxy::ReleaseContinueToken(ContinueToken &continueStmtToken)
338 {
339 return store_->ReleaseCloudDataToken(continueStmtToken);
340 }
341
CheckSchema(const TableName & tableName) const342 int StorageProxy::CheckSchema(const TableName &tableName) const
343 {
344 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
345 if (store_ == nullptr) {
346 return -E_INVALID_DB;
347 }
348 return store_->ChkSchema(tableName);
349 }
350
CheckSchema(std::vector<std::string> & tables)351 int StorageProxy::CheckSchema(std::vector<std::string> &tables)
352 {
353 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
354 if (store_ == nullptr) {
355 return -E_INVALID_DB;
356 }
357 if (tables.empty()) {
358 return -E_INVALID_ARGS;
359 }
360 for (const auto &table : tables) {
361 int ret = store_->ChkSchema(table);
362 if (ret != E_OK) {
363 return ret;
364 }
365 }
366 return E_OK;
367 }
368
GetPrimaryColNamesWithAssetsFields(const TableName & tableName,std::vector<std::string> & colNames,std::vector<Field> & assetFields)369 int StorageProxy::GetPrimaryColNamesWithAssetsFields(const TableName &tableName, std::vector<std::string> &colNames,
370 std::vector<Field> &assetFields)
371 {
372 if (!colNames.empty()) {
373 // output parameter should be empty
374 return -E_INVALID_ARGS;
375 }
376
377 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
378 if (store_ == nullptr) {
379 return -E_INVALID_DB;
380 }
381 // GetTableInfo
382 TableSchema tableSchema;
383 int ret = store_->GetCloudTableSchema(tableName, tableSchema);
384 if (ret != E_OK) {
385 LOGE("Cannot get cloud table schema: %d", ret);
386 return ret;
387 }
388 for (const auto &field : tableSchema.fields) {
389 if (field.primary) {
390 colNames.push_back(field.colName);
391 }
392 if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
393 assetFields.push_back(field);
394 }
395 }
396 if (colNames.empty() || colNames.size() > 1) {
397 (void)colNames.insert(colNames.begin(), CloudDbConstant::ROW_ID_FIELD_NAME);
398 }
399 return E_OK;
400 }
401
NotifyChangedData(const std::string & deviceName,ChangedData && changedData)402 int StorageProxy::NotifyChangedData(const std::string &deviceName, ChangedData &&changedData)
403 {
404 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
405 if (store_ == nullptr) {
406 return -E_INVALID_DB;
407 }
408 ChangeProperties changeProperties;
409 store_->GetAndResetServerObserverData(changedData.tableName, changeProperties);
410 changedData.properties = changeProperties;
411 store_->TriggerObserverAction(deviceName, std::move(changedData), true);
412 return E_OK;
413 }
414
FillCloudAssetForDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)415 int StorageProxy::FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess)
416 {
417 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
418 if (store_ == nullptr) {
419 return -E_INVALID_DB;
420 }
421 if (!transactionExeFlag_.load() || !isWrite_.load()) {
422 LOGE("the write transaction has not started before fill download assets");
423 return -E_TRANSACT_STATE;
424 }
425 return store_->FillCloudAssetForDownload(tableName, asset, isDownloadSuccess);
426 }
427
SetLogTriggerStatus(bool status)428 int StorageProxy::SetLogTriggerStatus(bool status)
429 {
430 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
431 if (store_ == nullptr) {
432 return -E_INVALID_DB;
433 }
434 return store_->SetLogTriggerStatus(status);
435 }
436
FillCloudLogAndAsset(OpType opType,const CloudSyncData & data)437 int StorageProxy::FillCloudLogAndAsset(OpType opType, const CloudSyncData &data)
438 {
439 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
440 if (store_ == nullptr) {
441 return -E_INVALID_DB;
442 }
443 if (!transactionExeFlag_.load()) {
444 LOGE("the transaction has not been started");
445 return -E_TRANSACT_STATE;
446 }
447 return store_->FillCloudLogAndAsset(opType, data, true, false);
448 }
449
GetIdentify() const450 std::string StorageProxy::GetIdentify() const
451 {
452 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
453 if (store_ == nullptr) {
454 LOGW("[StorageProxy] store is nullptr return default");
455 return "";
456 }
457 return store_->GetIdentify();
458 }
459
EraseNanoTime(DistributedDB::Timestamp localTime)460 Timestamp StorageProxy::EraseNanoTime(DistributedDB::Timestamp localTime)
461 {
462 return localTime / CloudDbConstant::TEN_THOUSAND * CloudDbConstant::TEN_THOUSAND;
463 }
464
CleanWaterMark(const DistributedDB::TableName & tableName)465 int StorageProxy::CleanWaterMark(const DistributedDB::TableName &tableName)
466 {
467 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
468 if (cloudMetaData_ == nullptr) {
469 LOGW("[StorageProxy] meta is nullptr return default");
470 return -E_INVALID_DB;
471 }
472 return cloudMetaData_->CleanWaterMark(AppendWithUserIfNeed(tableName));
473 }
474
CleanWaterMarkInMemory(const DistributedDB::TableName & tableName)475 int StorageProxy::CleanWaterMarkInMemory(const DistributedDB::TableName &tableName)
476 {
477 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
478 if (cloudMetaData_ == nullptr) {
479 LOGW("[StorageProxy] CleanWaterMarkInMemory is nullptr return default");
480 return -E_INVALID_DB;
481 }
482 cloudMetaData_->CleanWaterMarkInMemory(AppendWithUserIfNeed(tableName));
483 return E_OK;
484 }
485
SetUser(const std::string & user)486 void StorageProxy::SetUser(const std::string &user)
487 {
488 user_ = user;
489 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
490 if (store_ != nullptr) {
491 store_->SetUser(user);
492 }
493 }
494
CreateTempSyncTrigger(const std::string & tableName)495 int StorageProxy::CreateTempSyncTrigger(const std::string &tableName)
496 {
497 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
498 if (store_ == nullptr) {
499 return -E_INVALID_DB;
500 }
501 return store_->CreateTempSyncTrigger(tableName);
502 }
503
ClearAllTempSyncTrigger()504 int StorageProxy::ClearAllTempSyncTrigger()
505 {
506 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
507 if (store_ == nullptr) {
508 return -E_INVALID_DB;
509 }
510 // Clean up all temporary triggers
511 return store_->ClearAllTempSyncTrigger();
512 }
513
IsSharedTable(const std::string & tableName,bool & IsSharedTable)514 int StorageProxy::IsSharedTable(const std::string &tableName, bool &IsSharedTable)
515 {
516 std::unique_lock<std::shared_mutex> writeLock(storeMutex_);
517 if (store_ == nullptr) {
518 return -E_INVALID_DB;
519 }
520 IsSharedTable = store_->IsSharedTable(tableName);
521 return E_OK;
522 }
523
FillCloudGidIfSuccess(const OpType opType,const CloudSyncData & data)524 void StorageProxy::FillCloudGidIfSuccess(const OpType opType, const CloudSyncData &data)
525 {
526 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
527 if (store_ == nullptr) {
528 LOGW("[StorageProxy] fill gid failed with store invalid");
529 return;
530 }
531 int errCode = store_->FillCloudLogAndAsset(opType, data, true, true);
532 if (errCode != E_OK) {
533 LOGW("[StorageProxy] fill gid failed %d", errCode);
534 }
535 }
536
SetCloudTaskConfig(const CloudTaskConfig & config)537 void StorageProxy::SetCloudTaskConfig(const CloudTaskConfig &config)
538 {
539 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
540 if (store_ == nullptr) {
541 LOGW("[StorageProxy] fill gid failed with store invalid");
542 return;
543 }
544 store_->SetCloudTaskConfig(config);
545 }
546
GetAssetsByGidOrHashKey(const std::string & tableName,const std::string & gid,const Bytes & hashKey,VBucket & assets)547 std::pair<int, uint32_t> StorageProxy::GetAssetsByGidOrHashKey(const std::string &tableName, const std::string &gid,
548 const Bytes &hashKey, VBucket &assets)
549 {
550 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
551 if (store_ == nullptr) {
552 return { -E_INVALID_DB, static_cast<uint32_t>(LockStatus::UNLOCK) };
553 }
554 TableSchema tableSchema;
555 int errCode = store_->GetCloudTableSchema(tableName, tableSchema);
556 if (errCode != E_OK) {
557 LOGE("get cloud table schema failed: %d", errCode);
558 return { errCode, static_cast<uint32_t>(LockStatus::UNLOCK) };
559 }
560 return store_->GetAssetsByGidOrHashKey(tableSchema, gid, hashKey, assets);
561 }
562
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)563 int StorageProxy::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
564 {
565 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
566 if (store_ == nullptr) {
567 return -E_INVALID_DB;
568 }
569 return store_->SetIAssetLoader(loader);
570 }
571
UpdateRecordFlag(const std::string & tableName,bool recordConflict,const LogInfo & logInfo)572 int StorageProxy::UpdateRecordFlag(const std::string &tableName, bool recordConflict, const LogInfo &logInfo)
573 {
574 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
575 if (store_ == nullptr) {
576 return -E_INVALID_DB;
577 }
578 return store_->UpdateRecordFlag(tableName, recordConflict, logInfo);
579 }
580
GetCompensatedSyncQuery(std::vector<QuerySyncObject> & syncQuery,std::vector<std::string> & users)581 int StorageProxy::GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery, std::vector<std::string> &users)
582 {
583 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
584 if (store_ == nullptr) {
585 return -E_INVALID_DB;
586 }
587 return store_->GetCompensatedSyncQuery(syncQuery, users);
588 }
589
ClearUnLockingNoNeedCompensated()590 int StorageProxy::ClearUnLockingNoNeedCompensated()
591 {
592 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
593 if (store_ == nullptr) {
594 return -E_INVALID_DB;
595 }
596 return store_->ClearUnLockingNoNeedCompensated();
597 }
598
MarkFlagAsConsistent(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)599 int StorageProxy::MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData,
600 const std::set<std::string> &gidFilters)
601 {
602 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
603 if (store_ == nullptr) {
604 return -E_INVALID_DB;
605 }
606 return store_->MarkFlagAsConsistent(tableName, downloadData, gidFilters);
607 }
608
OnSyncFinish()609 void StorageProxy::OnSyncFinish()
610 {
611 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
612 if (store_ == nullptr) {
613 return;
614 }
615 store_->SyncFinishHook();
616 }
617
OnUploadStart()618 void StorageProxy::OnUploadStart()
619 {
620 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
621 if (store_ == nullptr) {
622 return;
623 }
624 store_->DoUploadHook();
625 }
626
CleanAllWaterMark()627 void StorageProxy::CleanAllWaterMark()
628 {
629 cloudMetaData_->CleanAllWaterMark();
630 }
631
AppendWithUserIfNeed(const std::string & source) const632 std::string StorageProxy::AppendWithUserIfNeed(const std::string &source) const
633 {
634 if (user_.empty()) {
635 return source;
636 }
637 return source + "_" + user_;
638 }
639
GetCloudDbSchema(std::shared_ptr<DataBaseSchema> & cloudSchema)640 int StorageProxy::GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema)
641 {
642 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
643 if (store_ == nullptr) {
644 return -E_INVALID_DB;
645 }
646 return store_->GetCloudDbSchema(cloudSchema);
647 }
648
GetLocalCloudVersion()649 std::pair<int, CloudSyncData> StorageProxy::GetLocalCloudVersion()
650 {
651 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
652 if (store_ == nullptr) {
653 return {-E_INTERNAL_ERROR, {}};
654 }
655 return store_->GetLocalCloudVersion();
656 }
657
GetCloudSyncConfig() const658 CloudSyncConfig StorageProxy::GetCloudSyncConfig() const
659 {
660 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
661 if (store_ == nullptr) {
662 return {};
663 }
664 return store_->GetCloudSyncConfig();
665 }
666
IsTableExistReference(const std::string & table)667 bool StorageProxy::IsTableExistReference(const std::string &table)
668 {
669 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
670 if (store_ == nullptr) {
671 return false;
672 }
673 return store_->IsTableExistReference(table);
674 }
675
IsTableExistReferenceOrReferenceBy(const std::string & table)676 bool StorageProxy::IsTableExistReferenceOrReferenceBy(const std::string &table)
677 {
678 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
679 if (store_ == nullptr) {
680 return false;
681 }
682 return store_->IsTableExistReferenceOrReferenceBy(table);
683 }
684
ReleaseUploadRecord(const std::string & table,const CloudWaterType & type,Timestamp localWaterMark)685 void StorageProxy::ReleaseUploadRecord(const std::string &table, const CloudWaterType &type, Timestamp localWaterMark)
686 {
687 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
688 if (store_ == nullptr) {
689 return;
690 }
691 store_->ReleaseUploadRecord(table, type, localWaterMark);
692 }
693
IsTagCloudUpdateLocal(const LogInfo & localInfo,const LogInfo & cloudInfo,SingleVerConflictResolvePolicy policy)694 bool StorageProxy::IsTagCloudUpdateLocal(const LogInfo &localInfo, const LogInfo &cloudInfo,
695 SingleVerConflictResolvePolicy policy)
696 {
697 if (store_ == nullptr) {
698 return false;
699 }
700 return store_->IsTagCloudUpdateLocal(localInfo, cloudInfo, policy);
701 }
702 }
703