1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "cloud_syncer.h"
16
17 #include <cstdint>
18 #include <utility>
19 #include <unordered_map>
20
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "cloud/icloud_db.h"
24 #include "cloud_sync_tag_assets.h"
25 #include "cloud_sync_utils.h"
26 #include "db_errno.h"
27 #include "kv_store_errno.h"
28 #include "log_print.h"
29 #include "runtime_context.h"
30 #include "storage_proxy.h"
31 #include "store_types.h"
32 #include "strategy_factory.h"
33 #include "version.h"
34
35 namespace DistributedDB {
ReloadWaterMarkIfNeed(TaskId taskId,WaterMark & waterMark)36 void CloudSyncer::ReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark)
37 {
38 Timestamp cacheWaterMark = GetResumeWaterMark(taskId);
39 waterMark = cacheWaterMark == 0u ? waterMark : cacheWaterMark;
40 RecordWaterMark(taskId, 0u);
41 }
42
ReloadCloudWaterMarkIfNeed(const std::string & tableName,std::string & cloudWaterMark)43 void CloudSyncer::ReloadCloudWaterMarkIfNeed(const std::string &tableName, std::string &cloudWaterMark)
44 {
45 std::lock_guard<std::mutex> autoLock(dataLock_);
46 std::string cacheCloudWaterMark = currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName];
47 cloudWaterMark = cacheCloudWaterMark.empty() ? cloudWaterMark : cacheCloudWaterMark;
48 }
49
ReloadUploadInfoIfNeed(TaskId taskId,const UploadParam & param,InnerProcessInfo & info)50 void CloudSyncer::ReloadUploadInfoIfNeed(TaskId taskId, const UploadParam ¶m, InnerProcessInfo &info)
51 {
52 info.upLoadInfo.total = static_cast<uint32_t>(param.count);
53 {
54 std::lock_guard<std::mutex> autoLock(dataLock_);
55 if (!cloudTaskInfos_[taskId].resume) {
56 return;
57 }
58 }
59 uint32_t lastSuccessCount = GetLastUploadSuccessCount(info.tableName);
60 if (lastSuccessCount == 0) {
61 return;
62 }
63 info.upLoadInfo.total += lastSuccessCount;
64 info.upLoadInfo.successCount += lastSuccessCount;
65 LOGD("[CloudSyncer] resume upload, last success count %" PRIu32, lastSuccessCount);
66 }
67
GetLastUploadSuccessCount(const std::string & tableName)68 uint32_t CloudSyncer::GetLastUploadSuccessCount(const std::string &tableName)
69 {
70 std::lock_guard<std::mutex> autoLock(dataLock_);
71 return currentContext_.notifier->GetLastUploadSuccessCount(tableName);
72 }
73
FillDownloadExtend(TaskId taskId,const std::string & tableName,const std::string & cloudWaterMark,VBucket & extend)74 int CloudSyncer::FillDownloadExtend(TaskId taskId, const std::string &tableName, const std::string &cloudWaterMark,
75 VBucket &extend)
76 {
77 extend = {
78 {CloudDbConstant::CURSOR_FIELD, cloudWaterMark}
79 };
80
81 QuerySyncObject obj = GetQuerySyncObject(tableName);
82 if (obj.IsContainQueryNodes()) {
83 int errCode = GetCloudGid(taskId, tableName, obj);
84 if (errCode != E_OK) {
85 LOGE("[CloudSyncer] Failed to get cloud gid when fill extend, %d.", errCode);
86 return errCode;
87 }
88 Bytes bytes;
89 bytes.resize(obj.CalculateParcelLen(SOFTWARE_VERSION_CURRENT));
90 Parcel parcel(bytes.data(), bytes.size());
91 errCode = obj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
92 if (errCode != E_OK) {
93 LOGE("[CloudSyncer] Query serialize failed %d", errCode);
94 return errCode;
95 }
96 extend[CloudDbConstant::TYPE_FIELD] = static_cast<int64_t>(CloudQueryType::QUERY_FIELD);
97 extend[CloudDbConstant::QUERY_FIELD] = bytes;
98 } else {
99 extend[CloudDbConstant::TYPE_FIELD] = static_cast<int64_t>(CloudQueryType::FULL_TABLE);
100 }
101 return E_OK;
102 }
103
GetCloudGid(TaskId taskId,const std::string & tableName,QuerySyncObject & obj)104 int CloudSyncer::GetCloudGid(TaskId taskId, const std::string &tableName, QuerySyncObject &obj)
105 {
106 std::vector<std::string> cloudGid;
107 bool isCloudForcePush = cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
108 int errCode = storageProxy_->GetCloudGid(obj, isCloudForcePush, IsCompensatedTask(taskId), cloudGid);
109 if (errCode != E_OK) {
110 LOGE("[CloudSyncer] Failed to get cloud gid, %d.", errCode);
111 } else if (!cloudGid.empty()) {
112 obj.SetCloudGid(cloudGid);
113 }
114 LOGI("[CloudSyncer] get cloud gid size:%zu", cloudGid.size());
115 return errCode;
116 }
117
GetQuerySyncObject(const std::string & tableName)118 QuerySyncObject CloudSyncer::GetQuerySyncObject(const std::string &tableName)
119 {
120 std::lock_guard<std::mutex> autoLock(dataLock_);
121 for (const auto &item : cloudTaskInfos_[currentContext_.currentTaskId].queryList) {
122 if (item.GetTableName() == tableName) {
123 return item;
124 }
125 }
126 LOGW("[CloudSyncer] not found query in cache");
127 QuerySyncObject querySyncObject;
128 querySyncObject.SetTableName(tableName);
129 return querySyncObject;
130 }
131
UpdateProccessWhenUploadFailed(InnerProcessInfo & info)132 void CloudSyncer::UpdateProccessWhenUploadFailed(InnerProcessInfo &info)
133 {
134 info.tableStatus = ProcessStatus::FINISHED;
135 std::lock_guard<std::mutex> autoLock(dataLock_);
136 currentContext_.notifier->UpdateProcess(info);
137 }
138
NotifyUploadFailed(int errCode,InnerProcessInfo & info)139 void CloudSyncer::NotifyUploadFailed(int errCode, InnerProcessInfo &info)
140 {
141 if (errCode == -E_CLOUD_VERSION_CONFLICT) {
142 LOGI("[CloudSyncer] Stop upload due to version conflict, %d", errCode);
143 } else {
144 LOGE("[CloudSyncer] Failed to do upload, %d", errCode);
145 info.upLoadInfo.failCount = info.upLoadInfo.total - info.upLoadInfo.successCount;
146 }
147 UpdateProccessWhenUploadFailed(info);
148 }
149
BatchInsert(Info & insertInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)150 int CloudSyncer::BatchInsert(Info &insertInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
151 {
152 int errCode = cloudDB_.BatchInsert(uploadData.tableName, uploadData.insData.record,
153 uploadData.insData.extend, insertInfo);
154 innerProcessInfo.upLoadInfo.successCount += insertInfo.successCount;
155 innerProcessInfo.upLoadInfo.insertCount += insertInfo.successCount;
156 innerProcessInfo.upLoadInfo.total -= insertInfo.total - insertInfo.successCount - insertInfo.failCount;
157 if (errCode != E_OK) {
158 LOGE("[CloudSyncer][BatchInsert] BatchInsert with error, ret is %d.", errCode);
159 }
160 if (uploadData.isCloudVersionRecord) {
161 return errCode;
162 }
163 bool isSharedTable = false;
164 int ret = storageProxy_->IsSharedTable(uploadData.tableName, isSharedTable);
165 if (ret != E_OK) {
166 LOGE("[CloudSyncer] DoBatchUpload cannot judge the table is shared table. %d", ret);
167 return ret;
168 }
169 if (!isSharedTable) {
170 ret = CloudSyncUtils::FillAssetIdToAssets(uploadData.insData, errCode, CloudWaterType::INSERT);
171 if (ret != errCode) {
172 LOGW("[CloudSyncer][BatchInsert] FillAssetIdToAssets with error, ret is %d.", ret);
173 }
174 }
175 if (errCode != E_OK) {
176 storageProxy_->FillCloudGidIfSuccess(OpType::INSERT, uploadData);
177 bool isSkip = CloudSyncUtils::IsSkipAssetsMissingRecord(uploadData.insData.extend);
178 if (isSkip) {
179 LOGI("[CloudSyncer][BatchInsert] Try to FillCloudLogAndAsset when assets missing. errCode: %d", errCode);
180 return E_OK;
181 } else {
182 LOGE("[CloudSyncer][BatchInsert] errCode: %d, can not skip assets missing record.", errCode);
183 return errCode;
184 }
185 }
186 // we need to fill back gid after insert data to cloud.
187 int errorCode = storageProxy_->FillCloudLogAndAsset(OpType::INSERT, uploadData);
188 if ((errorCode != E_OK) || (ret != E_OK)) {
189 LOGE("[CloudSyncer] Failed to fill back when doing upload insData, %d.", errorCode);
190 return ret == E_OK ? errorCode : ret;
191 }
192 return E_OK;
193 }
194
BatchUpdate(Info & updateInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)195 int CloudSyncer::BatchUpdate(Info &updateInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
196 {
197 int errCode = cloudDB_.BatchUpdate(uploadData.tableName, uploadData.updData.record,
198 uploadData.updData.extend, updateInfo);
199 innerProcessInfo.upLoadInfo.successCount += updateInfo.successCount;
200 innerProcessInfo.upLoadInfo.updateCount += updateInfo.successCount;
201 innerProcessInfo.upLoadInfo.total -= updateInfo.total - updateInfo.successCount - updateInfo.failCount;
202 if (errCode != E_OK) {
203 LOGE("[CloudSyncer][BatchUpdate] BatchUpdate with error, ret is %d.", errCode);
204 }
205 if (uploadData.isCloudVersionRecord) {
206 return errCode;
207 }
208 bool isSharedTable = false;
209 int ret = storageProxy_->IsSharedTable(uploadData.tableName, isSharedTable);
210 if (ret != E_OK) {
211 LOGE("[CloudSyncer] DoBatchUpload cannot judge the table is shared table. %d", ret);
212 return ret;
213 }
214 if (!isSharedTable) {
215 ret = CloudSyncUtils::FillAssetIdToAssets(uploadData.updData, errCode, CloudWaterType::UPDATE);
216 if (ret != E_OK) {
217 LOGW("[CloudSyncer][BatchUpdate] FillAssetIdToAssets with error, ret is %d.", ret);
218 }
219 }
220 if (errCode != E_OK) {
221 storageProxy_->FillCloudGidIfSuccess(OpType::UPDATE, uploadData);
222 bool isSkip = CloudSyncUtils::IsSkipAssetsMissingRecord(uploadData.updData.extend);
223 if (isSkip) {
224 LOGI("[CloudSyncer][BatchUpdate] Try to FillCloudLogAndAsset when assets missing. errCode: %d", errCode);
225 return E_OK;
226 } else {
227 LOGE("[CloudSyncer][BatchUpdate] errCode: %d, can not skip assets missing record.", errCode);
228 return errCode;
229 }
230 }
231 int errorCode = storageProxy_->FillCloudLogAndAsset(OpType::UPDATE, uploadData);
232 if ((errorCode != E_OK) || (ret != E_OK)) {
233 LOGE("[CloudSyncer] Failed to fill back when doing upload updData, %d.", errorCode);
234 return ret == E_OK ? errorCode : ret;
235 }
236 return E_OK;
237 }
238
DownloadAssetsOneByOne(const InnerProcessInfo & info,DownloadItem & downloadItem,std::map<std::string,Assets> & downloadAssets)239 int CloudSyncer::DownloadAssetsOneByOne(const InnerProcessInfo &info, DownloadItem &downloadItem,
240 std::map<std::string, Assets> &downloadAssets)
241 {
242 bool isSharedTable = false;
243 int errCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
244 if (errCode != E_OK) {
245 LOGE("[CloudSyncer] DownloadOneAssetRecord cannot judge the table is a shared table. %d", errCode);
246 return errCode;
247 }
248 int transactionCode = E_OK;
249 // shared table don't download, so just begin transaction once
250 if (isSharedTable) {
251 transactionCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
252 }
253 if (transactionCode != E_OK) {
254 LOGE("[CloudSyncer] begin transaction before download failed %d", transactionCode);
255 return transactionCode;
256 }
257 errCode = DownloadAssetsOneByOneInner(isSharedTable, info, downloadItem, downloadAssets);
258 if (isSharedTable) {
259 transactionCode = storageProxy_->Commit();
260 if (transactionCode != E_OK) {
261 LOGW("[CloudSyncer] commit transaction after download failed %d", transactionCode);
262 }
263 }
264 return (errCode == E_OK) ? transactionCode : errCode;
265 }
266
GetDBAssets(bool isSharedTable,const InnerProcessInfo & info,const DownloadItem & downloadItem,VBucket & dbAssets)267 std::pair<int, uint32_t> CloudSyncer::GetDBAssets(bool isSharedTable, const InnerProcessInfo &info,
268 const DownloadItem &downloadItem, VBucket &dbAssets)
269 {
270 std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK) };
271 auto &errCode = res.first;
272 if (!isSharedTable) {
273 errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
274 }
275 if (errCode != E_OK) {
276 LOGE("[CloudSyncer] begin transaction before download failed %d", errCode);
277 return res;
278 }
279 res = storageProxy_->GetAssetsByGidOrHashKey(info.tableName, downloadItem.gid,
280 downloadItem.hashKey, dbAssets);
281 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
282 if (errCode != -E_CLOUD_GID_MISMATCH) {
283 LOGE("[CloudSyncer] get assets from db failed %d", errCode);
284 }
285 if (!isSharedTable) {
286 (void)storageProxy_->Rollback();
287 }
288 return res;
289 }
290 if (!isSharedTable) {
291 errCode = storageProxy_->Commit();
292 }
293 if (errCode != E_OK) {
294 LOGE("[CloudSyncer] commit transaction before download failed %d", errCode);
295 }
296 return res;
297 }
298
BackFillAssetsAfterDownload(int downloadCode,int deleteCode,std::map<std::string,std::vector<uint32_t>> & tmpFlags,std::map<std::string,Assets> & tmpAssetsToDownload,std::map<std::string,Assets> & tmpAssetsToDelete)299 std::map<std::string, Assets>& CloudSyncer::BackFillAssetsAfterDownload(int downloadCode, int deleteCode,
300 std::map<std::string, std::vector<uint32_t>> &tmpFlags, std::map<std::string, Assets> &tmpAssetsToDownload,
301 std::map<std::string, Assets> &tmpAssetsToDelete)
302 {
303 std::map<std::string, Assets> &downloadAssets = tmpAssetsToDownload;
304 for (auto &[col, assets] : tmpAssetsToDownload) {
305 int i = 0;
306 for (auto &asset : assets) {
307 asset.flag = tmpFlags[col][i++];
308 if (asset.flag == static_cast<uint32_t>(AssetOpType::NO_CHANGE)) {
309 continue;
310 }
311 if (downloadCode == E_OK) {
312 asset.status = NORMAL;
313 } else {
314 asset.status = (asset.status == NORMAL) ? NORMAL : ABNORMAL;
315 }
316 }
317 }
318 for (auto &[col, assets] : tmpAssetsToDelete) {
319 for (auto &asset : assets) {
320 asset.flag = static_cast<uint32_t>(AssetOpType::DELETE);
321 if (deleteCode == E_OK) {
322 asset.status = NORMAL;
323 } else {
324 asset.status = ABNORMAL;
325 }
326 downloadAssets[col].push_back(asset);
327 }
328 }
329 return downloadAssets;
330 }
331
IsNeedSkipDownload(bool isSharedTable,int & errCode,const InnerProcessInfo & info,const DownloadItem & downloadItem,VBucket & dbAssets)332 int CloudSyncer::IsNeedSkipDownload(bool isSharedTable, int &errCode, const InnerProcessInfo &info,
333 const DownloadItem &downloadItem, VBucket &dbAssets)
334 {
335 auto [tmpCode, status] = GetDBAssets(isSharedTable, info, downloadItem, dbAssets);
336 if (tmpCode == -E_CLOUD_GID_MISMATCH) {
337 LOGW("[CloudSyncer] skip download asset because gid mismatch");
338 errCode = E_OK;
339 return true;
340 }
341 if (CloudStorageUtils::IsDataLocked(status)) {
342 LOGI("[CloudSyncer] skip download asset because data lock:%u", status);
343 errCode = E_OK;
344 return true;
345 }
346 if (tmpCode != E_OK) {
347 errCode = (errCode != E_OK) ? errCode : tmpCode;
348 return true;
349 }
350 return false;
351 }
352
CheckDownloadOrDeleteCode(int & errCode,int downloadCode,int deleteCode,DownloadItem & downloadItem)353 bool CloudSyncer::CheckDownloadOrDeleteCode(int &errCode, int downloadCode, int deleteCode, DownloadItem &downloadItem)
354 {
355 if (downloadCode == -E_CLOUD_RECORD_EXIST_CONFLICT || deleteCode == -E_CLOUD_RECORD_EXIST_CONFLICT) {
356 downloadItem.recordConflict = true;
357 errCode = E_OK;
358 return false;
359 }
360 errCode = (errCode != E_OK) ? errCode : deleteCode;
361 errCode = (errCode != E_OK) ? errCode : downloadCode;
362 if (downloadCode == -E_NOT_SET || deleteCode == -E_NOT_SET) {
363 return false;
364 }
365 return true;
366 }
367
DownloadAssetsOneByOneInner(bool isSharedTable,const InnerProcessInfo & info,DownloadItem & downloadItem,std::map<std::string,Assets> & downloadAssets)368 int CloudSyncer::DownloadAssetsOneByOneInner(bool isSharedTable, const InnerProcessInfo &info,
369 DownloadItem &downloadItem, std::map<std::string, Assets> &downloadAssets)
370 {
371 int errCode = E_OK;
372 std::map<std::string, Assets> tmpAssetsToDownload;
373 std::map<std::string, Assets> tmpAssetsToDelete;
374 std::map<std::string, std::vector<uint32_t>> tmpFlags;
375 for (auto &[col, assets] : downloadAssets) {
376 for (auto &asset : assets) {
377 VBucket dbAssets;
378 if (IsNeedSkipDownload(isSharedTable, errCode, info, downloadItem, dbAssets)) {
379 break;
380 }
381 if (!isSharedTable && asset.flag == static_cast<uint32_t>(AssetOpType::DELETE)) {
382 asset.status = static_cast<uint32_t>(AssetStatus::DELETE);
383 tmpAssetsToDelete[col].push_back(asset);
384 } else if (!isSharedTable && AssetOperationUtils::CalAssetOperation(col, asset, dbAssets,
385 AssetOperationUtils::CloudSyncAction::START_DOWNLOAD) == AssetOperationUtils::AssetOpType::HANDLE) {
386 asset.status = asset.flag == static_cast<uint32_t>(AssetOpType::INSERT) ?
387 static_cast<uint32_t>(AssetStatus::INSERT) : static_cast<uint32_t>(AssetStatus::UPDATE);
388 tmpAssetsToDownload[col].push_back(asset);
389 tmpFlags[col].push_back(asset.flag);
390 } else {
391 LOGD("[CloudSyncer] skip download asset...");
392 }
393 }
394 }
395 auto deleteCode = cloudDB_.RemoveLocalAssets(info.tableName, downloadItem.gid, downloadItem.prefix,
396 tmpAssetsToDelete);
397 auto downloadCode = cloudDB_.Download(info.tableName, downloadItem.gid, downloadItem.prefix, tmpAssetsToDownload);
398 if (!CheckDownloadOrDeleteCode(errCode, downloadCode, deleteCode, downloadItem)) {
399 return errCode;
400 }
401
402 // copy asset back
403 downloadAssets = BackFillAssetsAfterDownload(downloadCode, deleteCode, tmpFlags, tmpAssetsToDownload,
404 tmpAssetsToDelete);
405 return errCode;
406 }
407
CommitDownloadAssets(const DownloadItem & downloadItem,const std::string & tableName,DownloadCommitList & commitList,uint32_t & successCount)408 int CloudSyncer::CommitDownloadAssets(const DownloadItem &downloadItem, const std::string &tableName,
409 DownloadCommitList &commitList, uint32_t &successCount)
410 {
411 int errCode = storageProxy_->SetLogTriggerStatus(false);
412 if (errCode != E_OK) {
413 return errCode;
414 }
415 for (auto &item : commitList) {
416 std::string gid = std::get<0>(item); // 0 means gid is the first element in assetsInfo
417 // 1 means assetsMap info [colName, assets] is the forth element in downloadList[i]
418 std::map<std::string, Assets> assetsMap = std::get<1>(item);
419 bool setAllNormal = std::get<2>(item); // 2 means whether the download return is E_OK
420 VBucket normalAssets;
421 VBucket failedAssets;
422 normalAssets[CloudDbConstant::GID_FIELD] = gid;
423 failedAssets[CloudDbConstant::GID_FIELD] = gid;
424 VBucket &assets = setAllNormal ? normalAssets : failedAssets;
425 for (auto &[key, asset] : assetsMap) {
426 assets[key] = std::move(asset);
427 }
428 if (!downloadItem.recordConflict) {
429 errCode = FillCloudAssets(tableName, normalAssets, failedAssets);
430 if (errCode != E_OK) {
431 break;
432 }
433 }
434 LogInfo logInfo;
435 logInfo.cloudGid = gid;
436 // download must contain gid, just set the default value here.
437 logInfo.dataKey = DBConstant::DEFAULT_ROW_ID;
438 logInfo.hashKey = downloadItem.hashKey;
439 logInfo.timestamp = downloadItem.timestamp;
440 // there are failed assets, reset the timestamp to prevent the flag from being marked as consistent.
441 if (failedAssets.size() > 1) {
442 logInfo.timestamp = 0u;
443 }
444
445 errCode = storageProxy_->UpdateRecordFlag(tableName, downloadItem.recordConflict, logInfo);
446 if (errCode != E_OK) {
447 break;
448 }
449 successCount++;
450 }
451 int ret = storageProxy_->SetLogTriggerStatus(true);
452 return errCode == E_OK ? ret : errCode;
453 }
454
GenerateCompensatedSync(CloudTaskInfo & taskInfo)455 void CloudSyncer::GenerateCompensatedSync(CloudTaskInfo &taskInfo)
456 {
457 std::vector<QuerySyncObject> syncQuery;
458 std::vector<std::string> users;
459 int errCode = storageProxy_->GetCompensatedSyncQuery(syncQuery, users);
460 if (errCode != E_OK) {
461 LOGW("[CloudSyncer] Generate compensated sync failed by get query! errCode = %d", errCode);
462 return;
463 }
464 if (syncQuery.empty()) {
465 LOGD("[CloudSyncer] Not need generate compensated sync");
466 return;
467 }
468 taskInfo.users.clear();
469 auto cloudDBs = cloudDB_.GetCloudDB();
470 for (auto &[user, cloudDb] : cloudDBs) {
471 auto it = std::find(users.begin(), users.end(), user);
472 if (it != users.end()) {
473 taskInfo.users.push_back(user);
474 }
475 }
476 for (const auto &query : syncQuery) {
477 taskInfo.table.push_back(query.GetRelationTableName());
478 taskInfo.queryList.push_back(query);
479 }
480 Sync(taskInfo);
481 LOGI("[CloudSyncer] Generate compensated sync finished");
482 }
483
ChkIgnoredProcess(InnerProcessInfo & info,const CloudSyncData & uploadData,UploadParam & uploadParam)484 void CloudSyncer::ChkIgnoredProcess(InnerProcessInfo &info, const CloudSyncData &uploadData, UploadParam &uploadParam)
485 {
486 if (uploadData.ignoredCount == 0) { // LCOV_EXCL_BR_LINE
487 return;
488 }
489 info.upLoadInfo.total -= static_cast<uint32_t>(uploadData.ignoredCount);
490 if (info.upLoadInfo.successCount + info.upLoadInfo.failCount != info.upLoadInfo.total) { // LCOV_EXCL_BR_LINE
491 return;
492 }
493 if (!CloudSyncUtils::CheckCloudSyncDataEmpty(uploadData)) { // LCOV_EXCL_BR_LINE
494 return;
495 }
496 info.tableStatus = ProcessStatus::FINISHED;
497 info.upLoadInfo.batchIndex++;
498 NotifyInBatchUpload(uploadParam, info, true);
499 }
500
SaveCursorIfNeed(const std::string & tableName)501 int CloudSyncer::SaveCursorIfNeed(const std::string &tableName)
502 {
503 std::string cursor = "";
504 int errCode = storageProxy_->GetCloudWaterMark(tableName, cursor);
505 if (errCode != E_OK) {
506 LOGE("[CloudSyncer] get cloud water mark before download failed %d", errCode);
507 return errCode;
508 }
509 if (!cursor.empty()) {
510 return E_OK;
511 }
512 auto res = cloudDB_.GetEmptyCursor(tableName);
513 if (res.first != E_OK) {
514 LOGE("[CloudSyncer] get empty cursor failed %d", res.first);
515 return res.first;
516 }
517 if (res.second.empty()) {
518 LOGE("[CloudSyncer] get cursor is empty %d", -E_CLOUD_ERROR);
519 return -E_CLOUD_ERROR;
520 }
521 errCode = storageProxy_->SetCloudWaterMark(tableName, res.second);
522 if (errCode != E_OK) {
523 LOGE("[CloudSyncer] set cloud water mark before download failed %d", errCode);
524 }
525 return errCode;
526 }
527
PrepareAndDownload(const std::string & table,const CloudTaskInfo & taskInfo,bool isFirstDownload)528 int CloudSyncer::PrepareAndDownload(const std::string &table, const CloudTaskInfo &taskInfo, bool isFirstDownload)
529 {
530 std::string hashDev;
531 int errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDev);
532 if (errCode != E_OK) {
533 LOGE("[CloudSyncer] Failed to get local identity.");
534 return errCode;
535 }
536 errCode = SaveCursorIfNeed(table);
537 if (errCode != E_OK) {
538 return errCode;
539 }
540 bool isShared = false;
541 errCode = storageProxy_->IsSharedTable(table, isShared);
542 if (errCode != E_OK) {
543 LOGE("[CloudSyncer] check shared table failed %d", errCode);
544 return errCode;
545 }
546 // shared table not allow logic delete
547 storageProxy_->SetCloudTaskConfig({ !isShared });
548 errCode = CheckTaskIdValid(taskInfo.taskId);
549 if (errCode != E_OK) {
550 LOGW("[CloudSyncer] task is invalid, abort sync");
551 return errCode;
552 }
553 errCode = DoDownload(taskInfo.taskId, isFirstDownload);
554 if (errCode != E_OK) {
555 LOGE("[CloudSyncer] download failed %d", errCode);
556 }
557 return errCode;
558 }
559
IsClosed() const560 bool CloudSyncer::IsClosed() const
561 {
562 return closed_ || IsKilled();
563 }
564
UpdateFlagForSavedRecord(const SyncParam & param)565 int CloudSyncer::UpdateFlagForSavedRecord(const SyncParam ¶m)
566 {
567 DownloadList downloadList;
568 {
569 std::lock_guard<std::mutex> autoLock(dataLock_);
570 downloadList = currentContext_.assetDownloadList;
571 }
572 std::set<std::string> gidFilters;
573 for (const auto &tuple: downloadList) {
574 gidFilters.insert(std::get<CloudSyncUtils::GID_INDEX>(tuple));
575 }
576 return storageProxy_->MarkFlagAsConsistent(param.tableName, param.downloadData, gidFilters);
577 }
578
BatchDelete(Info & deleteInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)579 int CloudSyncer::BatchDelete(Info &deleteInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
580 {
581 int errCode = cloudDB_.BatchDelete(uploadData.tableName, uploadData.delData.record,
582 uploadData.delData.extend, deleteInfo);
583 innerProcessInfo.upLoadInfo.successCount += deleteInfo.successCount;
584 innerProcessInfo.upLoadInfo.deleteCount += deleteInfo.successCount;
585 innerProcessInfo.upLoadInfo.total -= deleteInfo.total - deleteInfo.successCount - deleteInfo.failCount;
586 if (errCode != E_OK) {
587 LOGE("[CloudSyncer] Failed to batch delete, %d", errCode);
588 storageProxy_->FillCloudGidIfSuccess(OpType::DELETE, uploadData);
589 return errCode;
590 }
591 errCode = storageProxy_->FillCloudLogAndAsset(OpType::DELETE, uploadData);
592 if (errCode != E_OK) {
593 LOGE("[CloudSyncer] Failed to fill back when doing upload delData, %d.", errCode);
594 }
595 return errCode;
596 }
597
IsCompensatedTask(TaskId taskId)598 bool CloudSyncer::IsCompensatedTask(TaskId taskId)
599 {
600 std::lock_guard<std::mutex> autoLock(dataLock_);
601 return cloudTaskInfos_[taskId].compensatedTask;
602 }
603
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)604 int CloudSyncer::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
605 {
606 return cloudDB_.SetCloudDB(cloudDBs);
607 }
608
CleanAllWaterMark()609 void CloudSyncer::CleanAllWaterMark()
610 {
611 storageProxy_->CleanAllWaterMark();
612 }
613
GetDownloadItem(const DownloadList & downloadList,size_t i,DownloadItem & downloadItem)614 void CloudSyncer::GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem)
615 {
616 downloadItem.gid = std::get<CloudSyncUtils::GID_INDEX>(downloadList[i]);
617 downloadItem.prefix = std::get<CloudSyncUtils::PREFIX_INDEX>(downloadList[i]);
618 downloadItem.strategy = std::get<CloudSyncUtils::STRATEGY_INDEX>(downloadList[i]);
619 downloadItem.assets = std::get<CloudSyncUtils::ASSETS_INDEX>(downloadList[i]);
620 downloadItem.hashKey = std::get<CloudSyncUtils::HASH_KEY_INDEX>(downloadList[i]);
621 downloadItem.primaryKeyValList = std::get<CloudSyncUtils::PRIMARY_KEY_INDEX>(downloadList[i]);
622 downloadItem.timestamp = std::get<CloudSyncUtils::TIMESTAMP_INDEX>(downloadList[i]);
623 }
624
DoNotifyInNeed(const CloudSyncer::TaskId & taskId,const std::vector<std::string> & needNotifyTables,const bool isFirstDownload)625 void CloudSyncer::DoNotifyInNeed(const CloudSyncer::TaskId &taskId, const std::vector<std::string> &needNotifyTables,
626 const bool isFirstDownload)
627 {
628 bool isNeedNotify = false;
629 {
630 std::lock_guard<std::mutex> autoLock(dataLock_);
631 // only when the first download and the task no need upload actually, notify the process, otherwise,
632 // the process will notify in the upload procedure, which can guarantee the notify order of the tables
633 isNeedNotify = isFirstDownload && !currentContext_.isNeedUpload;
634 }
635 if (!isNeedNotify) {
636 return;
637 }
638 for (size_t i = 0; i < needNotifyTables.size(); ++i) {
639 UpdateProcessInfoWithoutUpload(taskId, needNotifyTables[i], i != (needNotifyTables.size() - 1u));
640 }
641 }
642
GetUploadCountByTable(const CloudSyncer::TaskId & taskId,int64_t & count)643 int CloudSyncer::GetUploadCountByTable(const CloudSyncer::TaskId &taskId, int64_t &count)
644 {
645 std::string tableName;
646 int ret = GetCurrentTableName(tableName);
647 if (ret != E_OK) {
648 LOGE("[CloudSyncer] Invalid table name for get local water mark: %d", ret);
649 return ret;
650 }
651
652 ret = storageProxy_->StartTransaction();
653 if (ret != E_OK) {
654 LOGE("[CloudSyncer] start transaction failed before getting upload count.");
655 return ret;
656 }
657
658 ret = storageProxy_->GetUploadCount(GetQuerySyncObject(tableName), IsModeForcePush(taskId),
659 IsCompensatedTask(taskId), IsNeedGetLocalWater(taskId), count);
660 if (ret != E_OK) {
661 // GetUploadCount will return E_OK when upload count is zero.
662 LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret);
663 }
664 // No need Rollback when GetUploadCount failed
665 storageProxy_->Commit();
666 return ret;
667 }
668
UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId,const std::string & tableName,bool needNotify)669 void CloudSyncer::UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId, const std::string &tableName,
670 bool needNotify)
671 {
672 LOGI("[CloudSyncer] There is no need to doing upload, as the upload data count is zero.");
673 InnerProcessInfo innerProcessInfo;
674 innerProcessInfo.tableName = tableName;
675 innerProcessInfo.upLoadInfo.total = 0; // count is zero
676 innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
677 {
678 std::lock_guard<std::mutex> autoLock(dataLock_);
679 if (!needNotify) {
680 currentContext_.notifier->UpdateProcess(innerProcessInfo);
681 } else {
682 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], innerProcessInfo);
683 }
684 }
685 }
686
DoDownloadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload,bool isFirstDownload)687 int CloudSyncer::DoDownloadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload, bool isFirstDownload)
688 {
689 std::vector<std::string> needNotifyTables;
690 for (size_t i = 0; i < taskInfo.table.size(); ++i) {
691 std::string table;
692 {
693 std::lock_guard<std::mutex> autoLock(dataLock_);
694 if (currentContext_.processRecorder->IsDownloadFinish(currentContext_.currentUserIndex,
695 taskInfo.table[i])) {
696 continue;
697 }
698 LOGD("[CloudSyncer] try download table, index: %zu", i);
699 currentContext_.tableName = taskInfo.table[i];
700 table = currentContext_.tableName;
701 }
702 int errCode = PrepareAndDownload(table, taskInfo, isFirstDownload);
703 if (errCode != E_OK) {
704 return errCode;
705 }
706 MarkDownloadFinishIfNeed(table);
707 // needUpload indicate that the syncMode need push
708 if (needUpload) {
709 int64_t count = 0;
710 errCode = GetUploadCountByTable(taskInfo.taskId, count);
711 if (errCode != E_OK) {
712 LOGE("[CloudSyncer] GetUploadCountByTable failed %d", errCode);
713 return errCode;
714 }
715 // count > 0 means current table need upload actually
716 if (count > 0) {
717 {
718 std::lock_guard<std::mutex> autoLock(dataLock_);
719 currentContext_.isNeedUpload = true;
720 }
721 continue;
722 }
723 needNotifyTables.emplace_back(table);
724 }
725 errCode = SaveCloudWaterMark(taskInfo.table[i], taskInfo.taskId);
726 if (errCode != E_OK) {
727 LOGE("[CloudSyncer] Can not save cloud water mark after downloading %d", errCode);
728 return errCode;
729 }
730 }
731 DoNotifyInNeed(taskInfo.taskId, needNotifyTables, isFirstDownload);
732 return E_OK;
733 }
734
IsNeedGetLocalWater(TaskId taskId)735 bool CloudSyncer::IsNeedGetLocalWater(TaskId taskId)
736 {
737 return !IsModeForcePush(taskId) && (!IsPriorityTask(taskId) || IsQueryListEmpty(taskId)) &&
738 !IsCompensatedTask(taskId);
739 }
740
TryToAddSyncTask(CloudTaskInfo && taskInfo)741 int CloudSyncer::TryToAddSyncTask(CloudTaskInfo &&taskInfo)
742 {
743 if (closed_) {
744 LOGW("[CloudSyncer] syncer is closed, should not sync now");
745 return -E_DB_CLOSED;
746 }
747 std::shared_ptr<DataBaseSchema> cloudSchema;
748 int errCode = storageProxy_->GetCloudDbSchema(cloudSchema);
749 if (errCode != E_OK) {
750 LOGE("[CloudSyncer] Get cloud schema failed %d when add task", errCode);
751 return errCode;
752 }
753 std::lock_guard<std::mutex> autoLock(dataLock_);
754 errCode = CheckQueueSizeWithNoLock(taskInfo.priorityTask);
755 if (errCode != E_OK) {
756 return errCode;
757 }
758 errCode = GenerateTaskIdIfNeed(taskInfo);
759 if (errCode != E_OK) {
760 return errCode;
761 }
762 auto taskId = taskInfo.taskId;
763 cloudTaskInfos_[taskId] = std::move(taskInfo);
764 if (cloudTaskInfos_[taskId].priorityTask) {
765 priorityTaskQueue_.push_back(taskId);
766 LOGI("[CloudSyncer] Add priority task ok, storeId %.3s, taskId %" PRIu64,
767 cloudTaskInfos_[taskId].storeId.c_str(), cloudTaskInfos_[taskId].taskId);
768 return E_OK;
769 }
770 if (!MergeTaskInfo(cloudSchema, taskId)) {
771 taskQueue_.push_back(taskId);
772 LOGI("[CloudSyncer] Add task ok, storeId %.3s, taskId %" PRIu64, cloudTaskInfos_[taskId].storeId.c_str(),
773 cloudTaskInfos_[taskId].taskId);
774 }
775 return E_OK;
776 }
777
MergeTaskInfo(const std::shared_ptr<DataBaseSchema> & cloudSchema,TaskId taskId)778 bool CloudSyncer::MergeTaskInfo(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId taskId)
779 {
780 if (!cloudTaskInfos_[taskId].merge) { // LCOV_EXCL_BR_LINE
781 return false;
782 }
783 bool isMerge = false;
784 bool mergeHappen = false;
785 TaskId checkTaskId = taskId;
786 do {
787 std::tie(isMerge, checkTaskId) = TryMergeTask(cloudSchema, checkTaskId);
788 mergeHappen = mergeHappen || isMerge;
789 } while (isMerge);
790 return mergeHappen;
791 }
792
TryMergeTask(const std::shared_ptr<DataBaseSchema> & cloudSchema,TaskId tryTaskId)793 std::pair<bool, TaskId> CloudSyncer::TryMergeTask(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId tryTaskId)
794 {
795 std::pair<bool, TaskId> res;
796 auto &[merge, nextTryTask] = res;
797 TaskId beMergeTask = INVALID_TASK_ID;
798 TaskId runningTask = currentContext_.currentTaskId;
799 for (const auto &taskId : taskQueue_) {
800 if (taskId == runningTask || taskId == tryTaskId) { // LCOV_EXCL_BR_LINE
801 continue;
802 }
803 if (!IsTasksCanMerge(taskId, tryTaskId)) { // LCOV_EXCL_BR_LINE
804 continue;
805 }
806 if (MergeTaskTablesIfConsistent(taskId, tryTaskId)) { // LCOV_EXCL_BR_LINE
807 beMergeTask = taskId;
808 nextTryTask = tryTaskId;
809 merge = true;
810 break;
811 }
812 if (MergeTaskTablesIfConsistent(tryTaskId, taskId)) { // LCOV_EXCL_BR_LINE
813 beMergeTask = tryTaskId;
814 nextTryTask = taskId;
815 merge = true;
816 break;
817 }
818 }
819 if (!merge) { // LCOV_EXCL_BR_LINE
820 return res;
821 }
822 if (beMergeTask < nextTryTask) { // LCOV_EXCL_BR_LINE
823 std::tie(beMergeTask, nextTryTask) = SwapTwoTaskAndCopyTable(beMergeTask, nextTryTask);
824 }
825 AdjustTableBasedOnSchema(cloudSchema, cloudTaskInfos_[nextTryTask]);
826 auto processNotifier = std::make_shared<ProcessNotifier>(this);
827 processNotifier->Init(cloudTaskInfos_[beMergeTask].table, cloudTaskInfos_[beMergeTask].devices,
828 cloudTaskInfos_[beMergeTask].users);
829 cloudTaskInfos_[beMergeTask].errCode = -E_CLOUD_SYNC_TASK_MERGED;
830 cloudTaskInfos_[beMergeTask].status = ProcessStatus::FINISHED;
831 processNotifier->SetAllTableFinish();
832 processNotifier->NotifyProcess(cloudTaskInfos_[beMergeTask], {}, true);
833 cloudTaskInfos_.erase(beMergeTask);
834 taskQueue_.remove(beMergeTask);
835 LOGW("[CloudSyncer] TaskId %" PRIu64 " has been merged", beMergeTask);
836 return res;
837 }
838
IsTaskCanMerge(const CloudTaskInfo & taskInfo)839 bool CloudSyncer::IsTaskCanMerge(const CloudTaskInfo &taskInfo)
840 {
841 return !taskInfo.compensatedTask && !taskInfo.priorityTask &&
842 taskInfo.merge && taskInfo.mode == SYNC_MODE_CLOUD_MERGE;
843 }
844
IsTasksCanMerge(TaskId taskId,TaskId tryMergeTaskId)845 bool CloudSyncer::IsTasksCanMerge(TaskId taskId, TaskId tryMergeTaskId)
846 {
847 const auto &taskInfo = cloudTaskInfos_[taskId];
848 const auto &tryMergeTaskInfo = cloudTaskInfos_[tryMergeTaskId];
849 return IsTaskCanMerge(taskInfo) && IsTaskCanMerge(tryMergeTaskInfo) &&
850 taskInfo.devices == tryMergeTaskInfo.devices;
851 }
852
MergeTaskTablesIfConsistent(TaskId sourceId,TaskId targetId)853 bool CloudSyncer::MergeTaskTablesIfConsistent(TaskId sourceId, TaskId targetId)
854 {
855 const auto &source = cloudTaskInfos_[sourceId];
856 const auto &target = cloudTaskInfos_[targetId];
857 bool isMerge = true;
858 for (const auto &table : source.table) {
859 if (std::find(target.table.begin(), target.table.end(), table) == target.table.end()) { // LCOV_EXCL_BR_LINE
860 isMerge = false;
861 break;
862 }
863 }
864 return isMerge;
865 }
866
AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> & cloudSchema,CloudTaskInfo & taskInfo)867 void CloudSyncer::AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> &cloudSchema,
868 CloudTaskInfo &taskInfo)
869 {
870 std::vector<std::string> tmpTables = taskInfo.table;
871 taskInfo.table.clear();
872 taskInfo.queryList.clear();
873 for (const auto &table : cloudSchema->tables) {
874 if (std::find(tmpTables.begin(), tmpTables.end(), table.name) != tmpTables.end()) { // LCOV_EXCL_BR_LINE
875 taskInfo.table.push_back(table.name);
876 QuerySyncObject querySyncObject;
877 querySyncObject.SetTableName(table.name);
878 taskInfo.queryList.push_back(querySyncObject);
879 }
880 }
881 }
882
SwapTwoTaskAndCopyTable(TaskId source,TaskId target)883 std::pair<TaskId, TaskId> CloudSyncer::SwapTwoTaskAndCopyTable(TaskId source, TaskId target)
884 {
885 cloudTaskInfos_[source].table = cloudTaskInfos_[target].table;
886 cloudTaskInfos_[source].queryList = cloudTaskInfos_[target].queryList;
887 std::set<std::string> users;
888 users.insert(cloudTaskInfos_[source].users.begin(), cloudTaskInfos_[source].users.end());
889 users.insert(cloudTaskInfos_[target].users.begin(), cloudTaskInfos_[target].users.end());
890 cloudTaskInfos_[source].users = std::vector<std::string>(users.begin(), users.end());
891 return {target, source};
892 }
893
IsQueryListEmpty(TaskId taskId)894 bool CloudSyncer::IsQueryListEmpty(TaskId taskId)
895 {
896 std::lock_guard<std::mutex> autoLock(dataLock_);
897 return !std::any_of(cloudTaskInfos_[taskId].queryList.begin(), cloudTaskInfos_[taskId].queryList.end(),
898 [](const auto &item) {
899 return item.IsContainQueryNodes();
900 });
901 }
902
IsNeedLock(const UploadParam & param)903 bool CloudSyncer::IsNeedLock(const UploadParam ¶m)
904 {
905 return param.lockAction == LockAction::INSERT && param.mode == CloudWaterType::INSERT;
906 }
907
GetLocalWater(const std::string & tableName,UploadParam & uploadParam)908 std::pair<int, Timestamp> CloudSyncer::GetLocalWater(const std::string &tableName, UploadParam &uploadParam)
909 {
910 std::pair<int, Timestamp> res = { E_OK, 0u };
911 if (IsNeedGetLocalWater(uploadParam.taskId)) {
912 res.first = storageProxy_->GetLocalWaterMarkByMode(tableName, uploadParam.mode, res.second);
913 }
914 uploadParam.localMark = res.second;
915 return res;
916 }
917
HandleBatchUpload(UploadParam & uploadParam,InnerProcessInfo & info,CloudSyncData & uploadData,ContinueToken & continueStmtToken)918 int CloudSyncer::HandleBatchUpload(UploadParam &uploadParam, InnerProcessInfo &info,
919 CloudSyncData &uploadData, ContinueToken &continueStmtToken)
920 {
921 int ret = E_OK;
922 uint32_t batchIndex = GetCurrentTableUploadBatchIndex();
923 bool isLocked = false;
924 while (!CloudSyncUtils::CheckCloudSyncDataEmpty(uploadData)) {
925 ret = PreProcessBatchUpload(uploadParam, info, uploadData);
926 if (ret != E_OK) {
927 break;
928 }
929 info.upLoadInfo.batchIndex = ++batchIndex;
930 if (IsNeedLock(uploadParam) && !isLocked) {
931 ret = LockCloudIfNeed(uploadParam.taskId);
932 if (ret != E_OK) {
933 break;
934 }
935 isLocked = true;
936 }
937 ret = DoBatchUpload(uploadData, uploadParam, info);
938 if (ret != E_OK) {
939 break;
940 }
941 uploadData = CloudSyncData(uploadData.tableName, uploadParam.mode);
942 if (continueStmtToken == nullptr) {
943 break;
944 }
945 SetUploadDataFlag(uploadParam.taskId, uploadData);
946 RecordWaterMark(uploadParam.taskId, uploadParam.localMark);
947 ret = storageProxy_->GetCloudDataNext(continueStmtToken, uploadData);
948 if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
949 LOGE("[CloudSyncer] Failed to get cloud data next when doing upload, %d.", ret);
950 break;
951 }
952 ChkIgnoredProcess(info, uploadData, uploadParam);
953 }
954 if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
955 NotifyUploadFailed(ret, info);
956 }
957 if (isLocked && IsNeedLock(uploadParam)) {
958 UnlockIfNeed();
959 }
960 return ret;
961 }
962
DoUploadInner(const std::string & tableName,UploadParam & uploadParam)963 int CloudSyncer::DoUploadInner(const std::string &tableName, UploadParam &uploadParam)
964 {
965 InnerProcessInfo info = GetInnerProcessInfo(tableName, uploadParam);
966 static std::vector<CloudWaterType> waterTypes = DBCommon::GetWaterTypeVec();
967 int errCode = E_OK;
968 for (const auto &waterType: waterTypes) {
969 uploadParam.mode = waterType;
970 errCode = DoUploadByMode(tableName, uploadParam, info);
971 if (errCode != E_OK) {
972 break;
973 }
974 }
975 int ret = E_OK;
976 if (info.upLoadInfo.successCount > 0) {
977 ret = UploadVersionRecordIfNeed(uploadParam);
978 }
979 return errCode != E_OK ? errCode : ret;
980 }
981
UploadVersionRecordIfNeed(const UploadParam & uploadParam)982 int CloudSyncer::UploadVersionRecordIfNeed(const UploadParam &uploadParam)
983 {
984 if (uploadParam.count == 0) {
985 // no record upload
986 return E_OK;
987 }
988 if (!cloudDB_.IsExistCloudVersionCallback()) {
989 return E_OK;
990 }
991 auto [errCode, uploadData] = storageProxy_->GetLocalCloudVersion();
992 if (errCode != E_OK) {
993 return errCode;
994 }
995 bool isInsert = !uploadData.insData.record.empty();
996 CloudSyncBatch &batchData = isInsert ? uploadData.insData : uploadData.updData;
997 if (batchData.record.empty()) {
998 LOGE("[CloudSyncer] Get invalid cloud version record");
999 return -E_INTERNAL_ERROR;
1000 }
1001 std::string oriVersion;
1002 CloudStorageUtils::GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, batchData.record[0], oriVersion);
1003 std::string newVersion;
1004 std::tie(errCode, newVersion) = cloudDB_.GetCloudVersion(oriVersion);
1005 if (errCode != E_OK) {
1006 LOGE("[CloudSyncer] Get cloud version error %d", errCode);
1007 return errCode;
1008 }
1009 batchData.record[0][CloudDbConstant::CLOUD_KV_FIELD_VALUE] = newVersion;
1010 InnerProcessInfo processInfo;
1011 Info info;
1012 std::vector<VBucket> copyRecord = batchData.record;
1013 WaterMark waterMark;
1014 CloudSyncUtils::GetWaterMarkAndUpdateTime(batchData.extend, waterMark);
1015 errCode = isInsert ? BatchInsert(info, uploadData, processInfo) : BatchUpdate(info, uploadData, processInfo);
1016 batchData.record = copyRecord;
1017 CloudSyncUtils::ModifyCloudDataTime(batchData.extend[0]);
1018 auto ret = storageProxy_->FillCloudLogAndAsset(isInsert ? OpType::INSERT : OpType::UPDATE, uploadData);
1019 return errCode != E_OK ? errCode : ret;
1020 }
1021
TagUploadAssets(CloudSyncData & uploadData)1022 void CloudSyncer::TagUploadAssets(CloudSyncData &uploadData)
1023 {
1024 if (!IsDataContainAssets()) {
1025 return;
1026 }
1027 std::vector<Field> assetFields;
1028 {
1029 std::lock_guard<std::mutex> autoLock(dataLock_);
1030 assetFields = currentContext_.assetFields[currentContext_.tableName];
1031 }
1032
1033 for (size_t i = 0; i < uploadData.insData.extend.size(); i++) {
1034 for (const Field &assetField : assetFields) {
1035 (void)TagAssetsInSingleCol(assetField, true, uploadData.insData.record[i]);
1036 }
1037 }
1038 for (size_t i = 0; i < uploadData.updData.extend.size(); i++) {
1039 for (const Field &assetField : assetFields) {
1040 (void)TagAssetsInSingleCol(assetField, false, uploadData.updData.record[i]);
1041 }
1042 }
1043 }
1044
IsLockInDownload()1045 bool CloudSyncer::IsLockInDownload()
1046 {
1047 std::lock_guard<std::mutex> autoLock(dataLock_);
1048 if (cloudTaskInfos_.find(currentContext_.currentTaskId) == cloudTaskInfos_.end()) {
1049 return false;
1050 }
1051 auto currentLockAction = static_cast<uint32_t>(cloudTaskInfos_[currentContext_.currentTaskId].lockAction);
1052 return (currentLockAction & static_cast<uint32_t>(LockAction::DOWNLOAD)) != 0;
1053 }
1054
SetCurrentTaskFailedInMachine(int errCode)1055 CloudSyncEvent CloudSyncer::SetCurrentTaskFailedInMachine(int errCode)
1056 {
1057 std::lock_guard<std::mutex> autoLock(dataLock_);
1058 cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
1059 return CloudSyncEvent::ERROR_EVENT;
1060 }
1061
InitCloudSyncStateMachine()1062 void CloudSyncer::InitCloudSyncStateMachine()
1063 {
1064 CloudSyncStateMachine::Initialize();
1065 cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_DOWNLOAD, [this]() {
1066 return SyncMachineDoDownload();
1067 });
1068 cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_UPLOAD, [this]() {
1069 return SyncMachineDoUpload();
1070 });
1071 cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_FINISHED, [this]() {
1072 return SyncMachineDoFinished();
1073 });
1074 cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_REPEAT_CHECK, [this]() {
1075 return SyncMachineDoRepeatCheck();
1076 });
1077 }
1078
SyncMachineDoRepeatCheck()1079 CloudSyncEvent CloudSyncer::SyncMachineDoRepeatCheck()
1080 {
1081 auto config = storageProxy_->GetCloudSyncConfig();
1082 {
1083 std::lock_guard<std::mutex> autoLock(dataLock_);
1084 if (config.maxRetryConflictTimes < 0) { // unlimited repeat counts
1085 return CloudSyncEvent::REPEAT_DOWNLOAD_EVENT;
1086 }
1087 currentContext_.repeatCount++;
1088 if (currentContext_.repeatCount > config.maxRetryConflictTimes) {
1089 LOGD("[CloudSyncer] Repeat too much times current %d limit %" PRId32, currentContext_.repeatCount,
1090 config.maxRetryConflictTimes);
1091 SetCurrentTaskFailedWithoutLock(-E_CLOUD_VERSION_CONFLICT);
1092 return CloudSyncEvent::ERROR_EVENT;
1093 }
1094 LOGD("[CloudSyncer] Repeat taskId %" PRIu64 " download current %d", currentContext_.currentTaskId,
1095 currentContext_.repeatCount);
1096 }
1097 return CloudSyncEvent::REPEAT_DOWNLOAD_EVENT;
1098 }
1099
MarkDownloadFinishIfNeed(const std::string & downloadTable,bool isFinish)1100 void CloudSyncer::MarkDownloadFinishIfNeed(const std::string &downloadTable, bool isFinish)
1101 {
1102 // table exist reference should download every times
1103 if (IsLockInDownload() || storageProxy_->IsTableExistReferenceOrReferenceBy(downloadTable)) {
1104 return;
1105 }
1106 std::lock_guard<std::mutex> autoLock(dataLock_);
1107 currentContext_.processRecorder->MarkDownloadFinish(currentContext_.currentUserIndex, downloadTable, isFinish);
1108 }
1109
DoUploadByMode(const std::string & tableName,UploadParam & uploadParam,InnerProcessInfo & info)1110 int CloudSyncer::DoUploadByMode(const std::string &tableName, UploadParam &uploadParam, InnerProcessInfo &info)
1111 {
1112 CloudSyncData uploadData(tableName, uploadParam.mode);
1113 SetUploadDataFlag(uploadParam.taskId, uploadData);
1114 auto [err, localWater] = GetLocalWater(tableName, uploadParam);
1115 if (err != E_OK) {
1116 return err;
1117 }
1118 ContinueToken continueStmtToken = nullptr;
1119 int ret = storageProxy_->GetCloudData(GetQuerySyncObject(tableName), localWater, continueStmtToken, uploadData);
1120 if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
1121 LOGE("[CloudSyncer] Failed to get cloud data when upload, %d.", ret);
1122 return ret;
1123 }
1124 uploadParam.count -= uploadData.ignoredCount;
1125 info.upLoadInfo.total -= static_cast<uint32_t>(uploadData.ignoredCount);
1126 ret = HandleBatchUpload(uploadParam, info, uploadData, continueStmtToken);
1127 if (ret != -E_TASK_PAUSED) {
1128 // reset watermark to zero when task no paused
1129 RecordWaterMark(uploadParam.taskId, 0u);
1130 }
1131 if (continueStmtToken != nullptr) {
1132 storageProxy_->ReleaseContinueToken(continueStmtToken);
1133 }
1134 return ret;
1135 }
1136
IsTableFinishInUpload(const std::string & table)1137 bool CloudSyncer::IsTableFinishInUpload(const std::string &table)
1138 {
1139 std::lock_guard<std::mutex> autoLock(dataLock_);
1140 return currentContext_.processRecorder->IsUploadFinish(currentContext_.currentUserIndex, table);
1141 }
1142
MarkUploadFinishIfNeed(const std::string & table)1143 void CloudSyncer::MarkUploadFinishIfNeed(const std::string &table)
1144 {
1145 // table exist reference or reference by should upload every times
1146 if (storageProxy_->IsTableExistReferenceOrReferenceBy(table)) {
1147 return;
1148 }
1149 std::lock_guard<std::mutex> autoLock(dataLock_);
1150 currentContext_.processRecorder->MarkUploadFinish(currentContext_.currentUserIndex, table, true);
1151 }
1152
IsNeedUpdateAsset(const VBucket & data)1153 bool CloudSyncer::IsNeedUpdateAsset(const VBucket &data)
1154 {
1155 for (const auto &item : data) {
1156 const Asset *asset = std::get_if<TYPE_INDEX<Asset>>(&item.second);
1157 if (asset != nullptr) {
1158 uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(asset->status);
1159 if (lowBitStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
1160 lowBitStatus == static_cast<uint32_t>(AssetStatus::DOWNLOADING)) {
1161 return true;
1162 }
1163 continue;
1164 }
1165 const Assets *assets = std::get_if<TYPE_INDEX<Assets>>(&item.second);
1166 if (assets == nullptr) {
1167 continue;
1168 }
1169 for (const auto &oneAsset : *assets) {
1170 uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(oneAsset.status);
1171 if (lowBitStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
1172 lowBitStatus == static_cast<uint32_t>(AssetStatus::DOWNLOADING)) {
1173 return true;
1174 }
1175 }
1176 }
1177 return false;
1178 }
1179
GetCloudTaskStatus(uint64_t taskId) const1180 SyncProcess CloudSyncer::GetCloudTaskStatus(uint64_t taskId) const
1181 {
1182 std::lock_guard<std::mutex> autoLock(dataLock_);
1183 auto iter = cloudTaskInfos_.find(taskId);
1184 SyncProcess syncProcess;
1185 if (iter == cloudTaskInfos_.end()) {
1186 syncProcess.process = ProcessStatus::FINISHED;
1187 syncProcess.errCode = NOT_FOUND;
1188 LOGE("[CloudSyncer] Not found task %" PRIu64, taskId);
1189 return syncProcess;
1190 }
1191 syncProcess.process = iter->second.status;
1192 syncProcess.errCode = TransferDBErrno(iter->second.errCode);
1193 std::shared_ptr<ProcessNotifier> notifier = nullptr;
1194 if (currentContext_.currentTaskId == taskId) {
1195 notifier = currentContext_.notifier;
1196 }
1197 bool hasNotifier = notifier != nullptr;
1198 if (hasNotifier) {
1199 syncProcess.tableProcess = notifier->GetCurrentTableProcess();
1200 }
1201 LOGI("[CloudSyncer] Found task %" PRIu64 " storeId %.3s status %d has notifier %d", taskId,
1202 iter->second.storeId.c_str(), static_cast<int64_t>(syncProcess.process), static_cast<int>(hasNotifier));
1203 return syncProcess;
1204 }
1205
GenerateTaskIdIfNeed(CloudTaskInfo & taskInfo)1206 int CloudSyncer::GenerateTaskIdIfNeed(CloudTaskInfo &taskInfo)
1207 {
1208 if (taskInfo.taskId != INVALID_TASK_ID) {
1209 if (cloudTaskInfos_.find(taskInfo.taskId) != cloudTaskInfos_.end()) {
1210 LOGE("[CloudSyncer] Sync with exist taskId %" PRIu64 " storeId %.3s", taskInfo.taskId,
1211 taskInfo.storeId.c_str());
1212 return -E_INVALID_ARGS;
1213 }
1214 lastTaskId_ = std::max(lastTaskId_, taskInfo.taskId);
1215 LOGI("[CloudSyncer] Sync with taskId %" PRIu64 " storeId %.3s", taskInfo.taskId, taskInfo.storeId.c_str());
1216 return E_OK;
1217 }
1218 lastTaskId_++;
1219 if (lastTaskId_ == UINT64_MAX) {
1220 lastTaskId_ = 1u;
1221 }
1222 taskInfo.taskId = lastTaskId_;
1223 return E_OK;
1224 }
1225 }