1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "cloud/cloud_storage_utils.h"
17 #include <set>
18
19 #include "cloud/asset_operation_utils.h"
20 #include "cloud/cloud_db_constant.h"
21 #include "cloud/cloud_db_types.h"
22 #include "db_common.h"
23 #include "runtime_context.h"
24
25 namespace DistributedDB {
BindInt64(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)26 int CloudStorageUtils::BindInt64(int index, const VBucket &vBucket, const Field &field,
27 sqlite3_stmt *upsertStmt)
28 {
29 int64_t val = 0;
30 int errCode = GetValueFromVBucket<int64_t>(field.colName, vBucket, val);
31 if (field.nullable && errCode == -E_NOT_FOUND) {
32 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
33 } else {
34 if (errCode != E_OK) {
35 LOGE("get int from vbucket failed, %d", errCode);
36 return -E_CLOUD_ERROR;
37 }
38 errCode = SQLiteUtils::BindInt64ToStatement(upsertStmt, index, val);
39 }
40
41 if (errCode != E_OK) {
42 LOGE("Bind int to insert statement failed, %d", errCode);
43 }
44 return errCode;
45 }
46
BindBool(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)47 int CloudStorageUtils::BindBool(int index, const VBucket &vBucket, const Field &field,
48 sqlite3_stmt *upsertStmt)
49 {
50 bool val = false;
51 int errCode = GetValueFromVBucket<bool>(field.colName, vBucket, val);
52 if (field.nullable && errCode == -E_NOT_FOUND) {
53 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
54 } else {
55 if (errCode != E_OK) {
56 LOGE("get bool from vbucket failed, %d", errCode);
57 return -E_CLOUD_ERROR;
58 }
59 errCode = SQLiteUtils::BindInt64ToStatement(upsertStmt, index, val);
60 }
61
62 if (errCode != E_OK) {
63 LOGE("Bind bool to insert statement failed, %d", errCode);
64 }
65 return errCode;
66 }
67
BindDouble(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)68 int CloudStorageUtils::BindDouble(int index, const VBucket &vBucket, const Field &field,
69 sqlite3_stmt *upsertStmt)
70 {
71 double val = 0.0;
72 int errCode = GetValueFromVBucket<double>(field.colName, vBucket, val);
73 if (field.nullable && errCode == -E_NOT_FOUND) {
74 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
75 } else {
76 if (errCode != E_OK) {
77 LOGE("get double from vbucket failed, %d", errCode);
78 return -E_CLOUD_ERROR;
79 }
80 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_double(upsertStmt, index, val));
81 }
82
83 if (errCode != E_OK) {
84 LOGE("Bind double to insert statement failed, %d", errCode);
85 }
86 return errCode;
87 }
88
BindText(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)89 int CloudStorageUtils::BindText(int index, const VBucket &vBucket, const Field &field,
90 sqlite3_stmt *upsertStmt)
91 {
92 std::string str;
93 int errCode = GetValueFromVBucket<std::string>(field.colName, vBucket, str);
94 if (field.nullable && errCode == -E_NOT_FOUND) {
95 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
96 } else {
97 if (errCode != E_OK) {
98 LOGE("get string from vbucket failed, %d", errCode);
99 return -E_CLOUD_ERROR;
100 }
101 errCode = SQLiteUtils::BindTextToStatement(upsertStmt, index, str);
102 }
103
104 if (errCode != E_OK) {
105 LOGE("Bind string to insert statement failed, %d", errCode);
106 }
107 return errCode;
108 }
109
BindBlob(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)110 int CloudStorageUtils::BindBlob(int index, const VBucket &vBucket, const Field &field,
111 sqlite3_stmt *upsertStmt)
112 {
113 int errCode = E_OK;
114 Bytes val;
115 if (field.type == TYPE_INDEX<Bytes>) {
116 errCode = GetValueFromVBucket<Bytes>(field.colName, vBucket, val);
117 if (!(IsFieldValid(field, errCode))) {
118 goto ERROR;
119 }
120 } else if (field.type == TYPE_INDEX<Asset>) {
121 Asset asset;
122 errCode = GetValueFromVBucket(field.colName, vBucket, asset);
123 if (!(IsFieldValid(field, errCode))) {
124 goto ERROR;
125 }
126 RuntimeContext::GetInstance()->AssetToBlob(asset, val);
127 } else {
128 Assets assets;
129 errCode = GetValueFromVBucket(field.colName, vBucket, assets);
130 if (!(IsFieldValid(field, errCode))) {
131 goto ERROR;
132 }
133 RuntimeContext::GetInstance()->AssetsToBlob(assets, val);
134 }
135
136 if (errCode == -E_NOT_FOUND) {
137 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
138 } else {
139 errCode = SQLiteUtils::BindBlobToStatement(upsertStmt, index, val);
140 }
141 if (errCode != E_OK) {
142 LOGE("Bind blob to insert statement failed, %d", errCode);
143 }
144 return errCode;
145 ERROR:
146 LOGE("get blob from vbucket failed, %d", errCode);
147 return -E_CLOUD_ERROR;
148 }
149
BindAsset(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * upsertStmt)150 int CloudStorageUtils::BindAsset(int index, const VBucket &vBucket, const Field &field, sqlite3_stmt *upsertStmt)
151 {
152 int errCode;
153 Bytes val;
154 Type entry;
155 bool isExisted = GetTypeCaseInsensitive(field.colName, vBucket, entry);
156 if (!isExisted || entry.index() == TYPE_INDEX<Nil>) {
157 if (!field.nullable) {
158 LOGE("field value is not allowed to be null, %d", -E_CLOUD_ERROR);
159 return -E_CLOUD_ERROR;
160 }
161 return SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
162 }
163
164 Type type = entry;
165 if (field.type == TYPE_INDEX<Asset>) {
166 Asset asset;
167 errCode = GetValueFromOneField(type, asset);
168 if (errCode != E_OK) {
169 LOGE("can not get asset from vBucket when bind, %d", errCode);
170 return errCode;
171 }
172 asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
173 errCode = RuntimeContext::GetInstance()->AssetToBlob(asset, val);
174 } else if (field.type == TYPE_INDEX<Assets>) {
175 Assets assets;
176 errCode = GetValueFromOneField(type, assets);
177 if (errCode != E_OK) {
178 LOGE("can not get assets from vBucket when bind, %d", errCode);
179 return errCode;
180 }
181 if (!assets.empty()) {
182 for (auto &asset: assets) {
183 asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
184 }
185 errCode = RuntimeContext::GetInstance()->AssetsToBlob(assets, val);
186 }
187 } else {
188 LOGE("field type is not asset or assets, %d", -E_CLOUD_ERROR);
189 return -E_CLOUD_ERROR;
190 }
191 if (errCode != E_OK) {
192 LOGE("assets or asset to blob fail, %d", -E_CLOUD_ERROR);
193 return -E_CLOUD_ERROR;
194 }
195 if (val.empty()) {
196 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_null(upsertStmt, index));
197 } else {
198 errCode = SQLiteUtils::BindBlobToStatement(upsertStmt, index, val);
199 }
200 return errCode;
201 }
202
Int64ToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)203 int CloudStorageUtils::Int64ToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
204 std::vector<uint8_t> &value)
205 {
206 (void)collateType;
207 int64_t val = 0;
208 if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
209 return -E_CLOUD_ERROR;
210 }
211 DBCommon::StringToVector(std::to_string(val), value);
212 return E_OK;
213 }
214
BoolToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)215 int CloudStorageUtils::BoolToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
216 std::vector<uint8_t> &value)
217 {
218 (void)collateType;
219 bool val = false;
220 if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) { // LCOV_EXCL_BR_LINE
221 return -E_CLOUD_ERROR;
222 }
223 DBCommon::StringToVector(std::to_string(val ? 1 : 0), value);
224 return E_OK;
225 }
226
DoubleToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)227 int CloudStorageUtils::DoubleToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
228 std::vector<uint8_t> &value)
229 {
230 (void)collateType;
231 double val = 0.0;
232 if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) { // LCOV_EXCL_BR_LINE
233 return -E_CLOUD_ERROR;
234 }
235 std::ostringstream s;
236 s << val;
237 DBCommon::StringToVector(s.str(), value);
238 return E_OK;
239 }
240
TextToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)241 int CloudStorageUtils::TextToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
242 std::vector<uint8_t> &value)
243 {
244 std::string val;
245 if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) {
246 return -E_CLOUD_ERROR;
247 }
248 if (collateType == CollateType::COLLATE_NOCASE) {
249 std::transform(val.begin(), val.end(), val.begin(), ::toupper);
250 } else if (collateType == CollateType::COLLATE_RTRIM) {
251 DBCommon::RTrim(val);
252 }
253
254 DBCommon::StringToVector(val, value);
255 return E_OK;
256 }
257
BlobToVector(const VBucket & vBucket,const Field & field,CollateType collateType,std::vector<uint8_t> & value)258 int CloudStorageUtils::BlobToVector(const VBucket &vBucket, const Field &field, CollateType collateType,
259 std::vector<uint8_t> &value)
260 {
261 (void)collateType;
262 if (field.type == TYPE_INDEX<Bytes>) { // LCOV_EXCL_BR_LINE
263 return CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, value);
264 } else if (field.type == TYPE_INDEX<Asset>) {
265 Asset val;
266 if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) { // LCOV_EXCL_BR_LINE
267 return -E_CLOUD_ERROR;
268 }
269 int errCode = RuntimeContext::GetInstance()->AssetToBlob(val, value);
270 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
271 LOGE("asset to blob fail, %d", errCode);
272 }
273 return errCode;
274 } else {
275 Assets val;
276 if (CloudStorageUtils::GetValueFromVBucket(field.colName, vBucket, val) != E_OK) { // LCOV_EXCL_BR_LINE
277 return -E_CLOUD_ERROR;
278 }
279 int errCode = RuntimeContext::GetInstance()->AssetsToBlob(val, value);
280 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
281 LOGE("assets to blob fail, %d", errCode);
282 }
283 return errCode;
284 }
285 }
286
GetCloudPrimaryKey(const TableSchema & tableSchema)287 std::set<std::string> CloudStorageUtils::GetCloudPrimaryKey(const TableSchema &tableSchema)
288 {
289 std::set<std::string> pkSet;
290 for (const auto &field : tableSchema.fields) {
291 if (field.primary) {
292 pkSet.insert(field.colName);
293 }
294 }
295 return pkSet;
296 }
297
GetCloudAsset(const TableSchema & tableSchema)298 std::vector<Field> CloudStorageUtils::GetCloudAsset(const TableSchema &tableSchema)
299 {
300 std::vector<Field> assetFields;
301 for (const auto &item: tableSchema.fields) {
302 if (item.type != TYPE_INDEX<Asset> && item.type != TYPE_INDEX<Assets>) {
303 continue;
304 }
305 assetFields.push_back(item);
306 }
307 return assetFields;
308 }
309
GetCloudPrimaryKeyField(const TableSchema & tableSchema,bool sortByName)310 std::vector<Field> CloudStorageUtils::GetCloudPrimaryKeyField(const TableSchema &tableSchema, bool sortByName)
311 {
312 std::vector<Field> pkVec;
313 for (const auto &field : tableSchema.fields) {
314 if (field.primary) {
315 pkVec.push_back(field);
316 }
317 }
318 if (sortByName) {
319 std::sort(pkVec.begin(), pkVec.end(), [](const Field &a, const Field &b) {
320 return a.colName < b.colName;
321 });
322 }
323 return pkVec;
324 }
325
GetCloudPrimaryKeyFieldMap(const TableSchema & tableSchema,bool sortByUpper)326 std::map<std::string, Field> CloudStorageUtils::GetCloudPrimaryKeyFieldMap(const TableSchema &tableSchema,
327 bool sortByUpper)
328 {
329 std::map<std::string, Field> pkMap;
330 for (const auto &field : tableSchema.fields) {
331 if (field.primary) {
332 if (sortByUpper) {
333 pkMap[DBCommon::ToUpperCase(field.colName)] = field;
334 } else {
335 pkMap[field.colName] = field;
336 }
337 }
338 }
339 return pkMap;
340 }
341
GetAssetFieldsFromSchema(const TableSchema & tableSchema,const VBucket & vBucket,std::vector<Field> & fields)342 int CloudStorageUtils::GetAssetFieldsFromSchema(const TableSchema &tableSchema, const VBucket &vBucket,
343 std::vector<Field> &fields)
344 {
345 for (const auto &field: tableSchema.fields) {
346 Type type;
347 bool isExisted = GetTypeCaseInsensitive(field.colName, vBucket, type);
348 if (!isExisted) {
349 continue;
350 }
351 if (type.index() != TYPE_INDEX<Asset> && type.index() != TYPE_INDEX<Assets>) {
352 continue;
353 }
354 fields.push_back(field);
355 }
356 if (fields.empty()) {
357 return -E_CLOUD_ERROR;
358 }
359 return E_OK;
360 }
361
IsContainsPrimaryKey(const TableSchema & tableSchema)362 bool CloudStorageUtils::IsContainsPrimaryKey(const TableSchema &tableSchema)
363 {
364 for (const auto &field : tableSchema.fields) {
365 if (field.primary) {
366 return true;
367 }
368 }
369 return false;
370 }
371
ObtainAssetFromVBucket(const VBucket & vBucket,VBucket & asset)372 void CloudStorageUtils::ObtainAssetFromVBucket(const VBucket &vBucket, VBucket &asset)
373 {
374 for (const auto &item: vBucket) {
375 if (IsAsset(item.second)) {
376 Asset data = std::get<Asset>(item.second);
377 asset.insert_or_assign(item.first, data);
378 } else if (IsAssets(item.second)) {
379 Assets data = std::get<Assets>(item.second);
380 asset.insert_or_assign(item.first, data);
381 }
382 }
383 }
384
StatusToFlag(AssetStatus status)385 AssetOpType CloudStorageUtils::StatusToFlag(AssetStatus status)
386 {
387 switch (AssetOperationUtils::EraseBitMask(status)) {
388 case AssetStatus::INSERT:
389 return AssetOpType::INSERT;
390 case AssetStatus::DELETE:
391 return AssetOpType::DELETE;
392 case AssetStatus::UPDATE:
393 return AssetOpType::UPDATE;
394 default:
395 return AssetOpType::NO_CHANGE;
396 }
397 }
398
FlagToStatus(AssetOpType opType)399 AssetStatus CloudStorageUtils::FlagToStatus(AssetOpType opType)
400 {
401 switch (opType) {
402 case AssetOpType::INSERT:
403 return AssetStatus::INSERT;
404 case AssetOpType::DELETE:
405 return AssetStatus::DELETE;
406 case AssetOpType::UPDATE:
407 return AssetStatus::UPDATE;
408 default:
409 return AssetStatus::NORMAL;
410 }
411 }
412
ChangeAssetsOnVBucketToAsset(VBucket & vBucket,std::vector<Field> & fields)413 void CloudStorageUtils::ChangeAssetsOnVBucketToAsset(VBucket &vBucket, std::vector<Field> &fields)
414 {
415 for (const Field &field: fields) {
416 if (field.type == TYPE_INDEX<Asset>) {
417 Type asset = GetAssetFromAssets(vBucket[field.colName]);
418 vBucket[field.colName] = asset;
419 }
420 }
421 }
422
GetAssetFromAssets(Type & value)423 Type CloudStorageUtils::GetAssetFromAssets(Type &value)
424 {
425 Asset assetVal;
426 int errCode = GetValueFromType(value, assetVal);
427 if (errCode == E_OK) {
428 return assetVal;
429 }
430
431 Assets assets;
432 errCode = GetValueFromType(value, assets);
433 if (errCode != E_OK) {
434 return Nil();
435 }
436
437 for (Asset &asset: assets) {
438 uint32_t lowStatus = AssetOperationUtils::EraseBitMask(asset.status);
439 if ((asset.flag == static_cast<uint32_t>(AssetOpType::DELETE) && (lowStatus == AssetStatus::ABNORMAL ||
440 lowStatus == AssetStatus::NORMAL)) || asset.flag != static_cast<uint32_t>(AssetOpType::DELETE)) {
441 return std::move(asset);
442 }
443 }
444 return Nil();
445 }
446
FillAssetBeforeDownload(Asset & asset)447 int CloudStorageUtils::FillAssetBeforeDownload(Asset &asset)
448 {
449 AssetOpType flag = static_cast<AssetOpType>(asset.flag);
450 AssetStatus status = static_cast<AssetStatus>(asset.status);
451 uint32_t lowStatus = AssetOperationUtils::EraseBitMask(asset.status);
452 switch (flag) {
453 case AssetOpType::DELETE: {
454 // these asset no need to download, just remove before download
455 if (lowStatus == static_cast<uint32_t>(AssetStatus::DELETE) ||
456 lowStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
457 (asset.status == (AssetStatus::DOWNLOADING | AssetStatus::DOWNLOAD_WITH_NULL))) {
458 return -E_NOT_FOUND;
459 }
460 break;
461 }
462 case AssetOpType::INSERT:
463 case AssetOpType::UPDATE: {
464 if (status != AssetStatus::NORMAL) {
465 asset.hash = std::string("");
466 }
467 break;
468 }
469 default:
470 break;
471 }
472 return E_OK;
473 }
474
FillAssetAfterDownload(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)475 int CloudStorageUtils::FillAssetAfterDownload(Asset &asset, Asset &dbAsset,
476 AssetOperationUtils::AssetOpType assetOpType)
477 {
478 if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
479 return E_OK;
480 }
481 dbAsset = asset;
482 AssetOpType flag = static_cast<AssetOpType>(asset.flag);
483 if (asset.status != AssetStatus::NORMAL) {
484 return E_OK;
485 }
486 switch (flag) {
487 case AssetOpType::DELETE: {
488 return -E_NOT_FOUND;
489 }
490 default:
491 break;
492 }
493 return E_OK;
494 }
495
FillAssetsAfterDownload(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)496 void CloudStorageUtils::FillAssetsAfterDownload(Assets &assets, Assets &dbAssets,
497 const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
498 {
499 MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetAfterDownload);
500 }
501
FillAssetForUpload(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)502 int CloudStorageUtils::FillAssetForUpload(Asset &asset, Asset &dbAsset, AssetOperationUtils::AssetOpType assetOpType)
503 {
504 if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
505 // db assetId may be empty, need to be based on cache
506 // Notes: Assign happened when dbAsset.assetId is empty because of asset.assetId may be empty.
507 if (dbAsset.assetId.empty()) {
508 dbAsset.assetId = asset.assetId;
509 }
510 return E_OK;
511 }
512 AssetStatus status = static_cast<AssetStatus>(dbAsset.status);
513 dbAsset = asset;
514 switch (StatusToFlag(status)) {
515 case AssetOpType::INSERT:
516 case AssetOpType::UPDATE:
517 case AssetOpType::NO_CHANGE: {
518 dbAsset.status = static_cast<uint32_t>(AssetStatus::NORMAL);
519 break;
520 }
521 case AssetOpType::DELETE: {
522 return -E_NOT_FOUND;
523 }
524 default: {
525 break;
526 }
527 }
528 dbAsset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
529 return E_OK;
530 }
531
FillAssetsForUpload(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)532 void CloudStorageUtils::FillAssetsForUpload(Assets &assets, Assets &dbAssets,
533 const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
534 {
535 MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetForUpload);
536 }
537
FillAssetBeforeUpload(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)538 int CloudStorageUtils::FillAssetBeforeUpload(Asset &asset, Asset &dbAsset, AssetOperationUtils::AssetOpType assetOpType)
539 {
540 if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
541 return E_OK;
542 }
543 dbAsset = asset;
544 switch (static_cast<AssetOpType>(asset.flag)) {
545 case AssetOpType::INSERT:
546 case AssetOpType::UPDATE:
547 case AssetOpType::DELETE:
548 case AssetOpType::NO_CHANGE:
549 dbAsset.status |= static_cast<uint32_t>(AssetStatus::UPLOADING);
550 break;
551 default:
552 break;
553 }
554 dbAsset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
555 return E_OK;
556 }
557
FillAssetsBeforeUpload(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)558 void CloudStorageUtils::FillAssetsBeforeUpload(Assets &assets, Assets &dbAssets, const std::map<std::string,
559 AssetOperationUtils::AssetOpType> &assetOpTypeMap)
560 {
561 MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetBeforeUpload);
562 }
563
PrepareToFillAssetFromVBucket(VBucket & vBucket,std::function<int (Asset &)> fillAsset)564 void CloudStorageUtils::PrepareToFillAssetFromVBucket(VBucket &vBucket, std::function<int(Asset &)> fillAsset)
565 {
566 for (auto &item: vBucket) {
567 if (IsAsset(item.second)) {
568 Asset asset;
569 GetValueFromType(item.second, asset);
570 fillAsset(asset);
571 vBucket[item.first] = asset;
572 } else if (IsAssets(item.second)) {
573 Assets assets;
574 GetValueFromType(item.second, assets);
575 for (auto it = assets.begin(); it != assets.end();) {
576 fillAsset(*it) == -E_NOT_FOUND ? it = assets.erase(it) : ++it;
577 }
578 vBucket[item.first] = assets;
579 }
580 }
581 }
582
FillAssetFromVBucketFinish(const AssetOperationUtils::RecordAssetOpType & assetOpType,VBucket & vBucket,VBucket & dbAssets,std::function<int (Asset &,Asset &,AssetOperationUtils::AssetOpType)> fillAsset,std::function<void (Assets &,Assets &,const std::map<std::string,AssetOperationUtils::AssetOpType> &)> fillAssets)583 void CloudStorageUtils::FillAssetFromVBucketFinish(const AssetOperationUtils::RecordAssetOpType &assetOpType,
584 VBucket &vBucket, VBucket &dbAssets,
585 std::function<int(Asset &, Asset &, AssetOperationUtils::AssetOpType)> fillAsset,
586 std::function<void(Assets &, Assets &,
587 const std::map<std::string, AssetOperationUtils::AssetOpType> &)> fillAssets)
588 {
589 for (auto &item: dbAssets) {
590 if (IsAsset(item.second)) {
591 Asset cacheItem;
592 GetValueFromType(vBucket[item.first], cacheItem);
593 Asset dbItem;
594 GetValueFromType(item.second, dbItem);
595 AssetOperationUtils::AssetOpType opType = AssetOperationUtils::AssetOpType::NOT_HANDLE;
596 auto iterCol = assetOpType.find(item.first);
597 if (iterCol != assetOpType.end() && iterCol->second.find(dbItem.name) != iterCol->second.end()) {
598 opType = iterCol->second.at(dbItem.name);
599 }
600 int errCode = fillAsset(cacheItem, dbItem, opType);
601 if (errCode != E_OK) {
602 dbAssets[item.first] = Nil();
603 } else {
604 dbAssets[item.first] = dbItem;
605 }
606 continue;
607 }
608 if (IsAssets(item.second)) {
609 Assets cacheItems;
610 GetValueFromType(vBucket[item.first], cacheItems);
611 Assets dbItems;
612 GetValueFromType(item.second, dbItems);
613 auto iterCol = assetOpType.find(item.first);
614 if (iterCol == assetOpType.end()) {
615 fillAssets(cacheItems, dbItems, {});
616 } else {
617 fillAssets(cacheItems, dbItems, iterCol->second);
618 }
619 if (dbItems.empty()) {
620 dbAssets[item.first] = Nil();
621 } else {
622 dbAssets[item.first] = dbItems;
623 }
624 }
625 }
626 }
627
IsAsset(const Type & type)628 bool CloudStorageUtils::IsAsset(const Type &type)
629 {
630 return type.index() == TYPE_INDEX<Asset>;
631 }
632
IsAssets(const Type & type)633 bool CloudStorageUtils::IsAssets(const Type &type)
634 {
635 return type.index() == TYPE_INDEX<Assets>;
636 }
637
CalculateHashKeyForOneField(const Field & field,const VBucket & vBucket,bool allowEmpty,CollateType collateType,std::vector<uint8_t> & hashValue)638 int CloudStorageUtils::CalculateHashKeyForOneField(const Field &field, const VBucket &vBucket, bool allowEmpty,
639 CollateType collateType, std::vector<uint8_t> &hashValue)
640 {
641 Type type;
642 bool isExisted = GetTypeCaseInsensitive(field.colName, vBucket, type);
643 if (allowEmpty && !isExisted) {
644 return E_OK; // if vBucket from cloud doesn't contain primary key and allowEmpty, no need to calculate hash
645 }
646 static std::map<int32_t, std::function<int(const VBucket &, const Field &, CollateType,
647 std::vector<uint8_t> &)>> toVecFunc = {
648 { TYPE_INDEX<int64_t>, &CloudStorageUtils::Int64ToVector },
649 { TYPE_INDEX<bool>, &CloudStorageUtils::BoolToVector },
650 { TYPE_INDEX<double>, &CloudStorageUtils::DoubleToVector },
651 { TYPE_INDEX<std::string>, &CloudStorageUtils::TextToVector },
652 { TYPE_INDEX<Bytes>, &CloudStorageUtils::BlobToVector },
653 { TYPE_INDEX<Asset>, &CloudStorageUtils::BlobToVector },
654 { TYPE_INDEX<Assets>, &CloudStorageUtils::BlobToVector },
655 };
656 auto it = toVecFunc.find(field.type);
657 if (it == toVecFunc.end()) {
658 LOGE("unknown cloud type when convert field to vector.");
659 return -E_CLOUD_ERROR;
660 }
661 std::vector<uint8_t> value;
662 int errCode = it->second(vBucket, field, collateType, value);
663 if (errCode != E_OK) {
664 LOGE("convert cloud field fail, %d", errCode);
665 return errCode;
666 }
667 return DBCommon::CalcValueHash(value, hashValue);
668 }
669
IsAssetsContainDuplicateAsset(Assets & assets)670 bool CloudStorageUtils::IsAssetsContainDuplicateAsset(Assets &assets)
671 {
672 std::set<std::string> set;
673 for (const auto &asset : assets) {
674 if (set.find(asset.name) != set.end()) {
675 LOGE("assets contain duplicate Asset");
676 return true;
677 }
678 set.insert(asset.name);
679 }
680 return false;
681 }
682
EraseNoChangeAsset(std::map<std::string,Assets> & assetsMap)683 void CloudStorageUtils::EraseNoChangeAsset(std::map<std::string, Assets> &assetsMap)
684 {
685 for (auto items = assetsMap.begin(); items != assetsMap.end();) {
686 for (auto item = items->second.begin(); item != items->second.end();) {
687 if (static_cast<AssetOpType>((*item).flag) == AssetOpType::NO_CHANGE) {
688 item = items->second.erase(item);
689 } else {
690 item++;
691 }
692 }
693 if (items->second.empty()) {
694 items = assetsMap.erase(items);
695 } else {
696 items++;
697 }
698 }
699 }
700
MergeDownloadAsset(std::map<std::string,Assets> & downloadAssets,std::map<std::string,Assets> & mergeAssets)701 void CloudStorageUtils::MergeDownloadAsset(std::map<std::string, Assets> &downloadAssets,
702 std::map<std::string, Assets> &mergeAssets)
703 {
704 for (auto &items: mergeAssets) {
705 auto downloadItem = downloadAssets.find(items.first);
706 if (downloadItem == downloadAssets.end()) { // LCOV_EXCL_BR_LINE
707 continue;
708 }
709 std::map<std::string, size_t> beCoveredAssetsMap = GenAssetsIndexMap(items.second);
710 for (const Asset &asset: downloadItem->second) {
711 auto it = beCoveredAssetsMap.find(asset.name);
712 if (it == beCoveredAssetsMap.end()) { // LCOV_EXCL_BR_LINE
713 continue;
714 }
715 items.second[it->second] = asset;
716 }
717 }
718 }
719
GenAssetsIndexMap(Assets & assets)720 std::map<std::string, size_t> CloudStorageUtils::GenAssetsIndexMap(Assets &assets)
721 {
722 // key of assetsIndexMap is name of asset, the value of it is index.
723 std::map<std::string, size_t> assetsIndexMap;
724 for (size_t i = 0; i < assets.size(); i++) {
725 assetsIndexMap[assets[i].name] = i;
726 }
727 return assetsIndexMap;
728 }
729
IsVbucketContainsAllPK(const VBucket & vBucket,const std::set<std::string> & pkSet)730 bool CloudStorageUtils::IsVbucketContainsAllPK(const VBucket &vBucket, const std::set<std::string> &pkSet)
731 {
732 if (pkSet.empty()) {
733 return false;
734 }
735 for (const auto &pk : pkSet) {
736 Type type;
737 bool isExisted = GetTypeCaseInsensitive(pk, vBucket, type);
738 if (!isExisted) {
739 return false;
740 }
741 }
742 return true;
743 }
744
IsSharedTable(const TableSchema & tableSchema)745 bool CloudStorageUtils::IsSharedTable(const TableSchema &tableSchema)
746 {
747 return tableSchema.sharedTableName == tableSchema.name;
748 }
749
IsViolationOfConstraints(const std::string & name,const std::vector<FieldInfo> & fieldInfos)750 static bool IsViolationOfConstraints(const std::string &name, const std::vector<FieldInfo> &fieldInfos)
751 {
752 for (const auto &field : fieldInfos) {
753 if (name != field.GetFieldName()) {
754 continue;
755 }
756 if (field.GetStorageType() == StorageType::STORAGE_TYPE_REAL) {
757 LOGE("[ConstraintsCheckForCloud] Not support create distributed table with real primary key.");
758 return true;
759 } else if (field.IsAssetType() || field.IsAssetsType()) {
760 LOGE("[ConstraintsCheckForCloud] Not support create distributed table with asset primary key.");
761 return true;
762 } else {
763 return false;
764 }
765 }
766 return false;
767 }
768
ConstraintsCheckForCloud(const TableInfo & table,const std::string & trimmedSql)769 int CloudStorageUtils::ConstraintsCheckForCloud(const TableInfo &table, const std::string &trimmedSql)
770 {
771 if (DBCommon::HasConstraint(trimmedSql, "UNIQUE", " ,", " ,)(")) {
772 LOGE("[ConstraintsCheckForCloud] Not support create distributed table with 'UNIQUE' constraint.");
773 return -E_NOT_SUPPORT;
774 }
775
776 const std::map<int, FieldName> &primaryKeys = table.GetPrimaryKey();
777 const std::vector<FieldInfo> &fieldInfos = table.GetFieldInfos();
778 for (const auto &item : primaryKeys) {
779 if (IsViolationOfConstraints(item.second, fieldInfos)) {
780 return -E_NOT_SUPPORT;
781 }
782 }
783 return E_OK;
784 }
785
CheckAssetStatus(const Assets & assets)786 bool CloudStorageUtils::CheckAssetStatus(const Assets &assets)
787 {
788 for (const Asset &asset: assets) {
789 if (AssetOperationUtils::EraseBitMask(asset.status) > static_cast<uint32_t>(AssetStatus::UPDATE)) {
790 LOGE("assets contain invalid status:[%u]", asset.status);
791 return false;
792 }
793 }
794 return true;
795 }
796
GetTableRefUpdateSql(const TableInfo & table,OpType opType)797 std::string CloudStorageUtils::GetTableRefUpdateSql(const TableInfo &table, OpType opType)
798 {
799 std::string sql;
800 std::string rowid = std::string(DBConstant::SQLITE_INNER_ROWID);
801 for (const auto &reference : table.GetTableReference()) {
802 if (reference.columns.empty()) {
803 return "";
804 }
805 std::string sourceLogName = DBCommon::GetLogTableName(reference.sourceTableName);
806 sql += " UPDATE " + sourceLogName + " SET timestamp=get_raw_sys_time(), flag=flag|0x02 WHERE ";
807 int index = 0;
808 for (const auto &itCol : reference.columns) {
809 if (opType != OpType::UPDATE) {
810 continue;
811 }
812 if (index++ != 0) {
813 sql += " OR ";
814 }
815 sql += " (OLD." + itCol.second + " IS NOT " + " NEW." + itCol.second + ")";
816 }
817 if (opType == OpType::UPDATE) {
818 sql += " AND ";
819 }
820 sql += " (flag&0x08=0x00) AND data_key IN (SELECT " + sourceLogName + ".data_key FROM " + sourceLogName +
821 " LEFT JOIN " + reference.sourceTableName + " ON " + sourceLogName + ".data_key = " +
822 reference.sourceTableName + "." + rowid + " WHERE ";
823 index = 0;
824 for (const auto &itCol : reference.columns) {
825 if (index++ != 0) {
826 sql += " OR ";
827 }
828 if (opType == OpType::UPDATE) {
829 sql += itCol.first + "=OLD." + itCol.second + " OR " + itCol.first + "=NEW." + itCol.second;
830 } else if (opType == OpType::INSERT) {
831 sql += itCol.first + "=NEW." + itCol.second;
832 } else if (opType == OpType::DELETE) {
833 sql += itCol.first + "=OLD." + itCol.second;
834 }
835 }
836 sql += ");";
837 }
838 return sql;
839 }
840
GetLeftJoinLogSql(const std::string & tableName,bool logAsTableA)841 std::string CloudStorageUtils::GetLeftJoinLogSql(const std::string &tableName, bool logAsTableA)
842 {
843 std::string sql;
844 if (logAsTableA) {
845 sql += " FROM '" + DBCommon::GetLogTableName(tableName) + "' AS a LEFT JOIN '" + tableName + "' AS b " +
846 " ON (a.data_key = b." + std::string(DBConstant::SQLITE_INNER_ROWID) + ")";
847 } else {
848 sql += " FROM '" + DBCommon::GetLogTableName(tableName) + "' AS b LEFT JOIN '" + tableName + "' AS a " +
849 " ON (b.data_key = a." + std::string(DBConstant::SQLITE_INNER_ROWID) + ")";
850 }
851 return sql;
852 }
853
GetUpdateLockChangedSql()854 std::string CloudStorageUtils::GetUpdateLockChangedSql()
855 {
856 return " status = CASE WHEN status == 2 THEN 3 ELSE status END";
857 }
858
GetDeleteLockChangedSql()859 std::string CloudStorageUtils::GetDeleteLockChangedSql()
860 {
861 return " status = CASE WHEN status == 2 or status == 3 THEN 1 ELSE status END";
862 }
863
ChkFillCloudAssetParam(const CloudSyncBatch & data,int errCode)864 bool CloudStorageUtils::ChkFillCloudAssetParam(const CloudSyncBatch &data, int errCode)
865 {
866 if (data.assets.empty()) {
867 errCode = E_OK;
868 return true;
869 }
870 if (data.rowid.empty() || data.timestamp.empty()) {
871 errCode = -E_INVALID_ARGS;
872 LOGE("param is empty when fill cloud Asset. rowidN:%u, timeN:%u", errCode, data.rowid.size(),
873 data.timestamp.size());
874 return true;
875 }
876 if (data.assets.size() != data.rowid.size() || data.assets.size() != data.timestamp.size() ||
877 data.assets.size() != data.hashKey.size() || data.assets.size() != data.extend.size()) {
878 errCode = -E_INVALID_ARGS;
879 LOGE("the num of param is invalid when fill cloud Asset. assetsN:%u, rowidN:%u, timeN:%u, "
880 "hashKeyN:%u, extendN:%u", data.assets.size(), data.rowid.size(), data.timestamp.size(),
881 data.hashKey.size(), data.extend.size());
882 return true;
883 }
884 return false;
885 }
886
GetToBeRemoveAssets(const VBucket & vBucket,const AssetOperationUtils::RecordAssetOpType & assetOpType,std::vector<Asset> & removeAssets)887 void CloudStorageUtils::GetToBeRemoveAssets(const VBucket &vBucket,
888 const AssetOperationUtils::RecordAssetOpType &assetOpType, std::vector<Asset> &removeAssets)
889 {
890 for (const auto &col: assetOpType) {
891 Type itItem;
892 bool isExisted = GetTypeCaseInsensitive(col.first, vBucket, itItem);
893 if (!isExisted) {
894 continue;
895 }
896 if (!CloudStorageUtils::IsAsset(itItem) && !CloudStorageUtils::IsAssets(itItem)) {
897 continue;
898 }
899 if (CloudStorageUtils::IsAsset(itItem)) {
900 Asset delAsset;
901 GetValueFromType(itItem, delAsset);
902 auto itOp = col.second.find(delAsset.name);
903 if (itOp != col.second.end() && itOp->second == AssetOperationUtils::AssetOpType::NOT_HANDLE
904 && delAsset.flag != static_cast<uint32_t>(AssetOpType::DELETE)) {
905 removeAssets.push_back(delAsset);
906 }
907 continue;
908 }
909 Assets assets;
910 GetValueFromType(itItem, assets);
911 for (const auto &asset: assets) {
912 auto itOp = col.second.find(asset.name);
913 if (itOp == col.second.end() || itOp->second == AssetOperationUtils::AssetOpType::HANDLE ||
914 asset.flag == static_cast<uint32_t>(AssetOpType::DELETE)) {
915 continue;
916 }
917 removeAssets.push_back(asset);
918 }
919 }
920 }
921
FillAssetForUploadFailed(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)922 int CloudStorageUtils::FillAssetForUploadFailed(Asset &asset, Asset &dbAsset,
923 AssetOperationUtils::AssetOpType assetOpType)
924 {
925 dbAsset.assetId = asset.assetId;
926 dbAsset.status &= ~AssetStatus::UPLOADING;
927 return E_OK;
928 }
929
FillAssetsForUploadFailed(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)930 void CloudStorageUtils::FillAssetsForUploadFailed(Assets &assets, Assets &dbAssets,
931 const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
932 {
933 MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetForUploadFailed);
934 }
935
FillAssetAfterDownloadFail(Asset & asset,Asset & dbAsset,AssetOperationUtils::AssetOpType assetOpType)936 int CloudStorageUtils::FillAssetAfterDownloadFail(Asset &asset, Asset &dbAsset,
937 AssetOperationUtils::AssetOpType assetOpType)
938 {
939 AssetStatus status = static_cast<AssetStatus>(asset.status);
940 if (assetOpType == AssetOperationUtils::AssetOpType::NOT_HANDLE) {
941 return E_OK;
942 }
943 if (status != AssetStatus::ABNORMAL) {
944 return FillAssetAfterDownload(asset, dbAsset, assetOpType);
945 }
946 AssetOpType flag = static_cast<AssetOpType>(asset.flag);
947 dbAsset = asset;
948 switch (flag) {
949 case AssetOpType::INSERT:
950 case AssetOpType::DELETE:
951 case AssetOpType::UPDATE: {
952 dbAsset.hash = std::string("");
953 break;
954 }
955 default:
956 // other flag type do not need to clear hash
957 break;
958 }
959 return E_OK;
960 }
961
FillAssetsAfterDownloadFail(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap)962 void CloudStorageUtils::FillAssetsAfterDownloadFail(Assets &assets, Assets &dbAssets,
963 const std::map<std::string, AssetOperationUtils::AssetOpType> &assetOpTypeMap)
964 {
965 MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetAfterDownloadFail);
966 }
967
MergeAssetWithFillFunc(Assets & assets,Assets & dbAssets,const std::map<std::string,AssetOperationUtils::AssetOpType> & assetOpTypeMap,std::function<int (Asset &,Asset &,AssetOperationUtils::AssetOpType)> fillAsset)968 void CloudStorageUtils::MergeAssetWithFillFunc(Assets &assets, Assets &dbAssets, const std::map<std::string,
969 AssetOperationUtils::AssetOpType> &assetOpTypeMap,
970 std::function<int(Asset &, Asset &, AssetOperationUtils::AssetOpType)> fillAsset)
971 {
972 std::map<std::string, size_t> indexMap = GenAssetsIndexMap(assets);
973 for (auto dbAsset = dbAssets.begin(); dbAsset != dbAssets.end();) {
974 Asset cacheAsset;
975 auto it = indexMap.find(dbAsset->name);
976 if (it != indexMap.end()) {
977 cacheAsset = assets[it->second];
978 }
979 AssetOperationUtils::AssetOpType opType = AssetOperationUtils::AssetOpType::NOT_HANDLE;
980 auto iterOp = assetOpTypeMap.find(dbAsset->name);
981 if (iterOp != assetOpTypeMap.end()) {
982 opType = iterOp->second;
983 }
984 if (fillAsset(cacheAsset, *dbAsset, opType) == -E_NOT_FOUND) {
985 dbAsset = dbAssets.erase(dbAsset);
986 } else {
987 dbAsset++;
988 }
989 }
990 }
991
GetHashValueWithPrimaryKeyMap(const VBucket & vBucket,const TableSchema & tableSchema,const TableInfo & localTable,const std::map<std::string,Field> & pkMap,bool allowEmpty)992 std::pair<int, std::vector<uint8_t>> CloudStorageUtils::GetHashValueWithPrimaryKeyMap(const VBucket &vBucket,
993 const TableSchema &tableSchema, const TableInfo &localTable, const std::map<std::string, Field> &pkMap,
994 bool allowEmpty)
995 {
996 int errCode = E_OK;
997 std::vector<uint8_t> hashValue;
998 if (pkMap.size() == 0) {
999 LOGE("do not support get hashValue when primaryKey map is empty.");
1000 return { -E_INTERNAL_ERROR, {} };
1001 } else if (pkMap.size() == 1) {
1002 std::vector<Field> pkVec = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema);
1003 FieldInfoMap fieldInfos = localTable.GetFields();
1004 if (fieldInfos.find(pkMap.begin()->first) == fieldInfos.end()) {
1005 LOGE("localSchema doesn't contain primary key.");
1006 return { -E_INTERNAL_ERROR, {} };
1007 }
1008 CollateType collateType = fieldInfos.at(pkMap.begin()->first).GetCollateType();
1009 errCode = CloudStorageUtils::CalculateHashKeyForOneField(
1010 pkVec.at(0), vBucket, allowEmpty, collateType, hashValue);
1011 } else {
1012 std::vector<uint8_t> tempRes;
1013 for (const auto &item: pkMap) {
1014 FieldInfoMap fieldInfos = localTable.GetFields();
1015 if (fieldInfos.find(item.first) == fieldInfos.end()) {
1016 LOGE("localSchema doesn't contain primary key in multi pks.");
1017 return { -E_INTERNAL_ERROR, {} };
1018 }
1019 std::vector<uint8_t> temp;
1020 CollateType collateType = fieldInfos.at(item.first).GetCollateType();
1021 errCode = CloudStorageUtils::CalculateHashKeyForOneField(
1022 item.second, vBucket, allowEmpty, collateType, temp);
1023 if (errCode != E_OK) {
1024 LOGE("calc hash fail when there is more than one primary key. errCode = %d", errCode);
1025 return { errCode, {} };
1026 }
1027 tempRes.insert(tempRes.end(), temp.begin(), temp.end());
1028 }
1029 errCode = DBCommon::CalcValueHash(tempRes, hashValue);
1030 }
1031 return { errCode, hashValue };
1032 }
1033
CheckCloudSchemaFields(const TableSchema & tableSchema,const TableSchema & oldSchema)1034 bool CloudStorageUtils::CheckCloudSchemaFields(const TableSchema &tableSchema, const TableSchema &oldSchema)
1035 {
1036 if (tableSchema.name != oldSchema.name) {
1037 return true;
1038 }
1039 for (const auto &oldField : oldSchema.fields) {
1040 auto it = std::find_if(tableSchema.fields.begin(), tableSchema.fields.end(),
1041 [&oldField](const std::vector<Field>::value_type &field) {
1042 return oldField == field;
1043 });
1044 if (it == tableSchema.fields.end()) {
1045 return false;
1046 }
1047 }
1048 return true;
1049 }
1050
TransferFieldToLower(VBucket & vBucket)1051 void CloudStorageUtils::TransferFieldToLower(VBucket &vBucket)
1052 {
1053 for (auto it = vBucket.begin(); it != vBucket.end();) {
1054 std::string lowerField(it->first.length(), ' ');
1055 std::transform(it->first.begin(), it->first.end(), lowerField.begin(), ::tolower);
1056 if (lowerField != it->first) {
1057 vBucket[lowerField] = std::move(vBucket[it->first]);
1058 vBucket.erase(it++);
1059 } else {
1060 it++;
1061 }
1062 }
1063 }
1064
GetTypeCaseInsensitive(const std::string & fieldName,const VBucket & vBucket,Type & data)1065 bool CloudStorageUtils::GetTypeCaseInsensitive(const std::string &fieldName, const VBucket &vBucket, Type &data)
1066 {
1067 auto tmpFieldName = fieldName;
1068 auto tmpVBucket = vBucket;
1069 std::transform(tmpFieldName.begin(), tmpFieldName.end(), tmpFieldName.begin(), ::tolower);
1070 TransferFieldToLower(tmpVBucket);
1071 auto it = tmpVBucket.find(tmpFieldName);
1072 if (it == tmpVBucket.end()) {
1073 return false;
1074 }
1075 data = it->second;
1076 return true;
1077 }
1078
BindUpdateLogStmtFromVBucket(const VBucket & vBucket,const TableSchema & tableSchema,const std::vector<std::string> & colNames,sqlite3_stmt * updateLogStmt)1079 int CloudStorageUtils::BindUpdateLogStmtFromVBucket(const VBucket &vBucket, const TableSchema &tableSchema,
1080 const std::vector<std::string> &colNames, sqlite3_stmt *updateLogStmt)
1081 {
1082 int index = 0;
1083 int errCode = E_OK;
1084 for (const auto &colName : colNames) {
1085 index++;
1086 if (colName == CloudDbConstant::GID_FIELD) {
1087 if (vBucket.find(colName) == vBucket.end()) {
1088 LOGE("cloud data doesn't contain gid field when bind update log stmt.");
1089 return -E_CLOUD_ERROR;
1090 }
1091 errCode = SQLiteUtils::BindTextToStatement(updateLogStmt, index,
1092 std::get<std::string>(vBucket.at(colName)));
1093 } else if (colName == CloudDbConstant::MODIFY_FIELD) {
1094 if (vBucket.find(colName) == vBucket.end()) {
1095 LOGE("cloud data doesn't contain modify field when bind update log stmt.");
1096 return -E_CLOUD_ERROR;
1097 }
1098 errCode = SQLiteUtils::BindInt64ToStatement(updateLogStmt, index, std::get<int64_t>(vBucket.at(colName)));
1099 } else if (colName == CloudDbConstant::VERSION_FIELD) {
1100 if (vBucket.find(colName) == vBucket.end()) {
1101 errCode = SQLiteUtils::BindTextToStatement(updateLogStmt, index, std::string(""));
1102 } else {
1103 errCode = SQLiteUtils::BindTextToStatement(updateLogStmt, index,
1104 std::get<std::string>(vBucket.at(colName)));
1105 }
1106 } else if (colName == CloudDbConstant::SHARING_RESOURCE_FIELD) {
1107 if (vBucket.find(colName) == vBucket.end()) {
1108 errCode = SQLiteUtils::BindTextToStatement(updateLogStmt, index, std::string(""));
1109 } else {
1110 errCode = SQLiteUtils::BindTextToStatement(updateLogStmt, index,
1111 std::get<std::string>(vBucket.at(colName)));
1112 }
1113 } else {
1114 LOGE("invalid col name when bind value to update log statement.");
1115 return -E_INTERNAL_ERROR;
1116 }
1117 if (errCode != E_OK) {
1118 LOGE("fail to bind value to update log statement.");
1119 return errCode;
1120 }
1121 }
1122 return E_OK;
1123 }
1124
GetUpdateRecordFlagSql(const std::string & tableName,bool recordConflict,const LogInfo & logInfo,const VBucket & uploadExtend,const CloudWaterType & type)1125 std::string CloudStorageUtils::GetUpdateRecordFlagSql(const std::string &tableName, bool recordConflict,
1126 const LogInfo &logInfo, const VBucket &uploadExtend, const CloudWaterType &type)
1127 {
1128 std::string compensatedBit = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC));
1129 std::string inconsistencyBit = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY));
1130 bool gidEmpty = logInfo.cloudGid.empty();
1131 bool isDeleted = logInfo.dataKey == DBConstant::DEFAULT_ROW_ID;
1132 std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag = (CASE WHEN timestamp = ? THEN ";
1133 bool isNeedCompensated = recordConflict || DBCommon::IsNeedCompensatedForUpload(uploadExtend, type);
1134 if (isNeedCompensated && !(isDeleted && gidEmpty)) {
1135 sql += "flag | " + compensatedBit + " ELSE flag | " + compensatedBit;
1136 } else {
1137 sql += "flag & ~" + compensatedBit + " & ~" + inconsistencyBit + " ELSE flag & ~" + compensatedBit;
1138 }
1139 sql += " END), status = (CASE WHEN status == 2 THEN 3 WHEN (status == 1 AND timestamp = ?) THEN 0 ELSE status END)";
1140 if (DBCommon::IsCloudRecordNotFound(uploadExtend) &&
1141 (type == CloudWaterType::UPDATE || type == CloudWaterType::DELETE)) {
1142 sql += ", cloud_gid = '', version = '' ";
1143 }
1144 sql += " WHERE ";
1145 if (!gidEmpty) {
1146 sql += " cloud_gid = '" + logInfo.cloudGid + "'";
1147 }
1148 if (!isDeleted) {
1149 if (!gidEmpty) {
1150 sql += " OR ";
1151 }
1152 sql += " data_key = '" + std::to_string(logInfo.dataKey) + "'";
1153 }
1154 if (gidEmpty && isDeleted) {
1155 sql += " hash_key = ?";
1156 }
1157 sql += ";";
1158 return sql;
1159 }
1160
GetUpdateRecordFlagSqlUpload(const std::string & tableName,bool recordConflict,const LogInfo & logInfo,const VBucket & uploadExtend,const CloudWaterType & type)1161 std::string CloudStorageUtils::GetUpdateRecordFlagSqlUpload(const std::string &tableName, bool recordConflict,
1162 const LogInfo &logInfo, const VBucket &uploadExtend, const CloudWaterType &type)
1163 {
1164 std::string compensatedBit = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC));
1165 std::string inconsistencyBit = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY));
1166 bool gidEmpty = logInfo.cloudGid.empty();
1167 bool isDeleted = logInfo.dataKey == DBConstant::DEFAULT_ROW_ID;
1168 std::string sql;
1169 bool isNeedCompensated = recordConflict || DBCommon::IsNeedCompensatedForUpload(uploadExtend, type);
1170 if (isNeedCompensated && !(isDeleted && gidEmpty)) {
1171 sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag = (CASE WHEN timestamp = ? OR " +
1172 "flag & 0x01 = 0 THEN flag | " + compensatedBit + " ELSE flag";
1173 } else {
1174 sql += "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag = (CASE WHEN timestamp = ? THEN " +
1175 "flag & ~" + compensatedBit + " & ~" + inconsistencyBit + " ELSE flag & ~" + compensatedBit;
1176 }
1177 sql += " END), status = (CASE WHEN status == 2 THEN 3 WHEN (status == 1 AND timestamp = ?) THEN 0 ELSE status END)";
1178 if (DBCommon::IsCloudRecordNotFound(uploadExtend) &&
1179 (type == CloudWaterType::UPDATE || type == CloudWaterType::DELETE)) {
1180 sql += ", cloud_gid = '', version = '' ";
1181 }
1182 sql += " WHERE ";
1183 if (!gidEmpty) {
1184 sql += " cloud_gid = '" + logInfo.cloudGid + "'";
1185 }
1186 if (!isDeleted) {
1187 if (!gidEmpty) {
1188 sql += " OR ";
1189 }
1190 sql += " data_key = '" + std::to_string(logInfo.dataKey) + "'";
1191 }
1192 if (gidEmpty && isDeleted) {
1193 sql += " hash_key = ?";
1194 }
1195 sql += ";";
1196 return sql;
1197 }
1198
GetUpdateUploadFinishedSql(const std::string & tableName)1199 std::string CloudStorageUtils::GetUpdateUploadFinishedSql(const std::string &tableName)
1200 {
1201 std::string finishedBit = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_UPLOAD_FINISHED));
1202 std::string compensatedBit = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC));
1203 // When the data flag is not in the compensation state and the local data does not change, the upload is finished.
1204 return "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET flag = (CASE WHEN timestamp = ? AND flag & " +
1205 compensatedBit + " != " + compensatedBit + " THEN flag | " + finishedBit + " ELSE flag END) WHERE hash_key=?";
1206 }
1207
BindStepConsistentFlagStmt(sqlite3_stmt * stmt,const VBucket & data,const std::set<std::string> & gidFilters)1208 int CloudStorageUtils::BindStepConsistentFlagStmt(sqlite3_stmt *stmt, const VBucket &data,
1209 const std::set<std::string> &gidFilters)
1210 {
1211 std::string gidStr;
1212 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, data, gidStr);
1213 if (errCode != E_OK || gidStr.empty()) {
1214 LOGE("Get gid from bucket fail when mark flag as consistent, errCode = %d", errCode);
1215 return errCode;
1216 }
1217 if (gidStr.empty()) {
1218 LOGE("Get empty gid from bucket when mark flag as consistent.");
1219 return -E_CLOUD_ERROR;
1220 }
1221 // this data has not yet downloaded asset, skipping
1222 if (gidFilters.find(gidStr) != gidFilters.end()) {
1223 return E_OK;
1224 }
1225 errCode = SQLiteUtils::BindTextToStatement(stmt, 1, gidStr); // 1 is cloud_gid
1226 if (errCode != E_OK) {
1227 LOGE("Bind cloud_gid to mark flag as consistent stmt failed, %d", errCode);
1228 return errCode;
1229 }
1230 int64_t modifyTime;
1231 errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::MODIFY_FIELD, data, modifyTime);
1232 if (errCode != E_OK) {
1233 LOGE("Get modify time from bucket fail when mark flag as consistent, errCode = %d", errCode);
1234 return errCode;
1235 }
1236 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, modifyTime); // 2 is timestamp
1237 if (errCode != E_OK) {
1238 LOGE("Bind modify time to mark flag as consistent stmt failed, %d", errCode);
1239 return errCode;
1240 }
1241 errCode = SQLiteUtils::StepWithRetry(stmt);
1242 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1243 errCode = E_OK;
1244 } else {
1245 LOGE("[Storage Executor]Step mark flag as consistent stmt failed, %d", errCode);
1246 }
1247 return errCode;
1248 }
1249
IsCloudGidMismatch(const std::string & downloadGid,const std::string & curGid)1250 bool CloudStorageUtils::IsCloudGidMismatch(const std::string &downloadGid, const std::string &curGid)
1251 {
1252 return !downloadGid.empty() && !curGid.empty() && downloadGid != curGid;
1253 }
1254
IsGetCloudDataContinue(uint32_t curNum,uint32_t curSize,uint32_t maxSize,uint32_t maxCount)1255 bool CloudStorageUtils::IsGetCloudDataContinue(uint32_t curNum, uint32_t curSize, uint32_t maxSize, uint32_t maxCount)
1256 {
1257 if (curNum == 0) {
1258 return true;
1259 }
1260 if (curSize < maxSize && curNum < maxCount) {
1261 return true;
1262 }
1263 return false;
1264 }
1265
IdentifyCloudType(const CloudUploadRecorder & recorder,CloudSyncData & cloudSyncData,VBucket & data,VBucket & log,VBucket & flags)1266 int CloudStorageUtils::IdentifyCloudType(const CloudUploadRecorder &recorder, CloudSyncData &cloudSyncData,
1267 VBucket &data, VBucket &log, VBucket &flags)
1268 {
1269 Bytes *hashKey = std::get_if<Bytes>(&flags[CloudDbConstant::HASH_KEY]);
1270 int64_t *timeStamp = std::get_if<int64_t>(&flags[CloudDbConstant::TIMESTAMP]);
1271 if (timeStamp == nullptr || hashKey == nullptr) {
1272 return -E_INVALID_DATA;
1273 }
1274 if (recorder.IsIgnoreUploadRecord(cloudSyncData.tableName, *hashKey, cloudSyncData.mode, *timeStamp)) {
1275 cloudSyncData.ignoredCount++;
1276 return -E_IGNORE_DATA;
1277 }
1278 return IdentifyCloudTypeInner(cloudSyncData, data, log, flags);
1279 }
1280
IdentifyCloudTypeInner(CloudSyncData & cloudSyncData,VBucket & data,VBucket & log,VBucket & flags)1281 int CloudStorageUtils::IdentifyCloudTypeInner(CloudSyncData &cloudSyncData, VBucket &data, VBucket &log, VBucket &flags)
1282 {
1283 int64_t *rowid = std::get_if<int64_t>(&flags[CloudDbConstant::ROWID]);
1284 int64_t *flag = std::get_if<int64_t>(&flags[CloudDbConstant::FLAG]);
1285 int64_t *timeStamp = std::get_if<int64_t>(&flags[CloudDbConstant::TIMESTAMP]);
1286 Bytes *hashKey = std::get_if<Bytes>(&flags[CloudDbConstant::HASH_KEY]);
1287 int64_t *status = std::get_if<int64_t>(&flags[CloudDbConstant::STATUS]);
1288 if (rowid == nullptr || flag == nullptr || timeStamp == nullptr || hashKey == nullptr) {
1289 return -E_INVALID_DATA;
1290 }
1291 bool isDelete = ((static_cast<uint64_t>(*flag) & DataItem::DELETE_FLAG) != 0);
1292 bool isInsert = (!isDelete) && (log.find(CloudDbConstant::GID_FIELD) == log.end());
1293 if (status != nullptr && !isInsert && (CloudStorageUtils::IsDataLocked(*status))) {
1294 cloudSyncData.ignoredCount++;
1295 cloudSyncData.lockData.extend.push_back(log);
1296 cloudSyncData.lockData.hashKey.push_back(*hashKey);
1297 cloudSyncData.lockData.timestamp.push_back(*timeStamp);
1298 cloudSyncData.lockData.rowid.push_back(*rowid);
1299 return -E_IGNORE_DATA;
1300 }
1301 if (isDelete) {
1302 cloudSyncData.delData.record.push_back(data);
1303 cloudSyncData.delData.extend.push_back(log);
1304 cloudSyncData.delData.hashKey.push_back(*hashKey);
1305 cloudSyncData.delData.timestamp.push_back(*timeStamp);
1306 cloudSyncData.delData.rowid.push_back(*rowid);
1307 } else {
1308 if (data.empty()) {
1309 LOGE("The cloud data is empty, isInsert:%d", isInsert);
1310 return -E_INVALID_DATA;
1311 }
1312 if (IsAbnormalData(data)) {
1313 LOGW("This data is abnormal, ignore it when upload, isInsert:%d", isInsert);
1314 cloudSyncData.ignoredCount++;
1315 return -E_IGNORE_DATA;
1316 }
1317 CloudSyncBatch &opData = isInsert ? cloudSyncData.insData : cloudSyncData.updData;
1318 opData.record.push_back(data);
1319 opData.rowid.push_back(*rowid);
1320 VBucket asset;
1321 CloudStorageUtils::ObtainAssetFromVBucket(data, asset);
1322 opData.timestamp.push_back(*timeStamp);
1323 opData.assets.push_back(asset);
1324 if (isInsert) {
1325 log[CloudDbConstant::HASH_KEY_FIELD] = DBCommon::VectorToHexString(*hashKey);
1326 }
1327 opData.extend.push_back(log);
1328 opData.hashKey.push_back(*hashKey);
1329 }
1330 return E_OK;
1331 }
1332
IsAbnormalData(const VBucket & data)1333 bool CloudStorageUtils::IsAbnormalData(const VBucket &data)
1334 {
1335 for (const auto &item : data) {
1336 const Asset *asset = std::get_if<TYPE_INDEX<Asset>>(&item.second);
1337 if (asset != nullptr) {
1338 if (asset->status == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
1339 (asset->status & static_cast<uint32_t>(AssetStatus::DOWNLOAD_WITH_NULL)) != 0) {
1340 return true;
1341 }
1342 continue;
1343 }
1344 const Assets *assets = std::get_if<TYPE_INDEX<Assets>>(&item.second);
1345 if (assets == nullptr) {
1346 continue;
1347 }
1348 for (const auto &oneAsset : *assets) {
1349 if (oneAsset.status == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
1350 (oneAsset.status & static_cast<uint32_t>(AssetStatus::DOWNLOAD_WITH_NULL)) != 0) {
1351 return true;
1352 }
1353 }
1354 }
1355 return false;
1356 }
1357
GetDataItemFromCloudData(VBucket & data)1358 std::pair<int, DataItem> CloudStorageUtils::GetDataItemFromCloudData(VBucket &data)
1359 {
1360 std::pair<int, DataItem> res;
1361 auto &[errCode, dataItem] = res;
1362 GetBytesFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_KEY, data, dataItem.key);
1363 GetBytesFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, data, dataItem.value);
1364 GetStringFromCloudData(CloudDbConstant::GID_FIELD, data, dataItem.gid);
1365 GetStringFromCloudData(CloudDbConstant::VERSION_FIELD, data, dataItem.version);
1366 GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, data, dataItem.dev);
1367 GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE, data, dataItem.origDev);
1368 dataItem.flag = static_cast<uint64_t>(LogInfoFlag::FLAG_CLOUD_WRITE);
1369 GetUInt64FromCloudData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME, data, dataItem.writeTimestamp);
1370 GetUInt64FromCloudData(CloudDbConstant::MODIFY_FIELD, data, dataItem.modifyTime);
1371 errCode = GetUInt64FromCloudData(CloudDbConstant::CREATE_FIELD, data, dataItem.createTime);
1372 bool isSystemRecord = IsSystemRecord(dataItem.key);
1373 if (isSystemRecord) {
1374 dataItem.hashKey = dataItem.key;
1375 dataItem.flag |= static_cast<uint64_t>(LogInfoFlag::FLAG_SYSTEM_RECORD);
1376 } else if (!dataItem.key.empty()) {
1377 (void)DBCommon::CalcValueHash(dataItem.key, dataItem.hashKey);
1378 }
1379 return res;
1380 }
1381
GetDataItemFromCloudVersionData(VBucket & data)1382 std::pair<int, DataItem> CloudStorageUtils::GetDataItemFromCloudVersionData(VBucket &data)
1383 {
1384 std::pair<int, DataItem> res;
1385 auto &[errCode, dataItem] = res;
1386 GetBytesFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_KEY, data, dataItem.key);
1387 GetBytesFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, data, dataItem.value);
1388 errCode = GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, data, dataItem.dev);
1389 return res;
1390 }
1391
GetBytesFromCloudData(const std::string & field,VBucket & data,Bytes & bytes)1392 int CloudStorageUtils::GetBytesFromCloudData(const std::string &field, VBucket &data, Bytes &bytes)
1393 {
1394 std::string blobStr;
1395 int errCode = GetValueFromVBucket(field, data, blobStr);
1396 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1397 LOGE("[CloudStorageUtils] Get %.3s failed %d", field.c_str(), errCode);
1398 return errCode;
1399 }
1400 DBCommon::StringToVector(blobStr, bytes);
1401 return errCode;
1402 }
1403
GetStringFromCloudData(const std::string & field,VBucket & data,std::string & str)1404 int CloudStorageUtils::GetStringFromCloudData(const std::string &field, VBucket &data, std::string &str)
1405 {
1406 int errCode = GetValueFromVBucket(field, data, str);
1407 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1408 LOGE("[CloudStorageUtils] Get %.3s failed %d", field.c_str(), errCode);
1409 return errCode;
1410 }
1411 return errCode;
1412 }
1413
GetUInt64FromCloudData(const std::string & field,VBucket & data,uint64_t & number)1414 int CloudStorageUtils::GetUInt64FromCloudData(const std::string &field, VBucket &data, uint64_t &number)
1415 {
1416 int64_t intNum;
1417 int errCode = GetValueFromVBucket(field, data, intNum);
1418 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1419 LOGE("[CloudStorageUtils] Get %.3s failed %d", field.c_str(), errCode);
1420 return errCode;
1421 }
1422 number = static_cast<uint64_t>(intNum);
1423 return errCode;
1424 }
1425
AddUpdateColForShare(const TableSchema & tableSchema,std::string & updateLogSql,std::vector<std::string> & updateColName)1426 void CloudStorageUtils::AddUpdateColForShare(const TableSchema &tableSchema, std::string &updateLogSql,
1427 std::vector<std::string> &updateColName)
1428 {
1429 updateLogSql += ", version = ?";
1430 updateColName.push_back(CloudDbConstant::VERSION_FIELD);
1431 updateLogSql += ", sharing_resource = ?";
1432 updateColName.push_back(CloudDbConstant::SHARING_RESOURCE_FIELD);
1433 }
1434
IsDataLocked(uint32_t status)1435 bool CloudStorageUtils::IsDataLocked(uint32_t status)
1436 {
1437 return status == static_cast<uint32_t>(LockStatus::LOCK) ||
1438 status == static_cast<uint32_t>(LockStatus::LOCK_CHANGE);
1439 }
1440
GetSystemRecordFromCloudData(VBucket & data)1441 std::pair<int, DataItem> CloudStorageUtils::GetSystemRecordFromCloudData(VBucket &data)
1442 {
1443 auto res = CloudStorageUtils::GetDataItemFromCloudData(data); // only record first one
1444 auto &[errCode, dataItem] = res;
1445 if (errCode != E_OK) {
1446 LOGE("[SqliteCloudKvExecutorUtils] Get data item failed %d", errCode);
1447 return res;
1448 }
1449 dataItem.dev = "";
1450 dataItem.origDev = "";
1451 return res;
1452 }
1453
IsSystemRecord(const Key & key)1454 bool CloudStorageUtils::IsSystemRecord(const Key &key)
1455 {
1456 std::string prefixKey = CloudDbConstant::CLOUD_VERSION_RECORD_PREFIX_KEY;
1457 if (key.size() < prefixKey.size()) {
1458 return false;
1459 }
1460 std::string keyStr(key.begin(), key.end());
1461 return keyStr.find(prefixKey) == 0;
1462 }
1463
GetSelectIncCursorSql(const std::string & tableName)1464 std::string CloudStorageUtils::GetSelectIncCursorSql(const std::string &tableName)
1465 {
1466 return "(SELECT value FROM " + DBCommon::GetMetaTableName() + " WHERE key=x'" +
1467 DBCommon::TransferStringToHex(DBCommon::GetCursorKey(tableName)) + "')";
1468 }
1469
GetCursorIncSql(const std::string & tableName)1470 std::string CloudStorageUtils::GetCursorIncSql(const std::string &tableName)
1471 {
1472 return "UPDATE " + DBCommon::GetMetaTableName() + " SET value=value+1 WHERE key=x'" +
1473 DBCommon::TransferStringToHex(DBCommon::GetCursorKey(tableName)) + "';";
1474 }
1475
GetCursorIncSqlWhenAllow(const std::string & tableName)1476 std::string CloudStorageUtils::GetCursorIncSqlWhenAllow(const std::string &tableName)
1477 {
1478 return "UPDATE " + DBConstant::RELATIONAL_PREFIX + "metadata" + " SET value= case when (select 1 from " +
1479 DBConstant::RELATIONAL_PREFIX + "metadata" + " where key='cursor_inc_flag' AND value = 'true') then value + 1" +
1480 " else value end WHERE key=x'" + DBCommon::TransferStringToHex(DBCommon::GetCursorKey(tableName)) + "';";
1481 }
1482
GetCursorUpgradeSql(const std::string & tableName)1483 std::string CloudStorageUtils::GetCursorUpgradeSql(const std::string &tableName)
1484 {
1485 return "INSERT OR REPLACE INTO " + DBCommon::GetMetaTableName() + "(key,value) VALUES (x'" +
1486 DBCommon::TransferStringToHex(DBCommon::GetCursorKey(tableName)) + "', (SELECT CASE WHEN MAX(cursor) IS" +
1487 " NULL THEN 0 ELSE MAX(cursor) END FROM " + DBCommon::GetLogTableName(tableName) + "));";
1488 }
1489
GetSyncQueryByPk(const std::string & tableName,const std::vector<VBucket> & data,bool isKv,QuerySyncObject & querySyncObject)1490 int CloudStorageUtils::GetSyncQueryByPk(const std::string &tableName, const std::vector<VBucket> &data, bool isKv,
1491 QuerySyncObject &querySyncObject)
1492 {
1493 std::map<std::string, size_t> dataIndex;
1494 std::map<std::string, std::vector<Type>> syncPk;
1495 int ignoreCount = 0;
1496 for (const auto &oneRow : data) {
1497 if (oneRow.size() >= 2u) { // mean this data has more than 2 pk
1498 LOGW("not support, size: %zu, table: %s", oneRow.size(), DBCommon::StringMiddleMasking(tableName).c_str());
1499 return -E_NOT_SUPPORT;
1500 }
1501 for (const auto &[col, value] : oneRow) {
1502 bool isFind = dataIndex.find(col) != dataIndex.end();
1503 if (!isFind && value.index() == TYPE_INDEX<Nil>) {
1504 ignoreCount++;
1505 continue;
1506 }
1507 if (!isFind && value.index() != TYPE_INDEX<Nil>) {
1508 dataIndex[col] = value.index();
1509 syncPk[col].push_back(value);
1510 continue;
1511 }
1512 if (isFind && dataIndex[col] != value.index()) {
1513 ignoreCount++;
1514 continue;
1515 }
1516 syncPk[col].push_back(value);
1517 }
1518 }
1519 LOGI("match %zu data for compensated sync, ignore %d", data.size(), ignoreCount);
1520 Query query = Query::Select().From(tableName);
1521 if (isKv) {
1522 QueryUtils::FillQueryInKeys(syncPk, dataIndex, query);
1523 } else {
1524 for (const auto &[col, pkList] : syncPk) {
1525 QueryUtils::FillQueryIn(col, pkList, dataIndex[col], query);
1526 }
1527 }
1528 auto objectList = QuerySyncObject::GetQuerySyncObject(query);
1529 if (objectList.size() != 1u) { // only support one QueryExpression
1530 return -E_INTERNAL_ERROR;
1531 }
1532 querySyncObject = objectList[0];
1533 return E_OK;
1534 }
1535 }
1536