1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud_syncer.h"
16 
17 #include <cstdint>
18 #include <utility>
19 #include <unordered_map>
20 
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "cloud/icloud_db.h"
24 #include "cloud_sync_tag_assets.h"
25 #include "cloud_sync_utils.h"
26 #include "db_errno.h"
27 #include "kv_store_errno.h"
28 #include "log_print.h"
29 #include "runtime_context.h"
30 #include "storage_proxy.h"
31 #include "store_types.h"
32 #include "strategy_factory.h"
33 #include "version.h"
34 
35 namespace DistributedDB {
ReloadWaterMarkIfNeed(TaskId taskId,WaterMark & waterMark)36 void CloudSyncer::ReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark)
37 {
38     Timestamp cacheWaterMark = GetResumeWaterMark(taskId);
39     waterMark = cacheWaterMark == 0u ? waterMark : cacheWaterMark;
40     RecordWaterMark(taskId, 0u);
41 }
42 
ReloadCloudWaterMarkIfNeed(const std::string & tableName,std::string & cloudWaterMark)43 void CloudSyncer::ReloadCloudWaterMarkIfNeed(const std::string &tableName, std::string &cloudWaterMark)
44 {
45     std::lock_guard<std::mutex> autoLock(dataLock_);
46     std::string cacheCloudWaterMark = currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName];
47     cloudWaterMark = cacheCloudWaterMark.empty() ? cloudWaterMark : cacheCloudWaterMark;
48 }
49 
ReloadUploadInfoIfNeed(TaskId taskId,const UploadParam & param,InnerProcessInfo & info)50 void CloudSyncer::ReloadUploadInfoIfNeed(TaskId taskId, const UploadParam &param, InnerProcessInfo &info)
51 {
52     info.upLoadInfo.total = static_cast<uint32_t>(param.count);
53     {
54         std::lock_guard<std::mutex> autoLock(dataLock_);
55         if (!cloudTaskInfos_[taskId].resume) {
56             return;
57         }
58     }
59     uint32_t lastSuccessCount = GetLastUploadSuccessCount(info.tableName);
60     if (lastSuccessCount == 0) {
61         return;
62     }
63     info.upLoadInfo.total += lastSuccessCount;
64     info.upLoadInfo.successCount += lastSuccessCount;
65     LOGD("[CloudSyncer] resume upload, last success count %" PRIu32, lastSuccessCount);
66 }
67 
GetLastUploadSuccessCount(const std::string & tableName)68 uint32_t CloudSyncer::GetLastUploadSuccessCount(const std::string &tableName)
69 {
70     std::lock_guard<std::mutex> autoLock(dataLock_);
71     return currentContext_.notifier->GetLastUploadSuccessCount(tableName);
72 }
73 
FillDownloadExtend(TaskId taskId,const std::string & tableName,const std::string & cloudWaterMark,VBucket & extend)74 int CloudSyncer::FillDownloadExtend(TaskId taskId, const std::string &tableName, const std::string &cloudWaterMark,
75     VBucket &extend)
76 {
77     extend = {
78         {CloudDbConstant::CURSOR_FIELD, cloudWaterMark}
79     };
80 
81     QuerySyncObject obj = GetQuerySyncObject(tableName);
82     if (obj.IsContainQueryNodes()) {
83         int errCode = GetCloudGid(taskId, tableName, obj);
84         if (errCode != E_OK) {
85             LOGE("[CloudSyncer] Failed to get cloud gid when fill extend, %d.", errCode);
86             return errCode;
87         }
88         Bytes bytes;
89         bytes.resize(obj.CalculateParcelLen(SOFTWARE_VERSION_CURRENT));
90         Parcel parcel(bytes.data(), bytes.size());
91         errCode = obj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
92         if (errCode != E_OK) {
93             LOGE("[CloudSyncer] Query serialize failed %d", errCode);
94             return errCode;
95         }
96         extend[CloudDbConstant::TYPE_FIELD] = static_cast<int64_t>(CloudQueryType::QUERY_FIELD);
97         extend[CloudDbConstant::QUERY_FIELD] = bytes;
98     } else {
99         extend[CloudDbConstant::TYPE_FIELD] = static_cast<int64_t>(CloudQueryType::FULL_TABLE);
100     }
101     return E_OK;
102 }
103 
GetCloudGid(TaskId taskId,const std::string & tableName,QuerySyncObject & obj)104 int CloudSyncer::GetCloudGid(TaskId taskId, const std::string &tableName, QuerySyncObject &obj)
105 {
106     std::vector<std::string> cloudGid;
107     bool isCloudForcePush = cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
108     int errCode = storageProxy_->GetCloudGid(obj, isCloudForcePush, IsCompensatedTask(taskId), cloudGid);
109     if (errCode != E_OK) {
110         LOGE("[CloudSyncer] Failed to get cloud gid, %d.", errCode);
111     } else if (!cloudGid.empty()) {
112         obj.SetCloudGid(cloudGid);
113     }
114     LOGI("[CloudSyncer] get cloud gid size:%zu", cloudGid.size());
115     return errCode;
116 }
117 
GetQuerySyncObject(const std::string & tableName)118 QuerySyncObject CloudSyncer::GetQuerySyncObject(const std::string &tableName)
119 {
120     std::lock_guard<std::mutex> autoLock(dataLock_);
121     for (const auto &item : cloudTaskInfos_[currentContext_.currentTaskId].queryList) {
122         if (item.GetTableName() == tableName) {
123             return item;
124         }
125     }
126     LOGW("[CloudSyncer] not found query in cache");
127     QuerySyncObject querySyncObject;
128     querySyncObject.SetTableName(tableName);
129     return querySyncObject;
130 }
131 
UpdateProccessWhenUploadFailed(InnerProcessInfo & info)132 void CloudSyncer::UpdateProccessWhenUploadFailed(InnerProcessInfo &info)
133 {
134     info.tableStatus = ProcessStatus::FINISHED;
135     std::lock_guard<std::mutex> autoLock(dataLock_);
136     currentContext_.notifier->UpdateProcess(info);
137 }
138 
NotifyUploadFailed(int errCode,InnerProcessInfo & info)139 void CloudSyncer::NotifyUploadFailed(int errCode, InnerProcessInfo &info)
140 {
141     if (errCode == -E_CLOUD_VERSION_CONFLICT) {
142         LOGI("[CloudSyncer] Stop upload due to version conflict, %d", errCode);
143     } else {
144         LOGE("[CloudSyncer] Failed to do upload, %d", errCode);
145         info.upLoadInfo.failCount = info.upLoadInfo.total - info.upLoadInfo.successCount;
146     }
147     UpdateProccessWhenUploadFailed(info);
148 }
149 
BatchInsert(Info & insertInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)150 int CloudSyncer::BatchInsert(Info &insertInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
151 {
152     int errCode = cloudDB_.BatchInsert(uploadData.tableName, uploadData.insData.record,
153         uploadData.insData.extend, insertInfo);
154     innerProcessInfo.upLoadInfo.successCount += insertInfo.successCount;
155     innerProcessInfo.upLoadInfo.insertCount += insertInfo.successCount;
156     innerProcessInfo.upLoadInfo.total -= insertInfo.total - insertInfo.successCount - insertInfo.failCount;
157     if (errCode != E_OK) {
158         LOGE("[CloudSyncer][BatchInsert] BatchInsert with error, ret is %d.", errCode);
159     }
160     if (uploadData.isCloudVersionRecord) {
161         return errCode;
162     }
163     bool isSharedTable = false;
164     int ret = storageProxy_->IsSharedTable(uploadData.tableName, isSharedTable);
165     if (ret != E_OK) {
166         LOGE("[CloudSyncer] DoBatchUpload cannot judge the table is shared table. %d", ret);
167         return ret;
168     }
169     if (!isSharedTable) {
170         ret = CloudSyncUtils::FillAssetIdToAssets(uploadData.insData, errCode, CloudWaterType::INSERT);
171         if (ret != errCode) {
172             LOGW("[CloudSyncer][BatchInsert] FillAssetIdToAssets with error, ret is %d.", ret);
173         }
174     }
175     if (errCode != E_OK) {
176         storageProxy_->FillCloudGidIfSuccess(OpType::INSERT, uploadData);
177         bool isSkip = CloudSyncUtils::IsSkipAssetsMissingRecord(uploadData.insData.extend);
178         if (isSkip) {
179             LOGI("[CloudSyncer][BatchInsert] Try to FillCloudLogAndAsset when assets missing. errCode: %d", errCode);
180             return E_OK;
181         } else {
182             LOGE("[CloudSyncer][BatchInsert] errCode: %d, can not skip assets missing record.", errCode);
183             return errCode;
184         }
185     }
186     // we need to fill back gid after insert data to cloud.
187     int errorCode = storageProxy_->FillCloudLogAndAsset(OpType::INSERT, uploadData);
188     if ((errorCode != E_OK) || (ret != E_OK)) {
189         LOGE("[CloudSyncer] Failed to fill back when doing upload insData, %d.", errorCode);
190         return ret == E_OK ? errorCode : ret;
191     }
192     return E_OK;
193 }
194 
BatchUpdate(Info & updateInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)195 int CloudSyncer::BatchUpdate(Info &updateInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
196 {
197     int errCode = cloudDB_.BatchUpdate(uploadData.tableName, uploadData.updData.record,
198         uploadData.updData.extend, updateInfo);
199     innerProcessInfo.upLoadInfo.successCount += updateInfo.successCount;
200     innerProcessInfo.upLoadInfo.updateCount += updateInfo.successCount;
201     innerProcessInfo.upLoadInfo.total -= updateInfo.total - updateInfo.successCount - updateInfo.failCount;
202     if (errCode != E_OK) {
203         LOGE("[CloudSyncer][BatchUpdate] BatchUpdate with error, ret is %d.", errCode);
204     }
205     if (uploadData.isCloudVersionRecord) {
206         return errCode;
207     }
208     bool isSharedTable = false;
209     int ret = storageProxy_->IsSharedTable(uploadData.tableName, isSharedTable);
210     if (ret != E_OK) {
211         LOGE("[CloudSyncer] DoBatchUpload cannot judge the table is shared table. %d", ret);
212         return ret;
213     }
214     if (!isSharedTable) {
215         ret = CloudSyncUtils::FillAssetIdToAssets(uploadData.updData, errCode, CloudWaterType::UPDATE);
216         if (ret != E_OK) {
217             LOGW("[CloudSyncer][BatchUpdate] FillAssetIdToAssets with error, ret is %d.", ret);
218         }
219     }
220     if (errCode != E_OK) {
221         storageProxy_->FillCloudGidIfSuccess(OpType::UPDATE, uploadData);
222         bool isSkip = CloudSyncUtils::IsSkipAssetsMissingRecord(uploadData.updData.extend);
223         if (isSkip) {
224             LOGI("[CloudSyncer][BatchUpdate] Try to FillCloudLogAndAsset when assets missing. errCode: %d", errCode);
225             return E_OK;
226         } else {
227             LOGE("[CloudSyncer][BatchUpdate] errCode: %d, can not skip assets missing record.", errCode);
228             return errCode;
229         }
230     }
231     int errorCode = storageProxy_->FillCloudLogAndAsset(OpType::UPDATE, uploadData);
232     if ((errorCode != E_OK) || (ret != E_OK)) {
233         LOGE("[CloudSyncer] Failed to fill back when doing upload updData, %d.", errorCode);
234         return ret == E_OK ? errorCode : ret;
235     }
236     return E_OK;
237 }
238 
DownloadAssetsOneByOne(const InnerProcessInfo & info,DownloadItem & downloadItem,std::map<std::string,Assets> & downloadAssets)239 int CloudSyncer::DownloadAssetsOneByOne(const InnerProcessInfo &info, DownloadItem &downloadItem,
240     std::map<std::string, Assets> &downloadAssets)
241 {
242     bool isSharedTable = false;
243     int errCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
244     if (errCode != E_OK) {
245         LOGE("[CloudSyncer] DownloadOneAssetRecord cannot judge the table is a shared table. %d", errCode);
246         return errCode;
247     }
248     int transactionCode = E_OK;
249     // shared table don't download, so just begin transaction once
250     if (isSharedTable) {
251         transactionCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
252     }
253     if (transactionCode != E_OK) {
254         LOGE("[CloudSyncer] begin transaction before download failed %d", transactionCode);
255         return transactionCode;
256     }
257     errCode = DownloadAssetsOneByOneInner(isSharedTable, info, downloadItem, downloadAssets);
258     if (isSharedTable) {
259         transactionCode = storageProxy_->Commit();
260         if (transactionCode != E_OK) {
261             LOGW("[CloudSyncer] commit transaction after download failed %d", transactionCode);
262         }
263     }
264     return (errCode == E_OK) ? transactionCode : errCode;
265 }
266 
GetDBAssets(bool isSharedTable,const InnerProcessInfo & info,const DownloadItem & downloadItem,VBucket & dbAssets)267 std::pair<int, uint32_t> CloudSyncer::GetDBAssets(bool isSharedTable, const InnerProcessInfo &info,
268     const DownloadItem &downloadItem, VBucket &dbAssets)
269 {
270     std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK) };
271     auto &errCode = res.first;
272     if (!isSharedTable) {
273         errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
274     }
275     if (errCode != E_OK) {
276         LOGE("[CloudSyncer] begin transaction before download failed %d", errCode);
277         return res;
278     }
279     res = storageProxy_->GetAssetsByGidOrHashKey(info.tableName, downloadItem.gid,
280         downloadItem.hashKey, dbAssets);
281     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
282         if (errCode != -E_CLOUD_GID_MISMATCH) {
283             LOGE("[CloudSyncer] get assets from db failed %d", errCode);
284         }
285         if (!isSharedTable) {
286             (void)storageProxy_->Rollback();
287         }
288         return res;
289     }
290     if (!isSharedTable) {
291         errCode = storageProxy_->Commit();
292     }
293     if (errCode != E_OK) {
294         LOGE("[CloudSyncer] commit transaction before download failed %d", errCode);
295     }
296     return res;
297 }
298 
BackFillAssetsAfterDownload(int downloadCode,int deleteCode,std::map<std::string,std::vector<uint32_t>> & tmpFlags,std::map<std::string,Assets> & tmpAssetsToDownload,std::map<std::string,Assets> & tmpAssetsToDelete)299 std::map<std::string, Assets>& CloudSyncer::BackFillAssetsAfterDownload(int downloadCode, int deleteCode,
300     std::map<std::string, std::vector<uint32_t>> &tmpFlags, std::map<std::string, Assets> &tmpAssetsToDownload,
301     std::map<std::string, Assets> &tmpAssetsToDelete)
302 {
303     std::map<std::string, Assets> &downloadAssets = tmpAssetsToDownload;
304     for (auto &[col, assets] : tmpAssetsToDownload) {
305         int i = 0;
306         for (auto &asset : assets) {
307             asset.flag = tmpFlags[col][i++];
308             if (asset.flag == static_cast<uint32_t>(AssetOpType::NO_CHANGE)) {
309                 continue;
310             }
311             if (downloadCode == E_OK) {
312                 asset.status = NORMAL;
313             } else {
314                 asset.status = (asset.status == NORMAL) ? NORMAL : ABNORMAL;
315             }
316         }
317     }
318     for (auto &[col, assets] : tmpAssetsToDelete) {
319         for (auto &asset : assets) {
320             asset.flag = static_cast<uint32_t>(AssetOpType::DELETE);
321             if (deleteCode == E_OK) {
322                 asset.status = NORMAL;
323             } else {
324                 asset.status = ABNORMAL;
325             }
326             downloadAssets[col].push_back(asset);
327         }
328     }
329     return downloadAssets;
330 }
331 
IsNeedSkipDownload(bool isSharedTable,int & errCode,const InnerProcessInfo & info,const DownloadItem & downloadItem,VBucket & dbAssets)332 int CloudSyncer::IsNeedSkipDownload(bool isSharedTable, int &errCode, const InnerProcessInfo &info,
333     const DownloadItem &downloadItem, VBucket &dbAssets)
334 {
335     auto [tmpCode, status] = GetDBAssets(isSharedTable, info, downloadItem, dbAssets);
336     if (tmpCode == -E_CLOUD_GID_MISMATCH) {
337         LOGW("[CloudSyncer] skip download asset because gid mismatch");
338         errCode = E_OK;
339         return true;
340     }
341     if (CloudStorageUtils::IsDataLocked(status)) {
342         LOGI("[CloudSyncer] skip download asset because data lock:%u", status);
343         errCode = E_OK;
344         return true;
345     }
346     if (tmpCode != E_OK) {
347         errCode = (errCode != E_OK) ? errCode : tmpCode;
348         return true;
349     }
350     return false;
351 }
352 
CheckDownloadOrDeleteCode(int & errCode,int downloadCode,int deleteCode,DownloadItem & downloadItem)353 bool CloudSyncer::CheckDownloadOrDeleteCode(int &errCode, int downloadCode, int deleteCode, DownloadItem &downloadItem)
354 {
355     if (downloadCode == -E_CLOUD_RECORD_EXIST_CONFLICT || deleteCode == -E_CLOUD_RECORD_EXIST_CONFLICT) {
356         downloadItem.recordConflict = true;
357         errCode = E_OK;
358         return false;
359     }
360     errCode = (errCode != E_OK) ? errCode : deleteCode;
361     errCode = (errCode != E_OK) ? errCode : downloadCode;
362     if (downloadCode == -E_NOT_SET || deleteCode == -E_NOT_SET) {
363         return false;
364     }
365     return true;
366 }
367 
DownloadAssetsOneByOneInner(bool isSharedTable,const InnerProcessInfo & info,DownloadItem & downloadItem,std::map<std::string,Assets> & downloadAssets)368 int CloudSyncer::DownloadAssetsOneByOneInner(bool isSharedTable, const InnerProcessInfo &info,
369     DownloadItem &downloadItem, std::map<std::string, Assets> &downloadAssets)
370 {
371     int errCode = E_OK;
372     std::map<std::string, Assets> tmpAssetsToDownload;
373     std::map<std::string, Assets> tmpAssetsToDelete;
374     std::map<std::string, std::vector<uint32_t>> tmpFlags;
375     for (auto &[col, assets] : downloadAssets) {
376         for (auto &asset : assets) {
377             VBucket dbAssets;
378             if (IsNeedSkipDownload(isSharedTable, errCode, info, downloadItem, dbAssets)) {
379                 break;
380             }
381             if (!isSharedTable && asset.flag == static_cast<uint32_t>(AssetOpType::DELETE)) {
382                 asset.status = static_cast<uint32_t>(AssetStatus::DELETE);
383                 tmpAssetsToDelete[col].push_back(asset);
384             } else if (!isSharedTable && AssetOperationUtils::CalAssetOperation(col, asset, dbAssets,
385                 AssetOperationUtils::CloudSyncAction::START_DOWNLOAD) == AssetOperationUtils::AssetOpType::HANDLE) {
386                 asset.status = asset.flag == static_cast<uint32_t>(AssetOpType::INSERT) ?
387                     static_cast<uint32_t>(AssetStatus::INSERT) : static_cast<uint32_t>(AssetStatus::UPDATE);
388                 tmpAssetsToDownload[col].push_back(asset);
389                 tmpFlags[col].push_back(asset.flag);
390             } else {
391                 LOGD("[CloudSyncer] skip download asset...");
392             }
393         }
394     }
395     auto deleteCode = cloudDB_.RemoveLocalAssets(info.tableName, downloadItem.gid, downloadItem.prefix,
396         tmpAssetsToDelete);
397     auto downloadCode = cloudDB_.Download(info.tableName, downloadItem.gid, downloadItem.prefix, tmpAssetsToDownload);
398     if (!CheckDownloadOrDeleteCode(errCode, downloadCode, deleteCode, downloadItem)) {
399         return errCode;
400     }
401 
402     // copy asset back
403     downloadAssets = BackFillAssetsAfterDownload(downloadCode, deleteCode, tmpFlags, tmpAssetsToDownload,
404         tmpAssetsToDelete);
405     return errCode;
406 }
407 
CommitDownloadAssets(const DownloadItem & downloadItem,const std::string & tableName,DownloadCommitList & commitList,uint32_t & successCount)408 int CloudSyncer::CommitDownloadAssets(const DownloadItem &downloadItem, const std::string &tableName,
409     DownloadCommitList &commitList, uint32_t &successCount)
410 {
411     int errCode = storageProxy_->SetLogTriggerStatus(false);
412     if (errCode != E_OK) {
413         return errCode;
414     }
415     for (auto &item : commitList) {
416         std::string gid = std::get<0>(item); // 0 means gid is the first element in assetsInfo
417         // 1 means assetsMap info [colName, assets] is the forth element in downloadList[i]
418         std::map<std::string, Assets> assetsMap = std::get<1>(item);
419         bool setAllNormal = std::get<2>(item); // 2 means whether the download return is E_OK
420         VBucket normalAssets;
421         VBucket failedAssets;
422         normalAssets[CloudDbConstant::GID_FIELD] = gid;
423         failedAssets[CloudDbConstant::GID_FIELD] = gid;
424         VBucket &assets = setAllNormal ? normalAssets : failedAssets;
425         for (auto &[key, asset] : assetsMap) {
426             assets[key] = std::move(asset);
427         }
428         if (!downloadItem.recordConflict) {
429             errCode = FillCloudAssets(tableName, normalAssets, failedAssets);
430             if (errCode != E_OK) {
431                 break;
432             }
433         }
434         LogInfo logInfo;
435         logInfo.cloudGid = gid;
436         // download must contain gid, just set the default value here.
437         logInfo.dataKey = DBConstant::DEFAULT_ROW_ID;
438         logInfo.hashKey = downloadItem.hashKey;
439         logInfo.timestamp = downloadItem.timestamp;
440         // there are failed assets, reset the timestamp to prevent the flag from being marked as consistent.
441         if (failedAssets.size() > 1) {
442             logInfo.timestamp = 0u;
443         }
444 
445         errCode = storageProxy_->UpdateRecordFlag(tableName, downloadItem.recordConflict, logInfo);
446         if (errCode != E_OK) {
447             break;
448         }
449         successCount++;
450     }
451     int ret = storageProxy_->SetLogTriggerStatus(true);
452     return errCode == E_OK ? ret : errCode;
453 }
454 
GenerateCompensatedSync(CloudTaskInfo & taskInfo)455 void CloudSyncer::GenerateCompensatedSync(CloudTaskInfo &taskInfo)
456 {
457     std::vector<QuerySyncObject> syncQuery;
458     std::vector<std::string> users;
459     int errCode = storageProxy_->GetCompensatedSyncQuery(syncQuery, users);
460     if (errCode != E_OK) {
461         LOGW("[CloudSyncer] Generate compensated sync failed by get query! errCode = %d", errCode);
462         return;
463     }
464     if (syncQuery.empty()) {
465         LOGD("[CloudSyncer] Not need generate compensated sync");
466         return;
467     }
468     taskInfo.users.clear();
469     auto cloudDBs = cloudDB_.GetCloudDB();
470     for (auto &[user, cloudDb] : cloudDBs) {
471         auto it = std::find(users.begin(), users.end(), user);
472         if (it != users.end()) {
473             taskInfo.users.push_back(user);
474         }
475     }
476     for (const auto &query : syncQuery) {
477         taskInfo.table.push_back(query.GetRelationTableName());
478         taskInfo.queryList.push_back(query);
479     }
480     Sync(taskInfo);
481     LOGI("[CloudSyncer] Generate compensated sync finished");
482 }
483 
ChkIgnoredProcess(InnerProcessInfo & info,const CloudSyncData & uploadData,UploadParam & uploadParam)484 void CloudSyncer::ChkIgnoredProcess(InnerProcessInfo &info, const CloudSyncData &uploadData, UploadParam &uploadParam)
485 {
486     if (uploadData.ignoredCount == 0) { // LCOV_EXCL_BR_LINE
487         return;
488     }
489     info.upLoadInfo.total -= static_cast<uint32_t>(uploadData.ignoredCount);
490     if (info.upLoadInfo.successCount + info.upLoadInfo.failCount != info.upLoadInfo.total) { // LCOV_EXCL_BR_LINE
491         return;
492     }
493     if (!CloudSyncUtils::CheckCloudSyncDataEmpty(uploadData)) { // LCOV_EXCL_BR_LINE
494         return;
495     }
496     info.tableStatus = ProcessStatus::FINISHED;
497     info.upLoadInfo.batchIndex++;
498     NotifyInBatchUpload(uploadParam, info, true);
499 }
500 
SaveCursorIfNeed(const std::string & tableName)501 int CloudSyncer::SaveCursorIfNeed(const std::string &tableName)
502 {
503     std::string cursor = "";
504     int errCode = storageProxy_->GetCloudWaterMark(tableName, cursor);
505     if (errCode != E_OK) {
506         LOGE("[CloudSyncer] get cloud water mark before download failed %d", errCode);
507         return errCode;
508     }
509     if (!cursor.empty()) {
510         return E_OK;
511     }
512     auto res = cloudDB_.GetEmptyCursor(tableName);
513     if (res.first != E_OK) {
514         LOGE("[CloudSyncer] get empty cursor failed %d", res.first);
515         return res.first;
516     }
517     if (res.second.empty()) {
518         LOGE("[CloudSyncer] get cursor is empty %d", -E_CLOUD_ERROR);
519         return -E_CLOUD_ERROR;
520     }
521     errCode = storageProxy_->SetCloudWaterMark(tableName, res.second);
522     if (errCode != E_OK) {
523         LOGE("[CloudSyncer] set cloud water mark before download failed %d", errCode);
524     }
525     return errCode;
526 }
527 
PrepareAndDownload(const std::string & table,const CloudTaskInfo & taskInfo,bool isFirstDownload)528 int CloudSyncer::PrepareAndDownload(const std::string &table, const CloudTaskInfo &taskInfo, bool isFirstDownload)
529 {
530     std::string hashDev;
531     int errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDev);
532     if (errCode != E_OK) {
533         LOGE("[CloudSyncer] Failed to get local identity.");
534         return errCode;
535     }
536     errCode = SaveCursorIfNeed(table);
537     if (errCode != E_OK) {
538         return errCode;
539     }
540     bool isShared = false;
541     errCode = storageProxy_->IsSharedTable(table, isShared);
542     if (errCode != E_OK) {
543         LOGE("[CloudSyncer] check shared table failed %d", errCode);
544         return errCode;
545     }
546     // shared table not allow logic delete
547     storageProxy_->SetCloudTaskConfig({ !isShared });
548     errCode = CheckTaskIdValid(taskInfo.taskId);
549     if (errCode != E_OK) {
550         LOGW("[CloudSyncer] task is invalid, abort sync");
551         return errCode;
552     }
553     errCode = DoDownload(taskInfo.taskId, isFirstDownload);
554     if (errCode != E_OK) {
555         LOGE("[CloudSyncer] download failed %d", errCode);
556     }
557     return errCode;
558 }
559 
IsClosed() const560 bool CloudSyncer::IsClosed() const
561 {
562     return closed_ || IsKilled();
563 }
564 
UpdateFlagForSavedRecord(const SyncParam & param)565 int CloudSyncer::UpdateFlagForSavedRecord(const SyncParam &param)
566 {
567     DownloadList downloadList;
568     {
569         std::lock_guard<std::mutex> autoLock(dataLock_);
570         downloadList = currentContext_.assetDownloadList;
571     }
572     std::set<std::string> gidFilters;
573     for (const auto &tuple: downloadList) {
574         gidFilters.insert(std::get<CloudSyncUtils::GID_INDEX>(tuple));
575     }
576     return storageProxy_->MarkFlagAsConsistent(param.tableName, param.downloadData, gidFilters);
577 }
578 
BatchDelete(Info & deleteInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)579 int CloudSyncer::BatchDelete(Info &deleteInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
580 {
581     int errCode = cloudDB_.BatchDelete(uploadData.tableName, uploadData.delData.record,
582         uploadData.delData.extend, deleteInfo);
583     innerProcessInfo.upLoadInfo.successCount += deleteInfo.successCount;
584     innerProcessInfo.upLoadInfo.deleteCount += deleteInfo.successCount;
585     innerProcessInfo.upLoadInfo.total -= deleteInfo.total - deleteInfo.successCount - deleteInfo.failCount;
586     if (errCode != E_OK) {
587         LOGE("[CloudSyncer] Failed to batch delete, %d", errCode);
588         storageProxy_->FillCloudGidIfSuccess(OpType::DELETE, uploadData);
589         return errCode;
590     }
591     errCode = storageProxy_->FillCloudLogAndAsset(OpType::DELETE, uploadData);
592     if (errCode != E_OK) {
593         LOGE("[CloudSyncer] Failed to fill back when doing upload delData, %d.", errCode);
594     }
595     return errCode;
596 }
597 
IsCompensatedTask(TaskId taskId)598 bool CloudSyncer::IsCompensatedTask(TaskId taskId)
599 {
600     std::lock_guard<std::mutex> autoLock(dataLock_);
601     return cloudTaskInfos_[taskId].compensatedTask;
602 }
603 
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)604 int CloudSyncer::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
605 {
606     return cloudDB_.SetCloudDB(cloudDBs);
607 }
608 
CleanAllWaterMark()609 void CloudSyncer::CleanAllWaterMark()
610 {
611     storageProxy_->CleanAllWaterMark();
612 }
613 
GetDownloadItem(const DownloadList & downloadList,size_t i,DownloadItem & downloadItem)614 void CloudSyncer::GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem)
615 {
616     downloadItem.gid = std::get<CloudSyncUtils::GID_INDEX>(downloadList[i]);
617     downloadItem.prefix = std::get<CloudSyncUtils::PREFIX_INDEX>(downloadList[i]);
618     downloadItem.strategy = std::get<CloudSyncUtils::STRATEGY_INDEX>(downloadList[i]);
619     downloadItem.assets = std::get<CloudSyncUtils::ASSETS_INDEX>(downloadList[i]);
620     downloadItem.hashKey = std::get<CloudSyncUtils::HASH_KEY_INDEX>(downloadList[i]);
621     downloadItem.primaryKeyValList = std::get<CloudSyncUtils::PRIMARY_KEY_INDEX>(downloadList[i]);
622     downloadItem.timestamp = std::get<CloudSyncUtils::TIMESTAMP_INDEX>(downloadList[i]);
623 }
624 
DoNotifyInNeed(const CloudSyncer::TaskId & taskId,const std::vector<std::string> & needNotifyTables,const bool isFirstDownload)625 void CloudSyncer::DoNotifyInNeed(const CloudSyncer::TaskId &taskId, const std::vector<std::string> &needNotifyTables,
626     const bool isFirstDownload)
627 {
628     bool isNeedNotify = false;
629     {
630         std::lock_guard<std::mutex> autoLock(dataLock_);
631         // only when the first download and the task no need upload actually, notify the process, otherwise,
632         // the process will notify in the upload procedure, which can guarantee the notify order of the tables
633         isNeedNotify = isFirstDownload && !currentContext_.isNeedUpload;
634     }
635     if (!isNeedNotify) {
636         return;
637     }
638     for (size_t i = 0; i < needNotifyTables.size(); ++i) {
639         UpdateProcessInfoWithoutUpload(taskId, needNotifyTables[i], i != (needNotifyTables.size() - 1u));
640     }
641 }
642 
GetUploadCountByTable(const CloudSyncer::TaskId & taskId,int64_t & count)643 int CloudSyncer::GetUploadCountByTable(const CloudSyncer::TaskId &taskId, int64_t &count)
644 {
645     std::string tableName;
646     int ret = GetCurrentTableName(tableName);
647     if (ret != E_OK) {
648         LOGE("[CloudSyncer] Invalid table name for get local water mark: %d", ret);
649         return ret;
650     }
651 
652     ret = storageProxy_->StartTransaction();
653     if (ret != E_OK) {
654         LOGE("[CloudSyncer] start transaction failed before getting upload count.");
655         return ret;
656     }
657 
658     ret = storageProxy_->GetUploadCount(GetQuerySyncObject(tableName), IsModeForcePush(taskId),
659         IsCompensatedTask(taskId), IsNeedGetLocalWater(taskId), count);
660     if (ret != E_OK) {
661         // GetUploadCount will return E_OK when upload count is zero.
662         LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret);
663     }
664     // No need Rollback when GetUploadCount failed
665     storageProxy_->Commit();
666     return ret;
667 }
668 
UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId,const std::string & tableName,bool needNotify)669 void CloudSyncer::UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId, const std::string &tableName,
670     bool needNotify)
671 {
672     LOGI("[CloudSyncer] There is no need to doing upload, as the upload data count is zero.");
673     InnerProcessInfo innerProcessInfo;
674     innerProcessInfo.tableName = tableName;
675     innerProcessInfo.upLoadInfo.total = 0;  // count is zero
676     innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
677     {
678         std::lock_guard<std::mutex> autoLock(dataLock_);
679         if (!needNotify) {
680             currentContext_.notifier->UpdateProcess(innerProcessInfo);
681         } else {
682             currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], innerProcessInfo);
683         }
684     }
685 }
686 
DoDownloadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload,bool isFirstDownload)687 int CloudSyncer::DoDownloadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload, bool isFirstDownload)
688 {
689     std::vector<std::string> needNotifyTables;
690     for (size_t i = 0; i < taskInfo.table.size(); ++i) {
691         std::string table;
692         {
693             std::lock_guard<std::mutex> autoLock(dataLock_);
694             if (currentContext_.processRecorder->IsDownloadFinish(currentContext_.currentUserIndex,
695                 taskInfo.table[i])) {
696                 continue;
697             }
698             LOGD("[CloudSyncer] try download table, index: %zu", i);
699             currentContext_.tableName = taskInfo.table[i];
700             table = currentContext_.tableName;
701         }
702         int errCode = PrepareAndDownload(table, taskInfo, isFirstDownload);
703         if (errCode != E_OK) {
704             return errCode;
705         }
706         MarkDownloadFinishIfNeed(table);
707         // needUpload indicate that the syncMode need push
708         if (needUpload) {
709             int64_t count = 0;
710             errCode = GetUploadCountByTable(taskInfo.taskId, count);
711             if (errCode != E_OK) {
712                 LOGE("[CloudSyncer] GetUploadCountByTable failed %d", errCode);
713                 return errCode;
714             }
715             // count > 0 means current table need upload actually
716             if (count > 0) {
717                 {
718                     std::lock_guard<std::mutex> autoLock(dataLock_);
719                     currentContext_.isNeedUpload = true;
720                 }
721                 continue;
722             }
723             needNotifyTables.emplace_back(table);
724         }
725         errCode = SaveCloudWaterMark(taskInfo.table[i], taskInfo.taskId);
726         if (errCode != E_OK) {
727             LOGE("[CloudSyncer] Can not save cloud water mark after downloading %d", errCode);
728             return errCode;
729         }
730     }
731     DoNotifyInNeed(taskInfo.taskId, needNotifyTables, isFirstDownload);
732     return E_OK;
733 }
734 
IsNeedGetLocalWater(TaskId taskId)735 bool CloudSyncer::IsNeedGetLocalWater(TaskId taskId)
736 {
737     return !IsModeForcePush(taskId) && (!IsPriorityTask(taskId) || IsQueryListEmpty(taskId)) &&
738         !IsCompensatedTask(taskId);
739 }
740 
TryToAddSyncTask(CloudTaskInfo && taskInfo)741 int CloudSyncer::TryToAddSyncTask(CloudTaskInfo &&taskInfo)
742 {
743     if (closed_) {
744         LOGW("[CloudSyncer] syncer is closed, should not sync now");
745         return -E_DB_CLOSED;
746     }
747     std::shared_ptr<DataBaseSchema> cloudSchema;
748     int errCode = storageProxy_->GetCloudDbSchema(cloudSchema);
749     if (errCode != E_OK) {
750         LOGE("[CloudSyncer] Get cloud schema failed %d when add task", errCode);
751         return errCode;
752     }
753     std::lock_guard<std::mutex> autoLock(dataLock_);
754     errCode = CheckQueueSizeWithNoLock(taskInfo.priorityTask);
755     if (errCode != E_OK) {
756         return errCode;
757     }
758     errCode = GenerateTaskIdIfNeed(taskInfo);
759     if (errCode != E_OK) {
760         return errCode;
761     }
762     auto taskId = taskInfo.taskId;
763     cloudTaskInfos_[taskId] = std::move(taskInfo);
764     if (cloudTaskInfos_[taskId].priorityTask) {
765         priorityTaskQueue_.push_back(taskId);
766         LOGI("[CloudSyncer] Add priority task ok, storeId %.3s, taskId %" PRIu64,
767             cloudTaskInfos_[taskId].storeId.c_str(), cloudTaskInfos_[taskId].taskId);
768         return E_OK;
769     }
770     if (!MergeTaskInfo(cloudSchema, taskId)) {
771         taskQueue_.push_back(taskId);
772         LOGI("[CloudSyncer] Add task ok, storeId %.3s, taskId %" PRIu64, cloudTaskInfos_[taskId].storeId.c_str(),
773             cloudTaskInfos_[taskId].taskId);
774     }
775     return E_OK;
776 }
777 
MergeTaskInfo(const std::shared_ptr<DataBaseSchema> & cloudSchema,TaskId taskId)778 bool CloudSyncer::MergeTaskInfo(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId taskId)
779 {
780     if (!cloudTaskInfos_[taskId].merge) { // LCOV_EXCL_BR_LINE
781         return false;
782     }
783     bool isMerge = false;
784     bool mergeHappen = false;
785     TaskId checkTaskId = taskId;
786     do {
787         std::tie(isMerge, checkTaskId) = TryMergeTask(cloudSchema, checkTaskId);
788         mergeHappen = mergeHappen || isMerge;
789     } while (isMerge);
790     return mergeHappen;
791 }
792 
TryMergeTask(const std::shared_ptr<DataBaseSchema> & cloudSchema,TaskId tryTaskId)793 std::pair<bool, TaskId> CloudSyncer::TryMergeTask(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId tryTaskId)
794 {
795     std::pair<bool, TaskId> res;
796     auto &[merge, nextTryTask] = res;
797     TaskId beMergeTask = INVALID_TASK_ID;
798     TaskId runningTask = currentContext_.currentTaskId;
799     for (const auto &taskId : taskQueue_) {
800         if (taskId == runningTask || taskId == tryTaskId) { // LCOV_EXCL_BR_LINE
801             continue;
802         }
803         if (!IsTasksCanMerge(taskId, tryTaskId)) { // LCOV_EXCL_BR_LINE
804             continue;
805         }
806         if (MergeTaskTablesIfConsistent(taskId, tryTaskId)) { // LCOV_EXCL_BR_LINE
807             beMergeTask = taskId;
808             nextTryTask = tryTaskId;
809             merge = true;
810             break;
811         }
812         if (MergeTaskTablesIfConsistent(tryTaskId, taskId)) { // LCOV_EXCL_BR_LINE
813             beMergeTask = tryTaskId;
814             nextTryTask = taskId;
815             merge = true;
816             break;
817         }
818     }
819     if (!merge) { // LCOV_EXCL_BR_LINE
820         return res;
821     }
822     if (beMergeTask < nextTryTask) { // LCOV_EXCL_BR_LINE
823         std::tie(beMergeTask, nextTryTask) = SwapTwoTaskAndCopyTable(beMergeTask, nextTryTask);
824     }
825     AdjustTableBasedOnSchema(cloudSchema, cloudTaskInfos_[nextTryTask]);
826     auto processNotifier = std::make_shared<ProcessNotifier>(this);
827     processNotifier->Init(cloudTaskInfos_[beMergeTask].table, cloudTaskInfos_[beMergeTask].devices,
828         cloudTaskInfos_[beMergeTask].users);
829     cloudTaskInfos_[beMergeTask].errCode = -E_CLOUD_SYNC_TASK_MERGED;
830     cloudTaskInfos_[beMergeTask].status = ProcessStatus::FINISHED;
831     processNotifier->SetAllTableFinish();
832     processNotifier->NotifyProcess(cloudTaskInfos_[beMergeTask], {}, true);
833     cloudTaskInfos_.erase(beMergeTask);
834     taskQueue_.remove(beMergeTask);
835     LOGW("[CloudSyncer] TaskId %" PRIu64 " has been merged", beMergeTask);
836     return res;
837 }
838 
IsTaskCanMerge(const CloudTaskInfo & taskInfo)839 bool CloudSyncer::IsTaskCanMerge(const CloudTaskInfo &taskInfo)
840 {
841     return !taskInfo.compensatedTask && !taskInfo.priorityTask &&
842         taskInfo.merge && taskInfo.mode == SYNC_MODE_CLOUD_MERGE;
843 }
844 
IsTasksCanMerge(TaskId taskId,TaskId tryMergeTaskId)845 bool CloudSyncer::IsTasksCanMerge(TaskId taskId, TaskId tryMergeTaskId)
846 {
847     const auto &taskInfo = cloudTaskInfos_[taskId];
848     const auto &tryMergeTaskInfo = cloudTaskInfos_[tryMergeTaskId];
849     return IsTaskCanMerge(taskInfo) && IsTaskCanMerge(tryMergeTaskInfo) &&
850         taskInfo.devices == tryMergeTaskInfo.devices;
851 }
852 
MergeTaskTablesIfConsistent(TaskId sourceId,TaskId targetId)853 bool CloudSyncer::MergeTaskTablesIfConsistent(TaskId sourceId, TaskId targetId)
854 {
855     const auto &source = cloudTaskInfos_[sourceId];
856     const auto &target = cloudTaskInfos_[targetId];
857     bool isMerge = true;
858     for (const auto &table : source.table) {
859         if (std::find(target.table.begin(), target.table.end(), table) == target.table.end()) { // LCOV_EXCL_BR_LINE
860             isMerge = false;
861             break;
862         }
863     }
864     return isMerge;
865 }
866 
AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> & cloudSchema,CloudTaskInfo & taskInfo)867 void CloudSyncer::AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> &cloudSchema,
868     CloudTaskInfo &taskInfo)
869 {
870     std::vector<std::string> tmpTables = taskInfo.table;
871     taskInfo.table.clear();
872     taskInfo.queryList.clear();
873     for (const auto &table : cloudSchema->tables) {
874         if (std::find(tmpTables.begin(), tmpTables.end(), table.name) != tmpTables.end()) { // LCOV_EXCL_BR_LINE
875             taskInfo.table.push_back(table.name);
876             QuerySyncObject querySyncObject;
877             querySyncObject.SetTableName(table.name);
878             taskInfo.queryList.push_back(querySyncObject);
879         }
880     }
881 }
882 
SwapTwoTaskAndCopyTable(TaskId source,TaskId target)883 std::pair<TaskId, TaskId> CloudSyncer::SwapTwoTaskAndCopyTable(TaskId source, TaskId target)
884 {
885     cloudTaskInfos_[source].table = cloudTaskInfos_[target].table;
886     cloudTaskInfos_[source].queryList = cloudTaskInfos_[target].queryList;
887     std::set<std::string> users;
888     users.insert(cloudTaskInfos_[source].users.begin(), cloudTaskInfos_[source].users.end());
889     users.insert(cloudTaskInfos_[target].users.begin(), cloudTaskInfos_[target].users.end());
890     cloudTaskInfos_[source].users = std::vector<std::string>(users.begin(), users.end());
891     return {target, source};
892 }
893 
IsQueryListEmpty(TaskId taskId)894 bool CloudSyncer::IsQueryListEmpty(TaskId taskId)
895 {
896     std::lock_guard<std::mutex> autoLock(dataLock_);
897     return !std::any_of(cloudTaskInfos_[taskId].queryList.begin(), cloudTaskInfos_[taskId].queryList.end(),
898         [](const auto &item) {
899             return item.IsContainQueryNodes();
900     });
901 }
902 
IsNeedLock(const UploadParam & param)903 bool CloudSyncer::IsNeedLock(const UploadParam &param)
904 {
905     return param.lockAction == LockAction::INSERT && param.mode == CloudWaterType::INSERT;
906 }
907 
GetLocalWater(const std::string & tableName,UploadParam & uploadParam)908 std::pair<int, Timestamp> CloudSyncer::GetLocalWater(const std::string &tableName, UploadParam &uploadParam)
909 {
910     std::pair<int, Timestamp> res = { E_OK, 0u };
911     if (IsNeedGetLocalWater(uploadParam.taskId)) {
912         res.first = storageProxy_->GetLocalWaterMarkByMode(tableName, uploadParam.mode, res.second);
913     }
914     uploadParam.localMark = res.second;
915     return res;
916 }
917 
HandleBatchUpload(UploadParam & uploadParam,InnerProcessInfo & info,CloudSyncData & uploadData,ContinueToken & continueStmtToken)918 int CloudSyncer::HandleBatchUpload(UploadParam &uploadParam, InnerProcessInfo &info,
919     CloudSyncData &uploadData, ContinueToken &continueStmtToken)
920 {
921     int ret = E_OK;
922     uint32_t batchIndex = GetCurrentTableUploadBatchIndex();
923     bool isLocked = false;
924     while (!CloudSyncUtils::CheckCloudSyncDataEmpty(uploadData)) {
925         ret = PreProcessBatchUpload(uploadParam, info, uploadData);
926         if (ret != E_OK) {
927             break;
928         }
929         info.upLoadInfo.batchIndex = ++batchIndex;
930         if (IsNeedLock(uploadParam) && !isLocked) {
931             ret = LockCloudIfNeed(uploadParam.taskId);
932             if (ret != E_OK) {
933                 break;
934             }
935             isLocked = true;
936         }
937         ret = DoBatchUpload(uploadData, uploadParam, info);
938         if (ret != E_OK) {
939             break;
940         }
941         uploadData = CloudSyncData(uploadData.tableName, uploadParam.mode);
942         if (continueStmtToken == nullptr) {
943             break;
944         }
945         SetUploadDataFlag(uploadParam.taskId, uploadData);
946         RecordWaterMark(uploadParam.taskId, uploadParam.localMark);
947         ret = storageProxy_->GetCloudDataNext(continueStmtToken, uploadData);
948         if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
949             LOGE("[CloudSyncer] Failed to get cloud data next when doing upload, %d.", ret);
950             break;
951         }
952         ChkIgnoredProcess(info, uploadData, uploadParam);
953     }
954     if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
955         NotifyUploadFailed(ret, info);
956     }
957     if (isLocked && IsNeedLock(uploadParam)) {
958         UnlockIfNeed();
959     }
960     return ret;
961 }
962 
DoUploadInner(const std::string & tableName,UploadParam & uploadParam)963 int CloudSyncer::DoUploadInner(const std::string &tableName, UploadParam &uploadParam)
964 {
965     InnerProcessInfo info = GetInnerProcessInfo(tableName, uploadParam);
966     static std::vector<CloudWaterType> waterTypes = DBCommon::GetWaterTypeVec();
967     int errCode = E_OK;
968     for (const auto &waterType: waterTypes) {
969         uploadParam.mode = waterType;
970         errCode = DoUploadByMode(tableName, uploadParam, info);
971         if (errCode != E_OK) {
972             break;
973         }
974     }
975     int ret = E_OK;
976     if (info.upLoadInfo.successCount > 0) {
977         ret = UploadVersionRecordIfNeed(uploadParam);
978     }
979     return errCode != E_OK ? errCode : ret;
980 }
981 
UploadVersionRecordIfNeed(const UploadParam & uploadParam)982 int CloudSyncer::UploadVersionRecordIfNeed(const UploadParam &uploadParam)
983 {
984     if (uploadParam.count == 0) {
985         // no record upload
986         return E_OK;
987     }
988     if (!cloudDB_.IsExistCloudVersionCallback()) {
989         return E_OK;
990     }
991     auto [errCode, uploadData] = storageProxy_->GetLocalCloudVersion();
992     if (errCode != E_OK) {
993         return errCode;
994     }
995     bool isInsert = !uploadData.insData.record.empty();
996     CloudSyncBatch &batchData = isInsert ? uploadData.insData : uploadData.updData;
997     if (batchData.record.empty()) {
998         LOGE("[CloudSyncer] Get invalid cloud version record");
999         return -E_INTERNAL_ERROR;
1000     }
1001     std::string oriVersion;
1002     CloudStorageUtils::GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, batchData.record[0], oriVersion);
1003     std::string newVersion;
1004     std::tie(errCode, newVersion) = cloudDB_.GetCloudVersion(oriVersion);
1005     if (errCode != E_OK) {
1006         LOGE("[CloudSyncer] Get cloud version error %d", errCode);
1007         return errCode;
1008     }
1009     batchData.record[0][CloudDbConstant::CLOUD_KV_FIELD_VALUE] = newVersion;
1010     InnerProcessInfo processInfo;
1011     Info info;
1012     std::vector<VBucket> copyRecord = batchData.record;
1013     WaterMark waterMark;
1014     CloudSyncUtils::GetWaterMarkAndUpdateTime(batchData.extend, waterMark);
1015     errCode = isInsert ? BatchInsert(info, uploadData, processInfo) : BatchUpdate(info, uploadData, processInfo);
1016     batchData.record = copyRecord;
1017     CloudSyncUtils::ModifyCloudDataTime(batchData.extend[0]);
1018     auto ret = storageProxy_->FillCloudLogAndAsset(isInsert ? OpType::INSERT : OpType::UPDATE, uploadData);
1019     return errCode != E_OK ? errCode : ret;
1020 }
1021 
TagUploadAssets(CloudSyncData & uploadData)1022 void CloudSyncer::TagUploadAssets(CloudSyncData &uploadData)
1023 {
1024     if (!IsDataContainAssets()) {
1025         return;
1026     }
1027     std::vector<Field> assetFields;
1028     {
1029         std::lock_guard<std::mutex> autoLock(dataLock_);
1030         assetFields = currentContext_.assetFields[currentContext_.tableName];
1031     }
1032 
1033     for (size_t i = 0; i < uploadData.insData.extend.size(); i++) {
1034         for (const Field &assetField : assetFields) {
1035             (void)TagAssetsInSingleCol(assetField, true, uploadData.insData.record[i]);
1036         }
1037     }
1038     for (size_t i = 0; i < uploadData.updData.extend.size(); i++) {
1039         for (const Field &assetField : assetFields) {
1040             (void)TagAssetsInSingleCol(assetField, false, uploadData.updData.record[i]);
1041         }
1042     }
1043 }
1044 
IsLockInDownload()1045 bool CloudSyncer::IsLockInDownload()
1046 {
1047     std::lock_guard<std::mutex> autoLock(dataLock_);
1048     if (cloudTaskInfos_.find(currentContext_.currentTaskId) == cloudTaskInfos_.end()) {
1049         return false;
1050     }
1051     auto currentLockAction = static_cast<uint32_t>(cloudTaskInfos_[currentContext_.currentTaskId].lockAction);
1052     return (currentLockAction & static_cast<uint32_t>(LockAction::DOWNLOAD)) != 0;
1053 }
1054 
SetCurrentTaskFailedInMachine(int errCode)1055 CloudSyncEvent CloudSyncer::SetCurrentTaskFailedInMachine(int errCode)
1056 {
1057     std::lock_guard<std::mutex> autoLock(dataLock_);
1058     cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
1059     return CloudSyncEvent::ERROR_EVENT;
1060 }
1061 
InitCloudSyncStateMachine()1062 void CloudSyncer::InitCloudSyncStateMachine()
1063 {
1064     CloudSyncStateMachine::Initialize();
1065     cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_DOWNLOAD, [this]() {
1066         return SyncMachineDoDownload();
1067     });
1068     cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_UPLOAD, [this]() {
1069         return SyncMachineDoUpload();
1070     });
1071     cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_FINISHED, [this]() {
1072         return SyncMachineDoFinished();
1073     });
1074     cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_REPEAT_CHECK, [this]() {
1075         return SyncMachineDoRepeatCheck();
1076     });
1077 }
1078 
SyncMachineDoRepeatCheck()1079 CloudSyncEvent CloudSyncer::SyncMachineDoRepeatCheck()
1080 {
1081     auto config = storageProxy_->GetCloudSyncConfig();
1082     {
1083         std::lock_guard<std::mutex> autoLock(dataLock_);
1084         if (config.maxRetryConflictTimes < 0) { // unlimited repeat counts
1085             return CloudSyncEvent::REPEAT_DOWNLOAD_EVENT;
1086         }
1087         currentContext_.repeatCount++;
1088         if (currentContext_.repeatCount > config.maxRetryConflictTimes) {
1089             LOGD("[CloudSyncer] Repeat too much times current %d limit %" PRId32, currentContext_.repeatCount,
1090                 config.maxRetryConflictTimes);
1091             SetCurrentTaskFailedWithoutLock(-E_CLOUD_VERSION_CONFLICT);
1092             return CloudSyncEvent::ERROR_EVENT;
1093         }
1094         LOGD("[CloudSyncer] Repeat taskId %" PRIu64 " download current %d", currentContext_.currentTaskId,
1095             currentContext_.repeatCount);
1096     }
1097     return CloudSyncEvent::REPEAT_DOWNLOAD_EVENT;
1098 }
1099 
MarkDownloadFinishIfNeed(const std::string & downloadTable,bool isFinish)1100 void CloudSyncer::MarkDownloadFinishIfNeed(const std::string &downloadTable, bool isFinish)
1101 {
1102     // table exist reference should download every times
1103     if (IsLockInDownload() || storageProxy_->IsTableExistReferenceOrReferenceBy(downloadTable)) {
1104         return;
1105     }
1106     std::lock_guard<std::mutex> autoLock(dataLock_);
1107     currentContext_.processRecorder->MarkDownloadFinish(currentContext_.currentUserIndex, downloadTable, isFinish);
1108 }
1109 
DoUploadByMode(const std::string & tableName,UploadParam & uploadParam,InnerProcessInfo & info)1110 int CloudSyncer::DoUploadByMode(const std::string &tableName, UploadParam &uploadParam, InnerProcessInfo &info)
1111 {
1112     CloudSyncData uploadData(tableName, uploadParam.mode);
1113     SetUploadDataFlag(uploadParam.taskId, uploadData);
1114     auto [err, localWater] = GetLocalWater(tableName, uploadParam);
1115     if (err != E_OK) {
1116         return err;
1117     }
1118     ContinueToken continueStmtToken = nullptr;
1119     int ret = storageProxy_->GetCloudData(GetQuerySyncObject(tableName), localWater, continueStmtToken, uploadData);
1120     if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
1121         LOGE("[CloudSyncer] Failed to get cloud data when upload, %d.", ret);
1122         return ret;
1123     }
1124     uploadParam.count -= uploadData.ignoredCount;
1125     info.upLoadInfo.total -= static_cast<uint32_t>(uploadData.ignoredCount);
1126     ret = HandleBatchUpload(uploadParam, info, uploadData, continueStmtToken);
1127     if (ret != -E_TASK_PAUSED) {
1128         // reset watermark to zero when task no paused
1129         RecordWaterMark(uploadParam.taskId, 0u);
1130     }
1131     if (continueStmtToken != nullptr) {
1132         storageProxy_->ReleaseContinueToken(continueStmtToken);
1133     }
1134     return ret;
1135 }
1136 
IsTableFinishInUpload(const std::string & table)1137 bool CloudSyncer::IsTableFinishInUpload(const std::string &table)
1138 {
1139     std::lock_guard<std::mutex> autoLock(dataLock_);
1140     return currentContext_.processRecorder->IsUploadFinish(currentContext_.currentUserIndex, table);
1141 }
1142 
MarkUploadFinishIfNeed(const std::string & table)1143 void CloudSyncer::MarkUploadFinishIfNeed(const std::string &table)
1144 {
1145     // table exist reference or reference by should upload every times
1146     if (storageProxy_->IsTableExistReferenceOrReferenceBy(table)) {
1147         return;
1148     }
1149     std::lock_guard<std::mutex> autoLock(dataLock_);
1150     currentContext_.processRecorder->MarkUploadFinish(currentContext_.currentUserIndex, table, true);
1151 }
1152 
IsNeedUpdateAsset(const VBucket & data)1153 bool CloudSyncer::IsNeedUpdateAsset(const VBucket &data)
1154 {
1155     for (const auto &item : data) {
1156         const Asset *asset = std::get_if<TYPE_INDEX<Asset>>(&item.second);
1157         if (asset != nullptr) {
1158             uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(asset->status);
1159             if (lowBitStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
1160                 lowBitStatus == static_cast<uint32_t>(AssetStatus::DOWNLOADING)) {
1161                 return true;
1162             }
1163             continue;
1164         }
1165         const Assets *assets = std::get_if<TYPE_INDEX<Assets>>(&item.second);
1166         if (assets == nullptr) {
1167             continue;
1168         }
1169         for (const auto &oneAsset : *assets) {
1170             uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(oneAsset.status);
1171             if (lowBitStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
1172                 lowBitStatus == static_cast<uint32_t>(AssetStatus::DOWNLOADING)) {
1173                 return true;
1174             }
1175         }
1176     }
1177     return false;
1178 }
1179 
GetCloudTaskStatus(uint64_t taskId) const1180 SyncProcess CloudSyncer::GetCloudTaskStatus(uint64_t taskId) const
1181 {
1182     std::lock_guard<std::mutex> autoLock(dataLock_);
1183     auto iter = cloudTaskInfos_.find(taskId);
1184     SyncProcess syncProcess;
1185     if (iter == cloudTaskInfos_.end()) {
1186         syncProcess.process = ProcessStatus::FINISHED;
1187         syncProcess.errCode = NOT_FOUND;
1188         LOGE("[CloudSyncer] Not found task %" PRIu64, taskId);
1189         return syncProcess;
1190     }
1191     syncProcess.process = iter->second.status;
1192     syncProcess.errCode = TransferDBErrno(iter->second.errCode);
1193     std::shared_ptr<ProcessNotifier> notifier = nullptr;
1194     if (currentContext_.currentTaskId == taskId) {
1195         notifier = currentContext_.notifier;
1196     }
1197     bool hasNotifier = notifier != nullptr;
1198     if (hasNotifier) {
1199         syncProcess.tableProcess = notifier->GetCurrentTableProcess();
1200     }
1201     LOGI("[CloudSyncer] Found task %" PRIu64 " storeId %.3s status %d has notifier %d", taskId,
1202         iter->second.storeId.c_str(), static_cast<int64_t>(syncProcess.process), static_cast<int>(hasNotifier));
1203     return syncProcess;
1204 }
1205 
GenerateTaskIdIfNeed(CloudTaskInfo & taskInfo)1206 int CloudSyncer::GenerateTaskIdIfNeed(CloudTaskInfo &taskInfo)
1207 {
1208     if (taskInfo.taskId != INVALID_TASK_ID) {
1209         if (cloudTaskInfos_.find(taskInfo.taskId) != cloudTaskInfos_.end()) {
1210             LOGE("[CloudSyncer] Sync with exist taskId %" PRIu64 " storeId %.3s", taskInfo.taskId,
1211                 taskInfo.storeId.c_str());
1212             return -E_INVALID_ARGS;
1213         }
1214         lastTaskId_ = std::max(lastTaskId_, taskInfo.taskId);
1215         LOGI("[CloudSyncer] Sync with taskId %" PRIu64 " storeId %.3s", taskInfo.taskId, taskInfo.storeId.c_str());
1216         return E_OK;
1217     }
1218     lastTaskId_++;
1219     if (lastTaskId_ == UINT64_MAX) {
1220         lastTaskId_ = 1u;
1221     }
1222     taskInfo.taskId = lastTaskId_;
1223     return E_OK;
1224 }
1225 }