1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud_db_proxy.h"
16 #include "cloud/cloud_db_constant.h"
17 #include "db_common.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 
21 namespace DistributedDB {
CloudDBProxy()22 CloudDBProxy::CloudDBProxy()
23     : timeout_(0)
24 {
25 }
26 
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDB)27 void CloudDBProxy::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB)
28 {
29     std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
30     if (!iCloudDb_) {
31         iCloudDb_ = cloudDB;
32     }
33 }
34 
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)35 int CloudDBProxy::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
36 {
37     std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
38     for (const auto &item : cloudDBs) {
39         if (item.second == nullptr) {
40             LOGE("[CloudDBProxy] User %s setCloudDB with nullptr", item.first.c_str());
41             return -E_INVALID_ARGS;
42         }
43     }
44     cloudDbs_ = cloudDBs;
45     return E_OK;
46 }
47 
GetCloudDB() const48 const std::map<std::string, std::shared_ptr<ICloudDb>> CloudDBProxy::GetCloudDB() const
49 {
50     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
51     return cloudDbs_;
52 }
53 
SwitchCloudDB(const std::string & user)54 void CloudDBProxy::SwitchCloudDB(const std::string &user)
55 {
56     std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
57     if (cloudDbs_.find(user) == cloudDbs_.end()) {
58         return;
59     }
60     iCloudDb_ = cloudDbs_[user];
61 }
62 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)63 void CloudDBProxy::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
64 {
65     std::unique_lock<std::shared_mutex> writeLock(assetLoaderMutex_);
66     iAssetLoader_ = loader;
67 }
68 
BatchInsert(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo)69 int CloudDBProxy::BatchInsert(const std::string &tableName, std::vector<VBucket> &record,
70     std::vector<VBucket> &extend, Info &uploadInfo)
71 {
72     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
73     if (iCloudDb_ == nullptr) {
74         return -E_CLOUD_ERROR;
75     }
76     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
77     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
78     context->MoveInRecordAndExtend(record, extend);
79     context->SetTableName(tableName);
80     int errCode = InnerAction(context, cloudDb, INSERT);
81     uploadInfo = context->GetInfo();
82     context->MoveOutRecordAndExtend(record, extend);
83     return errCode;
84 }
85 
BatchUpdate(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo)86 int CloudDBProxy::BatchUpdate(const std::string &tableName, std::vector<VBucket> &record,
87     std::vector<VBucket> &extend, Info &uploadInfo)
88 {
89     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
90     if (iCloudDb_ == nullptr) {
91         return -E_CLOUD_ERROR;
92     }
93     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
94     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
95     context->SetTableName(tableName);
96     context->MoveInRecordAndExtend(record, extend);
97     int errCode = InnerAction(context, cloudDb, UPDATE);
98     uploadInfo = context->GetInfo();
99     context->MoveOutRecordAndExtend(record, extend);
100     return errCode;
101 }
102 
BatchDelete(const std::string & tableName,std::vector<VBucket> & record,std::vector<VBucket> & extend,Info & uploadInfo)103 int CloudDBProxy::BatchDelete(const std::string &tableName, std::vector<VBucket> &record, std::vector<VBucket> &extend,
104     Info &uploadInfo)
105 {
106     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
107     if (iCloudDb_ == nullptr) {
108         return -E_CLOUD_ERROR;
109     }
110     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
111     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
112     context->MoveInRecordAndExtend(record, extend);
113     context->SetTableName(tableName);
114     int errCode = InnerAction(context, cloudDb, DELETE);
115     uploadInfo = context->GetInfo();
116     context->MoveOutRecordAndExtend(record, extend);
117     return errCode;
118 }
119 
Query(const std::string & tableName,VBucket & extend,std::vector<VBucket> & data)120 int CloudDBProxy::Query(const std::string &tableName, VBucket &extend, std::vector<VBucket> &data)
121 {
122     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
123     if (iCloudDb_ == nullptr) {
124         return -E_CLOUD_ERROR;
125     }
126     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
127     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
128     context->MoveInQueryExtendAndData(extend, data);
129     context->SetTableName(tableName);
130     int errCode = InnerAction(context, cloudDb, QUERY);
131     context->MoveOutQueryExtendAndData(extend, data);
132     for (auto &item : data) {
133         for (auto &row : item) {
134             auto assets = std::get_if<Assets>(&row.second);
135             if (assets == nullptr) {
136                 continue;
137             }
138             DBCommon::RemoveDuplicateAssetsData(*assets);
139         }
140     }
141     return errCode;
142 }
143 
Lock()144 std::pair<int, uint64_t> CloudDBProxy::Lock()
145 {
146     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
147     if (iCloudDb_ == nullptr) {
148         return { -E_CLOUD_ERROR, 0u };
149     }
150     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
151     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
152     std::pair<int, uint64_t> lockStatus;
153     int errCode = InnerAction(context, cloudDb, LOCK);
154     context->MoveOutLockStatus(lockStatus);
155     lockStatus.first = errCode;
156     return lockStatus;
157 }
158 
UnLock()159 int CloudDBProxy::UnLock()
160 {
161     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
162     if (iCloudDb_ == nullptr) {
163         return -E_CLOUD_ERROR;
164     }
165     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
166     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
167     return InnerAction(context, cloudDb, UNLOCK);
168 }
169 
Close()170 int CloudDBProxy::Close()
171 {
172     std::shared_ptr<ICloudDb> iCloudDb = nullptr;
173     std::vector<std::shared_ptr<ICloudDb>> waitForClose;
174     {
175         std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
176         if (iCloudDb_ != nullptr) {
177             iCloudDb = iCloudDb_;
178             iCloudDb_ = nullptr;
179         }
180         for (const auto &item : cloudDbs_) {
181             if (iCloudDb == item.second) {
182                 iCloudDb = nullptr;
183             }
184             waitForClose.push_back(item.second);
185         }
186         cloudDbs_.clear();
187     }
188     LOGD("[CloudDBProxy] call cloudDb close begin");
189     DBStatus status = OK;
190     if (iCloudDb != nullptr) {
191         status = iCloudDb->Close();
192     }
193     for (const auto &item : waitForClose) {
194         DBStatus ret = item->Close();
195         status = (status == OK ? ret : status);
196     }
197     if (status != OK) {
198         LOGW("[CloudDBProxy] cloud db close failed %d", static_cast<int>(status));
199     }
200     waitForClose.clear();
201     LOGD("[CloudDBProxy] call cloudDb close end");
202     return status == OK ? E_OK : -E_CLOUD_ERROR;
203 }
204 
HeartBeat()205 int CloudDBProxy::HeartBeat()
206 {
207     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
208     if (iCloudDb_ == nullptr) {
209         return -E_CLOUD_ERROR;
210     }
211 
212     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
213     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
214     return InnerAction(context, cloudDb, HEARTBEAT);
215 }
216 
IsNotExistCloudDB() const217 bool CloudDBProxy::IsNotExistCloudDB() const
218 {
219     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
220     return iCloudDb_ == nullptr && cloudDbs_.empty();
221 }
222 
Download(const std::string & tableName,const std::string & gid,const Type & prefix,std::map<std::string,Assets> & assets)223 int CloudDBProxy::Download(const std::string &tableName, const std::string &gid, const Type &prefix,
224     std::map<std::string, Assets> &assets)
225 {
226     if (assets.empty()) {
227         return E_OK;
228     }
229     std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
230     if (iAssetLoader_ == nullptr) {
231         LOGE("Asset loader has not been set %d", -E_NOT_SET);
232         return -E_NOT_SET;
233     }
234     DBStatus status = iAssetLoader_->Download(tableName, gid, prefix, assets);
235     if (status != OK) {
236         LOGW("[CloudDBProxy] download asset failed %d", static_cast<int>(status));
237     }
238     return GetInnerErrorCode(status);
239 }
240 
RemoveLocalAssets(const std::vector<Asset> & assets)241 int CloudDBProxy::RemoveLocalAssets(const std::vector<Asset> &assets)
242 {
243     if (assets.empty()) {
244         return E_OK;
245     }
246     std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
247     if (iAssetLoader_ == nullptr) {
248         LOGW("Asset loader has not been set");
249         return E_OK;
250     }
251     DBStatus status = iAssetLoader_->RemoveLocalAssets(assets);
252     if (status != OK) {
253         LOGE("[CloudDBProxy] remove local asset failed %d", static_cast<int>(status));
254         return -E_REMOVE_ASSETS_FAILED;
255     }
256     return E_OK;
257 }
258 
RemoveLocalAssets(const std::string & tableName,const std::string & gid,const Type & prefix,std::map<std::string,Assets> & assets)259 int CloudDBProxy::RemoveLocalAssets(const std::string &tableName, const std::string &gid, const Type &prefix,
260     std::map<std::string, Assets> &assets)
261 {
262     if (assets.empty()) {
263         return E_OK;
264     }
265     std::shared_lock<std::shared_mutex> readLock(assetLoaderMutex_);
266     if (iAssetLoader_ == nullptr) {
267         LOGE("Asset loader has not been set %d", -E_NOT_SET);
268         return -E_NOT_SET;
269     }
270     DBStatus status = iAssetLoader_->RemoveLocalAssets(tableName, gid, prefix, assets);
271     if (status != OK) {
272         LOGE("[CloudDBProxy] remove local asset failed %d", static_cast<int>(status));
273         return -E_REMOVE_ASSETS_FAILED;
274     }
275     return E_OK;
276 }
277 
GetEmptyCursor(const std::string & tableName)278 std::pair<int, std::string> CloudDBProxy::GetEmptyCursor(const std::string &tableName)
279 {
280     std::shared_lock<std::shared_mutex> readLock(cloudMutex_);
281     if (iCloudDb_ == nullptr) {
282         return { -E_CLOUD_ERROR, "" };
283     }
284     std::shared_ptr<ICloudDb> cloudDb = iCloudDb_;
285     std::shared_ptr<CloudActionContext> context = std::make_shared<CloudActionContext>();
286     context->SetTableName(tableName);
287     int errCode = InnerAction(context, cloudDb, GET_EMPTY_CURSOR);
288     std::pair<int, std::string> cursorStatus;
289     context->MoveOutCursorStatus(cursorStatus);
290     cursorStatus.first = errCode;
291     return cursorStatus;
292 }
293 
InnerAction(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)294 int CloudDBProxy::InnerAction(const std::shared_ptr<CloudActionContext> &context,
295     const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
296 {
297     if (action >= InnerActionCode::INVALID_ACTION) {
298         return -E_INVALID_ARGS;
299     }
300     InnerActionTask(context, cloudDb, action);
301     return context->GetActionRes();
302 }
303 
DMLActionTask(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)304 DBStatus CloudDBProxy::DMLActionTask(const std::shared_ptr<CloudActionContext> &context,
305     const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
306 {
307     DBStatus status = OK;
308     std::vector<VBucket> record;
309     std::vector<VBucket> extend;
310     context->MoveOutRecordAndExtend(record, extend);
311 
312     switch (action) {
313         case INSERT: {
314             status = cloudDb->BatchInsert(context->GetTableName(), std::move(record), extend);
315             context->MoveInRecordAndExtend(record, extend);
316             context->SetInfo(CloudWaterType::INSERT, status);
317             break;
318         }
319         case UPDATE: {
320             status = cloudDb->BatchUpdate(context->GetTableName(), std::move(record), extend);
321             context->MoveInRecordAndExtend(record, extend);
322             context->SetInfo(CloudWaterType::UPDATE, status);
323             break;
324         }
325         case DELETE: {
326             status = cloudDb->BatchDelete(context->GetTableName(), extend);
327             context->MoveInRecordAndExtend(record, extend);
328             context->SetInfo(CloudWaterType::DELETE, status);
329             break;
330         }
331         default: {
332             LOGE("DMLActionTask can only be used on INSERT/UPDATE/DELETE.");
333             return INVALID_ARGS;
334         }
335     }
336     if (status == CLOUD_VERSION_CONFLICT) {
337         LOGI("[CloudSyncer] Version conflict during cloud batch upload.");
338     } else if (status != OK) {
339         LOGE("[CloudSyncer] Cloud BATCH UPLOAD failed.");
340     }
341     return status;
342 }
343 
InnerActionTask(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb,InnerActionCode action)344 void CloudDBProxy::InnerActionTask(const std::shared_ptr<CloudActionContext> &context,
345     const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action)
346 {
347     DBStatus status = OK;
348     bool setResAlready = false;
349     LOGD("[CloudDBProxy] action %" PRIu8 " begin", static_cast<uint8_t>(action));
350     switch (action) {
351         case INSERT:
352         case UPDATE:
353         case DELETE:
354             status = DMLActionTask(context, cloudDb, action);
355             break;
356         case QUERY: {
357             status = QueryAction(context, cloudDb);
358             if (status == QUERY_END) {
359                 setResAlready = true;
360             }
361             break;
362         }
363         case GET_EMPTY_CURSOR:
364             status = InnerActionGetEmptyCursor(context, cloudDb);
365             break;
366         case LOCK:
367             status = InnerActionLock(context, cloudDb);
368             break;
369         case UNLOCK:
370             status = cloudDb->UnLock();
371             break;
372         case HEARTBEAT:
373             status = cloudDb->HeartBeat();
374             break;
375         default: // should not happen
376             status = DB_ERROR;
377     }
378     LOGD("[CloudDBProxy] action %" PRIu8 " end res:%d", static_cast<uint8_t>(action), static_cast<int>(status));
379 
380     if (!setResAlready) {
381         context->SetActionRes(GetInnerErrorCode(status));
382     }
383 
384     context->FinishAndNotify();
385 }
386 
InnerActionLock(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)387 DBStatus CloudDBProxy::InnerActionLock(const std::shared_ptr<CloudActionContext> &context,
388     const std::shared_ptr<ICloudDb> &cloudDb)
389 {
390     DBStatus status = OK;
391     std::pair<int, uint64_t> lockRet;
392     std::pair<DBStatus, uint64_t> lockStatus = cloudDb->Lock();
393     if (lockStatus.first != OK) {
394         status = lockStatus.first;
395     } else if (lockStatus.second == 0) {
396         status = CLOUD_ERROR;
397     }
398     lockRet.second = lockStatus.second;
399     lockRet.first = GetInnerErrorCode(status);
400     context->MoveInLockStatus(lockRet);
401     return status;
402 }
403 
InnerActionGetEmptyCursor(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)404 DBStatus CloudDBProxy::InnerActionGetEmptyCursor(const std::shared_ptr<CloudActionContext> &context,
405     const std::shared_ptr<ICloudDb> &cloudDb)
406 {
407     std::string tableName = context->GetTableName();
408     std::pair<DBStatus, std::string> cursorStatus = cloudDb->GetEmptyCursor(tableName);
409     DBStatus status = OK;
410     if (cursorStatus.first != OK) {
411         status = cursorStatus.first;
412     }
413     std::pair<int, std::string> cursorRet;
414     cursorRet.second = cursorStatus.second;
415     cursorRet.first = GetInnerErrorCode(status);
416     context->MoveInCursorStatus(cursorRet);
417     return status;
418 }
419 
GetInnerErrorCode(DBStatus status)420 int CloudDBProxy::GetInnerErrorCode(DBStatus status)
421 {
422     if (status < DB_ERROR || status >= BUTT_STATUS) {
423         return static_cast<int>(status);
424     }
425     switch (status) {
426         case OK:
427             return E_OK;
428         case CLOUD_NETWORK_ERROR:
429             return -E_CLOUD_NETWORK_ERROR;
430         case CLOUD_SYNC_UNSET:
431             return -E_CLOUD_SYNC_UNSET;
432         case CLOUD_FULL_RECORDS:
433             return -E_CLOUD_FULL_RECORDS;
434         case CLOUD_LOCK_ERROR:
435             return -E_CLOUD_LOCK_ERROR;
436         case CLOUD_ASSET_SPACE_INSUFFICIENT:
437             return -E_CLOUD_ASSET_SPACE_INSUFFICIENT;
438         case CLOUD_VERSION_CONFLICT:
439             return -E_CLOUD_VERSION_CONFLICT;
440         case CLOUD_RECORD_EXIST_CONFLICT:
441             return -E_CLOUD_RECORD_EXIST_CONFLICT;
442         default:
443             return -E_CLOUD_ERROR;
444     }
445 }
446 
QueryAction(const std::shared_ptr<CloudActionContext> & context,const std::shared_ptr<ICloudDb> & cloudDb)447 DBStatus CloudDBProxy::QueryAction(const std::shared_ptr<CloudActionContext> &context,
448     const std::shared_ptr<ICloudDb> &cloudDb)
449 {
450     VBucket queryExtend;
451     std::vector<VBucket> data;
452     context->MoveOutQueryExtendAndData(queryExtend, data);
453     DBStatus status = cloudDb->Query(context->GetTableName(), queryExtend, data);
454     context->MoveInQueryExtendAndData(queryExtend, data);
455     if (status == QUERY_END) {
456         context->SetActionRes(-E_QUERY_END);
457     }
458     return status;
459 }
460 
CloudActionContext()461 CloudDBProxy::CloudActionContext::CloudActionContext()
462     : actionFinished_(false),
463       actionRes_(OK),
464       totalCount_(0u),
465       successCount_(0u),
466       failedCount_(0u)
467 {
468 }
469 
MoveInRecordAndExtend(std::vector<VBucket> & record,std::vector<VBucket> & extend)470 void CloudDBProxy::CloudActionContext::MoveInRecordAndExtend(std::vector<VBucket> &record,
471     std::vector<VBucket> &extend)
472 {
473     std::lock_guard<std::mutex> autoLock(actionMutex_);
474     record_ = std::move(record);
475     extend_ = std::move(extend);
476 }
477 
MoveInExtend(std::vector<VBucket> & extend)478 void CloudDBProxy::CloudActionContext::MoveInExtend(std::vector<VBucket> &extend)
479 {
480     std::lock_guard<std::mutex> autoLock(actionMutex_);
481     extend_ = std::move(extend);
482 }
483 
MoveOutRecordAndExtend(std::vector<VBucket> & record,std::vector<VBucket> & extend)484 void CloudDBProxy::CloudActionContext::MoveOutRecordAndExtend(std::vector<VBucket> &record,
485     std::vector<VBucket> &extend)
486 {
487     std::lock_guard<std::mutex> autoLock(actionMutex_);
488     record = std::move(record_);
489     extend = std::move(extend_);
490 }
491 
MoveInQueryExtendAndData(VBucket & extend,std::vector<VBucket> & data)492 void CloudDBProxy::CloudActionContext::MoveInQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data)
493 {
494     std::lock_guard<std::mutex> autoLock(actionMutex_);
495     queryExtend_ = std::move(extend);
496     data_ = std::move(data);
497 }
498 
MoveOutQueryExtendAndData(VBucket & extend,std::vector<VBucket> & data)499 void CloudDBProxy::CloudActionContext::MoveOutQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data)
500 {
501     std::lock_guard<std::mutex> autoLock(actionMutex_);
502     extend = std::move(queryExtend_);
503     data = std::move(data_);
504 }
505 
MoveInLockStatus(std::pair<int,uint64_t> & lockStatus)506 void CloudDBProxy::CloudActionContext::MoveInLockStatus(std::pair<int, uint64_t> &lockStatus)
507 {
508     std::lock_guard<std::mutex> autoLock(actionMutex_);
509     lockStatus_ = std::move(lockStatus);
510 }
511 
MoveOutLockStatus(std::pair<int,uint64_t> & lockStatus)512 void CloudDBProxy::CloudActionContext::MoveOutLockStatus(std::pair<int, uint64_t> &lockStatus)
513 {
514     std::lock_guard<std::mutex> autoLock(actionMutex_);
515     lockStatus = std::move(lockStatus_);
516 }
517 
MoveInCursorStatus(std::pair<int,std::string> & cursorStatus)518 void CloudDBProxy::CloudActionContext::MoveInCursorStatus(std::pair<int, std::string> &cursorStatus)
519 {
520     std::lock_guard<std::mutex> autoLock(actionMutex_);
521     cursorStatus_ = std::move(cursorStatus);
522 }
523 
MoveOutCursorStatus(std::pair<int,std::string> & cursorStatus)524 void CloudDBProxy::CloudActionContext::MoveOutCursorStatus(std::pair<int, std::string> &cursorStatus)
525 {
526     std::lock_guard<std::mutex> autoLock(actionMutex_);
527     cursorStatus = std::move(cursorStatus_);
528 }
529 
FinishAndNotify()530 void CloudDBProxy::CloudActionContext::FinishAndNotify()
531 {
532     {
533         std::lock_guard<std::mutex> autoLock(actionMutex_);
534         actionFinished_ = true;
535     }
536     actionCv_.notify_all();
537 }
538 
SetActionRes(int res)539 void CloudDBProxy::CloudActionContext::SetActionRes(int res)
540 {
541     std::lock_guard<std::mutex> autoLock(actionMutex_);
542     actionRes_ = res;
543 }
544 
GetActionRes()545 int CloudDBProxy::CloudActionContext::GetActionRes()
546 {
547     std::lock_guard<std::mutex> autoLock(actionMutex_);
548     return actionRes_;
549 }
550 
GetInfo()551 Info CloudDBProxy::CloudActionContext::GetInfo()
552 {
553     std::lock_guard<std::mutex> autoLock(actionMutex_);
554     Info info;
555     info.total = totalCount_;
556     info.successCount = successCount_;
557     info.failCount = failedCount_;
558     return info;
559 }
560 
IsEmptyAssetId(const Assets & assets)561 bool CloudDBProxy::CloudActionContext::IsEmptyAssetId(const Assets &assets)
562 {
563     for (auto &asset : assets) {
564         if (asset.assetId.empty()) {
565             return true;
566         }
567     }
568     return false;
569 }
570 
IsRecordActionFail(const VBucket & extend,bool isInsert,DBStatus status)571 bool CloudDBProxy::CloudActionContext::IsRecordActionFail(const VBucket &extend, bool isInsert, DBStatus status)
572 {
573     if (extend.count(CloudDbConstant::GID_FIELD) == 0) {
574         return true;
575     }
576     if (status != OK) {
577         if (DBCommon::IsRecordError(extend) ||
578             (!DBCommon::IsRecordSuccess(extend) && !DBCommon::IsRecordIgnored(extend) &&
579             !DBCommon::IsRecordVersionConflict(extend))) {
580             return true;
581         }
582     }
583     auto gid = std::get_if<std::string>(&extend.at(CloudDbConstant::GID_FIELD));
584     if (gid == nullptr || (isInsert && (*gid).empty())) {
585         return true;
586     }
587     for (auto &entry : extend) {
588         auto asset = std::get_if<Asset>(&entry.second);
589         if (asset != nullptr && (*asset).assetId.empty()) {
590             return true;
591         }
592         auto assets = std::get_if<Assets>(&entry.second);
593         if (assets != nullptr && IsEmptyAssetId(*assets)) {
594             return true;
595         }
596     }
597     return false;
598 }
599 
SetInfo(const CloudWaterType & type,DBStatus status)600 void CloudDBProxy::CloudActionContext::SetInfo(const CloudWaterType &type, DBStatus status)
601 {
602     totalCount_ = record_.size();
603 
604     // records_ size should be equal to extend_ or batch data failed.
605     if (record_.size() != extend_.size()) {
606         failedCount_ += record_.size();
607         return;
608     }
609     for (auto &extend : extend_) {
610         if (DBCommon::IsNeedCompensatedForUpload(extend, type)) {
611             continue;
612         }
613         if (IsRecordActionFail(extend, type == CloudWaterType::INSERT, status)) {
614             failedCount_++;
615         } else {
616             successCount_++;
617         }
618     }
619 }
620 
SetTableName(const std::string & tableName)621 void CloudDBProxy::CloudActionContext::SetTableName(const std::string &tableName)
622 {
623     std::lock_guard<std::mutex> autoLock(actionMutex_);
624     tableName_ = tableName;
625 }
626 
GetTableName()627 std::string CloudDBProxy::CloudActionContext::GetTableName()
628 {
629     std::lock_guard<std::mutex> autoLock(actionMutex_);
630     return tableName_;
631 }
632 
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)633 void CloudDBProxy::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
634 {
635     std::lock_guard<std::mutex> autoLock(genVersionMutex_);
636     genVersionCallback_ = callback;
637     LOGI("[CloudDBProxy] Set generate cloud version callback ok");
638 }
639 
IsExistCloudVersionCallback() const640 bool CloudDBProxy::IsExistCloudVersionCallback() const
641 {
642     std::lock_guard<std::mutex> autoLock(genVersionMutex_);
643     return genVersionCallback_ != nullptr;
644 }
645 
GetCloudVersion(const std::string & originVersion) const646 std::pair<int, std::string> CloudDBProxy::GetCloudVersion(const std::string &originVersion) const
647 {
648     GenerateCloudVersionCallback genVersionCallback;
649     {
650         std::lock_guard<std::mutex> autoLock(genVersionMutex_);
651         if (genVersionCallback_ == nullptr) {
652             return {-E_NOT_SUPPORT, ""};
653         }
654         genVersionCallback = genVersionCallback_;
655     }
656     LOGI("[CloudDBProxy] Begin get cloud version");
657     std::string version = genVersionCallback(originVersion);
658     LOGI("[CloudDBProxy] End get cloud version");
659     return {E_OK, version};
660 }
661 
SetPrepareTraceId(const std::string & traceId)662 void CloudDBProxy::SetPrepareTraceId(const std::string &traceId)
663 {
664     std::shared_ptr<ICloudDb> iCloudDb = nullptr;
665     std::unique_lock<std::shared_mutex> writeLock(cloudMutex_);
666     if (iCloudDb_ != nullptr) {
667         iCloudDb = iCloudDb_;
668         iCloudDb->SetPrepareTraceId(traceId);
669     }
670 }
671 }
672