1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "cloud_db_proxy.h"
16 #include "cloud/cloud_db_constant.h"
17 #include "db_common.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20
21 namespace DistributedDB {
CloudDBProxy()22 CloudDBProxy::CloudDBProxy()
23 : timeout_(0)
24 {
25 }
26
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDB)27 void CloudDBProxy::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB)
28 {
29 std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
30 if (!iCloudDb_) {
31 iCloudDb_ = cloudDB;
32 }
33 }
34
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)35 int CloudDBProxy::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
36 {
37 std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
38 for (const auto &item : cloudDBs) {
39 if (item.second == nullptr) {
40 LOGE("[CloudDBProxy] User %s setCloudDB with nullptr", item.first.c_str());
41 return -E_INVALID_ARGS;
42 }
43 }
44 cloudDbs_ = cloudDBs;
45 return E_OK;
46 }
47
GetCloudDB() const48 const std::map<std::string, std::shared_ptr<ICloudDb>> CloudDBProxy::GetCloudDB() const
49 {
50 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
51 return cloudDbs_;
52 }
53
SwitchCloudDB(const std::string & user)54 void CloudDBProxy::SwitchCloudDB(const std::string &user)
55 {
56 std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
57 if (cloudDbs_.find(user) == cloudDbs_.end()) {
58 return;
59 }
60 iCloudDb_ = cloudDbs_[user];
61 }
62
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)63 void CloudDBProxy::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
64 {
65 std::unique_lock<std::shared_mutex> writeLock(assetLoaderMutex_);
66 iAssetLoader_ = loader;
67 }
68
BatchInsert(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo)69 int CloudDBProxy::BatchInsert(const std::string &tableName, std::vector<VBucket> &record,
70 std::vector<VBucket> &extend, Info &uploadInfo)
71 {
72 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
73 if (iCloudDb_ == nullptr) {
74 return -E_CLOUD_ERROR;
75 }
76 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
77 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
78 context->MoveInRecordAndExtend(record, extend);
79 context->SetTableName(tableName);
80 int errCode = InnerAction(context, cloudDb, INSERT);
81 uploadInfo = context->GetInfo();
82 context->MoveOutRecordAndExtend(record, extend);
83 return errCode;
84 }
85
BatchUpdate(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo)86 int CloudDBProxy::BatchUpdate(const std::string &tableName, std::vector<VBucket> &record,
87 std::vector<VBucket> &extend, Info &uploadInfo)
88 {
89 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
90 if (iCloudDb_ == nullptr) {
91 return -E_CLOUD_ERROR;
92 }
93 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
94 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
95 context->SetTableName(tableName);
96 context->MoveInRecordAndExtend(record, extend);
97 int errCode = InnerAction(context, cloudDb, UPDATE);
98 uploadInfo = context->GetInfo();
99 context->MoveOutRecordAndExtend(record, extend);
100 return errCode;
101 }
102
BatchDelete(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo)103 int CloudDBProxy::BatchDelete(const std::string &tableName, std::vector<VBucket> &record, std::vector<VBucket> &extend,
104 Info &uploadInfo)
105 {
106 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
107 if (iCloudDb_ == nullptr) {
108 return -E_CLOUD_ERROR;
109 }
110 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
111 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
112 context->MoveInRecordAndExtend(record, extend);
113 context->SetTableName(tableName);
114 int errCode = InnerAction(context, cloudDb, DELETE);
115 uploadInfo = context->GetInfo();
116 context->MoveOutRecordAndExtend(record, extend);
117 return errCode;
118 }
119
Query(const std::string & tableName,VBucket & extend,std::vector<VBucket> & data)120 int CloudDBProxy::Query(const std::string &tableName, VBucket &extend, std::vector<VBucket> &data)
121 {
122 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
123 if (iCloudDb_ == nullptr) {
124 return -E_CLOUD_ERROR;
125 }
126 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
127 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
128 context->MoveInQueryExtendAndData(extend, data);
129 context->SetTableName(tableName);
130 int errCode = InnerAction(context, cloudDb, QUERY);
131 context->MoveOutQueryExtendAndData(extend, data);
132 for (auto &item : data) {
133 for (auto &row : item) {
134 auto assets = std::get_if<Assets>(&row.second);
135 if (assets == nullptr) {
136 continue;
137 }
138 DBCommon::RemoveDuplicateAssetsData(*assets);
139 }
140 }
141 return errCode;
142 }
143
Lock()144 std::pair<int, uint64_t> CloudDBProxy::Lock()
145 {
146 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
147 if (iCloudDb_ == nullptr) {
148 return { -E_CLOUD_ERROR, 0u };
149 }
150 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
151 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
152 std::pair<int, uint64_t> lockStatus;
153 int errCode = InnerAction(context, cloudDb, LOCK);
154 context->MoveOutLockStatus(lockStatus);
155 lockStatus.first = errCode;
156 return lockStatus;
157 }
158
UnLock()159 int CloudDBProxy::UnLock()
160 {
161 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
162 if (iCloudDb_ == nullptr) {
163 return -E_CLOUD_ERROR;
164 }
165 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
166 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
167 return InnerAction(context, cloudDb, UNLOCK);
168 }
169
Close()170 int CloudDBProxy::Close()
171 {
172 std::shared_ptr<ICloudDb> iCloudDb = nullptr;
173 std::vector<std::shared_ptr<ICloudDb>> waitForClose;
174 {
175 std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
176 if (iCloudDb_ != nullptr) {
177 iCloudDb = iCloudDb_;
178 iCloudDb_ = nullptr;
179 }
180 for (const auto &item : cloudDbs_) {
181 if (iCloudDb == item.second) {
182 iCloudDb = nullptr;
183 }
184 waitForClose.push_back(item.second);
185 }
186 cloudDbs_.clear();
187 }
188 LOGD("[CloudDBProxy] call cloudDb close begin");
189 DBStatus status = OK;
190 if (iCloudDb != nullptr) {
191 status = iCloudDb->Close();
192 }
193 for (const auto &item : waitForClose) {
194 DBStatus ret = item->Close();
195 status = (status == OK ? ret : status);
196 }
197 if (status != OK) {
198 LOGW("[CloudDBProxy] cloud db close failed %d", static_cast<int>(status));
199 }
200 waitForClose.clear();
201 LOGD("[CloudDBProxy] call cloudDb close end");
202 return status == OK ? E_OK : -E_CLOUD_ERROR;
203 }
204
HeartBeat()205 int CloudDBProxy::HeartBeat()
206 {
207 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
208 if (iCloudDb_ == nullptr) {
209 return -E_CLOUD_ERROR;
210 }
211
212 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
213 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
214 return InnerAction(context, cloudDb, HEARTBEAT);
215 }
216
IsNotExistCloudDB() const217 bool CloudDBProxy::IsNotExistCloudDB() const
218 {
219 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
220 return iCloudDb_ == nullptr && cloudDbs_.empty();
221 }
222
Download(const std::string & tableName,const std::string & gid,const Type & prefix,std::map<std::string,Assets> & assets)223 int CloudDBProxy::Download(const std::string &tableName, const std::string &gid, const Type &prefix,
224 std::map<std::string, Assets> &assets)
225 {
226 if (assets.empty()) {
227 return E_OK;
228 }
229 std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
230 if (iAssetLoader_ == nullptr) {
231 LOGE("Asset loader has not been set %d", -E_NOT_SET);
232 return -E_NOT_SET;
233 }
234 DBStatus status = iAssetLoader_->Download(tableName, gid, prefix, assets);
235 if (status != OK) {
236 LOGW("[CloudDBProxy] download asset failed %d", static_cast<int>(status));
237 }
238 return GetInnerErrorCode(status);
239 }
240
RemoveLocalAssets(const std::vector<Asset> & assets)241 int CloudDBProxy::RemoveLocalAssets(const std::vector<Asset> &assets)
242 {
243 if (assets.empty()) {
244 return E_OK;
245 }
246 std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
247 if (iAssetLoader_ == nullptr) {
248 LOGW("Asset loader has not been set");
249 return E_OK;
250 }
251 DBStatus status = iAssetLoader_->RemoveLocalAssets(assets);
252 if (status != OK) {
253 LOGE("[CloudDBProxy] remove local asset failed %d", static_cast<int>(status));
254 return -E_REMOVE_ASSETS_FAILED;
255 }
256 return E_OK;
257 }
258
RemoveLocalAssets(const std::string & tableName,const std::string & gid,const Type & prefix,std::map<std::string,Assets> & assets)259 int CloudDBProxy::RemoveLocalAssets(const std::string &tableName, const std::string &gid, const Type &prefix,
260 std::map<std::string, Assets> &assets)
261 {
262 if (assets.empty()) {
263 return E_OK;
264 }
265 std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
266 if (iAssetLoader_ == nullptr) {
267 LOGE("Asset loader has not been set %d", -E_NOT_SET);
268 return -E_NOT_SET;
269 }
270 DBStatus status = iAssetLoader_->RemoveLocalAssets(tableName, gid, prefix, assets);
271 if (status != OK) {
272 LOGE("[CloudDBProxy] remove local asset failed %d", static_cast<int>(status));
273 return -E_REMOVE_ASSETS_FAILED;
274 }
275 return E_OK;
276 }
277
GetEmptyCursor(const std::string & tableName)278 std::pair<int, std::string> CloudDBProxy::GetEmptyCursor(const std::string &tableName)
279 {
280 std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
281 if (iCloudDb_ == nullptr) {
282 return { -E_CLOUD_ERROR, "" };
283 }
284 std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
285 std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
286 context->SetTableName(tableName);
287 int errCode = InnerAction(context, cloudDb, GET_EMPTY_CURSOR);
288 std::pair<int, std::string> cursorStatus;
289 context->MoveOutCursorStatus(cursorStatus);
290 cursorStatus.first = errCode;
291 return cursorStatus;
292 }
293
InnerAction(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)294 int CloudDBProxy::InnerAction(const std::shared_ptr<CloudActionContext> &context,
295 const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
296 {
297 if (action >= InnerActionCode::INVALID_ACTION) {
298 return -E_INVALID_ARGS;
299 }
300 InnerActionTask(context, cloudDb, action);
301 return context->GetActionRes();
302 }
303
DMLActionTask(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)304 DBStatus CloudDBProxy::DMLActionTask(const std::shared_ptr<CloudActionContext> &context,
305 const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
306 {
307 DBStatus status = OK;
308 std::vector<VBucket> record;
309 std::vector<VBucket> extend;
310 context->MoveOutRecordAndExtend(record, extend);
311
312 switch (action) {
313 case INSERT: {
314 status = cloudDb->BatchInsert(context->GetTableName(), std::move(record), extend);
315 context->MoveInRecordAndExtend(record, extend);
316 context->SetInfo(CloudWaterType::INSERT, status);
317 break;
318 }
319 case UPDATE: {
320 status = cloudDb->BatchUpdate(context->GetTableName(), std::move(record), extend);
321 context->MoveInRecordAndExtend(record, extend);
322 context->SetInfo(CloudWaterType::UPDATE, status);
323 break;
324 }
325 case DELETE: {
326 status = cloudDb->BatchDelete(context->GetTableName(), extend);
327 context->MoveInRecordAndExtend(record, extend);
328 context->SetInfo(CloudWaterType::DELETE, status);
329 break;
330 }
331 default: {
332 LOGE("DMLActionTask can only be used on INSERT/UPDATE/DELETE.");
333 return INVALID_ARGS;
334 }
335 }
336 if (status == CLOUD_VERSION_CONFLICT) {
337 LOGI("[CloudSyncer] Version conflict during cloud batch upload.");
338 } else if (status != OK) {
339 LOGE("[CloudSyncer] Cloud BATCH UPLOAD failed.");
340 }
341 return status;
342 }
343
InnerActionTask(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)344 void CloudDBProxy::InnerActionTask(const std::shared_ptr<CloudActionContext> &context,
345 const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
346 {
347 DBStatus status = OK;
348 bool setResAlready = false;
349 LOGD("[CloudDBProxy] action %" PRIu8 " begin", static_cast<uint8_t>(action));
350 switch (action) {
351 case INSERT:
352 case UPDATE:
353 case DELETE:
354 status = DMLActionTask(context, cloudDb, action);
355 break;
356 case QUERY: {
357 status = QueryAction(context, cloudDb);
358 if (status == QUERY_END) {
359 setResAlready = true;
360 }
361 break;
362 }
363 case GET_EMPTY_CURSOR:
364 status = InnerActionGetEmptyCursor(context, cloudDb);
365 break;
366 case LOCK:
367 status = InnerActionLock(context, cloudDb);
368 break;
369 case UNLOCK:
370 status = cloudDb->UnLock();
371 break;
372 case HEARTBEAT:
373 status = cloudDb->HeartBeat();
374 break;
375 default: // should not happen
376 status = DB_ERROR;
377 }
378 LOGD("[CloudDBProxy] action %" PRIu8 " end res:%d", static_cast<uint8_t>(action), static_cast<int>(status));
379
380 if (!setResAlready) {
381 context->SetActionRes(GetInnerErrorCode(status));
382 }
383
384 context->FinishAndNotify();
385 }
386
InnerActionLock(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)387 DBStatus CloudDBProxy::InnerActionLock(const std::shared_ptr<CloudActionContext> &context,
388 const std::shared_ptr<ICloudDb> &cloudDb)
389 {
390 DBStatus status = OK;
391 std::pair<int, uint64_t> lockRet;
392 std::pair<DBStatus, uint64_t> lockStatus = cloudDb->Lock();
393 if (lockStatus.first != OK) {
394 status = lockStatus.first;
395 } else if (lockStatus.second == 0) {
396 status = CLOUD_ERROR;
397 }
398 lockRet.second = lockStatus.second;
399 lockRet.first = GetInnerErrorCode(status);
400 context->MoveInLockStatus(lockRet);
401 return status;
402 }
403
InnerActionGetEmptyCursor(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)404 DBStatus CloudDBProxy::InnerActionGetEmptyCursor(const std::shared_ptr<CloudActionContext> &context,
405 const std::shared_ptr<ICloudDb> &cloudDb)
406 {
407 std::string tableName = context->GetTableName();
408 std::pair<DBStatus, std::string> cursorStatus = cloudDb->GetEmptyCursor(tableName);
409 DBStatus status = OK;
410 if (cursorStatus.first != OK) {
411 status = cursorStatus.first;
412 }
413 std::pair<int, std::string> cursorRet;
414 cursorRet.second = cursorStatus.second;
415 cursorRet.first = GetInnerErrorCode(status);
416 context->MoveInCursorStatus(cursorRet);
417 return status;
418 }
419
GetInnerErrorCode(DBStatus status)420 int CloudDBProxy::GetInnerErrorCode(DBStatus status)
421 {
422 if (status < DB_ERROR || status >= BUTT_STATUS) {
423 return static_cast<int>(status);
424 }
425 switch (status) {
426 case OK:
427 return E_OK;
428 case CLOUD_NETWORK_ERROR:
429 return -E_CLOUD_NETWORK_ERROR;
430 case CLOUD_SYNC_UNSET:
431 return -E_CLOUD_SYNC_UNSET;
432 case CLOUD_FULL_RECORDS:
433 return -E_CLOUD_FULL_RECORDS;
434 case CLOUD_LOCK_ERROR:
435 return -E_CLOUD_LOCK_ERROR;
436 case CLOUD_ASSET_SPACE_INSUFFICIENT:
437 return -E_CLOUD_ASSET_SPACE_INSUFFICIENT;
438 case CLOUD_VERSION_CONFLICT:
439 return -E_CLOUD_VERSION_CONFLICT;
440 case CLOUD_RECORD_EXIST_CONFLICT:
441 return -E_CLOUD_RECORD_EXIST_CONFLICT;
442 default:
443 return -E_CLOUD_ERROR;
444 }
445 }
446
QueryAction(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)447 DBStatus CloudDBProxy::QueryAction(const std::shared_ptr<CloudActionContext> &context,
448 const std::shared_ptr<ICloudDb> &cloudDb)
449 {
450 VBucket queryExtend;
451 std::vector<VBucket> data;
452 context->MoveOutQueryExtendAndData(queryExtend, data);
453 DBStatus status = cloudDb->Query(context->GetTableName(), queryExtend, data);
454 context->MoveInQueryExtendAndData(queryExtend, data);
455 if (status == QUERY_END) {
456 context->SetActionRes(-E_QUERY_END);
457 }
458 return status;
459 }
460
CloudActionContext()461 CloudDBProxy::CloudActionContext::CloudActionContext()
462 : actionFinished_(false),
463 actionRes_(OK),
464 totalCount_(0u),
465 successCount_(0u),
466 failedCount_(0u)
467 {
468 }
469
MoveInRecordAndExtend(std::vector<VBucket> & record,std::vector<VBucket> & extend)470 void CloudDBProxy::CloudActionContext::MoveInRecordAndExtend(std::vector<VBucket> &record,
471 std::vector<VBucket> &extend)
472 {
473 std::lock_guard<std::mutex> autoLock(actionMutex_);
474 record_ = std::move(record);
475 extend_ = std::move(extend);
476 }
477
MoveInExtend(std::vector<VBucket> & extend)478 void CloudDBProxy::CloudActionContext::MoveInExtend(std::vector<VBucket> &extend)
479 {
480 std::lock_guard<std::mutex> autoLock(actionMutex_);
481 extend_ = std::move(extend);
482 }
483
MoveOutRecordAndExtend(std::vector<VBucket> & record,std::vector<VBucket> & extend)484 void CloudDBProxy::CloudActionContext::MoveOutRecordAndExtend(std::vector<VBucket> &record,
485 std::vector<VBucket> &extend)
486 {
487 std::lock_guard<std::mutex> autoLock(actionMutex_);
488 record = std::move(record_);
489 extend = std::move(extend_);
490 }
491
MoveInQueryExtendAndData(VBucket & extend,std::vector<VBucket> & data)492 void CloudDBProxy::CloudActionContext::MoveInQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data)
493 {
494 std::lock_guard<std::mutex> autoLock(actionMutex_);
495 queryExtend_ = std::move(extend);
496 data_ = std::move(data);
497 }
498
MoveOutQueryExtendAndData(VBucket & extend,std::vector<VBucket> & data)499 void CloudDBProxy::CloudActionContext::MoveOutQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data)
500 {
501 std::lock_guard<std::mutex> autoLock(actionMutex_);
502 extend = std::move(queryExtend_);
503 data = std::move(data_);
504 }
505
MoveInLockStatus(std::pair<int,uint64_t> & lockStatus)506 void CloudDBProxy::CloudActionContext::MoveInLockStatus(std::pair<int, uint64_t> &lockStatus)
507 {
508 std::lock_guard<std::mutex> autoLock(actionMutex_);
509 lockStatus_ = std::move(lockStatus);
510 }
511
MoveOutLockStatus(std::pair<int,uint64_t> & lockStatus)512 void CloudDBProxy::CloudActionContext::MoveOutLockStatus(std::pair<int, uint64_t> &lockStatus)
513 {
514 std::lock_guard<std::mutex> autoLock(actionMutex_);
515 lockStatus = std::move(lockStatus_);
516 }
517
MoveInCursorStatus(std::pair<int,std::string> & cursorStatus)518 void CloudDBProxy::CloudActionContext::MoveInCursorStatus(std::pair<int, std::string> &cursorStatus)
519 {
520 std::lock_guard<std::mutex> autoLock(actionMutex_);
521 cursorStatus_ = std::move(cursorStatus);
522 }
523
MoveOutCursorStatus(std::pair<int,std::string> & cursorStatus)524 void CloudDBProxy::CloudActionContext::MoveOutCursorStatus(std::pair<int, std::string> &cursorStatus)
525 {
526 std::lock_guard<std::mutex> autoLock(actionMutex_);
527 cursorStatus = std::move(cursorStatus_);
528 }
529
FinishAndNotify()530 void CloudDBProxy::CloudActionContext::FinishAndNotify()
531 {
532 {
533 std::lock_guard<std::mutex> autoLock(actionMutex_);
534 actionFinished_ = true;
535 }
536 actionCv_.notify_all();
537 }
538
SetActionRes(int res)539 void CloudDBProxy::CloudActionContext::SetActionRes(int res)
540 {
541 std::lock_guard<std::mutex> autoLock(actionMutex_);
542 actionRes_ = res;
543 }
544
GetActionRes()545 int CloudDBProxy::CloudActionContext::GetActionRes()
546 {
547 std::lock_guard<std::mutex> autoLock(actionMutex_);
548 return actionRes_;
549 }
550
GetInfo()551 Info CloudDBProxy::CloudActionContext::GetInfo()
552 {
553 std::lock_guard<std::mutex> autoLock(actionMutex_);
554 Info info;
555 info.total = totalCount_;
556 info.successCount = successCount_;
557 info.failCount = failedCount_;
558 return info;
559 }
560
IsEmptyAssetId(const Assets & assets)561 bool CloudDBProxy::CloudActionContext::IsEmptyAssetId(const Assets &assets)
562 {
563 for (auto &asset : assets) {
564 if (asset.assetId.empty()) {
565 return true;
566 }
567 }
568 return false;
569 }
570
IsRecordActionFail(const VBucket & extend,bool isInsert,DBStatus status)571 bool CloudDBProxy::CloudActionContext::IsRecordActionFail(const VBucket &extend, bool isInsert, DBStatus status)
572 {
573 if (extend.count(CloudDbConstant::GID_FIELD) == 0) {
574 return true;
575 }
576 if (status != OK) {
577 if (DBCommon::IsRecordError(extend) ||
578 (!DBCommon::IsRecordSuccess(extend) && !DBCommon::IsRecordIgnored(extend) &&
579 !DBCommon::IsRecordVersionConflict(extend))) {
580 return true;
581 }
582 }
583 auto gid = std::get_if<std::string>(&extend.at(CloudDbConstant::GID_FIELD));
584 if (gid == nullptr || (isInsert && (*gid).empty())) {
585 return true;
586 }
587 for (auto &entry : extend) {
588 auto asset = std::get_if<Asset>(&entry.second);
589 if (asset != nullptr && (*asset).assetId.empty()) {
590 return true;
591 }
592 auto assets = std::get_if<Assets>(&entry.second);
593 if (assets != nullptr && IsEmptyAssetId(*assets)) {
594 return true;
595 }
596 }
597 return false;
598 }
599
SetInfo(const CloudWaterType & type,DBStatus status)600 void CloudDBProxy::CloudActionContext::SetInfo(const CloudWaterType &type, DBStatus status)
601 {
602 totalCount_ = record_.size();
603
604 // records_ size should be equal to extend_ or batch data failed.
605 if (record_.size() != extend_.size()) {
606 failedCount_ += record_.size();
607 return;
608 }
609 for (auto &extend : extend_) {
610 if (DBCommon::IsNeedCompensatedForUpload(extend, type)) {
611 continue;
612 }
613 if (IsRecordActionFail(extend, type == CloudWaterType::INSERT, status)) {
614 failedCount_++;
615 } else {
616 successCount_++;
617 }
618 }
619 }
620
SetTableName(const std::string & tableName)621 void CloudDBProxy::CloudActionContext::SetTableName(const std::string &tableName)
622 {
623 std::lock_guard<std::mutex> autoLock(actionMutex_);
624 tableName_ = tableName;
625 }
626
GetTableName()627 std::string CloudDBProxy::CloudActionContext::GetTableName()
628 {
629 std::lock_guard<std::mutex> autoLock(actionMutex_);
630 return tableName_;
631 }
632
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)633 void CloudDBProxy::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
634 {
635 std::lock_guard<std::mutex> autoLock(genVersionMutex_);
636 genVersionCallback_ = callback;
637 LOGI("[CloudDBProxy] Set generate cloud version callback ok");
638 }
639
IsExistCloudVersionCallback() const640 bool CloudDBProxy::IsExistCloudVersionCallback() const
641 {
642 std::lock_guard<std::mutex> autoLock(genVersionMutex_);
643 return genVersionCallback_ != nullptr;
644 }
645
GetCloudVersion(const std::string & originVersion) const646 std::pair<int, std::string> CloudDBProxy::GetCloudVersion(const std::string &originVersion) const
647 {
648 GenerateCloudVersionCallback genVersionCallback;
649 {
650 std::lock_guard<std::mutex> autoLock(genVersionMutex_);
651 if (genVersionCallback_ == nullptr) {
652 return {-E_NOT_SUPPORT, ""};
653 }
654 genVersionCallback = genVersionCallback_;
655 }
656 LOGI("[CloudDBProxy] Begin get cloud version");
657 std::string version = genVersionCallback(originVersion);
658 LOGI("[CloudDBProxy] End get cloud version");
659 return {E_OK, version};
660 }
661
SetPrepareTraceId(const std::string & traceId)662 void CloudDBProxy::SetPrepareTraceId(const std::string &traceId)
663 {
664 std::shared_ptr<ICloudDb> iCloudDb = nullptr;
665 std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
666 if (iCloudDb_ != nullptr) {
667 iCloudDb = iCloudDb_;
668 iCloudDb->SetPrepareTraceId(traceId);
669 }
670 }
671 }
672