1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "cloud_syncer.h"
16
17 #include <cstdint>
18 #include <unordered_map>
19 #include <utility>
20
21 #include "cloud/asset_operation_utils.h"
22 #include "cloud/cloud_db_constant.h"
23 #include "cloud/cloud_storage_utils.h"
24 #include "cloud/icloud_db.h"
25 #include "cloud_sync_tag_assets.h"
26 #include "cloud_sync_utils.h"
27 #include "db_dfx_adapter.h"
28 #include "db_errno.h"
29 #include "log_print.h"
30 #include "runtime_context.h"
31 #include "storage_proxy.h"
32 #include "store_types.h"
33 #include "strategy_factory.h"
34 #include "version.h"
35
36 namespace DistributedDB {
CloudSyncer(std::shared_ptr<StorageProxy> storageProxy,bool isKvScene,SingleVerConflictResolvePolicy policy)37 CloudSyncer::CloudSyncer(
38 std::shared_ptr<StorageProxy> storageProxy, bool isKvScene, SingleVerConflictResolvePolicy policy)
39 : lastTaskId_(INVALID_TASK_ID),
40 storageProxy_(std::move(storageProxy)),
41 queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
42 closed_(false),
43 timerId_(0u),
44 isKvScene_(isKvScene),
45 policy_(policy)
46 {
47 if (storageProxy_ != nullptr) {
48 id_ = storageProxy_->GetIdentify();
49 }
50 InitCloudSyncStateMachine();
51 }
52
Sync(const std::vector<DeviceID> & devices,SyncMode mode,const std::vector<std::string> & tables,const SyncProcessCallback & callback,int64_t waitTime)53 int CloudSyncer::Sync(const std::vector<DeviceID> &devices, SyncMode mode,
54 const std::vector<std::string> &tables, const SyncProcessCallback &callback, int64_t waitTime)
55 {
56 CloudTaskInfo taskInfo;
57 taskInfo.mode = mode;
58 taskInfo.table = tables;
59 taskInfo.callback = callback;
60 taskInfo.timeout = waitTime;
61 taskInfo.devices = devices;
62 for (const auto &item: tables) {
63 QuerySyncObject syncObject;
64 syncObject.SetTableName(item);
65 taskInfo.queryList.push_back(syncObject);
66 }
67 return Sync(taskInfo);
68 }
69
Sync(const CloudTaskInfo & taskInfo)70 int CloudSyncer::Sync(const CloudTaskInfo &taskInfo)
71 {
72 int errCode = CloudSyncUtils::CheckParamValid(taskInfo.devices, taskInfo.mode);
73 if (errCode != E_OK) {
74 return errCode;
75 }
76 if (cloudDB_.IsNotExistCloudDB()) {
77 LOGE("[CloudSyncer] Not set cloudDB!");
78 return -E_CLOUD_ERROR;
79 }
80 if (closed_) {
81 LOGE("[CloudSyncer] DB is closed!");
82 return -E_DB_CLOSED;
83 }
84 CloudTaskInfo info = taskInfo;
85 info.status = ProcessStatus::PREPARED;
86 errCode = TryToAddSyncTask(std::move(info));
87 if (errCode != E_OK) {
88 return errCode;
89 }
90 if (taskInfo.priorityTask) {
91 MarkCurrentTaskPausedIfNeed();
92 }
93 return TriggerSync();
94 }
95
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDB)96 void CloudSyncer::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB)
97 {
98 cloudDB_.SetCloudDB(cloudDB);
99 LOGI("[CloudSyncer] SetCloudDB finish");
100 }
101
GetCloudDB() const102 const std::map<std::string, std::shared_ptr<ICloudDb>> CloudSyncer::GetCloudDB() const
103 {
104 return cloudDB_.GetCloudDB();
105 }
106
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)107 void CloudSyncer::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
108 {
109 storageProxy_->SetIAssetLoader(loader);
110 cloudDB_.SetIAssetLoader(loader);
111 LOGI("[CloudSyncer] SetIAssetLoader finish");
112 }
113
Close()114 void CloudSyncer::Close()
115 {
116 closed_ = true;
117 CloudSyncer::TaskId currentTask;
118 {
119 std::lock_guard<std::mutex> autoLock(dataLock_);
120 currentTask = currentContext_.currentTaskId;
121 }
122 // mark current task db_closed
123 SetTaskFailed(currentTask, -E_DB_CLOSED);
124 UnlockIfNeed();
125 cloudDB_.Close();
126 WaitCurTaskFinished();
127
128 // copy all task from queue
129 std::vector<CloudTaskInfo> infoList = CopyAndClearTaskInfos();
130 for (auto &info: infoList) {
131 LOGI("[CloudSyncer] finished taskId %" PRIu64 " with db closed.", info.taskId);
132 }
133 storageProxy_->Close();
134 }
135
StopAllTasks()136 void CloudSyncer::StopAllTasks()
137 {
138 CloudSyncer::TaskId currentTask;
139 {
140 std::lock_guard<std::mutex> autoLock(dataLock_);
141 currentTask = currentContext_.currentTaskId;
142 }
143 // mark current task user_change
144 SetTaskFailed(currentTask, -E_USER_CHANGE);
145 UnlockIfNeed();
146 WaitCurTaskFinished();
147
148 std::vector<CloudTaskInfo> infoList = CopyAndClearTaskInfos();
149 for (auto &info: infoList) {
150 LOGI("[CloudSyncer] finished taskId %" PRIu64 " with user changed.", info.taskId);
151 auto processNotifier = std::make_shared<ProcessNotifier>(this);
152 processNotifier->Init(info.table, info.devices, info.users);
153 info.errCode = -E_USER_CHANGE;
154 info.status = ProcessStatus::FINISHED;
155 processNotifier->NotifyProcess(info, {}, true);
156 }
157 }
158
TriggerSync()159 int CloudSyncer::TriggerSync()
160 {
161 if (closed_) {
162 return -E_DB_CLOSED;
163 }
164 RefObject::IncObjRef(this);
165 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
166 DoSyncIfNeed();
167 RefObject::DecObjRef(this);
168 });
169 if (errCode != E_OK) {
170 LOGW("[CloudSyncer] schedule sync task failed %d", errCode);
171 RefObject::DecObjRef(this);
172 }
173 return errCode;
174 }
175
SetProxyUser(const std::string & user)176 void CloudSyncer::SetProxyUser(const std::string &user)
177 {
178 std::lock_guard<std::mutex> autoLock(dataLock_);
179 storageProxy_->SetUser(user);
180 currentContext_.notifier->SetUser(user);
181 currentContext_.currentUserIndex = currentContext_.currentUserIndex + 1;
182 cloudDB_.SwitchCloudDB(user);
183 }
184
DoSyncIfNeed()185 void CloudSyncer::DoSyncIfNeed()
186 {
187 if (closed_) {
188 return;
189 }
190 // do all sync task in this loop
191 do {
192 // get taskId from queue
193 TaskId triggerTaskId = GetNextTaskId();
194 if (triggerTaskId == INVALID_TASK_ID) {
195 LOGD("[CloudSyncer] task queue empty");
196 break;
197 }
198 // pop taskId in queue
199 if (PrepareSync(triggerTaskId) != E_OK) {
200 break;
201 }
202 // do sync logic
203 std::vector<std::string> usersList;
204 {
205 std::lock_guard<std::mutex> autoLock(dataLock_);
206 usersList = cloudTaskInfos_[triggerTaskId].users;
207 currentContext_.currentUserIndex = 0;
208 }
209 int errCode = E_OK;
210 if (usersList.empty()) {
211 SetProxyUser("");
212 errCode = DoSync(triggerTaskId);
213 } else {
214 for (const auto &user : usersList) {
215 SetProxyUser(user);
216 errCode = DoSync(triggerTaskId);
217 }
218 }
219 LOGD("[CloudSyncer] DoSync finished, errCode %d", errCode);
220 } while (!closed_);
221 LOGD("[CloudSyncer] DoSyncIfNeed finished, closed status %d", static_cast<int>(closed_));
222 }
223
DoSync(TaskId taskId)224 int CloudSyncer::DoSync(TaskId taskId)
225 {
226 std::lock_guard<std::mutex> lock(syncMutex_);
227 ResetCurrentTableUploadBatchIndex();
228 CloudTaskInfo taskInfo;
229 {
230 std::lock_guard<std::mutex> autoLock(dataLock_);
231 taskInfo = cloudTaskInfos_[taskId];
232 cloudDB_.SetPrepareTraceId(taskInfo.prepareTraceId); // SetPrepareTraceId before task started
233 LOGD("[CloudSyncer] DoSync get taskInfo, taskId is: %llu, prepareTraceId is:%s.",
234 static_cast<unsigned long long>(taskInfo.taskId), taskInfo.prepareTraceId.c_str());
235 }
236 bool needUpload = true;
237 bool isNeedFirstDownload = false;
238 {
239 std::lock_guard<std::mutex> autoLock(dataLock_);
240 needUpload = currentContext_.strategy->JudgeUpload();
241 // 1. if the locker is already exist, directly reuse the lock, no need do the first download
242 // 2. if the task(resume task) is already be tagged need upload data, no need do the first download
243 isNeedFirstDownload = (currentContext_.locker == nullptr) && (!currentContext_.isNeedUpload);
244 }
245 int errCode = E_OK;
246 bool isFirstDownload = true;
247 if (isNeedFirstDownload) {
248 // do first download
249 errCode = DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
250 SetTaskFailed(taskId, errCode);
251 if (errCode != E_OK) {
252 SyncMachineDoFinished();
253 return errCode;
254 }
255 bool isActuallyNeedUpload = false; // whether the task actually has data to upload
256 {
257 std::lock_guard<std::mutex> autoLock(dataLock_);
258 isActuallyNeedUpload = currentContext_.isNeedUpload;
259 }
260 if (!isActuallyNeedUpload) {
261 LOGI("[CloudSyncer] no table need upload!");
262 SyncMachineDoFinished();
263 return E_OK;
264 }
265 isFirstDownload = false;
266 }
267
268 {
269 std::lock_guard<std::mutex> autoLock(dataLock_);
270 currentContext_.isFirstDownload = isFirstDownload;
271 currentContext_.isRealNeedUpload = needUpload;
272 }
273 errCode = DoSyncInner(taskInfo);
274 return errCode;
275 }
276
PrepareAndUpload(const CloudTaskInfo & taskInfo,size_t index)277 int CloudSyncer::PrepareAndUpload(const CloudTaskInfo &taskInfo, size_t index)
278 {
279 {
280 std::lock_guard<std::mutex> autoLock(dataLock_);
281 currentContext_.tableName = taskInfo.table[index];
282 }
283 int errCode = CheckTaskIdValid(taskInfo.taskId);
284 if (errCode != E_OK) {
285 LOGE("[CloudSyncer] task is invalid, abort sync");
286 return errCode;
287 }
288 if (taskInfo.table.empty()) {
289 LOGE("[CloudSyncer] Invalid taskInfo table");
290 return -E_INVALID_ARGS;
291 }
292 errCode = DoUpload(taskInfo.taskId, index == (taskInfo.table.size() - 1u), taskInfo.lockAction);
293 if (errCode == -E_CLOUD_VERSION_CONFLICT) {
294 {
295 std::lock_guard<std::mutex> autoLock(dataLock_);
296 currentContext_.processRecorder->MarkDownloadFinish(currentContext_.currentUserIndex,
297 taskInfo.table[index], false);
298 LOGI("[CloudSyncer] upload version conflict, index:%zu", index);
299 }
300 return errCode;
301 }
302 if (errCode != E_OK) {
303 LOGE("[CloudSyncer] upload failed %d", errCode);
304 return errCode;
305 }
306 return errCode;
307 }
308
DoUploadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload)309 int CloudSyncer::DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload)
310 {
311 if (!needUpload) {
312 return E_OK;
313 }
314 int errCode = storageProxy_->StartTransaction();
315 if (errCode != E_OK) {
316 LOGE("[CloudSyncer] start transaction failed before doing upload.");
317 return errCode;
318 }
319 for (size_t i = 0u; i < taskInfo.table.size(); ++i) {
320 LOGD("[CloudSyncer] try upload table, index: %zu", i);
321 if (IsTableFinishInUpload(taskInfo.table[i])) {
322 continue;
323 }
324 errCode = PrepareAndUpload(taskInfo, i);
325 if (errCode == -E_TASK_PAUSED) { // should re download [paused table , last table]
326 for (size_t j = i; j < taskInfo.table.size(); ++j) {
327 MarkDownloadFinishIfNeed(taskInfo.table[j], false);
328 }
329 }
330 if (errCode != E_OK) {
331 break;
332 }
333 MarkUploadFinishIfNeed(taskInfo.table[i]);
334 }
335 if (errCode == -E_TASK_PAUSED) {
336 std::lock_guard<std::mutex> autoLock(dataLock_);
337 resumeTaskInfos_[taskInfo.taskId].upload = true;
338 }
339 if (errCode == E_OK || errCode == -E_TASK_PAUSED) {
340 int commitErrorCode = storageProxy_->Commit();
341 if (commitErrorCode != E_OK) {
342 LOGE("[CloudSyncer] cannot commit transaction: %d.", commitErrorCode);
343 }
344 } else {
345 int rollBackErrorCode = storageProxy_->Rollback();
346 if (rollBackErrorCode != E_OK) {
347 LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
348 }
349 }
350 return errCode;
351 }
352
SyncMachineDoDownload()353 CloudSyncEvent CloudSyncer::SyncMachineDoDownload()
354 {
355 CloudTaskInfo taskInfo;
356 bool needUpload;
357 bool isFirstDownload;
358 {
359 std::lock_guard<std::mutex> autoLock(dataLock_);
360 taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
361 needUpload = currentContext_.isRealNeedUpload;
362 isFirstDownload = currentContext_.isFirstDownload;
363 }
364 int errCode = E_OK;
365 if (IsLockInDownload()) {
366 errCode = LockCloudIfNeed(taskInfo.taskId);
367 }
368 if (errCode != E_OK) {
369 return SetCurrentTaskFailedInMachine(errCode);
370 }
371 errCode = DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
372 if (errCode != E_OK) {
373 if (errCode == -E_TASK_PAUSED) {
374 DBDfxAdapter::ReportBehavior(
375 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::CANCLE, errCode});
376 } else {
377 DBDfxAdapter::ReportBehavior(
378 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::FAIL, errCode});
379 }
380 return SetCurrentTaskFailedInMachine(errCode);
381 }
382 DBDfxAdapter::ReportBehavior(
383 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::SUCC, errCode});
384 return CloudSyncEvent::DOWNLOAD_FINISHED_EVENT;
385 }
386
SyncMachineDoUpload()387 CloudSyncEvent CloudSyncer::SyncMachineDoUpload()
388 {
389 CloudTaskInfo taskInfo;
390 bool needUpload;
391 {
392 std::lock_guard<std::mutex> autoLock(dataLock_);
393 taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
394 needUpload = currentContext_.isRealNeedUpload;
395 }
396 DBDfxAdapter::ReportBehavior(
397 {__func__, Scene::CLOUD_SYNC, State::BEGIN, Stage::CLOUD_UPLOAD, StageResult::SUCC, E_OK});
398 int errCode = DoUploadInNeed(taskInfo, needUpload);
399 if (errCode == -E_CLOUD_VERSION_CONFLICT) {
400 DBDfxAdapter::ReportBehavior(
401 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::FAIL, errCode});
402 return CloudSyncEvent::REPEAT_CHECK_EVENT;
403 }
404 if (errCode != E_OK) {
405 {
406 std::lock_guard<std::mutex> autoLock(dataLock_);
407 cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
408 }
409 if (errCode == -E_TASK_PAUSED) {
410 DBDfxAdapter::ReportBehavior(
411 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::CANCLE, errCode});
412 } else {
413 DBDfxAdapter::ReportBehavior(
414 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::FAIL, errCode});
415 }
416 return CloudSyncEvent::ERROR_EVENT;
417 }
418 DBDfxAdapter::ReportBehavior(
419 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::SUCC, errCode});
420 return CloudSyncEvent::UPLOAD_FINISHED_EVENT;
421 }
422
SyncMachineDoFinished()423 CloudSyncEvent CloudSyncer::SyncMachineDoFinished()
424 {
425 UnlockIfNeed();
426 TaskId taskId;
427 int errCode;
428 int currentUserIndex;
429 int userListSize;
430 {
431 std::lock_guard<std::mutex> autoLock(dataLock_);
432 taskId = currentContext_.currentTaskId;
433 errCode = cloudTaskInfos_[currentContext_.currentTaskId].errCode;
434 currentUserIndex = currentContext_.currentUserIndex;
435 userListSize = static_cast<int>(cloudTaskInfos_[taskId].users.size());
436 }
437 if (currentUserIndex >= userListSize) {
438 {
439 std::lock_guard<std::mutex> autoLock(dataLock_);
440 cloudTaskInfos_[currentContext_.currentTaskId].errCode = E_OK;
441 }
442 DoFinished(taskId, errCode);
443 } else {
444 CloudTaskInfo taskInfo;
445 {
446 std::lock_guard<std::mutex> autoLock(dataLock_);
447 taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
448 }
449 taskInfo.status = ProcessStatus::FINISHED;
450 currentContext_.notifier->NotifyProcess(taskInfo, {}, true);
451 {
452 std::lock_guard<std::mutex> autoLock(dataLock_);
453 cloudTaskInfos_[currentContext_.currentTaskId].errCode = E_OK;
454 }
455 }
456 return CloudSyncEvent::ALL_TASK_FINISHED_EVENT;
457 }
458
DoSyncInner(const CloudTaskInfo & taskInfo)459 int CloudSyncer::DoSyncInner(const CloudTaskInfo &taskInfo)
460 {
461 cloudSyncStateMachine_.SwitchStateAndStep(CloudSyncEvent::START_SYNC_EVENT);
462 DBDfxAdapter::ReportBehavior(
463 {__func__, Scene::CLOUD_SYNC, State::BEGIN, Stage::CLOUD_SYNC, StageResult::SUCC, E_OK});
464 return E_OK;
465 }
466
DoFinished(TaskId taskId,int errCode)467 void CloudSyncer::DoFinished(TaskId taskId, int errCode)
468 {
469 storageProxy_->OnSyncFinish();
470 if (errCode == -E_TASK_PAUSED) {
471 DBDfxAdapter::ReportBehavior(
472 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_SYNC, StageResult::CANCLE, errCode});
473 LOGD("[CloudSyncer] taskId %" PRIu64 " was paused, it won't be finished now", taskId);
474 {
475 std::lock_guard<std::mutex> autoLock(dataLock_);
476 resumeTaskInfos_[taskId].context = std::move(currentContext_);
477 currentContext_.locker = resumeTaskInfos_[taskId].context.locker;
478 resumeTaskInfos_[taskId].context.locker = nullptr;
479 ClearCurrentContextWithoutLock();
480 }
481 contextCv_.notify_one();
482 return;
483 }
484 {
485 std::lock_guard<std::mutex> autoLock(dataLock_);
486 taskQueue_.remove(taskId);
487 priorityTaskQueue_.remove(taskId);
488 }
489 ClearContextAndNotify(taskId, errCode);
490 }
491
492 /**
493 * UpdateChangedData will be used for Insert case, which we can only get rowid after we saved data in db.
494 */
UpdateChangedData(SyncParam & param,DownloadList & assetsDownloadList)495 int CloudSyncer::UpdateChangedData(SyncParam ¶m, DownloadList &assetsDownloadList)
496 {
497 if (param.withoutRowIdData.insertData.empty() && param.withoutRowIdData.updateData.empty()) {
498 return E_OK;
499 }
500 int ret = E_OK;
501 for (size_t j : param.withoutRowIdData.insertData) {
502 VBucket &datum = param.downloadData.data[j];
503 std::vector<Type> primaryValues;
504 ret = CloudSyncUtils::GetCloudPkVals(datum, param.changedData.field,
505 std::get<int64_t>(datum[CloudDbConstant::ROW_ID_FIELD_NAME]), primaryValues);
506 if (ret != E_OK) {
507 LOGE("[CloudSyncer] updateChangedData cannot get primaryValues");
508 return ret;
509 }
510 param.changedData.primaryData[ChangeType::OP_INSERT].push_back(primaryValues);
511 }
512 for (const auto &tuple : param.withoutRowIdData.assetInsertData) {
513 size_t downloadIndex = std::get<0>(tuple);
514 VBucket &datum = param.downloadData.data[downloadIndex];
515 size_t insertIdx = std::get<1>(tuple);
516 std::vector<Type> &pkVal = std::get<5>(assetsDownloadList[insertIdx]); // 5 means primary key list
517 pkVal[0] = datum[CloudDbConstant::ROW_ID_FIELD_NAME];
518 }
519 for (const auto &tuple : param.withoutRowIdData.updateData) {
520 size_t downloadIndex = std::get<0>(tuple);
521 size_t updateIndex = std::get<1>(tuple);
522 VBucket &datum = param.downloadData.data[downloadIndex];
523 size_t size = param.changedData.primaryData[ChangeType::OP_UPDATE].size();
524 if (updateIndex >= size) {
525 LOGE("[CloudSyncer] updateIndex is invalid. index=%zu, size=%zu", updateIndex, size);
526 return -E_INTERNAL_ERROR;
527 }
528 if (param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex].empty()) {
529 LOGE("[CloudSyncer] primary key value list should not be empty.");
530 return -E_INTERNAL_ERROR;
531 }
532 // no primary key or composite primary key, the first element is rowid
533 param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex][0] =
534 datum[CloudDbConstant::ROW_ID_FIELD_NAME];
535 }
536 return ret;
537 }
538
IsDataContainDuplicateAsset(const std::vector<Field> & assetFields,VBucket & data)539 bool CloudSyncer::IsDataContainDuplicateAsset(const std::vector<Field> &assetFields, VBucket &data)
540 {
541 for (const auto &assetField : assetFields) {
542 if (assetField.type == TYPE_INDEX<Assets> && data[assetField.colName].index() == TYPE_INDEX<Assets>) {
543 if (CloudStorageUtils::IsAssetsContainDuplicateAsset(std::get<Assets>(data[assetField.colName]))) {
544 return true;
545 }
546 }
547 }
548 return false;
549 }
550
IsDataContainAssets()551 bool CloudSyncer::IsDataContainAssets()
552 {
553 std::lock_guard<std::mutex> autoLock(dataLock_);
554 bool hasTable = (currentContext_.assetFields.find(currentContext_.tableName) != currentContext_.assetFields.end());
555 if (!hasTable) {
556 LOGW("[CloudSyncer] failed to get assetFields, because tableName doesn't exist in currentContext, %d.",
557 -E_INTERNAL_ERROR);
558 return false;
559 }
560 if (currentContext_.assetFields[currentContext_.tableName].empty()) {
561 return false;
562 }
563 return true;
564 }
565
TagAssetsInSingleRecord(VBucket & coveredData,VBucket & beCoveredData,bool setNormalStatus,int & errCode)566 std::map<std::string, Assets> CloudSyncer::TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData,
567 bool setNormalStatus, int &errCode)
568 {
569 // Define a map to store the result
570 std::map<std::string, Assets> res = {};
571 std::vector<Field> assetFields;
572 {
573 std::lock_guard<std::mutex> autoLock(dataLock_);
574 assetFields = currentContext_.assetFields[currentContext_.tableName];
575 }
576 // For every column contain asset or assets, assetFields are in context
577 for (const Field &assetField : assetFields) {
578 Assets assets = TagAssetsInSingleCol(coveredData, beCoveredData, assetField, setNormalStatus, errCode);
579 if (!assets.empty()) {
580 res[assetField.colName] = assets;
581 }
582 if (errCode != E_OK) {
583 break;
584 }
585 }
586 return res;
587 }
588
FillCloudAssets(const std::string & tableName,VBucket & normalAssets,VBucket & failedAssets)589 int CloudSyncer::FillCloudAssets(const std::string &tableName, VBucket &normalAssets,
590 VBucket &failedAssets)
591 {
592 int ret = E_OK;
593 if (normalAssets.size() > 1) {
594 ret = storageProxy_->FillCloudAssetForDownload(tableName, normalAssets, true);
595 if (ret != E_OK) {
596 LOGE("[CloudSyncer] Can not fill normal cloud assets for download");
597 return ret;
598 }
599 }
600 if (failedAssets.size() > 1) {
601 ret = storageProxy_->FillCloudAssetForDownload(tableName, failedAssets, false);
602 if (ret != E_OK) {
603 LOGE("[CloudSyncer] Can not fill abnormal assets for download");
604 return ret;
605 }
606 }
607 return E_OK;
608 }
609
HandleDownloadResult(const DownloadItem & downloadItem,const std::string & tableName,DownloadCommitList & commitList,uint32_t & successCount)610 int CloudSyncer::HandleDownloadResult(const DownloadItem &downloadItem, const std::string &tableName,
611 DownloadCommitList &commitList, uint32_t &successCount)
612 {
613 successCount = 0;
614 int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
615 if (errCode != E_OK) {
616 LOGE("[CloudSyncer] start transaction Failed before handle download.");
617 return errCode;
618 }
619 errCode = CommitDownloadAssets(downloadItem, tableName, commitList, successCount);
620 if (errCode != E_OK) {
621 successCount = 0;
622 int ret = E_OK;
623 if (errCode == -E_REMOVE_ASSETS_FAILED) {
624 // remove assets failed no effect to asset status, just commit
625 ret = storageProxy_->Commit();
626 } else {
627 ret = storageProxy_->Rollback();
628 }
629 LOGE("[CloudSyncer] commit download assets failed %d commit/rollback ret %d", errCode, ret);
630 return errCode;
631 }
632 errCode = storageProxy_->Commit();
633 if (errCode != E_OK) {
634 successCount = 0;
635 LOGE("[CloudSyncer] commit failed %d", errCode);
636 }
637 return errCode;
638 }
639
CloudDbDownloadAssets(TaskId taskId,InnerProcessInfo & info,const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)640 int CloudSyncer::CloudDbDownloadAssets(TaskId taskId, InnerProcessInfo &info, const DownloadList &downloadList,
641 const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
642 {
643 int downloadStatus = E_OK;
644 {
645 std::lock_guard<std::mutex> autoLock(dataLock_);
646 downloadStatus = resumeTaskInfos_[taskId].downloadStatus;
647 resumeTaskInfos_[taskId].downloadStatus = E_OK;
648 }
649 int errorCode = E_OK;
650 DownloadCommitList commitList;
651 for (size_t i = GetDownloadAssetIndex(taskId); i < downloadList.size(); i++) {
652 errorCode = CheckTaskIdValid(taskId);
653 if (errorCode != E_OK) {
654 std::lock_guard<std::mutex> autoLock(dataLock_);
655 resumeTaskInfos_[taskId].lastDownloadIndex = i;
656 resumeTaskInfos_[taskId].downloadStatus = downloadStatus;
657 break;
658 }
659 DownloadItem downloadItem;
660 GetDownloadItem(downloadList, i, downloadItem);
661 errorCode = DownloadOneAssetRecord(dupHashKeySet, downloadList, downloadItem, info, changedAssets);
662 if (errorCode == -E_NOT_SET) {
663 info.downLoadInfo.failCount += (downloadList.size() - i);
664 info.downLoadInfo.successCount -= (downloadList.size() - i);
665 return errorCode;
666 }
667 if (downloadItem.strategy == OpType::DELETE) {
668 downloadItem.assets = {};
669 downloadItem.gid = "";
670 }
671 // Process result of each asset
672 commitList.push_back(std::make_tuple(downloadItem.gid, std::move(downloadItem.assets), errorCode == E_OK));
673 downloadStatus = downloadStatus == E_OK ? errorCode : downloadStatus;
674 int ret = CommitDownloadResult(downloadItem, info, commitList, errorCode);
675 if (ret != E_OK && ret != -E_REMOVE_ASSETS_FAILED) {
676 return ret;
677 }
678 downloadStatus = downloadStatus == E_OK ? ret : downloadStatus;
679 }
680 LOGD("Download status is %d", downloadStatus);
681 return errorCode == E_OK ? downloadStatus : errorCode;
682 }
683
DownloadAssets(InnerProcessInfo & info,const std::vector<std::string> & pKColNames,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)684 int CloudSyncer::DownloadAssets(InnerProcessInfo &info, const std::vector<std::string> &pKColNames,
685 const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
686 {
687 if (!IsDataContainAssets()) {
688 return E_OK;
689 }
690 // update changed data info
691 if (!CloudSyncUtils::IsChangeDataEmpty(changedAssets)) {
692 // changedData.primaryData should have no member inside
693 return -E_INVALID_ARGS;
694 }
695 changedAssets.tableName = info.tableName;
696 changedAssets.type = ChangedDataType::ASSET;
697 changedAssets.field = pKColNames;
698
699 // Get AssetDownloadList
700 DownloadList changeList;
701 TaskId taskId;
702 {
703 std::lock_guard<std::mutex> autoLock(dataLock_);
704 changeList = currentContext_.assetDownloadList;
705 taskId = currentContext_.currentTaskId;
706 }
707 // Download data (include deleting) will handle return Code in this situation
708 int ret = CloudDbDownloadAssets(taskId, info, changeList, dupHashKeySet, changedAssets);
709 if (ret != E_OK) {
710 LOGE("[CloudSyncer] Can not download assets or can not handle download result %d", ret);
711 }
712 return ret;
713 }
714
GetAssetsFromVBucket(VBucket & data)715 std::map<std::string, Assets> CloudSyncer::GetAssetsFromVBucket(VBucket &data)
716 {
717 std::map<std::string, Assets> assets;
718 std::vector<Field> fields;
719 {
720 std::lock_guard<std::mutex> autoLock(dataLock_);
721 fields = currentContext_.assetFields[currentContext_.tableName];
722 }
723 for (const auto &field : fields) {
724 if (data.find(field.colName) != data.end()) {
725 if (field.type == TYPE_INDEX<Asset> && data[field.colName].index() == TYPE_INDEX<Asset>) {
726 assets[field.colName] = { std::get<Asset>(data[field.colName]) };
727 } else if (field.type == TYPE_INDEX<Assets> && data[field.colName].index() == TYPE_INDEX<Assets>) {
728 assets[field.colName] = std::get<Assets>(data[field.colName]);
729 } else {
730 Assets emptyAssets;
731 assets[field.colName] = emptyAssets;
732 }
733 }
734 }
735 return assets;
736 }
737
TagStatus(bool isExist,SyncParam & param,size_t idx,DataInfo & dataInfo,VBucket & localAssetInfo)738 int CloudSyncer::TagStatus(bool isExist, SyncParam ¶m, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo)
739 {
740 OpType strategyOpResult = OpType::NOT_HANDLE;
741 int errCode = TagStatusByStrategy(isExist, param, dataInfo, strategyOpResult);
742 if (errCode != E_OK) {
743 return errCode;
744 }
745 param.downloadData.opType[idx] = strategyOpResult;
746 if (!IsDataContainAssets()) {
747 return E_OK;
748 }
749 Key hashKey;
750 if (isExist) {
751 hashKey = dataInfo.localInfo.logInfo.hashKey;
752 }
753 return TagDownloadAssets(hashKey, idx, param, dataInfo, localAssetInfo);
754 }
755
TagDownloadAssets(const Key & hashKey,size_t idx,SyncParam & param,const DataInfo & dataInfo,VBucket & localAssetInfo)756 int CloudSyncer::TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam ¶m, const DataInfo &dataInfo,
757 VBucket &localAssetInfo)
758 {
759 int ret = E_OK;
760 OpType strategy = param.downloadData.opType[idx];
761 switch (strategy) {
762 case OpType::INSERT:
763 case OpType::UPDATE:
764 case OpType::DELETE:
765 ret = HandleTagAssets(hashKey, dataInfo, idx, param, localAssetInfo);
766 break;
767 case OpType::NOT_HANDLE:
768 case OpType::ONLY_UPDATE_GID:
769 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: { // means upload need this data
770 (void)TagAssetsInSingleRecord(localAssetInfo, param.downloadData.data[idx], true, ret);
771 for (const auto &[col, value]: localAssetInfo) {
772 param.downloadData.data[idx].insert_or_assign(col, value);
773 }
774 break;
775 }
776 default:
777 break;
778 }
779 return ret;
780 }
781
HandleTagAssets(const Key & hashKey,const DataInfo & dataInfo,size_t idx,SyncParam & param,VBucket & localAssetInfo)782 int CloudSyncer::HandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam ¶m,
783 VBucket &localAssetInfo)
784 {
785 Type prefix;
786 std::vector<Type> pkVals;
787 OpType strategy = param.downloadData.opType[idx];
788 bool isDelStrategy = (strategy == OpType::DELETE);
789 int ret = CloudSyncUtils::GetCloudPkVals(isDelStrategy ? dataInfo.localInfo.primaryKeys :
790 param.downloadData.data[idx], param.pkColNames, dataInfo.localInfo.logInfo.dataKey, pkVals);
791 if (ret != E_OK) {
792 LOGE("[CloudSyncer] HandleTagAssets cannot get primary key value list. %d", ret);
793 return ret;
794 }
795 prefix = param.isSinglePrimaryKey ? pkVals[0] : prefix;
796 if (param.isSinglePrimaryKey && prefix.index() == TYPE_INDEX<Nil>) {
797 LOGE("[CloudSyncer] Invalid primary key type in TagStatus, it's Nil.");
798 return -E_INTERNAL_ERROR;
799 }
800 AssetOperationUtils::FilterDeleteAsset(param.downloadData.data[idx]);
801 std::map<std::string, Assets> assetsMap = TagAssetsInSingleRecord(param.downloadData.data[idx], localAssetInfo,
802 false, ret);
803 if (ret != E_OK) {
804 LOGE("[CloudSyncer] TagAssetsInSingleRecord report ERROR in download data");
805 return ret;
806 }
807 strategy = CloudSyncUtils::CalOpType(param, idx);
808 if (!param.isSinglePrimaryKey && strategy == OpType::INSERT) {
809 param.withoutRowIdData.assetInsertData.push_back(std::make_tuple(idx, param.assetsDownloadList.size()));
810 }
811 param.assetsDownloadList.push_back(
812 std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, strategy, assetsMap, hashKey,
813 pkVals, dataInfo.cloudLogInfo.timestamp));
814 return ret;
815 }
816
SaveDatum(SyncParam & param,size_t idx,std::vector<std::pair<Key,size_t>> & deletedList,std::map<std::string,LogInfo> & localLogInfoCache)817 int CloudSyncer::SaveDatum(SyncParam ¶m, size_t idx, std::vector<std::pair<Key, size_t>> &deletedList,
818 std::map<std::string, LogInfo> &localLogInfoCache)
819 {
820 int ret = PreHandleData(param.downloadData.data[idx], param.pkColNames);
821 if (ret != E_OK) {
822 LOGE("[CloudSyncer] Invalid download data:%d", ret);
823 return ret;
824 }
825 CloudSyncUtils::ModifyCloudDataTime(param.downloadData.data[idx]);
826 DataInfo dataInfo;
827 VBucket localAssetInfo;
828 bool isExist = true;
829 ret = GetLocalInfo(idx, param, dataInfo.localInfo, localLogInfoCache, localAssetInfo);
830 if (ret == -E_NOT_FOUND) {
831 isExist = false;
832 } else if (ret != E_OK) {
833 LOGE("[CloudSyncer] Cannot get info by primary key or gid: %d.", ret);
834 return ret;
835 }
836 // Get cloudLogInfo from cloud data
837 dataInfo.cloudLogInfo = CloudSyncUtils::GetCloudLogInfo(param.downloadData.data[idx]);
838 // Tag datum to get opType
839 ret = TagStatus(isExist, param, idx, dataInfo, localAssetInfo);
840 if (ret != E_OK) {
841 LOGE("[CloudSyncer] Cannot tag status: %d.", ret);
842 return ret;
843 }
844 CloudSyncUtils::UpdateLocalCache(param.downloadData.opType[idx], dataInfo.cloudLogInfo, dataInfo.localInfo.logInfo,
845 localLogInfoCache);
846 ret = CloudSyncUtils::SaveChangedData(param, idx, dataInfo, deletedList);
847 if (ret != E_OK) {
848 LOGE("[CloudSyncer] Cannot save changed data: %d.", ret);
849 }
850 return ret;
851 }
852
SaveData(CloudSyncer::TaskId taskId,SyncParam & param)853 int CloudSyncer::SaveData(CloudSyncer::TaskId taskId, SyncParam ¶m)
854 {
855 if (!CloudSyncUtils::IsChangeDataEmpty(param.changedData)) {
856 LOGE("[CloudSyncer] changedData.primaryData should have no member inside.");
857 return -E_INVALID_ARGS;
858 }
859 // Update download batch Info
860 param.info.downLoadInfo.batchIndex += 1;
861 param.info.downLoadInfo.total += param.downloadData.data.size();
862 int ret = E_OK;
863 DownloadList assetsDownloadList;
864 param.assetsDownloadList = assetsDownloadList;
865 param.deletePrimaryKeySet.clear();
866 param.dupHashKeySet.clear();
867 CloudSyncUtils::ClearWithoutData(param);
868 std::vector<std::pair<Key, size_t>> deletedList;
869 // use for record local delete status
870 std::map<std::string, LogInfo> localLogInfoCache;
871 for (size_t i = 0; i < param.downloadData.data.size(); i++) {
872 ret = SaveDatum(param, i, deletedList, localLogInfoCache);
873 if (ret != E_OK) {
874 param.info.downLoadInfo.failCount += param.downloadData.data.size();
875 LOGE("[CloudSyncer] Cannot save datum due to error code %d", ret);
876 return ret;
877 }
878 }
879 // Save assetsMap into current context
880 {
881 std::lock_guard<std::mutex> autoLock(dataLock_);
882 currentContext_.assetDownloadList = param.assetsDownloadList;
883 }
884 // save the data to the database by batch, downloadData will return rowid when insert data.
885 ret = storageProxy_->PutCloudSyncData(param.tableName, param.downloadData);
886 if (ret != E_OK) {
887 param.info.downLoadInfo.failCount += param.downloadData.data.size();
888 LOGE("[CloudSyncer] Cannot save the data to database with error code: %d.", ret);
889 return ret;
890 }
891 ret = UpdateChangedData(param, currentContext_.assetDownloadList);
892 if (ret != E_OK) {
893 param.info.downLoadInfo.failCount += param.downloadData.data.size();
894 LOGE("[CloudSyncer] Cannot update changed data: %d.", ret);
895 return ret;
896 }
897 // Update downloadInfo
898 param.info.downLoadInfo.successCount += param.downloadData.data.size();
899 // Get latest cloudWaterMark
900 VBucket &lastData = param.downloadData.data.back();
901 if (!IsQueryListEmpty(taskId) && param.isLastBatch) {
902 // the last batch of cursor in the conditional query is useless
903 param.cloudWaterMark = {};
904 } else {
905 param.cloudWaterMark = std::get<std::string>(lastData[CloudDbConstant::CURSOR_FIELD]);
906 }
907 return UpdateFlagForSavedRecord(param);
908 }
909
PreCheck(CloudSyncer::TaskId & taskId,const TableName & tableName)910 int CloudSyncer::PreCheck(CloudSyncer::TaskId &taskId, const TableName &tableName)
911 {
912 // Check Input and Context Validity
913 int ret = CheckTaskIdValid(taskId);
914 if (ret != E_OK) {
915 return ret;
916 }
917 {
918 std::lock_guard<std::mutex> autoLock(dataLock_);
919 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
920 LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: , %" PRIu64 ".", taskId);
921 return -E_INVALID_ARGS;
922 }
923 }
924 if (currentContext_.strategy == nullptr) {
925 LOGE("[CloudSyncer] Strategy has not been initialized");
926 return -E_INVALID_ARGS;
927 }
928 ret = storageProxy_->CheckSchema(tableName);
929 if (ret != E_OK) {
930 LOGE("[CloudSyncer] A schema error occurred on the table to be synced, %d", ret);
931 return ret;
932 }
933 return E_OK;
934 }
935
NeedNotifyChangedData(const ChangedData & changedData)936 bool CloudSyncer::NeedNotifyChangedData(const ChangedData &changedData)
937 {
938 TaskId taskId;
939 {
940 std::lock_guard<std::mutex> autoLock(dataLock_);
941 taskId = currentContext_.currentTaskId;
942 }
943 if (IsModeForcePush(taskId)) {
944 return false;
945 }
946 // when there have no data been changed, it don't need fill back
947 if (changedData.primaryData[OP_INSERT].empty() && changedData.primaryData[OP_UPDATE].empty() &&
948 changedData.primaryData[OP_DELETE].empty()) {
949 return false;
950 }
951 return true;
952 }
953
NotifyChangedData(ChangedData && changedData)954 int CloudSyncer::NotifyChangedData(ChangedData &&changedData)
955 {
956 if (!NeedNotifyChangedData(changedData)) {
957 return E_OK;
958 }
959 std::string deviceName;
960 {
961 std::lock_guard<std::mutex> autoLock(dataLock_);
962 std::vector<std::string> devices = currentContext_.notifier->GetDevices();
963 if (devices.empty()) {
964 DBDfxAdapter::ReportBehavior(
965 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, -E_CLOUD_ERROR});
966 LOGE("[CloudSyncer] CurrentContext do not contain device info");
967 return -E_CLOUD_ERROR;
968 }
969 // We use first device name as the target of NotifyChangedData
970 deviceName = devices[0];
971 }
972 int ret = storageProxy_->NotifyChangedData(deviceName, std::move(changedData));
973 if (ret != E_OK) {
974 DBDfxAdapter::ReportBehavior(
975 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, ret});
976 LOGE("[CloudSyncer] Cannot notify changed data while downloading, %d.", ret);
977 }
978 DBDfxAdapter::ReportBehavior(
979 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, ret});
980 return ret;
981 }
982
NotifyInDownload(CloudSyncer::TaskId taskId,SyncParam & param,bool isFirstDownload)983 void CloudSyncer::NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam ¶m, bool isFirstDownload)
984 {
985 if (!isFirstDownload && param.downloadData.data.empty()) {
986 // if the second download and there is no download data, do not notify
987 return;
988 }
989 std::lock_guard<std::mutex> autoLock(dataLock_);
990 if (currentContext_.strategy->JudgeUpload()) {
991 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], param.info);
992 } else {
993 if (param.isLastBatch) {
994 param.info.tableStatus = ProcessStatus::FINISHED;
995 }
996 if (cloudTaskInfos_[taskId].table.back() == param.tableName && param.isLastBatch) {
997 currentContext_.notifier->UpdateProcess(param.info);
998 } else {
999 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], param.info);
1000 }
1001 }
1002 }
1003
SaveDataInTransaction(CloudSyncer::TaskId taskId,SyncParam & param)1004 int CloudSyncer::SaveDataInTransaction(CloudSyncer::TaskId taskId, SyncParam ¶m)
1005 {
1006 int ret = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
1007 if (ret != E_OK) {
1008 LOGE("[CloudSyncer] Cannot start a transaction: %d.", ret);
1009 return ret;
1010 }
1011 (void)storageProxy_->SetCursorIncFlag(true);
1012 if (!IsModeForcePush(taskId)) {
1013 param.changedData.tableName = param.info.tableName;
1014 param.changedData.field = param.pkColNames;
1015 param.changedData.type = ChangedDataType::DATA;
1016 }
1017 ret = SaveData(taskId, param);
1018 (void)storageProxy_->SetCursorIncFlag(false);
1019 param.insertPk.clear();
1020 if (ret != E_OK) {
1021 LOGE("[CloudSyncer] cannot save data: %d.", ret);
1022 int rollBackErrorCode = storageProxy_->Rollback();
1023 if (rollBackErrorCode != E_OK) {
1024 LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
1025 } else {
1026 LOGI("[CloudSyncer] Roll back transaction success: %d.", ret);
1027 }
1028 return ret;
1029 }
1030 ret = storageProxy_->Commit();
1031 if (ret != E_OK) {
1032 LOGE("[CloudSyncer] Cannot commit a transaction: %d.", ret);
1033 }
1034 return ret;
1035 }
1036
DoDownloadAssets(bool skipSave,SyncParam & param)1037 int CloudSyncer::DoDownloadAssets(bool skipSave, SyncParam ¶m)
1038 {
1039 // Begin downloading assets
1040 ChangedData changedAssets;
1041 int ret = DownloadAssets(param.info, param.pkColNames, param.dupHashKeySet, changedAssets);
1042 bool isSharedTable = false;
1043 int errCode = storageProxy_->IsSharedTable(param.tableName, isSharedTable);
1044 if (errCode != E_OK) {
1045 LOGE("[CloudSyncer] HandleTagAssets cannot judge the table is a shared table. %d", errCode);
1046 return errCode;
1047 }
1048 if (!isSharedTable) {
1049 (void)NotifyChangedData(std::move(changedAssets));
1050 }
1051 if (ret == -E_TASK_PAUSED) {
1052 LOGD("[CloudSyncer] current task was paused, abort download asset");
1053 std::lock_guard<std::mutex> autoLock(dataLock_);
1054 resumeTaskInfos_[currentContext_.currentTaskId].skipQuery = true;
1055 return ret;
1056 } else if (skipSave) {
1057 std::lock_guard<std::mutex> autoLock(dataLock_);
1058 resumeTaskInfos_[currentContext_.currentTaskId].skipQuery = false;
1059 }
1060 if (ret != E_OK) {
1061 LOGE("[CloudSyncer] Cannot notify downloadAssets due to error %d", ret);
1062 }
1063 return ret;
1064 }
1065
SaveDataNotifyProcess(CloudSyncer::TaskId taskId,SyncParam & param)1066 int CloudSyncer::SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam ¶m)
1067 {
1068 ChangedData changedData;
1069 bool skipSave = false;
1070 {
1071 bool currentTableResume = IsCurrentTableResume(taskId, false);
1072 std::lock_guard<std::mutex> autoLock(dataLock_);
1073 if (currentTableResume && resumeTaskInfos_[currentContext_.currentTaskId].skipQuery) {
1074 skipSave = true;
1075 }
1076 }
1077 int ret;
1078 if (!skipSave) {
1079 param.changedData = changedData;
1080 param.downloadData.opType.resize(param.downloadData.data.size());
1081 param.downloadData.existDataKey.resize(param.downloadData.data.size());
1082 param.downloadData.existDataHashKey.resize(param.downloadData.data.size());
1083 ret = SaveDataInTransaction(taskId, param);
1084 if (ret != E_OK) {
1085 return ret;
1086 }
1087 // call OnChange to notify changedData object first time (without Assets)
1088 ret = NotifyChangedData(std::move(param.changedData));
1089 if (ret != E_OK) {
1090 LOGE("[CloudSyncer] Cannot notify changed data due to error %d", ret);
1091 return ret;
1092 }
1093 }
1094 ret = DoDownloadAssets(skipSave, param);
1095 if (ret != E_OK) {
1096 return ret;
1097 }
1098 UpdateCloudWaterMark(taskId, param);
1099 return E_OK;
1100 }
1101
NotifyInBatchUpload(const UploadParam & uploadParam,const InnerProcessInfo & innerProcessInfo,bool lastBatch)1102 void CloudSyncer::NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
1103 bool lastBatch)
1104 {
1105 CloudTaskInfo taskInfo;
1106 {
1107 std::lock_guard<std::mutex> autoLock(dataLock_);
1108 taskInfo = cloudTaskInfos_[uploadParam.taskId];
1109 }
1110 std::lock_guard<std::mutex> autoLock(dataLock_);
1111 if (uploadParam.lastTable && lastBatch) {
1112 currentContext_.notifier->UpdateProcess(innerProcessInfo);
1113 } else {
1114 currentContext_.notifier->NotifyProcess(taskInfo, innerProcessInfo);
1115 }
1116 }
1117
DoDownload(CloudSyncer::TaskId taskId,bool isFirstDownload)1118 int CloudSyncer::DoDownload(CloudSyncer::TaskId taskId, bool isFirstDownload)
1119 {
1120 SyncParam param;
1121 int errCode = GetSyncParamForDownload(taskId, param);
1122 if (errCode != E_OK) {
1123 LOGE("[CloudSyncer] get sync param for download failed %d", errCode);
1124 return errCode;
1125 }
1126 (void)storageProxy_->CreateTempSyncTrigger(param.tableName);
1127 errCode = DoDownloadInner(taskId, param, isFirstDownload);
1128 (void)storageProxy_->ClearAllTempSyncTrigger();
1129 if (errCode == -E_TASK_PAUSED) {
1130 // No need to handle ret.
1131 int ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
1132 if (ret != E_OK) {
1133 LOGE("[DoDownload] Cannot get cloud watermark : %d.", ret);
1134 }
1135 std::lock_guard<std::mutex> autoLock(dataLock_);
1136 resumeTaskInfos_[taskId].syncParam = std::move(param);
1137 }
1138 return errCode;
1139 }
1140
DoDownloadInner(CloudSyncer::TaskId taskId,SyncParam & param,bool isFirstDownload)1141 int CloudSyncer::DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam ¶m, bool isFirstDownload)
1142 {
1143 // Query data by batch until reaching end and not more data need to be download
1144 int ret = PreCheck(taskId, param.info.tableName);
1145 if (ret != E_OK) {
1146 return ret;
1147 }
1148 do {
1149 ret = DownloadOneBatch(taskId, param, isFirstDownload);
1150 if (ret != E_OK) {
1151 return ret;
1152 }
1153 } while (!param.isLastBatch);
1154 return E_OK;
1155 }
1156
NotifyInEmptyDownload(CloudSyncer::TaskId taskId,InnerProcessInfo & info)1157 void CloudSyncer::NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info)
1158 {
1159 std::lock_guard<std::mutex> autoLock(dataLock_);
1160 if (currentContext_.strategy->JudgeUpload()) {
1161 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1162 } else {
1163 info.tableStatus = FINISHED;
1164 if (cloudTaskInfos_[taskId].table.back() == info.tableName) {
1165 currentContext_.notifier->UpdateProcess(info);
1166 } else {
1167 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1168 }
1169 }
1170 }
1171
PreCheckUpload(CloudSyncer::TaskId & taskId,const TableName & tableName,Timestamp & localMark)1172 int CloudSyncer::PreCheckUpload(CloudSyncer::TaskId &taskId, const TableName &tableName, Timestamp &localMark)
1173 {
1174 int ret = PreCheck(taskId, tableName);
1175 if (ret != E_OK) {
1176 return ret;
1177 }
1178 {
1179 std::lock_guard<std::mutex> autoLock(dataLock_);
1180 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1181 LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: %" PRIu64 ".", taskId);
1182 return -E_INVALID_ARGS;
1183 }
1184 if ((cloudTaskInfos_[taskId].mode < SYNC_MODE_CLOUD_MERGE) ||
1185 (cloudTaskInfos_[taskId].mode > SYNC_MODE_CLOUD_FORCE_PUSH)) {
1186 LOGE("[CloudSyncer] Upload failed, invalid sync mode: %d.",
1187 static_cast<int>(cloudTaskInfos_[taskId].mode));
1188 return -E_INVALID_ARGS;
1189 }
1190 }
1191
1192 return ret;
1193 }
1194
SaveUploadData(Info & insertInfo,Info & updateInfo,Info & deleteInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)1195 int CloudSyncer::SaveUploadData(Info &insertInfo, Info &updateInfo, Info &deleteInfo, CloudSyncData &uploadData,
1196 InnerProcessInfo &innerProcessInfo)
1197 {
1198 int errCode = E_OK;
1199 if (!uploadData.delData.record.empty() && !uploadData.delData.extend.empty()) {
1200 errCode = BatchDelete(deleteInfo, uploadData, innerProcessInfo);
1201 if (errCode != E_OK) {
1202 return errCode;
1203 }
1204 }
1205
1206 if (!uploadData.updData.record.empty() && !uploadData.updData.extend.empty()) {
1207 errCode = BatchUpdate(updateInfo, uploadData, innerProcessInfo);
1208 if (errCode != E_OK) {
1209 return errCode;
1210 }
1211 }
1212
1213 if (!uploadData.insData.record.empty() && !uploadData.insData.extend.empty()) {
1214 errCode = BatchInsert(insertInfo, uploadData, innerProcessInfo);
1215 if (errCode != E_OK) {
1216 return errCode;
1217 }
1218 }
1219
1220 if (!uploadData.lockData.rowid.empty()) {
1221 errCode = storageProxy_->FillCloudLogAndAsset(OpType::LOCKED_NOT_HANDLE, uploadData);
1222 }
1223 return errCode;
1224 }
1225
DoBatchUpload(CloudSyncData & uploadData,UploadParam & uploadParam,InnerProcessInfo & innerProcessInfo)1226 int CloudSyncer::DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo)
1227 {
1228 int errCode = storageProxy_->FillCloudLogAndAsset(OpType::SET_UPLOADING, uploadData);
1229 if (errCode != E_OK) {
1230 return errCode;
1231 }
1232 Info insertInfo;
1233 Info updateInfo;
1234 Info deleteInfo;
1235 errCode = SaveUploadData(insertInfo, updateInfo, deleteInfo, uploadData, innerProcessInfo);
1236 if (errCode != E_OK) {
1237 return errCode;
1238 }
1239 bool lastBatch = innerProcessInfo.upLoadInfo.successCount == innerProcessInfo.upLoadInfo.total;
1240 if (lastBatch) {
1241 innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
1242 }
1243 // After each batch upload successed, call NotifyProcess
1244 NotifyInBatchUpload(uploadParam, innerProcessInfo, lastBatch);
1245
1246 // if batch upload successed, update local water mark
1247 // The cloud water mark cannot be updated here, because the cloud api doesn't return cursor here.
1248 errCode = PutWaterMarkAfterBatchUpload(uploadData.tableName, uploadParam);
1249 if (errCode != E_OK) {
1250 LOGE("[CloudSyncer] Failed to set local water mark when doing upload, %d.", errCode);
1251 }
1252 return errCode;
1253 }
1254
PutWaterMarkAfterBatchUpload(const std::string & tableName,UploadParam & uploadParam)1255 int CloudSyncer::PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam)
1256 {
1257 int errCode = E_OK;
1258 storageProxy_->ReleaseUploadRecord(tableName, uploadParam.mode, uploadParam.localMark);
1259 // if we use local cover cloud strategy, it won't update local water mark also.
1260 if (IsModeForcePush(uploadParam.taskId) || (IsPriorityTask(uploadParam.taskId) &&
1261 !IsQueryListEmpty(uploadParam.taskId))) {
1262 return E_OK;
1263 }
1264 errCode = storageProxy_->PutWaterMarkByMode(tableName, uploadParam.mode, uploadParam.localMark);
1265 if (errCode != E_OK) {
1266 LOGE("[CloudSyncer] Cannot set local water mark while Uploading, %d.", errCode);
1267 }
1268 return errCode;
1269 }
1270
DoUpload(CloudSyncer::TaskId taskId,bool lastTable,LockAction lockAction)1271 int CloudSyncer::DoUpload(CloudSyncer::TaskId taskId, bool lastTable, LockAction lockAction)
1272 {
1273 std::string tableName;
1274 int ret = GetCurrentTableName(tableName);
1275 if (ret != E_OK) {
1276 LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
1277 return ret;
1278 }
1279
1280 Timestamp localMark = 0u;
1281 ret = PreCheckUpload(taskId, tableName, localMark);
1282 if (ret != E_OK) {
1283 LOGE("[CloudSyncer] Doing upload sync pre check failed, %d.", ret);
1284 return ret;
1285 }
1286 ReloadWaterMarkIfNeed(taskId, localMark);
1287 storageProxy_->OnUploadStart();
1288
1289 int64_t count = 0;
1290 ret = storageProxy_->GetUploadCount(GetQuerySyncObject(tableName), IsModeForcePush(taskId),
1291 IsCompensatedTask(taskId), IsNeedGetLocalWater(taskId), count);
1292 LOGI("get upload count:%zu", count);
1293 if (ret != E_OK) {
1294 // GetUploadCount will return E_OK when upload count is zero.
1295 LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret);
1296 return ret;
1297 }
1298 if (count == 0) {
1299 UpdateProcessInfoWithoutUpload(taskId, tableName, !lastTable);
1300 return E_OK;
1301 }
1302 UploadParam param;
1303 param.count = count;
1304 param.lastTable = lastTable;
1305 param.taskId = taskId;
1306 param.lockAction = lockAction;
1307 return DoUploadInner(tableName, param);
1308 }
1309
PreProcessBatchUpload(UploadParam & uploadParam,const InnerProcessInfo & innerProcessInfo,CloudSyncData & uploadData)1310 int CloudSyncer::PreProcessBatchUpload(UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
1311 CloudSyncData &uploadData)
1312 {
1313 // Precheck and calculate local water mark which would be updated if batch upload successed.
1314 int ret = CheckTaskIdValid(uploadParam.taskId);
1315 if (ret != E_OK) {
1316 return ret;
1317 }
1318 ret = CloudSyncUtils::CheckCloudSyncDataValid(uploadData, innerProcessInfo.tableName,
1319 innerProcessInfo.upLoadInfo.total);
1320 if (ret != E_OK) {
1321 LOGE("[CloudSyncer] Invalid Cloud Sync Data of Upload, %d.", ret);
1322 return ret;
1323 }
1324 TagUploadAssets(uploadData);
1325 CloudSyncUtils::UpdateAssetsFlag(uploadData);
1326 // get local water mark to be updated in future.
1327 ret = CloudSyncUtils::UpdateExtendTime(uploadData, innerProcessInfo.upLoadInfo.total,
1328 uploadParam.taskId, uploadParam.localMark);
1329 if (ret != E_OK) {
1330 LOGE("[CloudSyncer] Failed to get new local water mark in Cloud Sync Data, %d.", ret);
1331 }
1332 return ret;
1333 }
1334
SaveCloudWaterMark(const TableName & tableName,const TaskId taskId)1335 int CloudSyncer::SaveCloudWaterMark(const TableName &tableName, const TaskId taskId)
1336 {
1337 std::string cloudWaterMark;
1338 bool isUpdateCloudCursor = true;
1339 {
1340 std::lock_guard<std::mutex> autoLock(dataLock_);
1341 if (currentContext_.cloudWaterMarks.find(currentContext_.currentUserIndex) ==
1342 currentContext_.cloudWaterMarks.end() ||
1343 currentContext_.cloudWaterMarks[currentContext_.currentUserIndex].find(tableName) ==
1344 currentContext_.cloudWaterMarks[currentContext_.currentUserIndex].end()) {
1345 LOGD("[CloudSyncer] Not found water mark just return");
1346 return E_OK;
1347 }
1348 cloudWaterMark = currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName];
1349 isUpdateCloudCursor = currentContext_.strategy->JudgeUpdateCursor();
1350 }
1351 isUpdateCloudCursor = isUpdateCloudCursor && !(IsPriorityTask(taskId) && !IsQueryListEmpty(taskId));
1352 if (isUpdateCloudCursor) {
1353 int errCode = storageProxy_->SetCloudWaterMark(tableName, cloudWaterMark);
1354 if (errCode != E_OK) {
1355 LOGE("[CloudSyncer] Cannot set cloud water mark, %d.", errCode);
1356 }
1357 return errCode;
1358 }
1359 return E_OK;
1360 }
1361
SetUploadDataFlag(const TaskId taskId,CloudSyncData & uploadData)1362 void CloudSyncer::SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData)
1363 {
1364 std::lock_guard<std::mutex> autoLock(dataLock_);
1365 uploadData.isCloudForcePushStrategy = (cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH);
1366 uploadData.isCompensatedTask = cloudTaskInfos_[taskId].compensatedTask;
1367 }
1368
IsModeForcePush(TaskId taskId)1369 bool CloudSyncer::IsModeForcePush(TaskId taskId)
1370 {
1371 std::lock_guard<std::mutex> autoLock(dataLock_);
1372 return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
1373 }
1374
IsModeForcePull(const TaskId taskId)1375 bool CloudSyncer::IsModeForcePull(const TaskId taskId)
1376 {
1377 std::lock_guard<std::mutex> autoLock(dataLock_);
1378 return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PULL;
1379 }
1380
IsPriorityTask(TaskId taskId)1381 bool CloudSyncer::IsPriorityTask(TaskId taskId)
1382 {
1383 std::lock_guard<std::mutex> autoLock(dataLock_);
1384 return cloudTaskInfos_[taskId].priorityTask;
1385 }
1386
PreHandleData(VBucket & datum,const std::vector<std::string> & pkColNames)1387 int CloudSyncer::PreHandleData(VBucket &datum, const std::vector<std::string> &pkColNames)
1388 {
1389 // type index of field in fields, true means mandatory filed
1390 static std::vector<std::tuple<std::string, int32_t, bool>> fieldAndIndex = {
1391 std::make_tuple(CloudDbConstant::GID_FIELD, TYPE_INDEX<std::string>, true),
1392 std::make_tuple(CloudDbConstant::CREATE_FIELD, TYPE_INDEX<int64_t>, true),
1393 std::make_tuple(CloudDbConstant::MODIFY_FIELD, TYPE_INDEX<int64_t>, true),
1394 std::make_tuple(CloudDbConstant::DELETE_FIELD, TYPE_INDEX<bool>, true),
1395 std::make_tuple(CloudDbConstant::CURSOR_FIELD, TYPE_INDEX<std::string>, true),
1396 std::make_tuple(CloudDbConstant::SHARING_RESOURCE_FIELD, TYPE_INDEX<std::string>, false)
1397 };
1398
1399 for (const auto &fieldIndex : fieldAndIndex) {
1400 if (datum.find(std::get<0>(fieldIndex)) == datum.end()) {
1401 if (!std::get<2>(fieldIndex)) { // 2 is index of mandatory flag
1402 continue;
1403 }
1404 LOGE("[CloudSyncer] Cloud data do not contain expected field: %s.", std::get<0>(fieldIndex).c_str());
1405 return -E_CLOUD_ERROR;
1406 }
1407 if (datum[std::get<0>(fieldIndex)].index() != static_cast<size_t>(std::get<1>(fieldIndex))) {
1408 LOGE("[CloudSyncer] Cloud data's field: %s, doesn't has expected type.", std::get<0>(fieldIndex).c_str());
1409 return -E_CLOUD_ERROR;
1410 }
1411 }
1412
1413 if (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) {
1414 CloudSyncUtils::RemoveDataExceptExtendInfo(datum, pkColNames);
1415 }
1416 std::lock_guard<std::mutex> autoLock(dataLock_);
1417 if (IsDataContainDuplicateAsset(currentContext_.assetFields[currentContext_.tableName], datum)) {
1418 LOGE("[CloudSyncer] Cloud data contain duplicate asset");
1419 return -E_CLOUD_ERROR;
1420 }
1421 return E_OK;
1422 }
1423
QueryCloudData(TaskId taskId,const std::string & tableName,std::string & cloudWaterMark,DownloadData & downloadData)1424 int CloudSyncer::QueryCloudData(TaskId taskId, const std::string &tableName, std::string &cloudWaterMark,
1425 DownloadData &downloadData)
1426 {
1427 VBucket extend;
1428 int ret = FillDownloadExtend(taskId, tableName, cloudWaterMark, extend);
1429 if (ret != E_OK) {
1430 return ret;
1431 }
1432 ret = cloudDB_.Query(tableName, extend, downloadData.data);
1433 if ((ret == E_OK || ret == -E_QUERY_END) && downloadData.data.empty()) {
1434 if (extend[CloudDbConstant::CURSOR_FIELD].index() != TYPE_INDEX<std::string>) {
1435 LOGE("[CloudSyncer] cursor type is not valid=%d", extend[CloudDbConstant::CURSOR_FIELD].index());
1436 return -E_CLOUD_ERROR;
1437 }
1438 cloudWaterMark = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
1439 LOGD("[CloudSyncer] Download data is empty, try to use other cursor=%s", cloudWaterMark.c_str());
1440 return ret;
1441 }
1442 if (ret == -E_QUERY_END) {
1443 LOGD("[CloudSyncer] Download data from cloud database success and no more data need to be downloaded");
1444 return -E_QUERY_END;
1445 }
1446 if (ret != E_OK) {
1447 LOGE("[CloudSyncer] Download data from cloud database unsuccess %d", ret);
1448 }
1449 return ret;
1450 }
1451
CheckTaskIdValid(TaskId taskId)1452 int CloudSyncer::CheckTaskIdValid(TaskId taskId)
1453 {
1454 if (closed_) {
1455 LOGE("[CloudSyncer] DB is closed.");
1456 return -E_DB_CLOSED;
1457 }
1458 std::lock_guard<std::mutex> autoLock(dataLock_);
1459 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1460 LOGE("[CloudSyncer] not found task.");
1461 return -E_INVALID_ARGS;
1462 }
1463 if (cloudTaskInfos_[taskId].pause) {
1464 LOGW("[CloudSyncer] check task %" PRIu64 " was paused!", taskId);
1465 return -E_TASK_PAUSED;
1466 }
1467 if (cloudTaskInfos_[taskId].errCode != E_OK) {
1468 return cloudTaskInfos_[taskId].errCode;
1469 }
1470 return currentContext_.currentTaskId == taskId ? E_OK : -E_INVALID_ARGS;
1471 }
1472
GetCurrentTableName(std::string & tableName)1473 int CloudSyncer::GetCurrentTableName(std::string &tableName)
1474 {
1475 std::lock_guard<std::mutex> autoLock(dataLock_);
1476 if (currentContext_.tableName.empty()) {
1477 return -E_BUSY;
1478 }
1479 tableName = currentContext_.tableName;
1480 return E_OK;
1481 }
1482
CheckQueueSizeWithNoLock(bool priorityTask)1483 int CloudSyncer::CheckQueueSizeWithNoLock(bool priorityTask)
1484 {
1485 int32_t limit = queuedManualSyncLimit_;
1486 if (!priorityTask && taskQueue_.size() >= static_cast<size_t>(limit)) {
1487 LOGW("[CloudSyncer] too much sync task");
1488 return -E_BUSY;
1489 } else if (priorityTask && priorityTaskQueue_.size() >= static_cast<size_t>(limit)) {
1490 LOGW("[CloudSyncer] too much priority sync task");
1491 return -E_BUSY;
1492 }
1493 return E_OK;
1494 }
1495
PrepareSync(TaskId taskId)1496 int CloudSyncer::PrepareSync(TaskId taskId)
1497 {
1498 std::lock_guard<std::mutex> autoLock(dataLock_);
1499 if (closed_ || cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1500 LOGW("[CloudSyncer] Abort sync because syncer is closed");
1501 return -E_DB_CLOSED;
1502 }
1503 if (closed_ || currentContext_.currentTaskId != INVALID_TASK_ID) {
1504 LOGW("[CloudSyncer] Abort sync because syncer is closed or another task is running");
1505 return -E_DB_CLOSED;
1506 }
1507 currentContext_.currentTaskId = taskId;
1508 cloudTaskInfos_[taskId].resume = cloudTaskInfos_[taskId].pause;
1509 cloudTaskInfos_[taskId].pause = false;
1510 cloudTaskInfos_[taskId].status = ProcessStatus::PROCESSING;
1511 if (cloudTaskInfos_[taskId].resume) {
1512 auto tempLocker = currentContext_.locker;
1513 currentContext_ = resumeTaskInfos_[taskId].context;
1514 currentContext_.locker = tempLocker;
1515 } else {
1516 currentContext_.notifier = std::make_shared<ProcessNotifier>(this);
1517 currentContext_.strategy =
1518 StrategyFactory::BuildSyncStrategy(cloudTaskInfos_[taskId].mode, isKvScene_, policy_);
1519 currentContext_.notifier->Init(cloudTaskInfos_[taskId].table, cloudTaskInfos_[taskId].devices,
1520 cloudTaskInfos_[taskId].users);
1521 currentContext_.processRecorder = std::make_shared<ProcessRecorder>();
1522 }
1523 LOGI("[CloudSyncer] exec storeId %.3s taskId %" PRIu64, cloudTaskInfos_[taskId].storeId.c_str(), taskId);
1524 return E_OK;
1525 }
1526
LockCloud(TaskId taskId)1527 int CloudSyncer::LockCloud(TaskId taskId)
1528 {
1529 int period;
1530 {
1531 auto res = cloudDB_.Lock();
1532 if (res.first != E_OK) {
1533 return res.first;
1534 }
1535 period = static_cast<int>(res.second) / HEARTBEAT_PERIOD;
1536 }
1537 int errCode = StartHeartBeatTimer(period, taskId);
1538 if (errCode != E_OK) {
1539 UnlockCloud();
1540 }
1541 return errCode;
1542 }
1543
UnlockCloud()1544 int CloudSyncer::UnlockCloud()
1545 {
1546 FinishHeartBeatTimer();
1547 return cloudDB_.UnLock();
1548 }
1549
StartHeartBeatTimer(int period,TaskId taskId)1550 int CloudSyncer::StartHeartBeatTimer(int period, TaskId taskId)
1551 {
1552 if (timerId_ != 0u) {
1553 LOGW("[CloudSyncer] HeartBeat timer has been start!");
1554 return E_OK;
1555 }
1556 TimerId timerId = 0;
1557 int errCode = RuntimeContext::GetInstance()->SetTimer(period, [this, taskId](TimerId timerId) {
1558 HeartBeat(timerId, taskId);
1559 return E_OK;
1560 }, nullptr, timerId);
1561 if (errCode != E_OK) {
1562 LOGE("[CloudSyncer] HeartBeat timer start failed %d", errCode);
1563 return errCode;
1564 }
1565 timerId_ = timerId;
1566 return E_OK;
1567 }
1568
FinishHeartBeatTimer()1569 void CloudSyncer::FinishHeartBeatTimer()
1570 {
1571 if (timerId_ == 0u) {
1572 return;
1573 }
1574 RuntimeContext::GetInstance()->RemoveTimer(timerId_, true);
1575 timerId_ = 0u;
1576 LOGD("[CloudSyncer] Finish heartbeat timer ok");
1577 }
1578
HeartBeat(TimerId timerId,TaskId taskId)1579 void CloudSyncer::HeartBeat(TimerId timerId, TaskId taskId)
1580 {
1581 if (timerId_ != timerId) {
1582 return;
1583 }
1584 IncObjRef(this);
1585 {
1586 std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1587 heartbeatCount_[taskId]++;
1588 }
1589 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, taskId]() {
1590 {
1591 std::lock_guard<std::mutex> guard(dataLock_);
1592 if (currentContext_.currentTaskId != taskId) {
1593 heartbeatCount_.erase(taskId);
1594 failedHeartbeatCount_.erase(taskId);
1595 DecObjRef(this);
1596 return;
1597 }
1598 }
1599 if (heartbeatCount_[taskId] >= HEARTBEAT_PERIOD) {
1600 // heartbeat block twice should finish task now
1601 SetTaskFailed(taskId, -E_CLOUD_ERROR);
1602 } else {
1603 int ret = cloudDB_.HeartBeat();
1604 if (ret != E_OK) {
1605 HeartBeatFailed(taskId, ret);
1606 } else {
1607 failedHeartbeatCount_[taskId] = 0;
1608 }
1609 }
1610 {
1611 std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1612 heartbeatCount_[taskId]--;
1613 if (currentContext_.currentTaskId != taskId) {
1614 heartbeatCount_.erase(taskId);
1615 failedHeartbeatCount_.erase(taskId);
1616 }
1617 }
1618 DecObjRef(this);
1619 });
1620 if (errCode != E_OK) {
1621 LOGW("[CloudSyncer] schedule heartbeat task failed %d", errCode);
1622 DecObjRef(this);
1623 }
1624 }
1625
HeartBeatFailed(TaskId taskId,int errCode)1626 void CloudSyncer::HeartBeatFailed(TaskId taskId, int errCode)
1627 {
1628 failedHeartbeatCount_[taskId]++;
1629 if (failedHeartbeatCount_[taskId] < MAX_HEARTBEAT_FAILED_LIMIT) {
1630 return;
1631 }
1632 LOGW("[CloudSyncer] heartbeat failed too much times!");
1633 FinishHeartBeatTimer();
1634 SetTaskFailed(taskId, errCode);
1635 }
1636
SetTaskFailed(TaskId taskId,int errCode)1637 void CloudSyncer::SetTaskFailed(TaskId taskId, int errCode)
1638 {
1639 std::lock_guard<std::mutex> autoLock(dataLock_);
1640 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1641 return;
1642 }
1643 if (cloudTaskInfos_[taskId].errCode != E_OK) {
1644 return;
1645 }
1646 cloudTaskInfos_[taskId].errCode = errCode;
1647 }
1648
GetCloudSyncTaskCount()1649 int32_t CloudSyncer::GetCloudSyncTaskCount()
1650 {
1651 std::lock_guard<std::mutex> autoLock(dataLock_);
1652 return static_cast<int32_t>(taskQueue_.size() + priorityTaskQueue_.size());
1653 }
1654
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema)1655 int CloudSyncer::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1656 const RelationalSchemaObject &localSchema)
1657 {
1658 std::lock_guard<std::mutex> lock(syncMutex_);
1659 int index = 1;
1660 for (const auto &tableName: tableNameList) {
1661 LOGD("[CloudSyncer] Start clean cloud water mark. table index: %d.", index);
1662 int ret = storageProxy_->CleanWaterMark(tableName);
1663 if (ret != E_OK) {
1664 LOGE("[CloudSyncer] failed to put cloud water mark after clean cloud data, %d.", ret);
1665 return ret;
1666 }
1667 index++;
1668 }
1669 int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
1670 if (errCode != E_OK) {
1671 LOGE("[CloudSyncer] failed to start Transaction before clean cloud data, %d", errCode);
1672 return errCode;
1673 }
1674
1675 std::vector<Asset> assets;
1676 errCode = storageProxy_->CleanCloudData(mode, tableNameList, localSchema, assets);
1677 if (errCode != E_OK) {
1678 LOGE("[CloudSyncer] failed to clean cloud data, %d.", errCode);
1679 storageProxy_->Rollback();
1680 return errCode;
1681 }
1682
1683 if (!assets.empty() && mode == FLAG_AND_DATA) {
1684 errCode = cloudDB_.RemoveLocalAssets(assets);
1685 if (errCode != E_OK) {
1686 LOGE("[Storage Executor] failed to remove local assets, %d.", errCode);
1687 storageProxy_->Rollback();
1688 return errCode;
1689 }
1690 }
1691
1692 storageProxy_->Commit();
1693 return errCode;
1694 }
1695
CleanWaterMarkInMemory(const std::set<std::string> & tableNameList)1696 int CloudSyncer::CleanWaterMarkInMemory(const std::set<std::string> &tableNameList)
1697 {
1698 std::lock_guard<std::mutex> lock(syncMutex_);
1699 for (const auto &tableName: tableNameList) {
1700 int ret = storageProxy_->CleanWaterMarkInMemory(tableName);
1701 if (ret != E_OK) {
1702 LOGE("[CloudSyncer] failed to clean cloud water mark in memory, %d.", ret);
1703 return ret;
1704 }
1705 }
1706 return E_OK;
1707 }
1708
UpdateCloudWaterMark(TaskId taskId,const SyncParam & param)1709 void CloudSyncer::UpdateCloudWaterMark(TaskId taskId, const SyncParam ¶m)
1710 {
1711 {
1712 std::lock_guard<std::mutex> autoLock(dataLock_);
1713 currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][param.info.tableName] = param.cloudWaterMark;
1714 }
1715 }
1716
CommitDownloadResult(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,int errCode)1717 int CloudSyncer::CommitDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info,
1718 DownloadCommitList &commitList, int errCode)
1719 {
1720 if (commitList.empty()) {
1721 return E_OK;
1722 }
1723 uint32_t successCount = 0u;
1724 int ret = HandleDownloadResult(downloadItem, info.tableName, commitList, successCount);
1725 if (errCode == E_OK) {
1726 info.downLoadInfo.failCount += (commitList.size() - successCount);
1727 info.downLoadInfo.successCount -= (commitList.size() - successCount);
1728 }
1729 if (ret != E_OK) {
1730 LOGE("Commit download result failed.%d", ret);
1731 }
1732 commitList.clear();
1733 return ret;
1734 }
1735
GetIdentify() const1736 std::string CloudSyncer::GetIdentify() const
1737 {
1738 return id_;
1739 }
1740
TagStatusByStrategy(bool isExist,SyncParam & param,DataInfo & dataInfo,OpType & strategyOpResult)1741 int CloudSyncer::TagStatusByStrategy(bool isExist, SyncParam ¶m, DataInfo &dataInfo, OpType &strategyOpResult)
1742 {
1743 strategyOpResult = OpType::NOT_HANDLE;
1744 // ignore same record with local generate data
1745 if (dataInfo.localInfo.logInfo.device.empty() &&
1746 !CloudSyncUtils::NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
1747 // not handle same data
1748 return E_OK;
1749 }
1750 {
1751 std::lock_guard<std::mutex> autoLock(dataLock_);
1752 if (!currentContext_.strategy) {
1753 LOGE("[CloudSyncer] strategy has not been set when tag status, %d.", -E_INTERNAL_ERROR);
1754 return -E_INTERNAL_ERROR;
1755 }
1756 bool isCloudWin = storageProxy_->IsTagCloudUpdateLocal(dataInfo.localInfo.logInfo,
1757 dataInfo.cloudLogInfo, policy_);
1758 strategyOpResult = currentContext_.strategy->TagSyncDataStatus(isExist, isCloudWin,
1759 dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo);
1760 }
1761 if (strategyOpResult == OpType::DELETE) {
1762 param.deletePrimaryKeySet.insert(dataInfo.localInfo.logInfo.hashKey);
1763 }
1764 return E_OK;
1765 }
1766
GetLocalInfo(size_t index,SyncParam & param,DataInfoWithLog & logInfo,std::map<std::string,LogInfo> & localLogInfoCache,VBucket & localAssetInfo)1767 int CloudSyncer::GetLocalInfo(size_t index, SyncParam ¶m, DataInfoWithLog &logInfo,
1768 std::map<std::string, LogInfo> &localLogInfoCache, VBucket &localAssetInfo)
1769 {
1770 int errCode = storageProxy_->GetInfoByPrimaryKeyOrGid(param.tableName, param.downloadData.data[index],
1771 logInfo, localAssetInfo);
1772 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1773 return errCode;
1774 }
1775 std::string hashKey(logInfo.logInfo.hashKey.begin(), logInfo.logInfo.hashKey.end());
1776 if (hashKey.empty()) {
1777 return errCode;
1778 }
1779 param.downloadData.existDataKey[index] = logInfo.logInfo.dataKey;
1780 param.downloadData.existDataHashKey[index] = logInfo.logInfo.hashKey;
1781 if (localLogInfoCache.find(hashKey) != localLogInfoCache.end()) {
1782 LOGD("[CloudSyncer] exist same record in one batch, override from cache record! hash=%.3s",
1783 DBCommon::TransferStringToHex(hashKey).c_str());
1784 logInfo.logInfo.flag = localLogInfoCache[hashKey].flag;
1785 logInfo.logInfo.wTimestamp = localLogInfoCache[hashKey].wTimestamp;
1786 logInfo.logInfo.timestamp = localLogInfoCache[hashKey].timestamp;
1787 logInfo.logInfo.cloudGid = localLogInfoCache[hashKey].cloudGid;
1788 logInfo.logInfo.device = localLogInfoCache[hashKey].device;
1789 logInfo.logInfo.sharingResource = localLogInfoCache[hashKey].sharingResource;
1790 logInfo.logInfo.status = localLogInfoCache[hashKey].status;
1791 // delete record should remove local asset info
1792 if ((localLogInfoCache[hashKey].flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
1793 localAssetInfo.clear();
1794 }
1795 errCode = E_OK;
1796 }
1797 logInfo.logInfo.isNeedUpdateAsset = IsNeedUpdateAsset(localAssetInfo);
1798 return errCode;
1799 }
1800
GetNextTaskId()1801 TaskId CloudSyncer::GetNextTaskId()
1802 {
1803 std::lock_guard<std::mutex> autoLock(dataLock_);
1804 if (!priorityTaskQueue_.empty()) {
1805 return priorityTaskQueue_.front();
1806 }
1807 if (!taskQueue_.empty()) {
1808 return taskQueue_.front();
1809 }
1810 return INVALID_TASK_ID;
1811 }
1812
MarkCurrentTaskPausedIfNeed()1813 void CloudSyncer::MarkCurrentTaskPausedIfNeed()
1814 {
1815 std::lock_guard<std::mutex> autoLock(dataLock_);
1816 if (currentContext_.currentTaskId == INVALID_TASK_ID) {
1817 return;
1818 }
1819 if (cloudTaskInfos_.find(currentContext_.currentTaskId) == cloudTaskInfos_.end()) {
1820 return;
1821 }
1822 if (!cloudTaskInfos_[currentContext_.currentTaskId].priorityTask) {
1823 cloudTaskInfos_[currentContext_.currentTaskId].pause = true;
1824 LOGD("[CloudSyncer] Mark taskId %" PRIu64 " paused success", currentContext_.currentTaskId);
1825 }
1826 }
1827
SetCurrentTaskFailedWithoutLock(int errCode)1828 void CloudSyncer::SetCurrentTaskFailedWithoutLock(int errCode)
1829 {
1830 if (currentContext_.currentTaskId == INVALID_TASK_ID) {
1831 return;
1832 }
1833 cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
1834 }
1835
LockCloudIfNeed(TaskId taskId)1836 int CloudSyncer::LockCloudIfNeed(TaskId taskId)
1837 {
1838 {
1839 std::lock_guard<std::mutex> autoLock(dataLock_);
1840 if (currentContext_.locker != nullptr) {
1841 LOGD("[CloudSyncer] lock exist");
1842 return E_OK;
1843 }
1844 }
1845 std::shared_ptr<CloudLocker> locker = nullptr;
1846 int errCode = CloudLocker::BuildCloudLock([taskId, this]() {
1847 return LockCloud(taskId);
1848 }, [this]() {
1849 int unlockCode = UnlockCloud();
1850 if (unlockCode != E_OK) {
1851 SetCurrentTaskFailedWithoutLock(unlockCode);
1852 }
1853 }, locker);
1854 {
1855 std::lock_guard<std::mutex> autoLock(dataLock_);
1856 currentContext_.locker = locker;
1857 }
1858 return errCode;
1859 }
1860
UnlockIfNeed()1861 void CloudSyncer::UnlockIfNeed()
1862 {
1863 std::shared_ptr<CloudLocker> cacheLocker;
1864 {
1865 std::lock_guard<std::mutex> autoLock(dataLock_);
1866 if (currentContext_.locker == nullptr) {
1867 LOGW("[CloudSyncer] locker is nullptr when unlock it"); // should not happen
1868 }
1869 cacheLocker = currentContext_.locker;
1870 currentContext_.locker = nullptr;
1871 }
1872 // unlock without mutex
1873 cacheLocker = nullptr;
1874 }
1875
ClearCurrentContextWithoutLock()1876 void CloudSyncer::ClearCurrentContextWithoutLock()
1877 {
1878 heartbeatCount_.erase(currentContext_.currentTaskId);
1879 failedHeartbeatCount_.erase(currentContext_.currentTaskId);
1880 currentContext_.currentTaskId = INVALID_TASK_ID;
1881 currentContext_.notifier = nullptr;
1882 currentContext_.strategy = nullptr;
1883 currentContext_.processRecorder = nullptr;
1884 currentContext_.tableName.clear();
1885 currentContext_.assetDownloadList.clear();
1886 currentContext_.assetFields.clear();
1887 currentContext_.assetsInfo.clear();
1888 currentContext_.cloudWaterMarks.clear();
1889 currentContext_.isNeedUpload = false;
1890 currentContext_.currentUserIndex = 0;
1891 currentContext_.repeatCount = 0;
1892 }
1893
ClearContextAndNotify(TaskId taskId,int errCode)1894 void CloudSyncer::ClearContextAndNotify(TaskId taskId, int errCode)
1895 {
1896 std::shared_ptr<ProcessNotifier> notifier = nullptr;
1897 CloudTaskInfo info;
1898 {
1899 // clear current context
1900 std::lock_guard<std::mutex> autoLock(dataLock_);
1901 notifier = currentContext_.notifier;
1902 ClearCurrentContextWithoutLock();
1903 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) { // should not happen
1904 LOGW("[CloudSyncer] taskId %" PRIu64 " has been finished!", taskId);
1905 contextCv_.notify_one();
1906 return;
1907 }
1908 info = std::move(cloudTaskInfos_[taskId]);
1909 cloudTaskInfos_.erase(taskId);
1910 resumeTaskInfos_.erase(taskId);
1911 }
1912 int err = storageProxy_->ClearUnLockingNoNeedCompensated();
1913 if (err != E_OK) {
1914 // if clear unlocking failed, no return to avoid affecting the entire process
1915 LOGW("[CloudSyncer] clear unlocking status failed! errCode = %d", err);
1916 }
1917 contextCv_.notify_one();
1918 if (info.errCode == E_OK) {
1919 info.errCode = errCode;
1920 }
1921 LOGI("[CloudSyncer] finished storeId %.3s taskId %" PRIu64 " errCode %d", info.storeId.c_str(), taskId,
1922 info.errCode);
1923 info.status = ProcessStatus::FINISHED;
1924 if (notifier != nullptr) {
1925 notifier->NotifyProcess(info, {}, true);
1926 }
1927 // generate compensated sync
1928 if (!info.compensatedTask) {
1929 CloudTaskInfo taskInfo = CloudSyncUtils::InitCompensatedSyncTaskInfo(info);
1930 GenerateCompensatedSync(taskInfo);
1931 }
1932 }
1933
DownloadOneBatch(TaskId taskId,SyncParam & param,bool isFirstDownload)1934 int CloudSyncer::DownloadOneBatch(TaskId taskId, SyncParam ¶m, bool isFirstDownload)
1935 {
1936 int ret = CheckTaskIdValid(taskId);
1937 if (ret != E_OK) {
1938 return ret;
1939 }
1940 bool abort = false;
1941 ret = DownloadDataFromCloud(taskId, param, abort, isFirstDownload);
1942 if (abort) {
1943 return ret;
1944 }
1945 // Save data in transaction, update cloud water mark, notify process and changed data
1946 ret = SaveDataNotifyProcess(taskId, param);
1947 if (ret == -E_TASK_PAUSED) {
1948 return ret;
1949 }
1950 if (ret != E_OK) {
1951 std::lock_guard<std::mutex> autoLock(dataLock_);
1952 param.info.tableStatus = ProcessStatus::FINISHED;
1953 currentContext_.notifier->UpdateProcess(param.info);
1954 return ret;
1955 }
1956 (void)NotifyInDownload(taskId, param, isFirstDownload);
1957 return SaveCloudWaterMark(param.tableName, taskId);
1958 }
1959
DownloadOneAssetRecord(const std::set<Key> & dupHashKeySet,const DownloadList & downloadList,DownloadItem & downloadItem,InnerProcessInfo & info,ChangedData & changedAssets)1960 int CloudSyncer::DownloadOneAssetRecord(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList,
1961 DownloadItem &downloadItem, InnerProcessInfo &info, ChangedData &changedAssets)
1962 {
1963 CloudStorageUtils::EraseNoChangeAsset(downloadItem.assets);
1964 if (downloadItem.assets.empty()) { // Download data (include deleting)
1965 return E_OK;
1966 }
1967 bool isSharedTable = false;
1968 int errorCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
1969 if (errorCode != E_OK) {
1970 LOGE("[CloudSyncer] DownloadOneAssetRecord cannot judge the table is a shared table. %d", errorCode);
1971 return errorCode;
1972 }
1973 if (!isSharedTable) {
1974 errorCode = DownloadAssetsOneByOne(info, downloadItem, downloadItem.assets);
1975 if (errorCode == -E_NOT_SET) {
1976 return -E_NOT_SET;
1977 }
1978 } else {
1979 // share table will not download asset, need to reset the status
1980 for (auto &entry: downloadItem.assets) {
1981 for (auto &asset: entry.second) {
1982 asset.status = AssetStatus::NORMAL;
1983 }
1984 }
1985 }
1986 if (errorCode != E_OK) {
1987 info.downLoadInfo.failCount += 1;
1988 if (info.downLoadInfo.successCount == 0) {
1989 LOGW("[CloudSyncer] Invalid successCount");
1990 } else {
1991 info.downLoadInfo.successCount -= 1;
1992 }
1993 }
1994 if (!downloadItem.assets.empty()) {
1995 if (dupHashKeySet.find(downloadItem.hashKey) == dupHashKeySet.end()) {
1996 changedAssets.primaryData[CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy)].push_back(
1997 downloadItem.primaryKeyValList);
1998 } else if (downloadItem.strategy == OpType::INSERT) {
1999 changedAssets.primaryData[ChangeType::OP_UPDATE].push_back(downloadItem.primaryKeyValList);
2000 }
2001 }
2002
2003 return errorCode;
2004 }
2005
GetSyncParamForDownload(TaskId taskId,SyncParam & param)2006 int CloudSyncer::GetSyncParamForDownload(TaskId taskId, SyncParam ¶m)
2007 {
2008 int ret = E_OK;
2009 if (IsCurrentTableResume(taskId, false)) {
2010 std::lock_guard<std::mutex> autoLock(dataLock_);
2011 if (resumeTaskInfos_[taskId].syncParam.tableName == currentContext_.tableName) {
2012 param = resumeTaskInfos_[taskId].syncParam;
2013 resumeTaskInfos_[taskId].syncParam = {};
2014 ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
2015 if (ret != E_OK) {
2016 LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data when table is resume: %d.", ret);
2017 }
2018 LOGD("[CloudSyncer] Get sync param from cache");
2019 return E_OK;
2020 }
2021 }
2022 ret = GetCurrentTableName(param.tableName);
2023 if (ret != E_OK) {
2024 LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
2025 return ret;
2026 }
2027 param.info.tableName = param.tableName;
2028 std::vector<Field> assetFields;
2029 // only no primary key and composite primary key contains rowid.
2030 ret = storageProxy_->GetPrimaryColNamesWithAssetsFields(param.tableName, param.pkColNames, assetFields);
2031 if (ret != E_OK) {
2032 LOGE("[CloudSyncer] Cannot get primary column names: %d", ret);
2033 return ret;
2034 }
2035 {
2036 std::lock_guard<std::mutex> autoLock(dataLock_);
2037 currentContext_.assetFields[currentContext_.tableName] = assetFields;
2038 }
2039 param.isSinglePrimaryKey = CloudSyncUtils::IsSinglePrimaryKey(param.pkColNames);
2040 if (!IsModeForcePull(taskId) && (!IsPriorityTask(taskId) || IsQueryListEmpty(taskId))) {
2041 ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
2042 if (ret != E_OK) {
2043 LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data: %d.", ret);
2044 }
2045 if (!IsCurrentTaskResume(taskId)) {
2046 ReloadCloudWaterMarkIfNeed(param.tableName, param.cloudWaterMark);
2047 }
2048 }
2049 currentContext_.notifier->GetDownloadInfoByTableName(param.info);
2050 return ret;
2051 }
2052
IsCurrentTaskResume(TaskId taskId)2053 bool CloudSyncer::IsCurrentTaskResume(TaskId taskId)
2054 {
2055 std::lock_guard<std::mutex> autoLock(dataLock_);
2056 return cloudTaskInfos_[taskId].resume;
2057 }
2058
IsCurrentTableResume(TaskId taskId,bool upload)2059 bool CloudSyncer::IsCurrentTableResume(TaskId taskId, bool upload)
2060 {
2061 std::lock_guard<std::mutex> autoLock(dataLock_);
2062 if (!cloudTaskInfos_[taskId].resume) {
2063 return false;
2064 }
2065 if (currentContext_.tableName != resumeTaskInfos_[taskId].context.tableName) {
2066 return false;
2067 }
2068 return upload == resumeTaskInfos_[taskId].upload;
2069 }
2070
DownloadDataFromCloud(TaskId taskId,SyncParam & param,bool & abort,bool isFirstDownload)2071 int CloudSyncer::DownloadDataFromCloud(TaskId taskId, SyncParam ¶m, bool &abort,
2072 bool isFirstDownload)
2073 {
2074 // Get cloud data after cloud water mark
2075 param.info.tableStatus = ProcessStatus::PROCESSING;
2076 param.downloadData = {};
2077 int ret = QueryCloudData(taskId, param.info.tableName, param.cloudWaterMark, param.downloadData);
2078 if (ret == -E_QUERY_END) {
2079 // Won't break here since downloadData may not be null
2080 param.isLastBatch = true;
2081 } else if (ret != E_OK) {
2082 std::lock_guard<std::mutex> autoLock(dataLock_);
2083 param.info.tableStatus = ProcessStatus::FINISHED;
2084 currentContext_.notifier->UpdateProcess(param.info);
2085 abort = true;
2086 return ret;
2087 }
2088 if (param.downloadData.data.empty()) {
2089 if (ret == E_OK || isFirstDownload) {
2090 LOGD("[CloudSyncer] try to query cloud data use increment water mark");
2091 UpdateCloudWaterMark(taskId, param);
2092 // Cloud water may change on the cloud, it needs to be saved here
2093 SaveCloudWaterMark(param.tableName, taskId);
2094 }
2095 if (isFirstDownload) {
2096 NotifyInEmptyDownload(taskId, param.info);
2097 }
2098 abort = true;
2099 }
2100 return E_OK;
2101 }
2102
GetDownloadAssetIndex(TaskId taskId)2103 size_t CloudSyncer::GetDownloadAssetIndex(TaskId taskId)
2104 {
2105 size_t index = 0u;
2106 std::lock_guard<std::mutex> autoLock(dataLock_);
2107 if (resumeTaskInfos_[taskId].lastDownloadIndex != 0u) {
2108 index = resumeTaskInfos_[taskId].lastDownloadIndex;
2109 resumeTaskInfos_[taskId].lastDownloadIndex = 0u;
2110 }
2111 return index;
2112 }
2113
GetCurrentTableUploadBatchIndex()2114 uint32_t CloudSyncer::GetCurrentTableUploadBatchIndex()
2115 {
2116 std::lock_guard<std::mutex> autoLock(dataLock_);
2117 return currentContext_.notifier->GetUploadBatchIndex(currentContext_.tableName);
2118 }
2119
ResetCurrentTableUploadBatchIndex()2120 void CloudSyncer::ResetCurrentTableUploadBatchIndex()
2121 {
2122 std::lock_guard<std::mutex> autoLock(dataLock_);
2123 currentContext_.notifier->ResetUploadBatchIndex(currentContext_.tableName);
2124 }
2125
RecordWaterMark(TaskId taskId,Timestamp waterMark)2126 void CloudSyncer::RecordWaterMark(TaskId taskId, Timestamp waterMark)
2127 {
2128 std::lock_guard<std::mutex> autoLock(dataLock_);
2129 resumeTaskInfos_[taskId].lastLocalWatermark = waterMark;
2130 }
2131
GetResumeWaterMark(TaskId taskId)2132 Timestamp CloudSyncer::GetResumeWaterMark(TaskId taskId)
2133 {
2134 std::lock_guard<std::mutex> autoLock(dataLock_);
2135 return resumeTaskInfos_[taskId].lastLocalWatermark;
2136 }
2137
GetInnerProcessInfo(const std::string & tableName,UploadParam & uploadParam)2138 CloudSyncer::InnerProcessInfo CloudSyncer::GetInnerProcessInfo(const std::string &tableName, UploadParam &uploadParam)
2139 {
2140 InnerProcessInfo info;
2141 info.tableName = tableName;
2142 info.tableStatus = ProcessStatus::PROCESSING;
2143 ReloadUploadInfoIfNeed(uploadParam.taskId, uploadParam, info);
2144 return info;
2145 }
2146
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)2147 void CloudSyncer::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
2148 {
2149 cloudDB_.SetGenCloudVersionCallback(callback);
2150 }
2151
CopyAndClearTaskInfos()2152 std::vector<CloudSyncer::CloudTaskInfo> CloudSyncer::CopyAndClearTaskInfos()
2153 {
2154 std::vector<CloudTaskInfo> infoList;
2155 std::lock_guard<std::mutex> autoLock(dataLock_);
2156 for (const auto &item: cloudTaskInfos_) {
2157 infoList.push_back(item.second);
2158 }
2159 taskQueue_.clear();
2160 priorityTaskQueue_.clear();
2161 cloudTaskInfos_.clear();
2162 resumeTaskInfos_.clear();
2163 currentContext_.notifier = nullptr;
2164 return infoList;
2165 }
2166
WaitCurTaskFinished()2167 void CloudSyncer::WaitCurTaskFinished()
2168 {
2169 LOGD("[CloudSyncer] begin wait current task finished");
2170 std::unique_lock<std::mutex> uniqueLock(dataLock_);
2171 contextCv_.wait(uniqueLock, [this]() {
2172 return currentContext_.currentTaskId == INVALID_TASK_ID;
2173 });
2174 LOGD("[CloudSyncer] current task has been finished");
2175 }
2176 } // namespace DistributedDB
2177