1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud_syncer.h"
16 
17 #include <cstdint>
18 #include <unordered_map>
19 #include <utility>
20 
21 #include "cloud/asset_operation_utils.h"
22 #include "cloud/cloud_db_constant.h"
23 #include "cloud/cloud_storage_utils.h"
24 #include "cloud/icloud_db.h"
25 #include "cloud_sync_tag_assets.h"
26 #include "cloud_sync_utils.h"
27 #include "db_dfx_adapter.h"
28 #include "db_errno.h"
29 #include "log_print.h"
30 #include "runtime_context.h"
31 #include "storage_proxy.h"
32 #include "store_types.h"
33 #include "strategy_factory.h"
34 #include "version.h"
35 
36 namespace DistributedDB {
CloudSyncer(std::shared_ptr<StorageProxy> storageProxy,bool isKvScene,SingleVerConflictResolvePolicy policy)37 CloudSyncer::CloudSyncer(
38     std::shared_ptr<StorageProxy> storageProxy, bool isKvScene, SingleVerConflictResolvePolicy policy)
39     : lastTaskId_(INVALID_TASK_ID),
40       storageProxy_(std::move(storageProxy)),
41       queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
42       closed_(false),
43       timerId_(0u),
44       isKvScene_(isKvScene),
45       policy_(policy)
46 {
47     if (storageProxy_ != nullptr) {
48         id_ = storageProxy_->GetIdentify();
49     }
50     InitCloudSyncStateMachine();
51 }
52 
Sync(const std::vector<DeviceID> & devices,SyncMode mode,const std::vector<std::string> & tables,const SyncProcessCallback & callback,int64_t waitTime)53 int CloudSyncer::Sync(const std::vector<DeviceID> &devices, SyncMode mode,
54     const std::vector<std::string> &tables, const SyncProcessCallback &callback, int64_t waitTime)
55 {
56     CloudTaskInfo taskInfo;
57     taskInfo.mode = mode;
58     taskInfo.table = tables;
59     taskInfo.callback = callback;
60     taskInfo.timeout = waitTime;
61     taskInfo.devices = devices;
62     for (const auto &item: tables) {
63         QuerySyncObject syncObject;
64         syncObject.SetTableName(item);
65         taskInfo.queryList.push_back(syncObject);
66     }
67     return Sync(taskInfo);
68 }
69 
Sync(const CloudTaskInfo & taskInfo)70 int CloudSyncer::Sync(const CloudTaskInfo &taskInfo)
71 {
72     int errCode = CloudSyncUtils::CheckParamValid(taskInfo.devices, taskInfo.mode);
73     if (errCode != E_OK) {
74         return errCode;
75     }
76     if (cloudDB_.IsNotExistCloudDB()) {
77         LOGE("[CloudSyncer] Not set cloudDB!");
78         return -E_CLOUD_ERROR;
79     }
80     if (closed_) {
81         LOGE("[CloudSyncer] DB is closed!");
82         return -E_DB_CLOSED;
83     }
84     CloudTaskInfo info = taskInfo;
85     info.status = ProcessStatus::PREPARED;
86     errCode = TryToAddSyncTask(std::move(info));
87     if (errCode != E_OK) {
88         return errCode;
89     }
90     if (taskInfo.priorityTask) {
91         MarkCurrentTaskPausedIfNeed();
92     }
93     return TriggerSync();
94 }
95 
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDB)96 void CloudSyncer::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB)
97 {
98     cloudDB_.SetCloudDB(cloudDB);
99     LOGI("[CloudSyncer] SetCloudDB finish");
100 }
101 
GetCloudDB() const102 const std::map<std::string, std::shared_ptr<ICloudDb>> CloudSyncer::GetCloudDB() const
103 {
104     return cloudDB_.GetCloudDB();
105 }
106 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)107 void CloudSyncer::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
108 {
109     storageProxy_->SetIAssetLoader(loader);
110     cloudDB_.SetIAssetLoader(loader);
111     LOGI("[CloudSyncer] SetIAssetLoader finish");
112 }
113 
Close()114 void CloudSyncer::Close()
115 {
116     closed_ = true;
117     CloudSyncer::TaskId currentTask;
118     {
119         std::lock_guard<std::mutex> autoLock(dataLock_);
120         currentTask = currentContext_.currentTaskId;
121     }
122     // mark current task db_closed
123     SetTaskFailed(currentTask, -E_DB_CLOSED);
124     UnlockIfNeed();
125     cloudDB_.Close();
126     WaitCurTaskFinished();
127 
128     // copy all task from queue
129     std::vector<CloudTaskInfo> infoList = CopyAndClearTaskInfos();
130     for (auto &info: infoList) {
131         LOGI("[CloudSyncer] finished taskId %" PRIu64 " with db closed.", info.taskId);
132     }
133     storageProxy_->Close();
134 }
135 
StopAllTasks()136 void CloudSyncer::StopAllTasks()
137 {
138     CloudSyncer::TaskId currentTask;
139     {
140         std::lock_guard<std::mutex> autoLock(dataLock_);
141         currentTask = currentContext_.currentTaskId;
142     }
143     // mark current task user_change
144     SetTaskFailed(currentTask, -E_USER_CHANGE);
145     UnlockIfNeed();
146     WaitCurTaskFinished();
147 
148     std::vector<CloudTaskInfo> infoList = CopyAndClearTaskInfos();
149     for (auto &info: infoList) {
150         LOGI("[CloudSyncer] finished taskId %" PRIu64 " with user changed.", info.taskId);
151         auto processNotifier = std::make_shared<ProcessNotifier>(this);
152         processNotifier->Init(info.table, info.devices, info.users);
153         info.errCode = -E_USER_CHANGE;
154         info.status = ProcessStatus::FINISHED;
155         processNotifier->NotifyProcess(info, {}, true);
156     }
157 }
158 
TriggerSync()159 int CloudSyncer::TriggerSync()
160 {
161     if (closed_) {
162         return -E_DB_CLOSED;
163     }
164     RefObject::IncObjRef(this);
165     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
166         DoSyncIfNeed();
167         RefObject::DecObjRef(this);
168     });
169     if (errCode != E_OK) {
170         LOGW("[CloudSyncer] schedule sync task failed %d", errCode);
171         RefObject::DecObjRef(this);
172     }
173     return errCode;
174 }
175 
SetProxyUser(const std::string & user)176 void CloudSyncer::SetProxyUser(const std::string &user)
177 {
178     std::lock_guard<std::mutex> autoLock(dataLock_);
179     storageProxy_->SetUser(user);
180     currentContext_.notifier->SetUser(user);
181     currentContext_.currentUserIndex = currentContext_.currentUserIndex + 1;
182     cloudDB_.SwitchCloudDB(user);
183 }
184 
DoSyncIfNeed()185 void CloudSyncer::DoSyncIfNeed()
186 {
187     if (closed_) {
188         return;
189     }
190     // do all sync task in this loop
191     do {
192         // get taskId from queue
193         TaskId triggerTaskId = GetNextTaskId();
194         if (triggerTaskId == INVALID_TASK_ID) {
195             LOGD("[CloudSyncer] task queue empty");
196             break;
197         }
198         // pop taskId in queue
199         if (PrepareSync(triggerTaskId) != E_OK) {
200             break;
201         }
202         // do sync logic
203         std::vector<std::string> usersList;
204         {
205             std::lock_guard<std::mutex> autoLock(dataLock_);
206             usersList = cloudTaskInfos_[triggerTaskId].users;
207             currentContext_.currentUserIndex = 0;
208         }
209         int errCode = E_OK;
210         if (usersList.empty()) {
211             SetProxyUser("");
212             errCode = DoSync(triggerTaskId);
213         } else {
214             for (const auto &user : usersList) {
215                 SetProxyUser(user);
216                 errCode = DoSync(triggerTaskId);
217             }
218         }
219         LOGD("[CloudSyncer] DoSync finished, errCode %d", errCode);
220     } while (!closed_);
221     LOGD("[CloudSyncer] DoSyncIfNeed finished, closed status %d", static_cast<int>(closed_));
222 }
223 
DoSync(TaskId taskId)224 int CloudSyncer::DoSync(TaskId taskId)
225 {
226     std::lock_guard<std::mutex> lock(syncMutex_);
227     ResetCurrentTableUploadBatchIndex();
228     CloudTaskInfo taskInfo;
229     {
230         std::lock_guard<std::mutex> autoLock(dataLock_);
231         taskInfo = cloudTaskInfos_[taskId];
232         cloudDB_.SetPrepareTraceId(taskInfo.prepareTraceId); // SetPrepareTraceId before task started
233         LOGD("[CloudSyncer] DoSync get taskInfo, taskId is: %llu, prepareTraceId is:%s.",
234             static_cast<unsigned long long>(taskInfo.taskId), taskInfo.prepareTraceId.c_str());
235     }
236     bool needUpload = true;
237     bool isNeedFirstDownload = false;
238     {
239         std::lock_guard<std::mutex> autoLock(dataLock_);
240         needUpload = currentContext_.strategy->JudgeUpload();
241         // 1. if the locker is already exist, directly reuse the lock, no need do the first download
242         // 2. if the task(resume task) is already be tagged need upload data, no need do the first download
243         isNeedFirstDownload = (currentContext_.locker == nullptr) && (!currentContext_.isNeedUpload);
244     }
245     int errCode = E_OK;
246     bool isFirstDownload = true;
247     if (isNeedFirstDownload) {
248         // do first download
249         errCode = DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
250         SetTaskFailed(taskId, errCode);
251         if (errCode != E_OK) {
252             SyncMachineDoFinished();
253             return errCode;
254         }
255         bool isActuallyNeedUpload = false;  // whether the task actually has data to upload
256         {
257             std::lock_guard<std::mutex> autoLock(dataLock_);
258             isActuallyNeedUpload = currentContext_.isNeedUpload;
259         }
260         if (!isActuallyNeedUpload) {
261             LOGI("[CloudSyncer] no table need upload!");
262             SyncMachineDoFinished();
263             return E_OK;
264         }
265         isFirstDownload = false;
266     }
267 
268     {
269         std::lock_guard<std::mutex> autoLock(dataLock_);
270         currentContext_.isFirstDownload = isFirstDownload;
271         currentContext_.isRealNeedUpload = needUpload;
272     }
273     errCode = DoSyncInner(taskInfo);
274     return errCode;
275 }
276 
PrepareAndUpload(const CloudTaskInfo & taskInfo,size_t index)277 int CloudSyncer::PrepareAndUpload(const CloudTaskInfo &taskInfo, size_t index)
278 {
279     {
280         std::lock_guard<std::mutex> autoLock(dataLock_);
281         currentContext_.tableName = taskInfo.table[index];
282     }
283     int errCode = CheckTaskIdValid(taskInfo.taskId);
284     if (errCode != E_OK) {
285         LOGE("[CloudSyncer] task is invalid, abort sync");
286         return errCode;
287     }
288     if (taskInfo.table.empty()) {
289         LOGE("[CloudSyncer] Invalid taskInfo table");
290         return -E_INVALID_ARGS;
291     }
292     errCode = DoUpload(taskInfo.taskId, index == (taskInfo.table.size() - 1u), taskInfo.lockAction);
293     if (errCode == -E_CLOUD_VERSION_CONFLICT) {
294         {
295             std::lock_guard<std::mutex> autoLock(dataLock_);
296             currentContext_.processRecorder->MarkDownloadFinish(currentContext_.currentUserIndex,
297                 taskInfo.table[index], false);
298             LOGI("[CloudSyncer] upload version conflict, index:%zu", index);
299         }
300         return errCode;
301     }
302     if (errCode != E_OK) {
303         LOGE("[CloudSyncer] upload failed %d", errCode);
304         return errCode;
305     }
306     return errCode;
307 }
308 
DoUploadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload)309 int CloudSyncer::DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload)
310 {
311     if (!needUpload) {
312         return E_OK;
313     }
314     int errCode = storageProxy_->StartTransaction();
315     if (errCode != E_OK) {
316         LOGE("[CloudSyncer] start transaction failed before doing upload.");
317         return errCode;
318     }
319     for (size_t i = 0u; i < taskInfo.table.size(); ++i) {
320         LOGD("[CloudSyncer] try upload table, index: %zu", i);
321         if (IsTableFinishInUpload(taskInfo.table[i])) {
322             continue;
323         }
324         errCode = PrepareAndUpload(taskInfo, i);
325         if (errCode == -E_TASK_PAUSED) { // should re download  [paused table , last table]
326             for (size_t j = i; j < taskInfo.table.size(); ++j) {
327                 MarkDownloadFinishIfNeed(taskInfo.table[j], false);
328             }
329         }
330         if (errCode != E_OK) {
331             break;
332         }
333         MarkUploadFinishIfNeed(taskInfo.table[i]);
334     }
335     if (errCode == -E_TASK_PAUSED) {
336         std::lock_guard<std::mutex> autoLock(dataLock_);
337         resumeTaskInfos_[taskInfo.taskId].upload = true;
338     }
339     if (errCode == E_OK || errCode == -E_TASK_PAUSED) {
340         int commitErrorCode = storageProxy_->Commit();
341         if (commitErrorCode != E_OK) {
342             LOGE("[CloudSyncer] cannot commit transaction: %d.", commitErrorCode);
343         }
344     } else {
345         int rollBackErrorCode = storageProxy_->Rollback();
346         if (rollBackErrorCode != E_OK) {
347             LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
348         }
349     }
350     return errCode;
351 }
352 
SyncMachineDoDownload()353 CloudSyncEvent CloudSyncer::SyncMachineDoDownload()
354 {
355     CloudTaskInfo taskInfo;
356     bool needUpload;
357     bool isFirstDownload;
358     {
359         std::lock_guard<std::mutex> autoLock(dataLock_);
360         taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
361         needUpload = currentContext_.isRealNeedUpload;
362         isFirstDownload = currentContext_.isFirstDownload;
363     }
364     int errCode = E_OK;
365     if (IsLockInDownload()) {
366         errCode = LockCloudIfNeed(taskInfo.taskId);
367     }
368     if (errCode != E_OK) {
369         return SetCurrentTaskFailedInMachine(errCode);
370     }
371     errCode = DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
372     if (errCode != E_OK) {
373         if (errCode == -E_TASK_PAUSED) {
374             DBDfxAdapter::ReportBehavior(
375                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::CANCLE, errCode});
376         } else {
377             DBDfxAdapter::ReportBehavior(
378                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::FAIL, errCode});
379         }
380         return SetCurrentTaskFailedInMachine(errCode);
381     }
382     DBDfxAdapter::ReportBehavior(
383         {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::SUCC, errCode});
384     return CloudSyncEvent::DOWNLOAD_FINISHED_EVENT;
385 }
386 
SyncMachineDoUpload()387 CloudSyncEvent CloudSyncer::SyncMachineDoUpload()
388 {
389     CloudTaskInfo taskInfo;
390     bool needUpload;
391     {
392         std::lock_guard<std::mutex> autoLock(dataLock_);
393         taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
394         needUpload = currentContext_.isRealNeedUpload;
395     }
396     DBDfxAdapter::ReportBehavior(
397         {__func__, Scene::CLOUD_SYNC, State::BEGIN, Stage::CLOUD_UPLOAD, StageResult::SUCC, E_OK});
398     int errCode = DoUploadInNeed(taskInfo, needUpload);
399     if (errCode == -E_CLOUD_VERSION_CONFLICT) {
400         DBDfxAdapter::ReportBehavior(
401             {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::FAIL, errCode});
402         return CloudSyncEvent::REPEAT_CHECK_EVENT;
403     }
404     if (errCode != E_OK) {
405         {
406             std::lock_guard<std::mutex> autoLock(dataLock_);
407             cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
408         }
409         if (errCode == -E_TASK_PAUSED) {
410             DBDfxAdapter::ReportBehavior(
411                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::CANCLE, errCode});
412         } else {
413             DBDfxAdapter::ReportBehavior(
414                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::FAIL, errCode});
415         }
416         return CloudSyncEvent::ERROR_EVENT;
417     }
418     DBDfxAdapter::ReportBehavior(
419         {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::SUCC, errCode});
420     return CloudSyncEvent::UPLOAD_FINISHED_EVENT;
421 }
422 
SyncMachineDoFinished()423 CloudSyncEvent CloudSyncer::SyncMachineDoFinished()
424 {
425     UnlockIfNeed();
426     TaskId taskId;
427     int errCode;
428     int currentUserIndex;
429     int userListSize;
430     {
431         std::lock_guard<std::mutex> autoLock(dataLock_);
432         taskId = currentContext_.currentTaskId;
433         errCode = cloudTaskInfos_[currentContext_.currentTaskId].errCode;
434         currentUserIndex = currentContext_.currentUserIndex;
435         userListSize = static_cast<int>(cloudTaskInfos_[taskId].users.size());
436     }
437     if (currentUserIndex >= userListSize) {
438         {
439             std::lock_guard<std::mutex> autoLock(dataLock_);
440             cloudTaskInfos_[currentContext_.currentTaskId].errCode = E_OK;
441         }
442         DoFinished(taskId, errCode);
443     } else {
444         CloudTaskInfo taskInfo;
445         {
446             std::lock_guard<std::mutex> autoLock(dataLock_);
447             taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
448         }
449         taskInfo.status = ProcessStatus::FINISHED;
450         currentContext_.notifier->NotifyProcess(taskInfo, {}, true);
451         {
452             std::lock_guard<std::mutex> autoLock(dataLock_);
453             cloudTaskInfos_[currentContext_.currentTaskId].errCode = E_OK;
454         }
455     }
456     return CloudSyncEvent::ALL_TASK_FINISHED_EVENT;
457 }
458 
DoSyncInner(const CloudTaskInfo & taskInfo)459 int CloudSyncer::DoSyncInner(const CloudTaskInfo &taskInfo)
460 {
461     cloudSyncStateMachine_.SwitchStateAndStep(CloudSyncEvent::START_SYNC_EVENT);
462     DBDfxAdapter::ReportBehavior(
463         {__func__, Scene::CLOUD_SYNC, State::BEGIN, Stage::CLOUD_SYNC, StageResult::SUCC, E_OK});
464     return E_OK;
465 }
466 
DoFinished(TaskId taskId,int errCode)467 void CloudSyncer::DoFinished(TaskId taskId, int errCode)
468 {
469     storageProxy_->OnSyncFinish();
470     if (errCode == -E_TASK_PAUSED) {
471         DBDfxAdapter::ReportBehavior(
472             {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_SYNC, StageResult::CANCLE, errCode});
473         LOGD("[CloudSyncer] taskId %" PRIu64 " was paused, it won't be finished now", taskId);
474         {
475             std::lock_guard<std::mutex> autoLock(dataLock_);
476             resumeTaskInfos_[taskId].context = std::move(currentContext_);
477             currentContext_.locker = resumeTaskInfos_[taskId].context.locker;
478             resumeTaskInfos_[taskId].context.locker = nullptr;
479             ClearCurrentContextWithoutLock();
480         }
481         contextCv_.notify_one();
482         return;
483     }
484     {
485         std::lock_guard<std::mutex> autoLock(dataLock_);
486         taskQueue_.remove(taskId);
487         priorityTaskQueue_.remove(taskId);
488     }
489     ClearContextAndNotify(taskId, errCode);
490 }
491 
492 /**
493  * UpdateChangedData will be used for Insert case, which we can only get rowid after we saved data in db.
494 */
UpdateChangedData(SyncParam & param,DownloadList & assetsDownloadList)495 int CloudSyncer::UpdateChangedData(SyncParam &param, DownloadList &assetsDownloadList)
496 {
497     if (param.withoutRowIdData.insertData.empty() && param.withoutRowIdData.updateData.empty()) {
498         return E_OK;
499     }
500     int ret = E_OK;
501     for (size_t j : param.withoutRowIdData.insertData) {
502         VBucket &datum = param.downloadData.data[j];
503         std::vector<Type> primaryValues;
504         ret = CloudSyncUtils::GetCloudPkVals(datum, param.changedData.field,
505             std::get<int64_t>(datum[CloudDbConstant::ROW_ID_FIELD_NAME]), primaryValues);
506         if (ret != E_OK) {
507             LOGE("[CloudSyncer] updateChangedData cannot get primaryValues");
508             return ret;
509         }
510         param.changedData.primaryData[ChangeType::OP_INSERT].push_back(primaryValues);
511     }
512     for (const auto &tuple : param.withoutRowIdData.assetInsertData) {
513         size_t downloadIndex = std::get<0>(tuple);
514         VBucket &datum = param.downloadData.data[downloadIndex];
515         size_t insertIdx = std::get<1>(tuple);
516         std::vector<Type> &pkVal = std::get<5>(assetsDownloadList[insertIdx]); // 5 means primary key list
517         pkVal[0] = datum[CloudDbConstant::ROW_ID_FIELD_NAME];
518     }
519     for (const auto &tuple : param.withoutRowIdData.updateData) {
520         size_t downloadIndex = std::get<0>(tuple);
521         size_t updateIndex = std::get<1>(tuple);
522         VBucket &datum = param.downloadData.data[downloadIndex];
523         size_t size = param.changedData.primaryData[ChangeType::OP_UPDATE].size();
524         if (updateIndex >= size) {
525             LOGE("[CloudSyncer] updateIndex is invalid. index=%zu, size=%zu", updateIndex, size);
526             return -E_INTERNAL_ERROR;
527         }
528         if (param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex].empty()) {
529             LOGE("[CloudSyncer] primary key value list should not be empty.");
530             return -E_INTERNAL_ERROR;
531         }
532         // no primary key or composite primary key, the first element is rowid
533         param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex][0] =
534             datum[CloudDbConstant::ROW_ID_FIELD_NAME];
535     }
536     return ret;
537 }
538 
IsDataContainDuplicateAsset(const std::vector<Field> & assetFields,VBucket & data)539 bool CloudSyncer::IsDataContainDuplicateAsset(const std::vector<Field> &assetFields, VBucket &data)
540 {
541     for (const auto &assetField : assetFields) {
542         if (assetField.type == TYPE_INDEX<Assets> && data[assetField.colName].index() == TYPE_INDEX<Assets>) {
543             if (CloudStorageUtils::IsAssetsContainDuplicateAsset(std::get<Assets>(data[assetField.colName]))) {
544                 return true;
545             }
546         }
547     }
548     return false;
549 }
550 
IsDataContainAssets()551 bool CloudSyncer::IsDataContainAssets()
552 {
553     std::lock_guard<std::mutex> autoLock(dataLock_);
554     bool hasTable = (currentContext_.assetFields.find(currentContext_.tableName) != currentContext_.assetFields.end());
555     if (!hasTable) {
556         LOGW("[CloudSyncer] failed to get assetFields, because tableName doesn't exist in currentContext, %d.",
557             -E_INTERNAL_ERROR);
558             return false;
559     }
560     if (currentContext_.assetFields[currentContext_.tableName].empty()) {
561         return false;
562     }
563     return true;
564 }
565 
TagAssetsInSingleRecord(VBucket & coveredData,VBucket & beCoveredData,bool setNormalStatus,int & errCode)566 std::map<std::string, Assets> CloudSyncer::TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData,
567     bool setNormalStatus, int &errCode)
568 {
569     // Define a map to store the result
570     std::map<std::string, Assets> res = {};
571     std::vector<Field> assetFields;
572     {
573         std::lock_guard<std::mutex> autoLock(dataLock_);
574         assetFields = currentContext_.assetFields[currentContext_.tableName];
575     }
576     // For every column contain asset or assets, assetFields are in context
577     for (const Field &assetField : assetFields) {
578         Assets assets = TagAssetsInSingleCol(coveredData, beCoveredData, assetField, setNormalStatus, errCode);
579         if (!assets.empty()) {
580             res[assetField.colName] = assets;
581         }
582         if (errCode != E_OK) {
583             break;
584         }
585     }
586     return res;
587 }
588 
FillCloudAssets(const std::string & tableName,VBucket & normalAssets,VBucket & failedAssets)589 int CloudSyncer::FillCloudAssets(const std::string &tableName, VBucket &normalAssets,
590     VBucket &failedAssets)
591 {
592     int ret = E_OK;
593     if (normalAssets.size() > 1) {
594         ret = storageProxy_->FillCloudAssetForDownload(tableName, normalAssets, true);
595         if (ret != E_OK) {
596             LOGE("[CloudSyncer] Can not fill normal cloud assets for download");
597             return ret;
598         }
599     }
600     if (failedAssets.size() > 1) {
601         ret = storageProxy_->FillCloudAssetForDownload(tableName, failedAssets, false);
602         if (ret != E_OK) {
603             LOGE("[CloudSyncer] Can not fill abnormal assets for download");
604             return ret;
605         }
606     }
607     return E_OK;
608 }
609 
HandleDownloadResult(const DownloadItem & downloadItem,const std::string & tableName,DownloadCommitList & commitList,uint32_t & successCount)610 int CloudSyncer::HandleDownloadResult(const DownloadItem &downloadItem, const std::string &tableName,
611     DownloadCommitList &commitList, uint32_t &successCount)
612 {
613     successCount = 0;
614     int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
615     if (errCode != E_OK) {
616         LOGE("[CloudSyncer] start transaction Failed before handle download.");
617         return errCode;
618     }
619     errCode = CommitDownloadAssets(downloadItem, tableName, commitList, successCount);
620     if (errCode != E_OK) {
621         successCount = 0;
622         int ret = E_OK;
623         if (errCode == -E_REMOVE_ASSETS_FAILED) {
624             // remove assets failed no effect to asset status, just commit
625             ret = storageProxy_->Commit();
626         } else {
627             ret = storageProxy_->Rollback();
628         }
629         LOGE("[CloudSyncer] commit download assets failed %d commit/rollback ret %d", errCode, ret);
630         return errCode;
631     }
632     errCode = storageProxy_->Commit();
633     if (errCode != E_OK) {
634         successCount = 0;
635         LOGE("[CloudSyncer] commit failed %d", errCode);
636     }
637     return errCode;
638 }
639 
CloudDbDownloadAssets(TaskId taskId,InnerProcessInfo & info,const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)640 int CloudSyncer::CloudDbDownloadAssets(TaskId taskId, InnerProcessInfo &info, const DownloadList &downloadList,
641     const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
642 {
643     int downloadStatus = E_OK;
644     {
645         std::lock_guard<std::mutex> autoLock(dataLock_);
646         downloadStatus = resumeTaskInfos_[taskId].downloadStatus;
647         resumeTaskInfos_[taskId].downloadStatus = E_OK;
648     }
649     int errorCode = E_OK;
650     DownloadCommitList commitList;
651     for (size_t i = GetDownloadAssetIndex(taskId); i < downloadList.size(); i++) {
652         errorCode = CheckTaskIdValid(taskId);
653         if (errorCode != E_OK) {
654             std::lock_guard<std::mutex> autoLock(dataLock_);
655             resumeTaskInfos_[taskId].lastDownloadIndex = i;
656             resumeTaskInfos_[taskId].downloadStatus = downloadStatus;
657             break;
658         }
659         DownloadItem downloadItem;
660         GetDownloadItem(downloadList, i, downloadItem);
661         errorCode = DownloadOneAssetRecord(dupHashKeySet, downloadList, downloadItem, info, changedAssets);
662         if (errorCode == -E_NOT_SET) {
663             info.downLoadInfo.failCount += (downloadList.size() - i);
664             info.downLoadInfo.successCount -= (downloadList.size() - i);
665             return errorCode;
666         }
667         if (downloadItem.strategy == OpType::DELETE) {
668             downloadItem.assets = {};
669             downloadItem.gid = "";
670         }
671         // Process result of each asset
672         commitList.push_back(std::make_tuple(downloadItem.gid, std::move(downloadItem.assets), errorCode == E_OK));
673         downloadStatus = downloadStatus == E_OK ? errorCode : downloadStatus;
674         int ret = CommitDownloadResult(downloadItem, info, commitList, errorCode);
675         if (ret != E_OK && ret != -E_REMOVE_ASSETS_FAILED) {
676             return ret;
677         }
678         downloadStatus = downloadStatus == E_OK ? ret : downloadStatus;
679     }
680     LOGD("Download status is %d", downloadStatus);
681     return errorCode == E_OK ? downloadStatus : errorCode;
682 }
683 
DownloadAssets(InnerProcessInfo & info,const std::vector<std::string> & pKColNames,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)684 int CloudSyncer::DownloadAssets(InnerProcessInfo &info, const std::vector<std::string> &pKColNames,
685     const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
686 {
687     if (!IsDataContainAssets()) {
688         return E_OK;
689     }
690     // update changed data info
691     if (!CloudSyncUtils::IsChangeDataEmpty(changedAssets)) {
692         // changedData.primaryData should have no member inside
693         return -E_INVALID_ARGS;
694     }
695     changedAssets.tableName = info.tableName;
696     changedAssets.type = ChangedDataType::ASSET;
697     changedAssets.field = pKColNames;
698 
699     // Get AssetDownloadList
700     DownloadList changeList;
701     TaskId taskId;
702     {
703         std::lock_guard<std::mutex> autoLock(dataLock_);
704         changeList = currentContext_.assetDownloadList;
705         taskId = currentContext_.currentTaskId;
706     }
707     // Download data (include deleting) will handle return Code in this situation
708     int ret = CloudDbDownloadAssets(taskId, info, changeList, dupHashKeySet, changedAssets);
709     if (ret != E_OK) {
710         LOGE("[CloudSyncer] Can not download assets or can not handle download result %d", ret);
711     }
712     return ret;
713 }
714 
GetAssetsFromVBucket(VBucket & data)715 std::map<std::string, Assets> CloudSyncer::GetAssetsFromVBucket(VBucket &data)
716 {
717     std::map<std::string, Assets> assets;
718     std::vector<Field> fields;
719     {
720         std::lock_guard<std::mutex> autoLock(dataLock_);
721         fields = currentContext_.assetFields[currentContext_.tableName];
722     }
723     for (const auto &field : fields) {
724         if (data.find(field.colName) != data.end()) {
725             if (field.type == TYPE_INDEX<Asset> && data[field.colName].index() == TYPE_INDEX<Asset>) {
726                 assets[field.colName] = { std::get<Asset>(data[field.colName]) };
727             } else if (field.type == TYPE_INDEX<Assets> && data[field.colName].index() == TYPE_INDEX<Assets>) {
728                 assets[field.colName] = std::get<Assets>(data[field.colName]);
729             } else {
730                 Assets emptyAssets;
731                 assets[field.colName] = emptyAssets;
732             }
733         }
734     }
735     return assets;
736 }
737 
TagStatus(bool isExist,SyncParam & param,size_t idx,DataInfo & dataInfo,VBucket & localAssetInfo)738 int CloudSyncer::TagStatus(bool isExist, SyncParam &param, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo)
739 {
740     OpType strategyOpResult = OpType::NOT_HANDLE;
741     int errCode = TagStatusByStrategy(isExist, param, dataInfo, strategyOpResult);
742     if (errCode != E_OK) {
743         return errCode;
744     }
745     param.downloadData.opType[idx] = strategyOpResult;
746     if (!IsDataContainAssets()) {
747         return E_OK;
748     }
749     Key hashKey;
750     if (isExist) {
751         hashKey = dataInfo.localInfo.logInfo.hashKey;
752     }
753     return TagDownloadAssets(hashKey, idx, param, dataInfo, localAssetInfo);
754 }
755 
TagDownloadAssets(const Key & hashKey,size_t idx,SyncParam & param,const DataInfo & dataInfo,VBucket & localAssetInfo)756 int CloudSyncer::TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam &param, const DataInfo &dataInfo,
757     VBucket &localAssetInfo)
758 {
759     int ret = E_OK;
760     OpType strategy = param.downloadData.opType[idx];
761     switch (strategy) {
762         case OpType::INSERT:
763         case OpType::UPDATE:
764         case OpType::DELETE:
765             ret = HandleTagAssets(hashKey, dataInfo, idx, param, localAssetInfo);
766             break;
767         case OpType::NOT_HANDLE:
768         case OpType::ONLY_UPDATE_GID:
769         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: { // means upload need this data
770             (void)TagAssetsInSingleRecord(localAssetInfo, param.downloadData.data[idx], true, ret);
771             for (const auto &[col, value]: localAssetInfo) {
772                 param.downloadData.data[idx].insert_or_assign(col, value);
773             }
774             break;
775         }
776         default:
777             break;
778     }
779     return ret;
780 }
781 
HandleTagAssets(const Key & hashKey,const DataInfo & dataInfo,size_t idx,SyncParam & param,VBucket & localAssetInfo)782 int CloudSyncer::HandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam &param,
783     VBucket &localAssetInfo)
784 {
785     Type prefix;
786     std::vector<Type> pkVals;
787     OpType strategy = param.downloadData.opType[idx];
788     bool isDelStrategy = (strategy == OpType::DELETE);
789     int ret = CloudSyncUtils::GetCloudPkVals(isDelStrategy ? dataInfo.localInfo.primaryKeys :
790         param.downloadData.data[idx], param.pkColNames, dataInfo.localInfo.logInfo.dataKey, pkVals);
791     if (ret != E_OK) {
792         LOGE("[CloudSyncer] HandleTagAssets cannot get primary key value list. %d", ret);
793         return ret;
794     }
795     prefix = param.isSinglePrimaryKey ? pkVals[0] : prefix;
796     if (param.isSinglePrimaryKey && prefix.index() == TYPE_INDEX<Nil>) {
797         LOGE("[CloudSyncer] Invalid primary key type in TagStatus, it's Nil.");
798         return -E_INTERNAL_ERROR;
799     }
800     AssetOperationUtils::FilterDeleteAsset(param.downloadData.data[idx]);
801     std::map<std::string, Assets> assetsMap = TagAssetsInSingleRecord(param.downloadData.data[idx], localAssetInfo,
802         false, ret);
803     if (ret != E_OK) {
804         LOGE("[CloudSyncer] TagAssetsInSingleRecord report ERROR in download data");
805         return ret;
806     }
807     strategy = CloudSyncUtils::CalOpType(param, idx);
808     if (!param.isSinglePrimaryKey && strategy == OpType::INSERT) {
809         param.withoutRowIdData.assetInsertData.push_back(std::make_tuple(idx, param.assetsDownloadList.size()));
810     }
811     param.assetsDownloadList.push_back(
812         std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, strategy, assetsMap, hashKey,
813         pkVals, dataInfo.cloudLogInfo.timestamp));
814     return ret;
815 }
816 
SaveDatum(SyncParam & param,size_t idx,std::vector<std::pair<Key,size_t>> & deletedList,std::map<std::string,LogInfo> & localLogInfoCache)817 int CloudSyncer::SaveDatum(SyncParam &param, size_t idx, std::vector<std::pair<Key, size_t>> &deletedList,
818     std::map<std::string, LogInfo> &localLogInfoCache)
819 {
820     int ret = PreHandleData(param.downloadData.data[idx], param.pkColNames);
821     if (ret != E_OK) {
822         LOGE("[CloudSyncer] Invalid download data:%d", ret);
823         return ret;
824     }
825     CloudSyncUtils::ModifyCloudDataTime(param.downloadData.data[idx]);
826     DataInfo dataInfo;
827     VBucket localAssetInfo;
828     bool isExist = true;
829     ret = GetLocalInfo(idx, param, dataInfo.localInfo, localLogInfoCache, localAssetInfo);
830     if (ret == -E_NOT_FOUND) {
831         isExist = false;
832     } else if (ret != E_OK) {
833         LOGE("[CloudSyncer] Cannot get info by primary key or gid: %d.", ret);
834         return ret;
835     }
836     // Get cloudLogInfo from cloud data
837     dataInfo.cloudLogInfo = CloudSyncUtils::GetCloudLogInfo(param.downloadData.data[idx]);
838     // Tag datum to get opType
839     ret = TagStatus(isExist, param, idx, dataInfo, localAssetInfo);
840     if (ret != E_OK) {
841         LOGE("[CloudSyncer] Cannot tag status: %d.", ret);
842         return ret;
843     }
844     CloudSyncUtils::UpdateLocalCache(param.downloadData.opType[idx], dataInfo.cloudLogInfo, dataInfo.localInfo.logInfo,
845         localLogInfoCache);
846     ret = CloudSyncUtils::SaveChangedData(param, idx, dataInfo, deletedList);
847     if (ret != E_OK) {
848         LOGE("[CloudSyncer] Cannot save changed data: %d.", ret);
849     }
850     return ret;
851 }
852 
SaveData(CloudSyncer::TaskId taskId,SyncParam & param)853 int CloudSyncer::SaveData(CloudSyncer::TaskId taskId, SyncParam &param)
854 {
855     if (!CloudSyncUtils::IsChangeDataEmpty(param.changedData)) {
856         LOGE("[CloudSyncer] changedData.primaryData should have no member inside.");
857         return -E_INVALID_ARGS;
858     }
859     // Update download batch Info
860     param.info.downLoadInfo.batchIndex += 1;
861     param.info.downLoadInfo.total += param.downloadData.data.size();
862     int ret = E_OK;
863     DownloadList assetsDownloadList;
864     param.assetsDownloadList = assetsDownloadList;
865     param.deletePrimaryKeySet.clear();
866     param.dupHashKeySet.clear();
867     CloudSyncUtils::ClearWithoutData(param);
868     std::vector<std::pair<Key, size_t>> deletedList;
869     // use for record local delete status
870     std::map<std::string, LogInfo> localLogInfoCache;
871     for (size_t i = 0; i < param.downloadData.data.size(); i++) {
872         ret = SaveDatum(param, i, deletedList, localLogInfoCache);
873         if (ret != E_OK) {
874             param.info.downLoadInfo.failCount += param.downloadData.data.size();
875             LOGE("[CloudSyncer] Cannot save datum due to error code %d", ret);
876             return ret;
877         }
878     }
879     // Save assetsMap into current context
880     {
881         std::lock_guard<std::mutex> autoLock(dataLock_);
882         currentContext_.assetDownloadList = param.assetsDownloadList;
883     }
884     // save the data to the database by batch, downloadData will return rowid when insert data.
885     ret = storageProxy_->PutCloudSyncData(param.tableName, param.downloadData);
886     if (ret != E_OK) {
887         param.info.downLoadInfo.failCount += param.downloadData.data.size();
888         LOGE("[CloudSyncer] Cannot save the data to database with error code: %d.", ret);
889         return ret;
890     }
891     ret = UpdateChangedData(param, currentContext_.assetDownloadList);
892     if (ret != E_OK) {
893         param.info.downLoadInfo.failCount += param.downloadData.data.size();
894         LOGE("[CloudSyncer] Cannot update changed data: %d.", ret);
895         return ret;
896     }
897     // Update downloadInfo
898     param.info.downLoadInfo.successCount += param.downloadData.data.size();
899     // Get latest cloudWaterMark
900     VBucket &lastData = param.downloadData.data.back();
901     if (!IsQueryListEmpty(taskId) && param.isLastBatch) {
902         // the last batch of cursor in the conditional query is useless
903         param.cloudWaterMark = {};
904     } else {
905         param.cloudWaterMark = std::get<std::string>(lastData[CloudDbConstant::CURSOR_FIELD]);
906     }
907     return UpdateFlagForSavedRecord(param);
908 }
909 
PreCheck(CloudSyncer::TaskId & taskId,const TableName & tableName)910 int CloudSyncer::PreCheck(CloudSyncer::TaskId &taskId, const TableName &tableName)
911 {
912     // Check Input and Context Validity
913     int ret = CheckTaskIdValid(taskId);
914     if (ret != E_OK) {
915         return ret;
916     }
917     {
918         std::lock_guard<std::mutex> autoLock(dataLock_);
919         if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
920             LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: , %" PRIu64 ".", taskId);
921             return -E_INVALID_ARGS;
922         }
923     }
924     if (currentContext_.strategy == nullptr) {
925         LOGE("[CloudSyncer] Strategy has not been initialized");
926         return -E_INVALID_ARGS;
927     }
928     ret = storageProxy_->CheckSchema(tableName);
929     if (ret != E_OK) {
930         LOGE("[CloudSyncer] A schema error occurred on the table to be synced, %d", ret);
931         return ret;
932     }
933     return E_OK;
934 }
935 
NeedNotifyChangedData(const ChangedData & changedData)936 bool CloudSyncer::NeedNotifyChangedData(const ChangedData &changedData)
937 {
938     TaskId taskId;
939     {
940         std::lock_guard<std::mutex> autoLock(dataLock_);
941         taskId = currentContext_.currentTaskId;
942     }
943     if (IsModeForcePush(taskId)) {
944         return false;
945     }
946     // when there have no data been changed, it don't need fill back
947     if (changedData.primaryData[OP_INSERT].empty() && changedData.primaryData[OP_UPDATE].empty() &&
948         changedData.primaryData[OP_DELETE].empty()) {
949         return false;
950     }
951     return true;
952 }
953 
NotifyChangedData(ChangedData && changedData)954 int CloudSyncer::NotifyChangedData(ChangedData &&changedData)
955 {
956     if (!NeedNotifyChangedData(changedData)) {
957         return E_OK;
958     }
959     std::string deviceName;
960     {
961         std::lock_guard<std::mutex> autoLock(dataLock_);
962         std::vector<std::string> devices = currentContext_.notifier->GetDevices();
963         if (devices.empty()) {
964             DBDfxAdapter::ReportBehavior(
965                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, -E_CLOUD_ERROR});
966             LOGE("[CloudSyncer] CurrentContext do not contain device info");
967             return -E_CLOUD_ERROR;
968         }
969         // We use first device name as the target of NotifyChangedData
970         deviceName = devices[0];
971     }
972     int ret = storageProxy_->NotifyChangedData(deviceName, std::move(changedData));
973     if (ret != E_OK) {
974         DBDfxAdapter::ReportBehavior(
975             {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, ret});
976         LOGE("[CloudSyncer] Cannot notify changed data while downloading, %d.", ret);
977     }
978     DBDfxAdapter::ReportBehavior(
979         {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, ret});
980     return ret;
981 }
982 
NotifyInDownload(CloudSyncer::TaskId taskId,SyncParam & param,bool isFirstDownload)983 void CloudSyncer::NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam &param, bool isFirstDownload)
984 {
985     if (!isFirstDownload && param.downloadData.data.empty()) {
986         // if the second download and there is no download data, do not notify
987         return;
988     }
989     std::lock_guard<std::mutex> autoLock(dataLock_);
990     if (currentContext_.strategy->JudgeUpload()) {
991         currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], param.info);
992     } else {
993         if (param.isLastBatch) {
994             param.info.tableStatus = ProcessStatus::FINISHED;
995         }
996         if (cloudTaskInfos_[taskId].table.back() == param.tableName && param.isLastBatch) {
997             currentContext_.notifier->UpdateProcess(param.info);
998         } else {
999             currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], param.info);
1000         }
1001     }
1002 }
1003 
SaveDataInTransaction(CloudSyncer::TaskId taskId,SyncParam & param)1004 int CloudSyncer::SaveDataInTransaction(CloudSyncer::TaskId taskId, SyncParam &param)
1005 {
1006     int ret = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
1007     if (ret != E_OK) {
1008         LOGE("[CloudSyncer] Cannot start a transaction: %d.", ret);
1009         return ret;
1010     }
1011     (void)storageProxy_->SetCursorIncFlag(true);
1012     if (!IsModeForcePush(taskId)) {
1013         param.changedData.tableName = param.info.tableName;
1014         param.changedData.field = param.pkColNames;
1015         param.changedData.type = ChangedDataType::DATA;
1016     }
1017     ret = SaveData(taskId, param);
1018     (void)storageProxy_->SetCursorIncFlag(false);
1019     param.insertPk.clear();
1020     if (ret != E_OK) {
1021         LOGE("[CloudSyncer] cannot save data: %d.", ret);
1022         int rollBackErrorCode = storageProxy_->Rollback();
1023         if (rollBackErrorCode != E_OK) {
1024             LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
1025         } else {
1026             LOGI("[CloudSyncer] Roll back transaction success: %d.", ret);
1027         }
1028         return ret;
1029     }
1030     ret = storageProxy_->Commit();
1031     if (ret != E_OK) {
1032         LOGE("[CloudSyncer] Cannot commit a transaction: %d.", ret);
1033     }
1034     return ret;
1035 }
1036 
DoDownloadAssets(bool skipSave,SyncParam & param)1037 int CloudSyncer::DoDownloadAssets(bool skipSave, SyncParam &param)
1038 {
1039     // Begin downloading assets
1040     ChangedData changedAssets;
1041     int ret = DownloadAssets(param.info, param.pkColNames, param.dupHashKeySet, changedAssets);
1042     bool isSharedTable = false;
1043     int errCode = storageProxy_->IsSharedTable(param.tableName, isSharedTable);
1044     if (errCode != E_OK) {
1045         LOGE("[CloudSyncer] HandleTagAssets cannot judge the table is a shared table. %d", errCode);
1046         return errCode;
1047     }
1048     if (!isSharedTable) {
1049         (void)NotifyChangedData(std::move(changedAssets));
1050     }
1051     if (ret == -E_TASK_PAUSED) {
1052         LOGD("[CloudSyncer] current task was paused, abort download asset");
1053         std::lock_guard<std::mutex> autoLock(dataLock_);
1054         resumeTaskInfos_[currentContext_.currentTaskId].skipQuery = true;
1055         return ret;
1056     } else if (skipSave) {
1057         std::lock_guard<std::mutex> autoLock(dataLock_);
1058         resumeTaskInfos_[currentContext_.currentTaskId].skipQuery = false;
1059     }
1060     if (ret != E_OK) {
1061         LOGE("[CloudSyncer] Cannot notify downloadAssets due to error %d", ret);
1062     }
1063     return ret;
1064 }
1065 
SaveDataNotifyProcess(CloudSyncer::TaskId taskId,SyncParam & param)1066 int CloudSyncer::SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam &param)
1067 {
1068     ChangedData changedData;
1069     bool skipSave = false;
1070     {
1071         bool currentTableResume = IsCurrentTableResume(taskId, false);
1072         std::lock_guard<std::mutex> autoLock(dataLock_);
1073         if (currentTableResume && resumeTaskInfos_[currentContext_.currentTaskId].skipQuery) {
1074             skipSave = true;
1075         }
1076     }
1077     int ret;
1078     if (!skipSave) {
1079         param.changedData = changedData;
1080         param.downloadData.opType.resize(param.downloadData.data.size());
1081         param.downloadData.existDataKey.resize(param.downloadData.data.size());
1082         param.downloadData.existDataHashKey.resize(param.downloadData.data.size());
1083         ret = SaveDataInTransaction(taskId, param);
1084         if (ret != E_OK) {
1085             return ret;
1086         }
1087         // call OnChange to notify changedData object first time (without Assets)
1088         ret = NotifyChangedData(std::move(param.changedData));
1089         if (ret != E_OK) {
1090             LOGE("[CloudSyncer] Cannot notify changed data due to error %d", ret);
1091             return ret;
1092         }
1093     }
1094     ret = DoDownloadAssets(skipSave, param);
1095     if (ret != E_OK) {
1096         return ret;
1097     }
1098     UpdateCloudWaterMark(taskId, param);
1099     return E_OK;
1100 }
1101 
NotifyInBatchUpload(const UploadParam & uploadParam,const InnerProcessInfo & innerProcessInfo,bool lastBatch)1102 void CloudSyncer::NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
1103     bool lastBatch)
1104 {
1105     CloudTaskInfo taskInfo;
1106     {
1107         std::lock_guard<std::mutex> autoLock(dataLock_);
1108         taskInfo = cloudTaskInfos_[uploadParam.taskId];
1109     }
1110     std::lock_guard<std::mutex> autoLock(dataLock_);
1111     if (uploadParam.lastTable && lastBatch) {
1112         currentContext_.notifier->UpdateProcess(innerProcessInfo);
1113     } else {
1114         currentContext_.notifier->NotifyProcess(taskInfo, innerProcessInfo);
1115     }
1116 }
1117 
DoDownload(CloudSyncer::TaskId taskId,bool isFirstDownload)1118 int CloudSyncer::DoDownload(CloudSyncer::TaskId taskId, bool isFirstDownload)
1119 {
1120     SyncParam param;
1121     int errCode = GetSyncParamForDownload(taskId, param);
1122     if (errCode != E_OK) {
1123         LOGE("[CloudSyncer] get sync param for download failed %d", errCode);
1124         return errCode;
1125     }
1126     (void)storageProxy_->CreateTempSyncTrigger(param.tableName);
1127     errCode = DoDownloadInner(taskId, param, isFirstDownload);
1128     (void)storageProxy_->ClearAllTempSyncTrigger();
1129     if (errCode == -E_TASK_PAUSED) {
1130         // No need to handle ret.
1131         int ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
1132         if (ret != E_OK) {
1133             LOGE("[DoDownload] Cannot get cloud watermark : %d.", ret);
1134         }
1135         std::lock_guard<std::mutex> autoLock(dataLock_);
1136         resumeTaskInfos_[taskId].syncParam = std::move(param);
1137     }
1138     return errCode;
1139 }
1140 
DoDownloadInner(CloudSyncer::TaskId taskId,SyncParam & param,bool isFirstDownload)1141 int CloudSyncer::DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam &param, bool isFirstDownload)
1142 {
1143     // Query data by batch until reaching end and not more data need to be download
1144     int ret = PreCheck(taskId, param.info.tableName);
1145     if (ret != E_OK) {
1146         return ret;
1147     }
1148     do {
1149         ret = DownloadOneBatch(taskId, param, isFirstDownload);
1150         if (ret != E_OK) {
1151             return ret;
1152         }
1153     } while (!param.isLastBatch);
1154     return E_OK;
1155 }
1156 
NotifyInEmptyDownload(CloudSyncer::TaskId taskId,InnerProcessInfo & info)1157 void CloudSyncer::NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info)
1158 {
1159     std::lock_guard<std::mutex> autoLock(dataLock_);
1160     if (currentContext_.strategy->JudgeUpload()) {
1161         currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1162     } else {
1163         info.tableStatus = FINISHED;
1164         if (cloudTaskInfos_[taskId].table.back() == info.tableName) {
1165             currentContext_.notifier->UpdateProcess(info);
1166         } else {
1167             currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1168         }
1169     }
1170 }
1171 
PreCheckUpload(CloudSyncer::TaskId & taskId,const TableName & tableName,Timestamp & localMark)1172 int CloudSyncer::PreCheckUpload(CloudSyncer::TaskId &taskId, const TableName &tableName, Timestamp &localMark)
1173 {
1174     int ret = PreCheck(taskId, tableName);
1175     if (ret != E_OK) {
1176         return ret;
1177     }
1178     {
1179         std::lock_guard<std::mutex> autoLock(dataLock_);
1180         if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1181             LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: %" PRIu64 ".", taskId);
1182             return -E_INVALID_ARGS;
1183         }
1184         if ((cloudTaskInfos_[taskId].mode < SYNC_MODE_CLOUD_MERGE) ||
1185             (cloudTaskInfos_[taskId].mode > SYNC_MODE_CLOUD_FORCE_PUSH)) {
1186             LOGE("[CloudSyncer] Upload failed, invalid sync mode: %d.",
1187                 static_cast<int>(cloudTaskInfos_[taskId].mode));
1188             return -E_INVALID_ARGS;
1189         }
1190     }
1191 
1192     return ret;
1193 }
1194 
SaveUploadData(Info & insertInfo,Info & updateInfo,Info & deleteInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)1195 int CloudSyncer::SaveUploadData(Info &insertInfo, Info &updateInfo, Info &deleteInfo, CloudSyncData &uploadData,
1196     InnerProcessInfo &innerProcessInfo)
1197 {
1198     int errCode = E_OK;
1199     if (!uploadData.delData.record.empty() && !uploadData.delData.extend.empty()) {
1200         errCode = BatchDelete(deleteInfo, uploadData, innerProcessInfo);
1201         if (errCode != E_OK) {
1202             return errCode;
1203         }
1204     }
1205 
1206     if (!uploadData.updData.record.empty() && !uploadData.updData.extend.empty()) {
1207         errCode = BatchUpdate(updateInfo, uploadData, innerProcessInfo);
1208         if (errCode != E_OK) {
1209             return errCode;
1210         }
1211     }
1212 
1213     if (!uploadData.insData.record.empty() && !uploadData.insData.extend.empty()) {
1214         errCode = BatchInsert(insertInfo, uploadData, innerProcessInfo);
1215         if (errCode != E_OK) {
1216             return errCode;
1217         }
1218     }
1219 
1220     if (!uploadData.lockData.rowid.empty()) {
1221         errCode = storageProxy_->FillCloudLogAndAsset(OpType::LOCKED_NOT_HANDLE, uploadData);
1222     }
1223     return errCode;
1224 }
1225 
DoBatchUpload(CloudSyncData & uploadData,UploadParam & uploadParam,InnerProcessInfo & innerProcessInfo)1226 int CloudSyncer::DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo)
1227 {
1228     int errCode = storageProxy_->FillCloudLogAndAsset(OpType::SET_UPLOADING, uploadData);
1229     if (errCode != E_OK) {
1230         return errCode;
1231     }
1232     Info insertInfo;
1233     Info updateInfo;
1234     Info deleteInfo;
1235     errCode = SaveUploadData(insertInfo, updateInfo, deleteInfo, uploadData, innerProcessInfo);
1236     if (errCode != E_OK) {
1237         return errCode;
1238     }
1239     bool lastBatch = innerProcessInfo.upLoadInfo.successCount == innerProcessInfo.upLoadInfo.total;
1240     if (lastBatch) {
1241         innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
1242     }
1243     // After each batch upload successed, call NotifyProcess
1244     NotifyInBatchUpload(uploadParam, innerProcessInfo, lastBatch);
1245 
1246     // if batch upload successed, update local water mark
1247     // The cloud water mark cannot be updated here, because the cloud api doesn't return cursor here.
1248     errCode = PutWaterMarkAfterBatchUpload(uploadData.tableName, uploadParam);
1249     if (errCode != E_OK) {
1250         LOGE("[CloudSyncer] Failed to set local water mark when doing upload, %d.", errCode);
1251     }
1252     return errCode;
1253 }
1254 
PutWaterMarkAfterBatchUpload(const std::string & tableName,UploadParam & uploadParam)1255 int CloudSyncer::PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam)
1256 {
1257     int errCode = E_OK;
1258     storageProxy_->ReleaseUploadRecord(tableName, uploadParam.mode, uploadParam.localMark);
1259     // if we use local cover cloud strategy, it won't update local water mark also.
1260     if (IsModeForcePush(uploadParam.taskId) || (IsPriorityTask(uploadParam.taskId) &&
1261         !IsQueryListEmpty(uploadParam.taskId))) {
1262         return E_OK;
1263     }
1264     errCode = storageProxy_->PutWaterMarkByMode(tableName, uploadParam.mode, uploadParam.localMark);
1265     if (errCode != E_OK) {
1266         LOGE("[CloudSyncer] Cannot set local water mark while Uploading, %d.", errCode);
1267     }
1268     return errCode;
1269 }
1270 
DoUpload(CloudSyncer::TaskId taskId,bool lastTable,LockAction lockAction)1271 int CloudSyncer::DoUpload(CloudSyncer::TaskId taskId, bool lastTable, LockAction lockAction)
1272 {
1273     std::string tableName;
1274     int ret = GetCurrentTableName(tableName);
1275     if (ret != E_OK) {
1276         LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
1277         return ret;
1278     }
1279 
1280     Timestamp localMark = 0u;
1281     ret = PreCheckUpload(taskId, tableName, localMark);
1282     if (ret != E_OK) {
1283         LOGE("[CloudSyncer] Doing upload sync pre check failed, %d.", ret);
1284         return ret;
1285     }
1286     ReloadWaterMarkIfNeed(taskId, localMark);
1287     storageProxy_->OnUploadStart();
1288 
1289     int64_t count = 0;
1290     ret = storageProxy_->GetUploadCount(GetQuerySyncObject(tableName), IsModeForcePush(taskId),
1291         IsCompensatedTask(taskId), IsNeedGetLocalWater(taskId), count);
1292     LOGI("get upload count:%zu", count);
1293     if (ret != E_OK) {
1294         // GetUploadCount will return E_OK when upload count is zero.
1295         LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret);
1296         return ret;
1297     }
1298     if (count == 0) {
1299         UpdateProcessInfoWithoutUpload(taskId, tableName, !lastTable);
1300         return E_OK;
1301     }
1302     UploadParam param;
1303     param.count = count;
1304     param.lastTable = lastTable;
1305     param.taskId = taskId;
1306     param.lockAction = lockAction;
1307     return DoUploadInner(tableName, param);
1308 }
1309 
PreProcessBatchUpload(UploadParam & uploadParam,const InnerProcessInfo & innerProcessInfo,CloudSyncData & uploadData)1310 int CloudSyncer::PreProcessBatchUpload(UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
1311     CloudSyncData &uploadData)
1312 {
1313     // Precheck and calculate local water mark which would be updated if batch upload successed.
1314     int ret = CheckTaskIdValid(uploadParam.taskId);
1315     if (ret != E_OK) {
1316         return ret;
1317     }
1318     ret = CloudSyncUtils::CheckCloudSyncDataValid(uploadData, innerProcessInfo.tableName,
1319         innerProcessInfo.upLoadInfo.total);
1320     if (ret != E_OK) {
1321         LOGE("[CloudSyncer] Invalid Cloud Sync Data of Upload, %d.", ret);
1322         return ret;
1323     }
1324     TagUploadAssets(uploadData);
1325     CloudSyncUtils::UpdateAssetsFlag(uploadData);
1326     // get local water mark to be updated in future.
1327     ret = CloudSyncUtils::UpdateExtendTime(uploadData, innerProcessInfo.upLoadInfo.total,
1328         uploadParam.taskId, uploadParam.localMark);
1329     if (ret != E_OK) {
1330         LOGE("[CloudSyncer] Failed to get new local water mark in Cloud Sync Data, %d.", ret);
1331     }
1332     return ret;
1333 }
1334 
SaveCloudWaterMark(const TableName & tableName,const TaskId taskId)1335 int CloudSyncer::SaveCloudWaterMark(const TableName &tableName, const TaskId taskId)
1336 {
1337     std::string cloudWaterMark;
1338     bool isUpdateCloudCursor = true;
1339     {
1340         std::lock_guard<std::mutex> autoLock(dataLock_);
1341         if (currentContext_.cloudWaterMarks.find(currentContext_.currentUserIndex) ==
1342             currentContext_.cloudWaterMarks.end() ||
1343             currentContext_.cloudWaterMarks[currentContext_.currentUserIndex].find(tableName) ==
1344             currentContext_.cloudWaterMarks[currentContext_.currentUserIndex].end()) {
1345             LOGD("[CloudSyncer] Not found water mark just return");
1346             return E_OK;
1347         }
1348         cloudWaterMark = currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName];
1349         isUpdateCloudCursor = currentContext_.strategy->JudgeUpdateCursor();
1350     }
1351     isUpdateCloudCursor = isUpdateCloudCursor && !(IsPriorityTask(taskId) && !IsQueryListEmpty(taskId));
1352     if (isUpdateCloudCursor) {
1353         int errCode = storageProxy_->SetCloudWaterMark(tableName, cloudWaterMark);
1354         if (errCode != E_OK) {
1355             LOGE("[CloudSyncer] Cannot set cloud water mark, %d.", errCode);
1356         }
1357         return errCode;
1358     }
1359     return E_OK;
1360 }
1361 
SetUploadDataFlag(const TaskId taskId,CloudSyncData & uploadData)1362 void CloudSyncer::SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData)
1363 {
1364     std::lock_guard<std::mutex> autoLock(dataLock_);
1365     uploadData.isCloudForcePushStrategy = (cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH);
1366     uploadData.isCompensatedTask = cloudTaskInfos_[taskId].compensatedTask;
1367 }
1368 
IsModeForcePush(TaskId taskId)1369 bool CloudSyncer::IsModeForcePush(TaskId taskId)
1370 {
1371     std::lock_guard<std::mutex> autoLock(dataLock_);
1372     return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
1373 }
1374 
IsModeForcePull(const TaskId taskId)1375 bool CloudSyncer::IsModeForcePull(const TaskId taskId)
1376 {
1377     std::lock_guard<std::mutex> autoLock(dataLock_);
1378     return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PULL;
1379 }
1380 
IsPriorityTask(TaskId taskId)1381 bool CloudSyncer::IsPriorityTask(TaskId taskId)
1382 {
1383     std::lock_guard<std::mutex> autoLock(dataLock_);
1384     return cloudTaskInfos_[taskId].priorityTask;
1385 }
1386 
PreHandleData(VBucket & datum,const std::vector<std::string> & pkColNames)1387 int CloudSyncer::PreHandleData(VBucket &datum, const std::vector<std::string> &pkColNames)
1388 {
1389     // type index of field in fields, true means mandatory filed
1390     static std::vector<std::tuple<std::string, int32_t, bool>> fieldAndIndex = {
1391         std::make_tuple(CloudDbConstant::GID_FIELD, TYPE_INDEX<std::string>, true),
1392         std::make_tuple(CloudDbConstant::CREATE_FIELD, TYPE_INDEX<int64_t>, true),
1393         std::make_tuple(CloudDbConstant::MODIFY_FIELD, TYPE_INDEX<int64_t>, true),
1394         std::make_tuple(CloudDbConstant::DELETE_FIELD, TYPE_INDEX<bool>, true),
1395         std::make_tuple(CloudDbConstant::CURSOR_FIELD, TYPE_INDEX<std::string>, true),
1396         std::make_tuple(CloudDbConstant::SHARING_RESOURCE_FIELD, TYPE_INDEX<std::string>, false)
1397     };
1398 
1399     for (const auto &fieldIndex : fieldAndIndex) {
1400         if (datum.find(std::get<0>(fieldIndex)) == datum.end()) {
1401             if (!std::get<2>(fieldIndex)) { // 2 is index of mandatory flag
1402                 continue;
1403             }
1404             LOGE("[CloudSyncer] Cloud data do not contain expected field: %s.", std::get<0>(fieldIndex).c_str());
1405             return -E_CLOUD_ERROR;
1406         }
1407         if (datum[std::get<0>(fieldIndex)].index() != static_cast<size_t>(std::get<1>(fieldIndex))) {
1408             LOGE("[CloudSyncer] Cloud data's field: %s, doesn't has expected type.", std::get<0>(fieldIndex).c_str());
1409             return -E_CLOUD_ERROR;
1410         }
1411     }
1412 
1413     if (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) {
1414         CloudSyncUtils::RemoveDataExceptExtendInfo(datum, pkColNames);
1415     }
1416     std::lock_guard<std::mutex> autoLock(dataLock_);
1417     if (IsDataContainDuplicateAsset(currentContext_.assetFields[currentContext_.tableName], datum)) {
1418         LOGE("[CloudSyncer] Cloud data contain duplicate asset");
1419         return -E_CLOUD_ERROR;
1420     }
1421     return E_OK;
1422 }
1423 
QueryCloudData(TaskId taskId,const std::string & tableName,std::string & cloudWaterMark,DownloadData & downloadData)1424 int CloudSyncer::QueryCloudData(TaskId taskId, const std::string &tableName, std::string &cloudWaterMark,
1425     DownloadData &downloadData)
1426 {
1427     VBucket extend;
1428     int ret = FillDownloadExtend(taskId, tableName, cloudWaterMark, extend);
1429     if (ret != E_OK) {
1430         return ret;
1431     }
1432     ret = cloudDB_.Query(tableName, extend, downloadData.data);
1433     if ((ret == E_OK || ret == -E_QUERY_END) && downloadData.data.empty()) {
1434         if (extend[CloudDbConstant::CURSOR_FIELD].index() != TYPE_INDEX<std::string>) {
1435             LOGE("[CloudSyncer] cursor type is not valid=%d", extend[CloudDbConstant::CURSOR_FIELD].index());
1436             return -E_CLOUD_ERROR;
1437         }
1438         cloudWaterMark = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
1439         LOGD("[CloudSyncer] Download data is empty, try to use other cursor=%s", cloudWaterMark.c_str());
1440         return ret;
1441     }
1442     if (ret == -E_QUERY_END) {
1443         LOGD("[CloudSyncer] Download data from cloud database success and no more data need to be downloaded");
1444         return -E_QUERY_END;
1445     }
1446     if (ret != E_OK) {
1447         LOGE("[CloudSyncer] Download data from cloud database unsuccess %d", ret);
1448     }
1449     return ret;
1450 }
1451 
CheckTaskIdValid(TaskId taskId)1452 int CloudSyncer::CheckTaskIdValid(TaskId taskId)
1453 {
1454     if (closed_) {
1455         LOGE("[CloudSyncer] DB is closed.");
1456         return -E_DB_CLOSED;
1457     }
1458     std::lock_guard<std::mutex> autoLock(dataLock_);
1459     if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1460         LOGE("[CloudSyncer] not found task.");
1461         return -E_INVALID_ARGS;
1462     }
1463     if (cloudTaskInfos_[taskId].pause) {
1464         LOGW("[CloudSyncer] check task %" PRIu64 " was paused!", taskId);
1465         return -E_TASK_PAUSED;
1466     }
1467     if (cloudTaskInfos_[taskId].errCode != E_OK) {
1468         return cloudTaskInfos_[taskId].errCode;
1469     }
1470     return currentContext_.currentTaskId == taskId ? E_OK : -E_INVALID_ARGS;
1471 }
1472 
GetCurrentTableName(std::string & tableName)1473 int CloudSyncer::GetCurrentTableName(std::string &tableName)
1474 {
1475     std::lock_guard<std::mutex> autoLock(dataLock_);
1476     if (currentContext_.tableName.empty()) {
1477         return -E_BUSY;
1478     }
1479     tableName = currentContext_.tableName;
1480     return E_OK;
1481 }
1482 
CheckQueueSizeWithNoLock(bool priorityTask)1483 int CloudSyncer::CheckQueueSizeWithNoLock(bool priorityTask)
1484 {
1485     int32_t limit = queuedManualSyncLimit_;
1486     if (!priorityTask && taskQueue_.size() >= static_cast<size_t>(limit)) {
1487         LOGW("[CloudSyncer] too much sync task");
1488         return -E_BUSY;
1489     } else if (priorityTask && priorityTaskQueue_.size() >= static_cast<size_t>(limit)) {
1490         LOGW("[CloudSyncer] too much priority sync task");
1491         return -E_BUSY;
1492     }
1493     return E_OK;
1494 }
1495 
PrepareSync(TaskId taskId)1496 int CloudSyncer::PrepareSync(TaskId taskId)
1497 {
1498     std::lock_guard<std::mutex> autoLock(dataLock_);
1499     if (closed_ || cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1500         LOGW("[CloudSyncer] Abort sync because syncer is closed");
1501         return -E_DB_CLOSED;
1502     }
1503     if (closed_ || currentContext_.currentTaskId != INVALID_TASK_ID) {
1504         LOGW("[CloudSyncer] Abort sync because syncer is closed or another task is running");
1505         return -E_DB_CLOSED;
1506     }
1507     currentContext_.currentTaskId = taskId;
1508     cloudTaskInfos_[taskId].resume = cloudTaskInfos_[taskId].pause;
1509     cloudTaskInfos_[taskId].pause = false;
1510     cloudTaskInfos_[taskId].status = ProcessStatus::PROCESSING;
1511     if (cloudTaskInfos_[taskId].resume) {
1512         auto tempLocker = currentContext_.locker;
1513         currentContext_ = resumeTaskInfos_[taskId].context;
1514         currentContext_.locker = tempLocker;
1515     } else {
1516         currentContext_.notifier = std::make_shared<ProcessNotifier>(this);
1517         currentContext_.strategy =
1518             StrategyFactory::BuildSyncStrategy(cloudTaskInfos_[taskId].mode, isKvScene_, policy_);
1519         currentContext_.notifier->Init(cloudTaskInfos_[taskId].table, cloudTaskInfos_[taskId].devices,
1520             cloudTaskInfos_[taskId].users);
1521         currentContext_.processRecorder = std::make_shared<ProcessRecorder>();
1522     }
1523     LOGI("[CloudSyncer] exec storeId %.3s taskId %" PRIu64, cloudTaskInfos_[taskId].storeId.c_str(), taskId);
1524     return E_OK;
1525 }
1526 
LockCloud(TaskId taskId)1527 int CloudSyncer::LockCloud(TaskId taskId)
1528 {
1529     int period;
1530     {
1531         auto res = cloudDB_.Lock();
1532         if (res.first != E_OK) {
1533             return res.first;
1534         }
1535         period = static_cast<int>(res.second) / HEARTBEAT_PERIOD;
1536     }
1537     int errCode = StartHeartBeatTimer(period, taskId);
1538     if (errCode != E_OK) {
1539         UnlockCloud();
1540     }
1541     return errCode;
1542 }
1543 
UnlockCloud()1544 int CloudSyncer::UnlockCloud()
1545 {
1546     FinishHeartBeatTimer();
1547     return cloudDB_.UnLock();
1548 }
1549 
StartHeartBeatTimer(int period,TaskId taskId)1550 int CloudSyncer::StartHeartBeatTimer(int period, TaskId taskId)
1551 {
1552     if (timerId_ != 0u) {
1553         LOGW("[CloudSyncer] HeartBeat timer has been start!");
1554         return E_OK;
1555     }
1556     TimerId timerId = 0;
1557     int errCode = RuntimeContext::GetInstance()->SetTimer(period, [this, taskId](TimerId timerId) {
1558         HeartBeat(timerId, taskId);
1559         return E_OK;
1560     }, nullptr, timerId);
1561     if (errCode != E_OK) {
1562         LOGE("[CloudSyncer] HeartBeat timer start failed %d", errCode);
1563         return errCode;
1564     }
1565     timerId_ = timerId;
1566     return E_OK;
1567 }
1568 
FinishHeartBeatTimer()1569 void CloudSyncer::FinishHeartBeatTimer()
1570 {
1571     if (timerId_ == 0u) {
1572         return;
1573     }
1574     RuntimeContext::GetInstance()->RemoveTimer(timerId_, true);
1575     timerId_ = 0u;
1576     LOGD("[CloudSyncer] Finish heartbeat timer ok");
1577 }
1578 
HeartBeat(TimerId timerId,TaskId taskId)1579 void CloudSyncer::HeartBeat(TimerId timerId, TaskId taskId)
1580 {
1581     if (timerId_ != timerId) {
1582         return;
1583     }
1584     IncObjRef(this);
1585     {
1586         std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1587         heartbeatCount_[taskId]++;
1588     }
1589     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, taskId]() {
1590         {
1591             std::lock_guard<std::mutex> guard(dataLock_);
1592             if (currentContext_.currentTaskId != taskId) {
1593                 heartbeatCount_.erase(taskId);
1594                 failedHeartbeatCount_.erase(taskId);
1595                 DecObjRef(this);
1596                 return;
1597             }
1598         }
1599         if (heartbeatCount_[taskId] >= HEARTBEAT_PERIOD) {
1600             // heartbeat block twice should finish task now
1601             SetTaskFailed(taskId, -E_CLOUD_ERROR);
1602         } else {
1603             int ret = cloudDB_.HeartBeat();
1604             if (ret != E_OK) {
1605                 HeartBeatFailed(taskId, ret);
1606             } else {
1607                 failedHeartbeatCount_[taskId] = 0;
1608             }
1609         }
1610         {
1611             std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1612             heartbeatCount_[taskId]--;
1613             if (currentContext_.currentTaskId != taskId) {
1614                 heartbeatCount_.erase(taskId);
1615                 failedHeartbeatCount_.erase(taskId);
1616             }
1617         }
1618         DecObjRef(this);
1619     });
1620     if (errCode != E_OK) {
1621         LOGW("[CloudSyncer] schedule heartbeat task failed %d", errCode);
1622         DecObjRef(this);
1623     }
1624 }
1625 
HeartBeatFailed(TaskId taskId,int errCode)1626 void CloudSyncer::HeartBeatFailed(TaskId taskId, int errCode)
1627 {
1628     failedHeartbeatCount_[taskId]++;
1629     if (failedHeartbeatCount_[taskId] < MAX_HEARTBEAT_FAILED_LIMIT) {
1630         return;
1631     }
1632     LOGW("[CloudSyncer] heartbeat failed too much times!");
1633     FinishHeartBeatTimer();
1634     SetTaskFailed(taskId, errCode);
1635 }
1636 
SetTaskFailed(TaskId taskId,int errCode)1637 void CloudSyncer::SetTaskFailed(TaskId taskId, int errCode)
1638 {
1639     std::lock_guard<std::mutex> autoLock(dataLock_);
1640     if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1641         return;
1642     }
1643     if (cloudTaskInfos_[taskId].errCode != E_OK) {
1644         return;
1645     }
1646     cloudTaskInfos_[taskId].errCode = errCode;
1647 }
1648 
GetCloudSyncTaskCount()1649 int32_t CloudSyncer::GetCloudSyncTaskCount()
1650 {
1651     std::lock_guard<std::mutex> autoLock(dataLock_);
1652     return static_cast<int32_t>(taskQueue_.size() + priorityTaskQueue_.size());
1653 }
1654 
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema)1655 int CloudSyncer::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1656     const RelationalSchemaObject &localSchema)
1657 {
1658     std::lock_guard<std::mutex> lock(syncMutex_);
1659     int index = 1;
1660     for (const auto &tableName: tableNameList) {
1661         LOGD("[CloudSyncer] Start clean cloud water mark. table index: %d.", index);
1662         int ret = storageProxy_->CleanWaterMark(tableName);
1663         if (ret != E_OK) {
1664         LOGE("[CloudSyncer] failed to put cloud water mark after clean cloud data, %d.", ret);
1665             return ret;
1666         }
1667         index++;
1668     }
1669     int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
1670     if (errCode != E_OK) {
1671         LOGE("[CloudSyncer] failed to start Transaction before clean cloud data, %d", errCode);
1672         return errCode;
1673     }
1674 
1675     std::vector<Asset> assets;
1676     errCode = storageProxy_->CleanCloudData(mode, tableNameList, localSchema, assets);
1677     if (errCode != E_OK) {
1678         LOGE("[CloudSyncer] failed to clean cloud data, %d.", errCode);
1679         storageProxy_->Rollback();
1680         return errCode;
1681     }
1682 
1683     if (!assets.empty() && mode == FLAG_AND_DATA) {
1684         errCode = cloudDB_.RemoveLocalAssets(assets);
1685         if (errCode != E_OK) {
1686             LOGE("[Storage Executor] failed to remove local assets, %d.", errCode);
1687             storageProxy_->Rollback();
1688             return errCode;
1689         }
1690     }
1691 
1692     storageProxy_->Commit();
1693     return errCode;
1694 }
1695 
CleanWaterMarkInMemory(const std::set<std::string> & tableNameList)1696 int CloudSyncer::CleanWaterMarkInMemory(const std::set<std::string> &tableNameList)
1697 {
1698     std::lock_guard<std::mutex> lock(syncMutex_);
1699     for (const auto &tableName: tableNameList) {
1700         int ret = storageProxy_->CleanWaterMarkInMemory(tableName);
1701         if (ret != E_OK) {
1702             LOGE("[CloudSyncer] failed to clean cloud water mark in memory, %d.", ret);
1703             return ret;
1704         }
1705     }
1706     return E_OK;
1707 }
1708 
UpdateCloudWaterMark(TaskId taskId,const SyncParam & param)1709 void CloudSyncer::UpdateCloudWaterMark(TaskId taskId, const SyncParam &param)
1710 {
1711     {
1712         std::lock_guard<std::mutex> autoLock(dataLock_);
1713         currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][param.info.tableName] = param.cloudWaterMark;
1714     }
1715 }
1716 
CommitDownloadResult(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,int errCode)1717 int CloudSyncer::CommitDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info,
1718     DownloadCommitList &commitList, int errCode)
1719 {
1720     if (commitList.empty()) {
1721         return E_OK;
1722     }
1723     uint32_t successCount = 0u;
1724     int ret = HandleDownloadResult(downloadItem, info.tableName, commitList, successCount);
1725     if (errCode == E_OK) {
1726         info.downLoadInfo.failCount += (commitList.size() - successCount);
1727         info.downLoadInfo.successCount -= (commitList.size() - successCount);
1728     }
1729     if (ret != E_OK) {
1730         LOGE("Commit download result failed.%d", ret);
1731     }
1732     commitList.clear();
1733     return ret;
1734 }
1735 
GetIdentify() const1736 std::string CloudSyncer::GetIdentify() const
1737 {
1738     return id_;
1739 }
1740 
TagStatusByStrategy(bool isExist,SyncParam & param,DataInfo & dataInfo,OpType & strategyOpResult)1741 int CloudSyncer::TagStatusByStrategy(bool isExist, SyncParam &param, DataInfo &dataInfo, OpType &strategyOpResult)
1742 {
1743     strategyOpResult = OpType::NOT_HANDLE;
1744     // ignore same record with local generate data
1745     if (dataInfo.localInfo.logInfo.device.empty() &&
1746         !CloudSyncUtils::NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
1747         // not handle same data
1748         return E_OK;
1749     }
1750     {
1751         std::lock_guard<std::mutex> autoLock(dataLock_);
1752         if (!currentContext_.strategy) {
1753             LOGE("[CloudSyncer] strategy has not been set when tag status, %d.", -E_INTERNAL_ERROR);
1754             return -E_INTERNAL_ERROR;
1755         }
1756         bool isCloudWin = storageProxy_->IsTagCloudUpdateLocal(dataInfo.localInfo.logInfo,
1757             dataInfo.cloudLogInfo, policy_);
1758         strategyOpResult = currentContext_.strategy->TagSyncDataStatus(isExist, isCloudWin,
1759             dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo);
1760     }
1761     if (strategyOpResult == OpType::DELETE) {
1762         param.deletePrimaryKeySet.insert(dataInfo.localInfo.logInfo.hashKey);
1763     }
1764     return E_OK;
1765 }
1766 
GetLocalInfo(size_t index,SyncParam & param,DataInfoWithLog & logInfo,std::map<std::string,LogInfo> & localLogInfoCache,VBucket & localAssetInfo)1767 int CloudSyncer::GetLocalInfo(size_t index, SyncParam &param, DataInfoWithLog &logInfo,
1768     std::map<std::string, LogInfo> &localLogInfoCache, VBucket &localAssetInfo)
1769 {
1770     int errCode = storageProxy_->GetInfoByPrimaryKeyOrGid(param.tableName, param.downloadData.data[index],
1771         logInfo, localAssetInfo);
1772     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1773         return errCode;
1774     }
1775     std::string hashKey(logInfo.logInfo.hashKey.begin(), logInfo.logInfo.hashKey.end());
1776     if (hashKey.empty()) {
1777         return errCode;
1778     }
1779     param.downloadData.existDataKey[index] = logInfo.logInfo.dataKey;
1780     param.downloadData.existDataHashKey[index] = logInfo.logInfo.hashKey;
1781     if (localLogInfoCache.find(hashKey) != localLogInfoCache.end()) {
1782         LOGD("[CloudSyncer] exist same record in one batch, override from cache record! hash=%.3s",
1783             DBCommon::TransferStringToHex(hashKey).c_str());
1784         logInfo.logInfo.flag = localLogInfoCache[hashKey].flag;
1785         logInfo.logInfo.wTimestamp = localLogInfoCache[hashKey].wTimestamp;
1786         logInfo.logInfo.timestamp = localLogInfoCache[hashKey].timestamp;
1787         logInfo.logInfo.cloudGid = localLogInfoCache[hashKey].cloudGid;
1788         logInfo.logInfo.device = localLogInfoCache[hashKey].device;
1789         logInfo.logInfo.sharingResource = localLogInfoCache[hashKey].sharingResource;
1790         logInfo.logInfo.status = localLogInfoCache[hashKey].status;
1791         // delete record should remove local asset info
1792         if ((localLogInfoCache[hashKey].flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
1793             localAssetInfo.clear();
1794         }
1795         errCode = E_OK;
1796     }
1797     logInfo.logInfo.isNeedUpdateAsset = IsNeedUpdateAsset(localAssetInfo);
1798     return errCode;
1799 }
1800 
GetNextTaskId()1801 TaskId CloudSyncer::GetNextTaskId()
1802 {
1803     std::lock_guard<std::mutex> autoLock(dataLock_);
1804     if (!priorityTaskQueue_.empty()) {
1805         return priorityTaskQueue_.front();
1806     }
1807     if (!taskQueue_.empty()) {
1808         return taskQueue_.front();
1809     }
1810     return INVALID_TASK_ID;
1811 }
1812 
MarkCurrentTaskPausedIfNeed()1813 void CloudSyncer::MarkCurrentTaskPausedIfNeed()
1814 {
1815     std::lock_guard<std::mutex> autoLock(dataLock_);
1816     if (currentContext_.currentTaskId == INVALID_TASK_ID) {
1817         return;
1818     }
1819     if (cloudTaskInfos_.find(currentContext_.currentTaskId) == cloudTaskInfos_.end()) {
1820         return;
1821     }
1822     if (!cloudTaskInfos_[currentContext_.currentTaskId].priorityTask) {
1823         cloudTaskInfos_[currentContext_.currentTaskId].pause = true;
1824         LOGD("[CloudSyncer] Mark taskId %" PRIu64 " paused success", currentContext_.currentTaskId);
1825     }
1826 }
1827 
SetCurrentTaskFailedWithoutLock(int errCode)1828 void CloudSyncer::SetCurrentTaskFailedWithoutLock(int errCode)
1829 {
1830     if (currentContext_.currentTaskId == INVALID_TASK_ID) {
1831         return;
1832     }
1833     cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
1834 }
1835 
LockCloudIfNeed(TaskId taskId)1836 int CloudSyncer::LockCloudIfNeed(TaskId taskId)
1837 {
1838     {
1839         std::lock_guard<std::mutex> autoLock(dataLock_);
1840         if (currentContext_.locker != nullptr) {
1841             LOGD("[CloudSyncer] lock exist");
1842             return E_OK;
1843         }
1844     }
1845     std::shared_ptr<CloudLocker> locker = nullptr;
1846     int errCode = CloudLocker::BuildCloudLock([taskId, this]() {
1847         return LockCloud(taskId);
1848     }, [this]() {
1849         int unlockCode = UnlockCloud();
1850         if (unlockCode != E_OK) {
1851             SetCurrentTaskFailedWithoutLock(unlockCode);
1852         }
1853     }, locker);
1854     {
1855         std::lock_guard<std::mutex> autoLock(dataLock_);
1856         currentContext_.locker = locker;
1857     }
1858     return errCode;
1859 }
1860 
UnlockIfNeed()1861 void CloudSyncer::UnlockIfNeed()
1862 {
1863     std::shared_ptr<CloudLocker> cacheLocker;
1864     {
1865         std::lock_guard<std::mutex> autoLock(dataLock_);
1866         if (currentContext_.locker == nullptr) {
1867             LOGW("[CloudSyncer] locker is nullptr when unlock it"); // should not happen
1868         }
1869         cacheLocker = currentContext_.locker;
1870         currentContext_.locker = nullptr;
1871     }
1872     // unlock without mutex
1873     cacheLocker = nullptr;
1874 }
1875 
ClearCurrentContextWithoutLock()1876 void CloudSyncer::ClearCurrentContextWithoutLock()
1877 {
1878     heartbeatCount_.erase(currentContext_.currentTaskId);
1879     failedHeartbeatCount_.erase(currentContext_.currentTaskId);
1880     currentContext_.currentTaskId = INVALID_TASK_ID;
1881     currentContext_.notifier = nullptr;
1882     currentContext_.strategy = nullptr;
1883     currentContext_.processRecorder = nullptr;
1884     currentContext_.tableName.clear();
1885     currentContext_.assetDownloadList.clear();
1886     currentContext_.assetFields.clear();
1887     currentContext_.assetsInfo.clear();
1888     currentContext_.cloudWaterMarks.clear();
1889     currentContext_.isNeedUpload = false;
1890     currentContext_.currentUserIndex = 0;
1891     currentContext_.repeatCount = 0;
1892 }
1893 
ClearContextAndNotify(TaskId taskId,int errCode)1894 void CloudSyncer::ClearContextAndNotify(TaskId taskId, int errCode)
1895 {
1896     std::shared_ptr<ProcessNotifier> notifier = nullptr;
1897     CloudTaskInfo info;
1898     {
1899         // clear current context
1900         std::lock_guard<std::mutex> autoLock(dataLock_);
1901         notifier = currentContext_.notifier;
1902         ClearCurrentContextWithoutLock();
1903         if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) { // should not happen
1904             LOGW("[CloudSyncer] taskId %" PRIu64 " has been finished!", taskId);
1905             contextCv_.notify_one();
1906             return;
1907         }
1908         info = std::move(cloudTaskInfos_[taskId]);
1909         cloudTaskInfos_.erase(taskId);
1910         resumeTaskInfos_.erase(taskId);
1911     }
1912     int err = storageProxy_->ClearUnLockingNoNeedCompensated();
1913     if (err != E_OK) {
1914         // if clear unlocking failed, no return to avoid affecting the entire process
1915         LOGW("[CloudSyncer] clear unlocking status failed! errCode = %d", err);
1916     }
1917     contextCv_.notify_one();
1918     if (info.errCode == E_OK) {
1919         info.errCode = errCode;
1920     }
1921     LOGI("[CloudSyncer] finished storeId %.3s taskId %" PRIu64 " errCode %d", info.storeId.c_str(), taskId,
1922         info.errCode);
1923     info.status = ProcessStatus::FINISHED;
1924     if (notifier != nullptr) {
1925         notifier->NotifyProcess(info, {}, true);
1926     }
1927     // generate compensated sync
1928     if (!info.compensatedTask) {
1929         CloudTaskInfo taskInfo = CloudSyncUtils::InitCompensatedSyncTaskInfo(info);
1930         GenerateCompensatedSync(taskInfo);
1931     }
1932 }
1933 
DownloadOneBatch(TaskId taskId,SyncParam & param,bool isFirstDownload)1934 int CloudSyncer::DownloadOneBatch(TaskId taskId, SyncParam &param, bool isFirstDownload)
1935 {
1936     int ret = CheckTaskIdValid(taskId);
1937     if (ret != E_OK) {
1938         return ret;
1939     }
1940     bool abort = false;
1941     ret = DownloadDataFromCloud(taskId, param, abort, isFirstDownload);
1942     if (abort) {
1943         return ret;
1944     }
1945     // Save data in transaction, update cloud water mark, notify process and changed data
1946     ret = SaveDataNotifyProcess(taskId, param);
1947     if (ret == -E_TASK_PAUSED) {
1948         return ret;
1949     }
1950     if (ret != E_OK) {
1951         std::lock_guard<std::mutex> autoLock(dataLock_);
1952         param.info.tableStatus = ProcessStatus::FINISHED;
1953         currentContext_.notifier->UpdateProcess(param.info);
1954         return ret;
1955     }
1956     (void)NotifyInDownload(taskId, param, isFirstDownload);
1957     return SaveCloudWaterMark(param.tableName, taskId);
1958 }
1959 
DownloadOneAssetRecord(const std::set<Key> & dupHashKeySet,const DownloadList & downloadList,DownloadItem & downloadItem,InnerProcessInfo & info,ChangedData & changedAssets)1960 int CloudSyncer::DownloadOneAssetRecord(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList,
1961     DownloadItem &downloadItem, InnerProcessInfo &info, ChangedData &changedAssets)
1962 {
1963     CloudStorageUtils::EraseNoChangeAsset(downloadItem.assets);
1964     if (downloadItem.assets.empty()) { // Download data (include deleting)
1965         return E_OK;
1966     }
1967     bool isSharedTable = false;
1968     int errorCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
1969     if (errorCode != E_OK) {
1970         LOGE("[CloudSyncer] DownloadOneAssetRecord cannot judge the table is a shared table. %d", errorCode);
1971         return errorCode;
1972     }
1973     if (!isSharedTable) {
1974         errorCode = DownloadAssetsOneByOne(info, downloadItem, downloadItem.assets);
1975         if (errorCode == -E_NOT_SET) {
1976             return -E_NOT_SET;
1977         }
1978     } else {
1979         // share table will not download asset, need to reset the status
1980         for (auto &entry: downloadItem.assets) {
1981             for (auto &asset: entry.second) {
1982                 asset.status = AssetStatus::NORMAL;
1983             }
1984         }
1985     }
1986     if (errorCode != E_OK) {
1987         info.downLoadInfo.failCount += 1;
1988         if (info.downLoadInfo.successCount == 0) {
1989             LOGW("[CloudSyncer] Invalid successCount");
1990         } else {
1991             info.downLoadInfo.successCount -= 1;
1992         }
1993     }
1994     if (!downloadItem.assets.empty()) {
1995         if (dupHashKeySet.find(downloadItem.hashKey) == dupHashKeySet.end()) {
1996             changedAssets.primaryData[CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy)].push_back(
1997                 downloadItem.primaryKeyValList);
1998         } else if (downloadItem.strategy == OpType::INSERT) {
1999             changedAssets.primaryData[ChangeType::OP_UPDATE].push_back(downloadItem.primaryKeyValList);
2000         }
2001     }
2002 
2003     return errorCode;
2004 }
2005 
GetSyncParamForDownload(TaskId taskId,SyncParam & param)2006 int CloudSyncer::GetSyncParamForDownload(TaskId taskId, SyncParam &param)
2007 {
2008     int ret = E_OK;
2009     if (IsCurrentTableResume(taskId, false)) {
2010         std::lock_guard<std::mutex> autoLock(dataLock_);
2011         if (resumeTaskInfos_[taskId].syncParam.tableName == currentContext_.tableName) {
2012             param = resumeTaskInfos_[taskId].syncParam;
2013             resumeTaskInfos_[taskId].syncParam = {};
2014             ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
2015             if (ret != E_OK) {
2016                 LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data when table is resume: %d.", ret);
2017             }
2018             LOGD("[CloudSyncer] Get sync param from cache");
2019             return E_OK;
2020         }
2021     }
2022     ret = GetCurrentTableName(param.tableName);
2023     if (ret != E_OK) {
2024         LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
2025         return ret;
2026     }
2027     param.info.tableName = param.tableName;
2028     std::vector<Field> assetFields;
2029     // only no primary key and composite primary key contains rowid.
2030     ret = storageProxy_->GetPrimaryColNamesWithAssetsFields(param.tableName, param.pkColNames, assetFields);
2031     if (ret != E_OK) {
2032         LOGE("[CloudSyncer] Cannot get primary column names: %d", ret);
2033         return ret;
2034     }
2035     {
2036         std::lock_guard<std::mutex> autoLock(dataLock_);
2037         currentContext_.assetFields[currentContext_.tableName] = assetFields;
2038     }
2039     param.isSinglePrimaryKey = CloudSyncUtils::IsSinglePrimaryKey(param.pkColNames);
2040     if (!IsModeForcePull(taskId) && (!IsPriorityTask(taskId) || IsQueryListEmpty(taskId))) {
2041         ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
2042         if (ret != E_OK) {
2043             LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data: %d.", ret);
2044         }
2045         if (!IsCurrentTaskResume(taskId)) {
2046             ReloadCloudWaterMarkIfNeed(param.tableName, param.cloudWaterMark);
2047         }
2048     }
2049     currentContext_.notifier->GetDownloadInfoByTableName(param.info);
2050     return ret;
2051 }
2052 
IsCurrentTaskResume(TaskId taskId)2053 bool CloudSyncer::IsCurrentTaskResume(TaskId taskId)
2054 {
2055     std::lock_guard<std::mutex> autoLock(dataLock_);
2056     return cloudTaskInfos_[taskId].resume;
2057 }
2058 
IsCurrentTableResume(TaskId taskId,bool upload)2059 bool CloudSyncer::IsCurrentTableResume(TaskId taskId, bool upload)
2060 {
2061     std::lock_guard<std::mutex> autoLock(dataLock_);
2062     if (!cloudTaskInfos_[taskId].resume) {
2063         return false;
2064     }
2065     if (currentContext_.tableName != resumeTaskInfos_[taskId].context.tableName) {
2066         return false;
2067     }
2068     return upload == resumeTaskInfos_[taskId].upload;
2069 }
2070 
DownloadDataFromCloud(TaskId taskId,SyncParam & param,bool & abort,bool isFirstDownload)2071 int CloudSyncer::DownloadDataFromCloud(TaskId taskId, SyncParam &param, bool &abort,
2072     bool isFirstDownload)
2073 {
2074     // Get cloud data after cloud water mark
2075     param.info.tableStatus = ProcessStatus::PROCESSING;
2076     param.downloadData = {};
2077     int ret = QueryCloudData(taskId, param.info.tableName, param.cloudWaterMark, param.downloadData);
2078     if (ret == -E_QUERY_END) {
2079         // Won't break here since downloadData may not be null
2080         param.isLastBatch = true;
2081     } else if (ret != E_OK) {
2082         std::lock_guard<std::mutex> autoLock(dataLock_);
2083         param.info.tableStatus = ProcessStatus::FINISHED;
2084         currentContext_.notifier->UpdateProcess(param.info);
2085         abort = true;
2086         return ret;
2087     }
2088     if (param.downloadData.data.empty()) {
2089         if (ret == E_OK || isFirstDownload) {
2090             LOGD("[CloudSyncer] try to query cloud data use increment water mark");
2091             UpdateCloudWaterMark(taskId, param);
2092             // Cloud water may change on the cloud, it needs to be saved here
2093             SaveCloudWaterMark(param.tableName, taskId);
2094         }
2095         if (isFirstDownload) {
2096             NotifyInEmptyDownload(taskId, param.info);
2097         }
2098         abort = true;
2099     }
2100     return E_OK;
2101 }
2102 
GetDownloadAssetIndex(TaskId taskId)2103 size_t CloudSyncer::GetDownloadAssetIndex(TaskId taskId)
2104 {
2105     size_t index = 0u;
2106     std::lock_guard<std::mutex> autoLock(dataLock_);
2107     if (resumeTaskInfos_[taskId].lastDownloadIndex != 0u) {
2108         index = resumeTaskInfos_[taskId].lastDownloadIndex;
2109         resumeTaskInfos_[taskId].lastDownloadIndex = 0u;
2110     }
2111     return index;
2112 }
2113 
GetCurrentTableUploadBatchIndex()2114 uint32_t CloudSyncer::GetCurrentTableUploadBatchIndex()
2115 {
2116     std::lock_guard<std::mutex> autoLock(dataLock_);
2117     return currentContext_.notifier->GetUploadBatchIndex(currentContext_.tableName);
2118 }
2119 
ResetCurrentTableUploadBatchIndex()2120 void CloudSyncer::ResetCurrentTableUploadBatchIndex()
2121 {
2122     std::lock_guard<std::mutex> autoLock(dataLock_);
2123     currentContext_.notifier->ResetUploadBatchIndex(currentContext_.tableName);
2124 }
2125 
RecordWaterMark(TaskId taskId,Timestamp waterMark)2126 void CloudSyncer::RecordWaterMark(TaskId taskId, Timestamp waterMark)
2127 {
2128     std::lock_guard<std::mutex> autoLock(dataLock_);
2129     resumeTaskInfos_[taskId].lastLocalWatermark = waterMark;
2130 }
2131 
GetResumeWaterMark(TaskId taskId)2132 Timestamp CloudSyncer::GetResumeWaterMark(TaskId taskId)
2133 {
2134     std::lock_guard<std::mutex> autoLock(dataLock_);
2135     return resumeTaskInfos_[taskId].lastLocalWatermark;
2136 }
2137 
GetInnerProcessInfo(const std::string & tableName,UploadParam & uploadParam)2138 CloudSyncer::InnerProcessInfo CloudSyncer::GetInnerProcessInfo(const std::string &tableName, UploadParam &uploadParam)
2139 {
2140     InnerProcessInfo info;
2141     info.tableName = tableName;
2142     info.tableStatus = ProcessStatus::PROCESSING;
2143     ReloadUploadInfoIfNeed(uploadParam.taskId, uploadParam, info);
2144     return info;
2145 }
2146 
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)2147 void CloudSyncer::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
2148 {
2149     cloudDB_.SetGenCloudVersionCallback(callback);
2150 }
2151 
CopyAndClearTaskInfos()2152 std::vector<CloudSyncer::CloudTaskInfo> CloudSyncer::CopyAndClearTaskInfos()
2153 {
2154     std::vector<CloudTaskInfo> infoList;
2155     std::lock_guard<std::mutex> autoLock(dataLock_);
2156     for (const auto &item: cloudTaskInfos_) {
2157         infoList.push_back(item.second);
2158     }
2159     taskQueue_.clear();
2160     priorityTaskQueue_.clear();
2161     cloudTaskInfos_.clear();
2162     resumeTaskInfos_.clear();
2163     currentContext_.notifier = nullptr;
2164     return infoList;
2165 }
2166 
WaitCurTaskFinished()2167 void CloudSyncer::WaitCurTaskFinished()
2168 {
2169     LOGD("[CloudSyncer] begin wait current task finished");
2170     std::unique_lock<std::mutex> uniqueLock(dataLock_);
2171     contextCv_.wait(uniqueLock, [this]() {
2172         return currentContext_.currentTaskId == INVALID_TASK_ID;
2173     });
2174     LOGD("[CloudSyncer] current task has been finished");
2175 }
2176 } // namespace DistributedDB
2177