1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "cloud/asset_operation_utils.h"
16 #include "cloud/cloud_db_constant.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "cloud/cloud_sync_utils.h"
22
23 namespace DistributedDB {
GetCloudPkVals(const VBucket & datum,const std::vector<std::string> & pkColNames,int64_t dataKey,std::vector<Type> & cloudPkVals)24 int CloudSyncUtils::GetCloudPkVals(const VBucket &datum, const std::vector<std::string> &pkColNames, int64_t dataKey,
25 std::vector<Type> &cloudPkVals)
26 {
27 if (!cloudPkVals.empty()) {
28 LOGE("[CloudSyncer] Output parameter should be empty");
29 return -E_INVALID_ARGS;
30 }
31 for (const auto &pkColName : pkColNames) {
32 // If data is primary key or is a composite primary key, then use rowID as value
33 // The single primary key table, does not contain rowid.
34 if (pkColName == CloudDbConstant::ROW_ID_FIELD_NAME) {
35 cloudPkVals.emplace_back(dataKey);
36 continue;
37 }
38 Type type;
39 bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive(pkColName, datum, type);
40 if (!isExisted) {
41 LOGE("[CloudSyncer] Cloud data do not contain expected primary field value");
42 return -E_CLOUD_ERROR;
43 }
44 cloudPkVals.push_back(type);
45 }
46 return E_OK;
47 }
48
OpTypeToChangeType(OpType strategy)49 ChangeType CloudSyncUtils::OpTypeToChangeType(OpType strategy)
50 {
51 switch (strategy) {
52 case OpType::INSERT:
53 return OP_INSERT;
54 case OpType::DELETE:
55 return OP_DELETE;
56 case OpType::UPDATE:
57 return OP_UPDATE;
58 default:
59 return OP_BUTT;
60 }
61 }
62
IsSinglePrimaryKey(const std::vector<std::string> & pkColNames)63 bool CloudSyncUtils::IsSinglePrimaryKey(const std::vector<std::string> &pkColNames)
64 {
65 return pkColNames.size() == 1 && pkColNames[0] != CloudDbConstant::ROW_ID_FIELD_NAME;
66 }
67
RemoveDataExceptExtendInfo(VBucket & datum,const std::vector<std::string> & pkColNames)68 void CloudSyncUtils::RemoveDataExceptExtendInfo(VBucket &datum, const std::vector<std::string> &pkColNames)
69 {
70 for (auto item = datum.begin(); item != datum.end();) {
71 const auto &key = item->first;
72 if (key != CloudDbConstant::GID_FIELD &&
73 key != CloudDbConstant::CREATE_FIELD &&
74 key != CloudDbConstant::MODIFY_FIELD &&
75 key != CloudDbConstant::DELETE_FIELD &&
76 key != CloudDbConstant::CURSOR_FIELD &&
77 key != CloudDbConstant::VERSION_FIELD &&
78 key != CloudDbConstant::SHARING_RESOURCE_FIELD &&
79 (std::find(pkColNames.begin(), pkColNames.end(), key) == pkColNames.end())) {
80 item = datum.erase(item);
81 } else {
82 item++;
83 }
84 }
85 }
86
StatusToFlag(AssetStatus status)87 AssetOpType CloudSyncUtils::StatusToFlag(AssetStatus status)
88 {
89 auto tmpStatus = static_cast<AssetStatus>(AssetOperationUtils::EraseBitMask(static_cast<uint32_t>(status)));
90 switch (tmpStatus) {
91 case AssetStatus::INSERT:
92 return AssetOpType::INSERT;
93 case AssetStatus::DELETE:
94 return AssetOpType::DELETE;
95 case AssetStatus::UPDATE:
96 return AssetOpType::UPDATE;
97 case AssetStatus::NORMAL:
98 return AssetOpType::NO_CHANGE;
99 default:
100 LOGW("[CloudSyncer] Unexpected Situation and won't be handled"
101 ", Caller should ensure that current situation won't occur");
102 return AssetOpType::NO_CHANGE;
103 }
104 }
105
StatusToFlagForAsset(Asset & asset)106 void CloudSyncUtils::StatusToFlagForAsset(Asset &asset)
107 {
108 asset.flag = static_cast<uint32_t>(StatusToFlag(static_cast<AssetStatus>(asset.status)));
109 asset.status = static_cast<uint32_t>(AssetStatus::NORMAL);
110 }
111
StatusToFlagForAssets(Assets & assets)112 void CloudSyncUtils::StatusToFlagForAssets(Assets &assets)
113 {
114 for (Asset &asset : assets) {
115 StatusToFlagForAsset(asset);
116 }
117 }
118
StatusToFlagForAssetsInRecord(const std::vector<Field> & fields,VBucket & record)119 void CloudSyncUtils::StatusToFlagForAssetsInRecord(const std::vector<Field> &fields, VBucket &record)
120 {
121 for (const Field &field : fields) {
122 if (field.type == TYPE_INDEX<Assets> && record[field.colName].index() == TYPE_INDEX<Assets>) {
123 StatusToFlagForAssets(std::get<Assets>(record[field.colName]));
124 } else if (field.type == TYPE_INDEX<Asset> && record[field.colName].index() == TYPE_INDEX<Asset>) {
125 StatusToFlagForAsset(std::get<Asset>(record[field.colName]));
126 }
127 }
128 }
129
IsChangeDataEmpty(const ChangedData & changedData)130 bool CloudSyncUtils::IsChangeDataEmpty(const ChangedData &changedData)
131 {
132 return changedData.primaryData[ChangeType::OP_INSERT].empty() ||
133 changedData.primaryData[ChangeType::OP_UPDATE].empty() ||
134 changedData.primaryData[ChangeType::OP_DELETE].empty();
135 }
136
EqualInMsLevel(const Timestamp cmp,const Timestamp beCmp)137 bool CloudSyncUtils::EqualInMsLevel(const Timestamp cmp, const Timestamp beCmp)
138 {
139 return (cmp / CloudDbConstant::TEN_THOUSAND) == (beCmp / CloudDbConstant::TEN_THOUSAND);
140 }
141
NeedSaveData(const LogInfo & localLogInfo,const LogInfo & cloudLogInfo)142 bool CloudSyncUtils::NeedSaveData(const LogInfo &localLogInfo, const LogInfo &cloudLogInfo)
143 {
144 // If timeStamp, write timestamp, cloudGid are all the same,
145 // We thought that the datum is mostly be the same between cloud and local
146 // However, there are still slightly possibility that it may be created from different device,
147 // So, during the strategy policy [i.e. TagSyncDataStatus], the datum was tagged as UPDATE
148 // But we won't notify the datum
149 bool isSame = localLogInfo.timestamp == cloudLogInfo.timestamp &&
150 EqualInMsLevel(localLogInfo.wTimestamp, cloudLogInfo.wTimestamp) &&
151 localLogInfo.cloudGid == cloudLogInfo.cloudGid &&
152 localLogInfo.sharingResource == cloudLogInfo.sharingResource &&
153 localLogInfo.version == cloudLogInfo.version &&
154 (localLogInfo.flag & static_cast<uint64_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC)) == 0;
155 return !isSame;
156 }
157
CheckParamValid(const std::vector<DeviceID> & devices,SyncMode mode)158 int CloudSyncUtils::CheckParamValid(const std::vector<DeviceID> &devices, SyncMode mode)
159 {
160 if (devices.size() != 1) {
161 LOGE("[CloudSyncer] invalid devices size %zu", devices.size());
162 return -E_INVALID_ARGS;
163 }
164 for (const auto &dev: devices) {
165 if (dev.empty() || dev.size() > DBConstant::MAX_DEV_LENGTH) {
166 LOGE("[CloudSyncer] invalid device, size %zu", dev.size());
167 return -E_INVALID_ARGS;
168 }
169 }
170 if (mode >= SyncMode::SYNC_MODE_PUSH_ONLY && mode < SyncMode::SYNC_MODE_CLOUD_MERGE) {
171 LOGE("[CloudSyncer] not support mode %d", static_cast<int>(mode));
172 return -E_NOT_SUPPORT;
173 }
174 if (mode < SyncMode::SYNC_MODE_PUSH_ONLY || mode > SyncMode::SYNC_MODE_CLOUD_FORCE_PULL) {
175 LOGE("[CloudSyncer] invalid mode %d", static_cast<int>(mode));
176 return -E_INVALID_ARGS;
177 }
178 return E_OK;
179 }
180
GetCloudLogInfo(DistributedDB::VBucket & datum)181 LogInfo CloudSyncUtils::GetCloudLogInfo(DistributedDB::VBucket &datum)
182 {
183 LogInfo cloudLogInfo;
184 cloudLogInfo.dataKey = 0;
185 cloudLogInfo.timestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::MODIFY_FIELD]);
186 cloudLogInfo.wTimestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::CREATE_FIELD]);
187 cloudLogInfo.flag = (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) ? 1u : 0u;
188 cloudLogInfo.cloudGid = std::get<std::string>(datum[CloudDbConstant::GID_FIELD]);
189 CloudStorageUtils::GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, datum, cloudLogInfo.device);
190 (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::SHARING_RESOURCE_FIELD,
191 datum, cloudLogInfo.sharingResource);
192 (void)CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD,
193 datum, cloudLogInfo.version);
194 return cloudLogInfo;
195 }
196
SaveChangedDataByType(const VBucket & datum,ChangedData & changedData,const DataInfoWithLog & localInfo,ChangeType type)197 int CloudSyncUtils::SaveChangedDataByType(const VBucket &datum, ChangedData &changedData,
198 const DataInfoWithLog &localInfo, ChangeType type)
199 {
200 int ret = E_OK;
201 std::vector<Type> cloudPkVals;
202 if (type == ChangeType::OP_DELETE) {
203 ret = CloudSyncUtils::GetCloudPkVals(localInfo.primaryKeys, changedData.field, localInfo.logInfo.dataKey,
204 cloudPkVals);
205 } else {
206 ret = CloudSyncUtils::GetCloudPkVals(datum, changedData.field, localInfo.logInfo.dataKey, cloudPkVals);
207 }
208 if (ret != E_OK) {
209 return ret;
210 }
211 InsertOrReplaceChangedDataByType(type, cloudPkVals, changedData);
212 return E_OK;
213 }
214
CheckCloudSyncDataValid(const CloudSyncData & uploadData,const std::string & tableName,int64_t count)215 int CloudSyncUtils::CheckCloudSyncDataValid(const CloudSyncData &uploadData, const std::string &tableName,
216 int64_t count)
217 {
218 size_t insRecordLen = uploadData.insData.record.size();
219 size_t insExtendLen = uploadData.insData.extend.size();
220 size_t updRecordLen = uploadData.updData.record.size();
221 size_t updExtendLen = uploadData.updData.extend.size();
222 size_t delRecordLen = uploadData.delData.record.size();
223 size_t delExtendLen = uploadData.delData.extend.size();
224
225 bool syncDataValid = (uploadData.tableName == tableName) &&
226 ((insRecordLen > 0 && insExtendLen > 0 && insRecordLen == insExtendLen) ||
227 (updRecordLen > 0 && updExtendLen > 0 && updRecordLen == updExtendLen) ||
228 (delRecordLen > 0 && delExtendLen > 0 && delRecordLen == delExtendLen) ||
229 (uploadData.lockData.extend.size() > 0));
230 if (!syncDataValid) {
231 LOGE("[CloudSyncUtils] upload data is empty but upload count is not zero or upload table name"
232 " is not the same as table name of sync data.");
233 return -E_INTERNAL_ERROR;
234 }
235 int64_t syncDataCount = static_cast<int64_t>(insRecordLen) + static_cast<int64_t>(updRecordLen) +
236 static_cast<int64_t>(delRecordLen);
237 if (syncDataCount > count) {
238 LOGE("[CloudSyncUtils] Size of a batch of sync data is greater than upload data size. count %d", count);
239 return -E_INTERNAL_ERROR;
240 }
241 return E_OK;
242 }
243
ClearCloudSyncData(CloudSyncData & uploadData)244 void CloudSyncUtils::ClearCloudSyncData(CloudSyncData &uploadData)
245 {
246 std::vector<VBucket>().swap(uploadData.insData.record);
247 std::vector<VBucket>().swap(uploadData.insData.extend);
248 std::vector<int64_t>().swap(uploadData.insData.rowid);
249 std::vector<VBucket>().swap(uploadData.updData.record);
250 std::vector<VBucket>().swap(uploadData.updData.extend);
251 std::vector<VBucket>().swap(uploadData.delData.record);
252 std::vector<VBucket>().swap(uploadData.delData.extend);
253 }
254
GetWaterMarkAndUpdateTime(std::vector<VBucket> & extend,Timestamp & waterMark)255 int CloudSyncUtils::GetWaterMarkAndUpdateTime(std::vector<VBucket> &extend, Timestamp &waterMark)
256 {
257 for (auto &extendData: extend) {
258 if (extendData.empty() || extendData.find(CloudDbConstant::MODIFY_FIELD) == extendData.end()) {
259 LOGE("[CloudSyncer] VBucket is empty or MODIFY_FIELD doesn't exist.");
260 return -E_INTERNAL_ERROR;
261 }
262 if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::MODIFY_FIELD).index()) {
263 LOGE("[CloudSyncer] VBucket's MODIFY_FIELD doesn't fit int64_t.");
264 return -E_INTERNAL_ERROR;
265 }
266 if (extendData.empty() || extendData.find(CloudDbConstant::CREATE_FIELD) == extendData.end()) {
267 LOGE("[CloudSyncer] VBucket is empty or CREATE_FIELD doesn't exist.");
268 return -E_INTERNAL_ERROR;
269 }
270 if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::CREATE_FIELD).index()) {
271 LOGE("[CloudSyncer] VBucket's CREATE_FIELD doesn't fit int64_t.");
272 return -E_INTERNAL_ERROR;
273 }
274 waterMark = std::max(int64_t(waterMark), std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)));
275 int64_t modifyTime =
276 std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)) / CloudDbConstant::TEN_THOUSAND;
277 int64_t createTime =
278 std::get<int64_t>(extendData.at(CloudDbConstant::CREATE_FIELD)) / CloudDbConstant::TEN_THOUSAND;
279 extendData.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
280 extendData.insert_or_assign(CloudDbConstant::CREATE_FIELD, createTime);
281 }
282 return E_OK;
283 }
284
CheckCloudSyncDataEmpty(const CloudSyncData & uploadData)285 bool CloudSyncUtils::CheckCloudSyncDataEmpty(const CloudSyncData &uploadData)
286 {
287 return uploadData.insData.extend.empty() && uploadData.insData.record.empty() &&
288 uploadData.updData.extend.empty() && uploadData.updData.record.empty() &&
289 uploadData.delData.extend.empty() && uploadData.delData.record.empty() &&
290 uploadData.lockData.rowid.empty();
291 }
292
ModifyCloudDataTime(DistributedDB::VBucket & data)293 void CloudSyncUtils::ModifyCloudDataTime(DistributedDB::VBucket &data)
294 {
295 // data already check field modify_field and create_field
296 int64_t modifyTime = std::get<int64_t>(data[CloudDbConstant::MODIFY_FIELD]) * CloudDbConstant::TEN_THOUSAND;
297 int64_t createTime = std::get<int64_t>(data[CloudDbConstant::CREATE_FIELD]) * CloudDbConstant::TEN_THOUSAND;
298 data[CloudDbConstant::MODIFY_FIELD] = modifyTime;
299 data[CloudDbConstant::CREATE_FIELD] = createTime;
300 }
301
302 // After doing a batch upload, we need to use CloudSyncData's maximum timestamp to update the water mark;
UpdateExtendTime(CloudSyncData & uploadData,const int64_t & count,uint64_t taskId,Timestamp & waterMark)303 int CloudSyncUtils::UpdateExtendTime(CloudSyncData &uploadData, const int64_t &count, uint64_t taskId,
304 Timestamp &waterMark)
305 {
306 int ret = CloudSyncUtils::CheckCloudSyncDataValid(uploadData, uploadData.tableName, count);
307 if (ret != E_OK) {
308 LOGE("[CloudSyncer] Invalid Sync Data when get local water mark.");
309 return ret;
310 }
311 if (!uploadData.insData.extend.empty()) {
312 if (uploadData.insData.record.size() != uploadData.insData.extend.size()) {
313 LOGE("[CloudSyncer] Inconsistent size of inserted data.");
314 return -E_INTERNAL_ERROR;
315 }
316 ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.insData.extend, waterMark);
317 if (ret != E_OK) {
318 return ret;
319 }
320 }
321
322 if (!uploadData.updData.extend.empty()) {
323 if (uploadData.updData.record.size() != uploadData.updData.extend.size()) {
324 LOGE("[CloudSyncer] Inconsistent size of updated data, %d.", -E_INTERNAL_ERROR);
325 return -E_INTERNAL_ERROR;
326 }
327 ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.updData.extend, waterMark);
328 if (ret != E_OK) {
329 return ret;
330 }
331 }
332
333 if (!uploadData.delData.extend.empty()) {
334 if (uploadData.delData.record.size() != uploadData.delData.extend.size()) {
335 LOGE("[CloudSyncer] Inconsistent size of deleted data, %d.", -E_INTERNAL_ERROR);
336 return -E_INTERNAL_ERROR;
337 }
338 ret = CloudSyncUtils::GetWaterMarkAndUpdateTime(uploadData.delData.extend, waterMark);
339 if (ret != E_OK) {
340 return ret;
341 }
342 }
343 return E_OK;
344 }
345
UpdateLocalCache(OpType opType,const LogInfo & cloudInfo,const LogInfo & localInfo,std::map<std::string,LogInfo> & localLogInfoCache)346 void CloudSyncUtils::UpdateLocalCache(OpType opType, const LogInfo &cloudInfo, const LogInfo &localInfo,
347 std::map<std::string, LogInfo> &localLogInfoCache)
348 {
349 LogInfo updateLogInfo;
350 std::string hashKey(localInfo.hashKey.begin(), localInfo.hashKey.end());
351 bool updateCache = true;
352 switch (opType) {
353 case OpType::INSERT :
354 case OpType::UPDATE :
355 case OpType::DELETE: {
356 updateLogInfo = cloudInfo;
357 updateLogInfo.device = CloudDbConstant::DEFAULT_CLOUD_DEV;
358 updateLogInfo.hashKey = localInfo.hashKey;
359 if (opType == OpType::DELETE) {
360 updateLogInfo.flag |= static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE);
361 } else if (opType == OpType::INSERT) {
362 updateLogInfo.originDev = CloudDbConstant::DEFAULT_CLOUD_DEV;
363 }
364 break;
365 }
366 case OpType::CLEAR_GID:
367 case OpType::UPDATE_TIMESTAMP: {
368 updateLogInfo = localInfo;
369 updateLogInfo.cloudGid.clear();
370 updateLogInfo.sharingResource.clear();
371 break;
372 }
373 default:
374 updateCache = false;
375 break;
376 }
377 if (updateCache) {
378 localLogInfoCache[hashKey] = updateLogInfo;
379 }
380 }
381
SaveChangedData(ICloudSyncer::SyncParam & param,size_t dataIndex,const ICloudSyncer::DataInfo & dataInfo,std::vector<std::pair<Key,size_t>> & deletedList)382 int CloudSyncUtils::SaveChangedData(ICloudSyncer::SyncParam ¶m, size_t dataIndex,
383 const ICloudSyncer::DataInfo &dataInfo, std::vector<std::pair<Key, size_t>> &deletedList)
384 {
385 OpType opType = CalOpType(param, dataIndex);
386 Key hashKey = dataInfo.localInfo.logInfo.hashKey;
387 if (param.deletePrimaryKeySet.find(hashKey) != param.deletePrimaryKeySet.end()) {
388 if (opType == OpType::INSERT) {
389 (void)param.dupHashKeySet.insert(hashKey);
390 opType = OpType::UPDATE;
391 // only composite primary key needs to be processed.
392 if (!param.isSinglePrimaryKey) {
393 param.withoutRowIdData.updateData.emplace_back(dataIndex,
394 param.changedData.primaryData[ChangeType::OP_UPDATE].size());
395 }
396 }
397 }
398 // INSERT: for no primary key or composite primary key situation
399 if (!param.isSinglePrimaryKey && opType == OpType::INSERT) {
400 param.info.downLoadInfo.insertCount++;
401 param.withoutRowIdData.insertData.push_back(dataIndex);
402 return E_OK;
403 }
404 switch (opType) {
405 // INSERT: only for single primary key situation
406 case OpType::INSERT:
407 param.info.downLoadInfo.insertCount++;
408 return CloudSyncUtils::SaveChangedDataByType(
409 param.downloadData.data[dataIndex], param.changedData, dataInfo.localInfo, ChangeType::OP_INSERT);
410 case OpType::UPDATE:
411 param.info.downLoadInfo.updateCount++;
412 if (CloudSyncUtils::NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
413 return CloudSyncUtils::SaveChangedDataByType(param.downloadData.data[dataIndex], param.changedData,
414 dataInfo.localInfo, ChangeType::OP_UPDATE);
415 }
416 return E_OK;
417 case OpType::DELETE:
418 param.info.downLoadInfo.deleteCount++;
419 return CloudSyncUtils::SaveChangedDataByType(param.downloadData.data[dataIndex], param.changedData,
420 dataInfo.localInfo, ChangeType::OP_DELETE);
421 default:
422 return E_OK;
423 }
424 }
425
ClearWithoutData(ICloudSyncer::SyncParam & param)426 void CloudSyncUtils::ClearWithoutData(ICloudSyncer::SyncParam ¶m)
427 {
428 param.withoutRowIdData.insertData.clear();
429 param.withoutRowIdData.updateData.clear();
430 param.withoutRowIdData.assetInsertData.clear();
431 }
432
IsSkipAssetsMissingRecord(const std::vector<VBucket> & extend)433 bool CloudSyncUtils::IsSkipAssetsMissingRecord(const std::vector<VBucket> &extend)
434 {
435 if (extend.empty()) {
436 return false;
437 }
438 for (size_t i = 0; i < extend.size(); ++i) {
439 if (DBCommon::IsIntTypeRecordError(extend[i]) && !DBCommon::IsRecordAssetsMissing(extend[i])) {
440 return false;
441 }
442 }
443 return true;
444 }
445
FillAssetIdToAssets(CloudSyncBatch & data,int errorCode,const CloudWaterType & type)446 int CloudSyncUtils::FillAssetIdToAssets(CloudSyncBatch &data, int errorCode, const CloudWaterType &type)
447 {
448 if (data.extend.size() != data.assets.size()) {
449 LOGE("[CloudSyncUtils] size not match, extend:%zu assets:%zu.", data.extend.size(), data.assets.size());
450 return -E_CLOUD_ERROR;
451 }
452 int errCode = E_OK;
453 for (size_t i = 0; i < data.assets.size(); i++) {
454 if (data.assets[i].empty() || DBCommon::IsRecordIgnored(data.extend[i]) ||
455 (errorCode != E_OK &&
456 (DBCommon::IsRecordError(data.extend[i]) || DBCommon::IsRecordAssetsMissing(data.extend[i]))) ||
457 DBCommon::IsNeedCompensatedForUpload(data.extend[i], type)) {
458 if (errCode != E_OK && DBCommon::IsRecordAssetsMissing(data.extend[i])) {
459 LOGI("[CloudSyncUtils][FileAssetIdToAssets] errCode with assets missing, skip fill assets id");
460 }
461 continue;
462 }
463 for (auto it = data.assets[i].begin(); it != data.assets[i].end();) {
464 auto &[col, value] = *it;
465 if (!CheckIfContainsInsertAssets(value)) {
466 ++it;
467 continue;
468 }
469 auto extendIt = data.extend[i].find(col);
470 if (extendIt == data.extend[i].end()) {
471 LOGI("[CloudSyncUtils] Asset field name can not find in extend. key is:%s.", col.c_str());
472 it = data.assets[i].erase(it);
473 continue;
474 }
475 if (extendIt->second.index() != value.index()) {
476 LOGE("[CloudSyncUtils] Asset field type not same. extend:%zu, data:%zu",
477 extendIt->second.index(), value.index());
478 errCode = -E_CLOUD_ERROR;
479 ++it;
480 continue;
481 }
482 int ret = FillAssetIdToAssetData(extendIt->second, value);
483 if (ret != E_OK) {
484 LOGE("[CloudSyncUtils] fail to fill assetId, %d.", ret);
485 errCode = -E_CLOUD_ERROR;
486 }
487 ++it;
488 }
489 }
490 return errCode;
491 }
492
FillAssetIdToAssetData(const Type & extend,Type & assetData)493 int CloudSyncUtils::FillAssetIdToAssetData(const Type &extend, Type &assetData)
494 {
495 if (extend.index() == TYPE_INDEX<Asset>) {
496 if (std::get<Asset>(assetData).name != std::get<Asset>(extend).name) {
497 LOGE("[CloudSyncUtils][FillAssetIdToAssetData] Asset name can not find in extend.");
498 return -E_CLOUD_ERROR;
499 }
500 if (std::get<Asset>(extend).assetId.empty()) {
501 LOGE("[CloudSyncUtils][FillAssetIdToAssetData] Asset id is empty.");
502 return -E_CLOUD_ERROR;
503 }
504 std::get<Asset>(assetData).assetId = std::get<Asset>(extend).assetId;
505 }
506 if (extend.index() == TYPE_INDEX<Assets>) {
507 FillAssetIdToAssetsData(std::get<Assets>(extend), std::get<Assets>(assetData));
508 }
509 return E_OK;
510 }
511
FillAssetIdToAssetsData(const Assets & extend,Assets & assets)512 void CloudSyncUtils::FillAssetIdToAssetsData(const Assets &extend, Assets &assets)
513 {
514 for (auto it = assets.begin(); it != assets.end();) {
515 auto &asset = *it;
516 if (asset.flag != static_cast<uint32_t>(AssetOpType::INSERT)) {
517 ++it;
518 continue;
519 }
520 auto extendAssets = extend;
521 bool isAssetExisted = false;
522 for (const auto &extendAsset : extendAssets) {
523 if (asset.name == extendAsset.name && !extendAsset.assetId.empty()) {
524 asset.assetId = extendAsset.assetId;
525 isAssetExisted = true;
526 break;
527 }
528 }
529 if (!isAssetExisted) {
530 LOGI("Unable to sync local asset, skip fill assetId.");
531 it = assets.erase(it);
532 } else {
533 ++it;
534 }
535 }
536 }
537
CheckIfContainsInsertAssets(const Type & assetData)538 bool CloudSyncUtils::CheckIfContainsInsertAssets(const Type &assetData)
539 {
540 if (assetData.index() == TYPE_INDEX<Asset>) {
541 if (std::get<Asset>(assetData).flag != static_cast<uint32_t>(AssetOpType::INSERT)) {
542 return false;
543 }
544 } else if (assetData.index() == TYPE_INDEX<Assets>) {
545 bool hasInsertAsset = false;
546 for (const auto &asset : std::get<Assets>(assetData)) {
547 if (asset.flag == static_cast<uint32_t>(AssetOpType::INSERT)) {
548 hasInsertAsset = true;
549 break;
550 }
551 }
552 if (!hasInsertAsset) {
553 return false;
554 }
555 }
556 return true;
557 }
558
UpdateAssetsFlag(CloudSyncData & uploadData)559 void CloudSyncUtils::UpdateAssetsFlag(CloudSyncData &uploadData)
560 {
561 AssetOperationUtils::UpdateAssetsFlag(uploadData.insData.record, uploadData.insData.assets);
562 AssetOperationUtils::UpdateAssetsFlag(uploadData.updData.record, uploadData.updData.assets);
563 AssetOperationUtils::UpdateAssetsFlag(uploadData.delData.record, uploadData.delData.assets);
564 }
565
InsertOrReplaceChangedDataByType(ChangeType type,std::vector<Type> & pkVal,ChangedData & changedData)566 void CloudSyncUtils::InsertOrReplaceChangedDataByType(ChangeType type, std::vector<Type> &pkVal,
567 ChangedData &changedData)
568 {
569 // erase old changedData if exist
570 for (auto &changePkValList : changedData.primaryData) {
571 changePkValList.erase(std::remove_if(changePkValList.begin(), changePkValList.end(),
572 [&pkVal](const std::vector<Type> &existPkVal) {
573 return existPkVal == pkVal;
574 }), changePkValList.end());
575 }
576 // insert new changeData
577 changedData.primaryData[type].emplace_back(std::move(pkVal));
578 }
579
CalOpType(ICloudSyncer::SyncParam & param,size_t dataIndex)580 OpType CloudSyncUtils::CalOpType(ICloudSyncer::SyncParam ¶m, size_t dataIndex)
581 {
582 OpType opType = param.downloadData.opType[dataIndex];
583 if (opType != OpType::INSERT && opType != OpType::UPDATE) {
584 return opType;
585 }
586
587 std::vector<Type> cloudPkVal;
588 // use dataIndex as dataKey avoid get same pk with no pk schema
589 int errCode = CloudSyncUtils::GetCloudPkVals(param.downloadData.data[dataIndex], param.changedData.field, dataIndex,
590 cloudPkVal);
591 if (errCode != E_OK) {
592 LOGW("[CloudSyncUtils] Get pk from download data failed %d", errCode);
593 // use origin opType
594 return opType;
595 }
596 auto iter = std::find_if(param.insertPk.begin(), param.insertPk.end(), [&cloudPkVal](const auto &item) {
597 return item == cloudPkVal;
598 });
599 if (opType == OpType::INSERT) {
600 // record all insert pk in one batch
601 if (iter == param.insertPk.end()) {
602 param.insertPk.push_back(cloudPkVal);
603 }
604 return OpType::INSERT;
605 }
606 // notify with insert because this data not exist in local before query
607 return (iter == param.insertPk.end()) ? OpType::UPDATE : OpType::INSERT;
608 }
609
InitCompensatedSyncTaskInfo()610 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo()
611 {
612 CloudSyncer::CloudTaskInfo taskInfo;
613 taskInfo.priorityTask = true;
614 taskInfo.timeout = CloudDbConstant::CLOUD_DEFAULT_TIMEOUT;
615 taskInfo.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
616 taskInfo.callback = nullptr;
617 taskInfo.compensatedTask = true;
618 return taskInfo;
619 }
620
InitCompensatedSyncTaskInfo(const CloudSyncOption & option,const SyncProcessCallback & onProcess)621 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo(const CloudSyncOption &option,
622 const SyncProcessCallback &onProcess)
623 {
624 CloudSyncer::CloudTaskInfo taskInfo = InitCompensatedSyncTaskInfo();
625 taskInfo.callback = onProcess;
626 taskInfo.devices = option.devices;
627 taskInfo.prepareTraceId = option.prepareTraceId;
628 if (option.users.empty()) {
629 taskInfo.users.push_back("");
630 } else {
631 taskInfo.users = option.users;
632 }
633 taskInfo.lockAction = option.lockAction;
634 return taskInfo;
635 }
636
InitCompensatedSyncTaskInfo(const CloudSyncer::CloudTaskInfo & oriTaskInfo)637 CloudSyncer::CloudTaskInfo CloudSyncUtils::InitCompensatedSyncTaskInfo(const CloudSyncer::CloudTaskInfo &oriTaskInfo)
638 {
639 CloudSyncer::CloudTaskInfo taskInfo = InitCompensatedSyncTaskInfo();
640 taskInfo.lockAction = oriTaskInfo.lockAction;
641 taskInfo.users = oriTaskInfo.users;
642 taskInfo.devices = oriTaskInfo.devices;
643 taskInfo.storeId = oriTaskInfo.storeId;
644 taskInfo.prepareTraceId = oriTaskInfo.prepareTraceId;
645 return taskInfo;
646 }
647 }