1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud/asset_operation_utils.h"
16 #include "cloud/cloud_db_constant.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "cloud/cloud_sync_utils.h"
22 
23 namespace DistributedDB {
GetCloudPkVals(const VBucket & datum,const std::vector<std::string> & pkColNames,int64_t dataKey,std::vector<Type> & cloudPkVals)24 int CloudSyncUtils::GetCloudPkVals(const VBucket &datum, const std::vector<std::string> &pkColNames, int64_t dataKey,
25     std::vector<Type> &cloudPkVals)
26 {
27     if (!cloudPkVals.empty()) {
28         LOGE("[CloudSyncer] Output parameter should be empty");
29         return -E_INVALID_ARGS;
30     }
31     for (const auto &pkColName : pkColNames) {
32         // If data is primary key or is a composite primary key, then use rowID as value
33         // The single primary key table, does not contain rowid.
34         if (pkColName == CloudDbConstant::ROW_ID_FIELD_NAME) {
35             cloudPkVals.emplace_back(dataKey);
36             continue;
37         }
38         Type type;
39         bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive(pkColName, datum, type);
40         if (!isExisted) {
41             LOGE("[CloudSyncer] Cloud data do not contain expected primary field value");
42             return -E_CLOUD_ERROR;
43         }
44         cloudPkVals.push_back(type);
45     }
46     return E_OK;
47 }
48 
OpTypeToChangeType(OpType strategy)49 ChangeType CloudSyncUtils::OpTypeToChangeType(OpType strategy)
50 {
51     switch (strategy) {
52         case OpType::INSERT:
53             return OP_INSERT;
54         case OpType::DELETE:
55             return OP_DELETE;
56         case OpType::UPDATE:
57             return OP_UPDATE;
58         default:
59             return OP_BUTT;
60     }
61 }
62 
IsSinglePrimaryKey(const std::vector<std::string> & pkColNames)63 bool CloudSyncUtils::IsSinglePrimaryKey(const std::vector<std::string> &pkColNames)
64 {
65     return pkColNames.size() == 1 && pkColNames[0] != CloudDbConstant::ROW_ID_FIELD_NAME;
66 }
67 
RemoveDataExceptExtendInfo(VBucket & datum,const std::vector<std::string> & pkColNames)68 void CloudSyncUtils::RemoveDataExceptExtendInfo(VBucket &datum, const std::vector<std::string> &pkColNames)
69 {
70     for (auto item = datum.begin(); item != datum.end();) {
71         const auto &key = item->first;
72         if (key != CloudDbConstant::GID_FIELD &&
73             key != CloudDbConstant::CREATE_FIELD &&
74             key != CloudDbConstant::MODIFY_FIELD &&
75             key != CloudDbConstant::DELETE_FIELD &&
76             key != CloudDbConstant::CURSOR_FIELD &&
77             key != CloudDbConstant::VERSION_FIELD &&
78             key != CloudDbConstant::SHARING_RESOURCE_FIELD &&
79             (std::find(pkColNames.begin(), pkColNames.end(), key) == pkColNames.end())) {
80                 item = datum.erase(item);
81             } else {
82                 item++;
83             }
84     }
85 }
86 
StatusToFlag(AssetStatus status)87 AssetOpType CloudSyncUtils::StatusToFlag(AssetStatus status)
88 {
89     auto tmpStatus = static_cast<AssetStatus>(AssetOperationUtils::EraseBitMask(static_cast<uint32_t>(status)));
90     switch (tmpStatus) {
91         case AssetStatus::INSERT:
92             return AssetOpType::INSERT;
93         case AssetStatus::DELETE:
94             return AssetOpType::DELETE;
95         case AssetStatus::UPDATE:
96             return AssetOpType::UPDATE;
97         case AssetStatus::NORMAL:
98             return AssetOpType::NO_CHANGE;
99         default:
100             LOGW("[CloudSyncer] Unexpected Situation and won't be handled"
101                 ", Caller should ensure that current situation won't occur");
102             return AssetOpType::NO_CHANGE;
103     }
104 }
105 
StatusToFlagForAsset(Asset & asset)106 void CloudSyncUtils::StatusToFlagForAsset(Asset &asset)
107 {
108     asset.flag = static_cast<uint32_t>(StatusToFlag(static_cast<AssetStatus>(asset.status)));
109     asset.status = static_cast<uint32_t>(AssetStatus::NORMAL);
110 }
111 
StatusToFlagForAssets(Assets & assets)112 void CloudSyncUtils::StatusToFlagForAssets(Assets &assets)
113 {
114     for (Asset &asset : assets) {
115         StatusToFlagForAsset(asset);
116     }
117 }
118 
StatusToFlagForAssetsInRecord(const std::vector<Field> & fields,VBucket & record)119 void CloudSyncUtils::StatusToFlagForAssetsInRecord(const std::vector<Field> &fields, VBucket &record)
120 {
121     for (const Field &field : fields) {
122         if (field.type == TYPE_INDEX<Assets> && record[field.colName].index() == TYPE_INDEX<Assets>) {
123             StatusToFlagForAssets(std::get<Assets>(record[field.colName]));
124         } else if (field.type == TYPE_INDEX<Asset> && record[field.colName].index() == TYPE_INDEX<Asset>) {
125             StatusToFlagForAsset(std::get<Asset>(record[field.colName]));
126         }
127     }
128 }
129 
IsChangeDataEmpty(const ChangedData & changedData)130 bool CloudSyncUtils::IsChangeDataEmpty(const ChangedData &changedData)
131 {
132     return changedData.primaryData[ChangeType::OP_INSERT].empty() ||
133            changedData.primaryData[ChangeType::OP_UPDATE].empty() ||
134            changedData.primaryData[ChangeType::OP_DELETE].empty();
135 }
136 
EqualInMsLevel(const Timestamp cmp,const Timestamp beCmp)137 bool CloudSyncUtils::EqualInMsLevel(const Timestamp cmp, const Timestamp beCmp)
138 {
139     return (cmp / CloudDbConstant::TEN_THOUSAND) == (beCmp / CloudDbConstant::TEN_THOUSAND);
140 }
141 
NeedSaveData(const LogInfo & localLogInfo,const LogInfo & cloudLogInfo)142 bool CloudSyncUtils::NeedSaveData(const LogInfo &localLogInfo, const LogInfo &cloudLogInfo)
143 {
144     // If timeStamp, write timestamp, cloudGid are all the same,
145     // We thought that the datum is mostly be the same between cloud and local
146     // However, there are still slightly possibility that it may be created from different device,
147     // So, during the strategy policy [i.e. TagSyncDataStatus], the datum was tagged as UPDATE
148     // But we won't notify the datum
149     bool isSame = localLogInfo.timestamp == cloudLogInfo.timestamp &&
150         EqualInMsLevel(localLogInfo.wTimestamp, cloudLogInfo.wTimestamp) &&
151         localLogInfo.cloudGid == cloudLogInfo.cloudGid &&
152         localLogInfo.sharingResource == cloudLogInfo.sharingResource &&
153         localLogInfo.version == cloudLogInfo.version &&
154         (localLogInfo.flag & static_cast<uint64_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC)) == 0;
155     return !isSame;
156 }
157 
CheckParamValid(const std::vector<DeviceID> & devices,SyncMode mode)158 int CloudSyncUtils::CheckParamValid(const std::vector<DeviceID> &devices, SyncMode mode)
159 {
160     if (devices.size() != 1) {
161         LOGE("[CloudSyncer] invalid devices size %zu", devices.size());
162         return -E_INVALID_ARGS;
163     }
164     for (const auto &dev: devices) {
165         if (dev.empty() || dev.size() > DBConstant::MAX_DEV_LENGTH) {
166             LOGE("[CloudSyncer] invalid device, size %zu", dev.size());
167             return -E_INVALID_ARGS;
168         }
169     }
170     if (mode >= SyncMode::SYNC_MODE_PUSH_ONLY && mode < SyncMode::SYNC_MODE_CLOUD_MERGE) {
171         LOGE("[CloudSyncer] not support mode %d", static_cast<int>(mode));
172         return -E_NOT_SUPPORT;
173     }
174     if (mode < SyncMode::SYNC_MODE_PUSH_ONLY || mode > SyncMode::SYNC_MODE_CLOUD_FORCE_PULL) {
175         LOGE("[CloudSyncer] invalid mode %d", static_cast<int>(mode));
176         return -E_INVALID_ARGS;
177     }
178     return E_OK;
179 }
180 
GetCloudLogInfo(DistributedDB::VBucket & datum)181 LogInfo CloudSyncUtils::GetCloudLogInfo(DistributedDB::VBucket &datum)
182 {
183     LogInfo cloudLogInfo;
184     cloudLogInfo.dataKey = 0;
185     cloudLogInfo.timestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::MODIFY_FIELD]);
186     cloudLogInfo.wTimestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::CREATE_FIELD]);
187     cloudLogInfo.flag = (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) ? 1u : 0u;
188     cloudLogInfo.cloudGid = std::get<std::string>(datum[CloudDbConstant::GID_FIELD]);
189     CloudStorageUtils::GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, datum, cloudLogInfo.device);
190     (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::SHARING_RESOURCE_FIELD,
191         datum, cloudLogInfo.sharingResource);
192     (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD,
193         datum, cloudLogInfo.version);
194     return cloudLogInfo;
195 }
196 
SaveChangedDataByType(const VBucket & datum,ChangedData & changedData,const DataInfoWithLog & localInfo,ChangeType type)197 int CloudSyncUtils::SaveChangedDataByType(const VBucket &datum, ChangedData &changedData,
198     const DataInfoWithLog &localInfo, ChangeType type)
199 {
200     int ret = E_OK;
201     std::vector<Type> cloudPkVals;
202     if (type == ChangeType::OP_DELETE) {
203         ret = CloudSyncUtils::GetCloudPkVals(localInfo.primaryKeys, changedData.field, localInfo.logInfo.dataKey,
204             cloudPkVals);
205     } else {
206         ret = CloudSyncUtils::GetCloudPkVals(datum, changedData.field, localInfo.logInfo.dataKey, cloudPkVals);
207     }
208     if (ret != E_OK) {
209         return ret;
210     }
211     InsertOrReplaceChangedDataByType(type, cloudPkVals, changedData);
212     return E_OK;
213 }
214 
CheckCloudSyncDataValid(const CloudSyncData & uploadData,const std::string & tableName,int64_t count)215 int CloudSyncUtils::CheckCloudSyncDataValid(const CloudSyncData &uploadData, const std::string &tableName,
216     int64_t count)
217 {
218     size_t insRecordLen = uploadData.insData.record.size();
219     size_t insExtendLen = uploadData.insData.extend.size();
220     size_t updRecordLen = uploadData.updData.record.size();
221     size_t updExtendLen = uploadData.updData.extend.size();
222     size_t delRecordLen = uploadData.delData.record.size();
223     size_t delExtendLen = uploadData.delData.extend.size();
224 
225     bool syncDataValid = (uploadData.tableName == tableName) &&
226         ((insRecordLen > 0 && insExtendLen > 0 && insRecordLen == insExtendLen) ||
227         (updRecordLen > 0 && updExtendLen > 0 && updRecordLen == updExtendLen) ||
228         (delRecordLen > 0 && delExtendLen > 0 && delRecordLen == delExtendLen) ||
229         (uploadData.lockData.extend.size() > 0));
230     if (!syncDataValid) {
231         LOGE("[CloudSyncUtils] upload data is empty but upload count is not zero or upload table name"
232             " is not the same as table name of sync data.");
233         return -E_INTERNAL_ERROR;
234     }
235     int64_t syncDataCount = static_cast<int64_t>(insRecordLen) + static_cast<int64_t>(updRecordLen) +
236         static_cast<int64_t>(delRecordLen);
237     if (syncDataCount > count) {
238         LOGE("[CloudSyncUtils] Size of a batch of sync data is greater than upload data size. count %d", count);
239         return -E_INTERNAL_ERROR;
240     }
241     return E_OK;
242 }
243 
ClearCloudSyncData(CloudSyncData & uploadData)244 void CloudSyncUtils::ClearCloudSyncData(CloudSyncData &uploadData)
245 {
246     std::vector<VBucket>().swap(uploadData.insData.record);
247     std::vector<VBucket>().swap(uploadData.insData.extend);
248     std::vector<int64_t>().swap(uploadData.insData.rowid);
249     std::vector<VBucket>().swap(uploadData.updData.record);
250     std::vector<VBucket>().swap(uploadData.updData.extend);
251     std::vector<VBucket>().swap(uploadData.delData.record);
252     std::vector<VBucket>().swap(uploadData.delData.extend);
253 }
254 
GetWaterMarkAndUpdateTime(std::vector<VBucket> & extend,Timestamp & waterMark)255 int CloudSyncUtils::GetWaterMarkAndUpdateTime(std::vector<VBucket> &extend, Timestamp &waterMark)
256 {
257     for (auto &extendData: extend) {
258         if (extendData.empty() || extendData.find(CloudDbConstant::MODIFY_FIELD) == extendData.end()) {
259             LOGE("[CloudSyncer] VBucket is empty or MODIFY_FIELD doesn't exist.");
260             return -E_INTERNAL_ERROR;
261         }
262         if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::MODIFY_FIELD).index()) {
263             LOGE("[CloudSyncer] VBucket's MODIFY_FIELD doesn't fit int64_t.");
264             return -E_INTERNAL_ERROR;
265         }
266         if (extendData.empty() || extendData.find(CloudDbConstant::CREATE_FIELD) == extendData.end()) {
267             LOGE("[CloudSyncer] VBucket is empty or CREATE_FIELD doesn't exist.");
268             return -E_INTERNAL_ERROR;
269         }
270         if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::CREATE_FIELD).index()) {
271             LOGE("[CloudSyncer] VBucket's CREATE_FIELD doesn't fit int64_t.");
272             return -E_INTERNAL_ERROR;
273         }
274         waterMark = std::max(int64_t(waterMark), std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)));
275         int64_t modifyTime =
276             std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)) / CloudDbConstant::TEN_THOUSAND;
277         int64_t createTime =
278             std::get<int64_t>(extendData.at(CloudDbConstant::CREATE_FIELD)) / CloudDbConstant::TEN_THOUSAND;
279         extendData.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
280         extendData.insert_or_assign(CloudDbConstant::CREATE_FIELD, createTime);
281     }
282     return E_OK;
283 }
284 
CheckCloudSyncDataEmpty(const CloudSyncData & uploadData)285 bool CloudSyncUtils::CheckCloudSyncDataEmpty(const CloudSyncData &uploadData)
286 {
287     return uploadData.insData.extend.empty() && uploadData.insData.record.empty() &&
288         uploadData.updData.extend.empty() && uploadData.updData.record.empty() &&
289         uploadData.delData.extend.empty() && uploadData.delData.record.empty() &&
290         uploadData.lockData.rowid.empty();
291 }
292 
ModifyCloudDataTime(DistributedDB::VBucket & data)293 void CloudSyncUtils::ModifyCloudDataTime(DistributedDB::VBucket &data)
294 {
295     // data already check field modify_field and create_field
296     int64_t modifyTime = std::get<int64_t>(data[CloudDbConstant::MODIFY_FIELD]) * CloudDbConstant::TEN_THOUSAND;
297     int64_t createTime = std::get<int64_t>(data[CloudDbConstant::CREATE_FIELD]) * CloudDbConstant::TEN_THOUSAND;
298     data[CloudDbConstant::MODIFY_FIELD] = modifyTime;
299     data[CloudDbConstant::CREATE_FIELD] = createTime;
300 }
301 
302 // After doing a batch upload, we need to use CloudSyncData's maximum timestamp to update the water mark;
UpdateExtendTime(CloudSyncData & uploadData,const int64_t & count,uint64_t taskId,Timestamp & waterMark)303 int CloudSyncUtils::UpdateExtendTime(CloudSyncData &uploadData, const int64_t &count, uint64_t taskId,
304     Timestamp &waterMark)
305 {
306     int ret = CloudSyncUtils::CheckCloudSyncDataValid(uploadData, uploadData.tableName, count);
307     if (ret != E_OK) {
308         LOGE("[CloudSyncer] Invalid Sync Data when get local water mark.");
309         return ret;
310     }
311     if (!uploadData.insData.extend.empty()) {
312         if (uploadData.insData.record.size() != uploadData.insData.extend.size()) {
313             LOGE("[CloudSyncer] Inconsistent size of inserted data.");
314             return -E_INTERNAL_ERROR;
315         }
316         ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.insData.extend, waterMark);
317         if (ret != E_OK) {
318             return ret;
319         }
320     }
321 
322     if (!uploadData.updData.extend.empty()) {
323         if (uploadData.updData.record.size() != uploadData.updData.extend.size()) {
324             LOGE("[CloudSyncer] Inconsistent size of updated data, %d.", -E_INTERNAL_ERROR);
325             return -E_INTERNAL_ERROR;
326         }
327         ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.updData.extend, waterMark);
328         if (ret != E_OK) {
329             return ret;
330         }
331     }
332 
333     if (!uploadData.delData.extend.empty()) {
334         if (uploadData.delData.record.size() != uploadData.delData.extend.size()) {
335             LOGE("[CloudSyncer] Inconsistent size of deleted data, %d.", -E_INTERNAL_ERROR);
336             return -E_INTERNAL_ERROR;
337         }
338         ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.delData.extend, waterMark);
339         if (ret != E_OK) {
340             return ret;
341         }
342     }
343     return E_OK;
344 }
345 
UpdateLocalCache(OpType opType,const LogInfo & cloudInfo,const LogInfo & localInfo,std::map<std::string,LogInfo> & localLogInfoCache)346 void CloudSyncUtils::UpdateLocalCache(OpType opType, const LogInfo &cloudInfo, const LogInfo &localInfo,
347     std::map<std::string, LogInfo> &localLogInfoCache)
348 {
349     LogInfo updateLogInfo;
350     std::string hashKey(localInfo.hashKey.begin(), localInfo.hashKey.end());
351     bool updateCache = true;
352     switch (opType) {
353         case OpType::INSERT :
354         case OpType::UPDATE :
355         case OpType::DELETE: {
356             updateLogInfo = cloudInfo;
357             updateLogInfo.device = CloudDbConstant::DEFAULT_CLOUD_DEV;
358             updateLogInfo.hashKey = localInfo.hashKey;
359             if (opType == OpType::DELETE) {
360                 updateLogInfo.flag |= static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE);
361             } else if (opType == OpType::INSERT) {
362                 updateLogInfo.originDev = CloudDbConstant::DEFAULT_CLOUD_DEV;
363             }
364             break;
365         }
366         case OpType::CLEAR_GID:
367         case OpType::UPDATE_TIMESTAMP: {
368             updateLogInfo = localInfo;
369             updateLogInfo.cloudGid.clear();
370             updateLogInfo.sharingResource.clear();
371             break;
372         }
373         default:
374             updateCache = false;
375             break;
376     }
377     if (updateCache) {
378         localLogInfoCache[hashKey] = updateLogInfo;
379     }
380 }
381 
SaveChangedData(ICloudSyncer::SyncParam & param,size_t dataIndex,const ICloudSyncer::DataInfo & dataInfo,std::vector<std::pair<Key,size_t>> & deletedList)382 int CloudSyncUtils::SaveChangedData(ICloudSyncer::SyncParam &param, size_t dataIndex,
383     const ICloudSyncer::DataInfo &dataInfo, std::vector<std::pair<Key, size_t>> &deletedList)
384 {
385     OpType opType = CalOpType(param, dataIndex);
386     Key hashKey = dataInfo.localInfo.logInfo.hashKey;
387     if (param.deletePrimaryKeySet.find(hashKey) != param.deletePrimaryKeySet.end()) {
388         if (opType == OpType::INSERT) {
389             (void)param.dupHashKeySet.insert(hashKey);
390             opType = OpType::UPDATE;
391             // only composite primary key needs to be processed.
392             if (!param.isSinglePrimaryKey) {
393                 param.withoutRowIdData.updateData.emplace_back(dataIndex,
394                     param.changedData.primaryData[ChangeType::OP_UPDATE].size());
395             }
396         }
397     }
398     // INSERT: for no primary key or composite primary key situation
399     if (!param.isSinglePrimaryKey && opType == OpType::INSERT) {
400         param.info.downLoadInfo.insertCount++;
401         param.withoutRowIdData.insertData.push_back(dataIndex);
402         return E_OK;
403     }
404     switch (opType) {
405         // INSERT: only for single primary key situation
406         case OpType::INSERT:
407             param.info.downLoadInfo.insertCount++;
408             return CloudSyncUtils::SaveChangedDataByType(
409                 param.downloadData.data[dataIndex], param.changedData, dataInfo.localInfo, ChangeType::OP_INSERT);
410         case OpType::UPDATE:
411             param.info.downLoadInfo.updateCount++;
412             if (CloudSyncUtils::NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
413                 return CloudSyncUtils::SaveChangedDataByType(param.downloadData.data[dataIndex], param.changedData,
414                     dataInfo.localInfo, ChangeType::OP_UPDATE);
415             }
416             return E_OK;
417         case OpType::DELETE:
418             param.info.downLoadInfo.deleteCount++;
419             return CloudSyncUtils::SaveChangedDataByType(param.downloadData.data[dataIndex], param.changedData,
420                 dataInfo.localInfo, ChangeType::OP_DELETE);
421         default:
422             return E_OK;
423     }
424 }
425 
ClearWithoutData(ICloudSyncer::SyncParam & param)426 void CloudSyncUtils::ClearWithoutData(ICloudSyncer::SyncParam &param)
427 {
428     param.withoutRowIdData.insertData.clear();
429     param.withoutRowIdData.updateData.clear();
430     param.withoutRowIdData.assetInsertData.clear();
431 }
432 
IsSkipAssetsMissingRecord(const std::vector<VBucket> & extend)433 bool CloudSyncUtils::IsSkipAssetsMissingRecord(const std::vector<VBucket> &extend)
434 {
435     if (extend.empty()) {
436         return false;
437     }
438     for (size_t i = 0; i < extend.size(); ++i) {
439         if (DBCommon::IsIntTypeRecordError(extend[i]) && !DBCommon::IsRecordAssetsMissing(extend[i])) {
440             return false;
441         }
442     }
443     return true;
444 }
445 
FillAssetIdToAssets(CloudSyncBatch & data,int errorCode,const CloudWaterType & type)446 int CloudSyncUtils::FillAssetIdToAssets(CloudSyncBatch &data, int errorCode, const CloudWaterType &type)
447 {
448     if (data.extend.size() != data.assets.size()) {
449         LOGE("[CloudSyncUtils] size not match, extend:%zu assets:%zu.", data.extend.size(), data.assets.size());
450         return -E_CLOUD_ERROR;
451     }
452     int errCode = E_OK;
453     for (size_t i = 0; i < data.assets.size(); i++) {
454         if (data.assets[i].empty() || DBCommon::IsRecordIgnored(data.extend[i]) ||
455             (errorCode != E_OK &&
456                 (DBCommon::IsRecordError(data.extend[i]) || DBCommon::IsRecordAssetsMissing(data.extend[i]))) ||
457             DBCommon::IsNeedCompensatedForUpload(data.extend[i], type)) {
458             if (errCode != E_OK && DBCommon::IsRecordAssetsMissing(data.extend[i])) {
459                 LOGI("[CloudSyncUtils][FileAssetIdToAssets] errCode with assets missing, skip fill assets id");
460             }
461             continue;
462         }
463         for (auto it = data.assets[i].begin(); it != data.assets[i].end();) {
464             auto &[col, value] = *it;
465             if (!CheckIfContainsInsertAssets(value)) {
466                 ++it;
467                 continue;
468             }
469             auto extendIt = data.extend[i].find(col);
470             if (extendIt == data.extend[i].end()) {
471                 LOGI("[CloudSyncUtils] Asset field name can not find in extend. key is:%s.", col.c_str());
472                 it = data.assets[i].erase(it);
473                 continue;
474             }
475             if (extendIt->second.index() != value.index()) {
476                 LOGE("[CloudSyncUtils] Asset field type not same. extend:%zu, data:%zu",
477                     extendIt->second.index(), value.index());
478                 errCode = -E_CLOUD_ERROR;
479                 ++it;
480                 continue;
481             }
482             int ret = FillAssetIdToAssetData(extendIt->second, value);
483             if (ret != E_OK) {
484                 LOGE("[CloudSyncUtils] fail to fill assetId, %d.", ret);
485                 errCode = -E_CLOUD_ERROR;
486             }
487             ++it;
488         }
489     }
490     return errCode;
491 }
492 
FillAssetIdToAssetData(const Type & extend,Type & assetData)493 int CloudSyncUtils::FillAssetIdToAssetData(const Type &extend, Type &assetData)
494 {
495     if (extend.index() == TYPE_INDEX<Asset>) {
496         if (std::get<Asset>(assetData).name != std::get<Asset>(extend).name) {
497             LOGE("[CloudSyncUtils][FillAssetIdToAssetData] Asset name can not find in extend.");
498             return -E_CLOUD_ERROR;
499         }
500         if (std::get<Asset>(extend).assetId.empty()) {
501             LOGE("[CloudSyncUtils][FillAssetIdToAssetData] Asset id is empty.");
502             return -E_CLOUD_ERROR;
503         }
504         std::get<Asset>(assetData).assetId = std::get<Asset>(extend).assetId;
505     }
506     if (extend.index() == TYPE_INDEX<Assets>) {
507         FillAssetIdToAssetsData(std::get<Assets>(extend), std::get<Assets>(assetData));
508     }
509     return E_OK;
510 }
511 
FillAssetIdToAssetsData(const Assets & extend,Assets & assets)512 void CloudSyncUtils::FillAssetIdToAssetsData(const Assets &extend, Assets &assets)
513 {
514     for (auto it = assets.begin(); it != assets.end();) {
515         auto &asset = *it;
516         if (asset.flag != static_cast<uint32_t>(AssetOpType::INSERT)) {
517             ++it;
518             continue;
519         }
520         auto extendAssets = extend;
521         bool isAssetExisted = false;
522         for (const auto &extendAsset : extendAssets) {
523             if (asset.name == extendAsset.name && !extendAsset.assetId.empty()) {
524                 asset.assetId = extendAsset.assetId;
525                 isAssetExisted = true;
526                 break;
527             }
528         }
529         if (!isAssetExisted) {
530             LOGI("Unable to sync local asset, skip fill assetId.");
531             it = assets.erase(it);
532         } else {
533             ++it;
534         }
535     }
536 }
537 
CheckIfContainsInsertAssets(const Type & assetData)538 bool CloudSyncUtils::CheckIfContainsInsertAssets(const Type &assetData)
539 {
540     if (assetData.index() == TYPE_INDEX<Asset>) {
541         if (std::get<Asset>(assetData).flag != static_cast<uint32_t>(AssetOpType::INSERT)) {
542             return false;
543         }
544     } else if (assetData.index() == TYPE_INDEX<Assets>) {
545         bool hasInsertAsset = false;
546         for (const auto &asset : std::get<Assets>(assetData)) {
547             if (asset.flag == static_cast<uint32_t>(AssetOpType::INSERT)) {
548                 hasInsertAsset = true;
549                 break;
550             }
551         }
552         if (!hasInsertAsset) {
553             return false;
554         }
555     }
556     return true;
557 }
558 
UpdateAssetsFlag(CloudSyncData & uploadData)559 void CloudSyncUtils::UpdateAssetsFlag(CloudSyncData &uploadData)
560 {
561     AssetOperationUtils::UpdateAssetsFlag(uploadData.insData.record, uploadData.insData.assets);
562     AssetOperationUtils::UpdateAssetsFlag(uploadData.updData.record, uploadData.updData.assets);
563     AssetOperationUtils::UpdateAssetsFlag(uploadData.delData.record, uploadData.delData.assets);
564 }
565 
InsertOrReplaceChangedDataByType(ChangeType type,std::vector<Type> & pkVal,ChangedData & changedData)566 void CloudSyncUtils::InsertOrReplaceChangedDataByType(ChangeType type, std::vector<Type> &pkVal,
567     ChangedData &changedData)
568 {
569     // erase old changedData if exist
570     for (auto &changePkValList : changedData.primaryData) {
571         changePkValList.erase(std::remove_if(changePkValList.begin(), changePkValList.end(),
572             [&pkVal](const std::vector<Type> &existPkVal) {
573             return existPkVal == pkVal;
574             }), changePkValList.end());
575     }
576     // insert new changeData
577     changedData.primaryData[type].emplace_back(std::move(pkVal));
578 }
579 
CalOpType(ICloudSyncer::SyncParam & param,size_t dataIndex)580 OpType CloudSyncUtils::CalOpType(ICloudSyncer::SyncParam &param, size_t dataIndex)
581 {
582     OpType opType = param.downloadData.opType[dataIndex];
583     if (opType != OpType::INSERT && opType != OpType::UPDATE) {
584         return opType;
585     }
586 
587     std::vector<Type> cloudPkVal;
588     // use dataIndex as dataKey avoid get same pk with no pk schema
589     int errCode = CloudSyncUtils::GetCloudPkVals(param.downloadData.data[dataIndex], param.changedData.field, dataIndex,
590         cloudPkVal);
591     if (errCode != E_OK) {
592         LOGW("[CloudSyncUtils] Get pk from download data failed %d", errCode);
593         // use origin opType
594         return opType;
595     }
596     auto iter = std::find_if(param.insertPk.begin(), param.insertPk.end(), [&cloudPkVal](const auto &item) {
597         return item == cloudPkVal;
598     });
599     if (opType == OpType::INSERT) {
600         // record all insert pk in one batch
601         if (iter == param.insertPk.end()) {
602             param.insertPk.push_back(cloudPkVal);
603         }
604         return OpType::INSERT;
605     }
606     // notify with insert because this data not exist in local before query
607     return (iter == param.insertPk.end()) ? OpType::UPDATE : OpType::INSERT;
608 }
609 
InitCompensatedSyncTaskInfo()610 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo()
611 {
612     CloudSyncer::CloudTaskInfo taskInfo;
613     taskInfo.priorityTask = true;
614     taskInfo.timeout = CloudDbConstant::CLOUD_DEFAULT_TIMEOUT;
615     taskInfo.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
616     taskInfo.callback = nullptr;
617     taskInfo.compensatedTask = true;
618     return taskInfo;
619 }
620 
InitCompensatedSyncTaskInfo(const CloudSyncOption & option,const SyncProcessCallback & onProcess)621 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo(const CloudSyncOption &option,
622     const SyncProcessCallback &onProcess)
623 {
624     CloudSyncer::CloudTaskInfo taskInfo = InitCompensatedSyncTaskInfo();
625     taskInfo.callback = onProcess;
626     taskInfo.devices = option.devices;
627     taskInfo.prepareTraceId = option.prepareTraceId;
628     if (option.users.empty()) {
629         taskInfo.users.push_back("");
630     } else {
631         taskInfo.users = option.users;
632     }
633     taskInfo.lockAction = option.lockAction;
634     return taskInfo;
635 }
636 
InitCompensatedSyncTaskInfo(const CloudSyncer::CloudTaskInfo & oriTaskInfo)637 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo(const CloudSyncer::CloudTaskInfo &oriTaskInfo)
638 {
639     CloudSyncer::CloudTaskInfo taskInfo = InitCompensatedSyncTaskInfo();
640     taskInfo.lockAction = oriTaskInfo.lockAction;
641     taskInfo.users = oriTaskInfo.users;
642     taskInfo.devices = oriTaskInfo.devices;
643     taskInfo.storeId = oriTaskInfo.storeId;
644     taskInfo.prepareTraceId = oriTaskInfo.prepareTraceId;
645     return taskInfo;
646 }
647 }