1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_cloud_kv_executor_utils.h"
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_storage_utils.h"
19 #include "db_base64_utils.h"
20 #include "db_common.h"
21 #include "res_finalizer.h"
22 #include "runtime_context.h"
23 #include "sqlite_single_ver_storage_executor_sql.h"
24 #include "time_helper.h"
25
26 namespace DistributedDB {
GetCloudData(const CloudSyncConfig & config,const DBParam & param,const CloudUploadRecorder & recorder,SQLiteSingleVerContinueToken & token,CloudSyncData & data)27 int SqliteCloudKvExecutorUtils::GetCloudData(const CloudSyncConfig &config, const DBParam ¶m,
28 const CloudUploadRecorder &recorder, SQLiteSingleVerContinueToken &token, CloudSyncData &data)
29 {
30 auto [db, isMemory] = param;
31 bool stepNext = false;
32 auto [errCode, stmt] = token.GetCloudQueryStmt(db, data.isCloudForcePushStrategy, stepNext, data.mode);
33 if (errCode != E_OK) {
34 token.ReleaseCloudQueryStmt();
35 return errCode;
36 }
37 UploadDetail detail;
38 auto &[stepNum, totalSize] = detail;
39 do {
40 if (stepNext) {
41 errCode = SQLiteUtils::StepNext(stmt, isMemory);
42 if (errCode != E_OK) {
43 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
44 break;
45 }
46 }
47 stepNext = true;
48 errCode = GetCloudDataForSync(config, recorder, stmt, data, detail);
49 stepNum++;
50 } while (errCode == E_OK);
51 LOGI("[SqliteCloudKvExecutorUtils] Get cloud sync data, insData:%u, upData:%u, delLog:%u errCode:%d total:%" PRIu32,
52 data.insData.record.size(), data.updData.record.size(), data.delData.extend.size(), errCode, totalSize);
53 if (errCode != -E_UNFINISHED) {
54 token.ReleaseCloudQueryStmt();
55 } else if (isMemory && UpdateBeginTimeForMemoryDB(token, data)) {
56 token.ReleaseCloudQueryStmt();
57 }
58 return errCode;
59 }
60
GetMaxTimeStamp(std::vector<VBucket> & dataExtend)61 Timestamp SqliteCloudKvExecutorUtils::GetMaxTimeStamp(std::vector<VBucket> &dataExtend)
62 {
63 Timestamp maxTimeStamp = 0;
64 VBucket lastRecord = dataExtend.back();
65 auto it = lastRecord.find(CloudDbConstant::MODIFY_FIELD);
66 if (it != lastRecord.end() && maxTimeStamp < static_cast<Timestamp>(std::get<int64_t>(it->second))) {
67 maxTimeStamp = static_cast<Timestamp>(std::get<int64_t>(it->second));
68 }
69 return maxTimeStamp;
70 }
71
UpdateBeginTimeForMemoryDB(SQLiteSingleVerContinueToken & token,CloudSyncData & data)72 bool SqliteCloudKvExecutorUtils::UpdateBeginTimeForMemoryDB(SQLiteSingleVerContinueToken &token, CloudSyncData &data)
73 {
74 Timestamp maxTimeStamp = 0;
75 switch (data.mode) {
76 case DistributedDB::CloudWaterType::DELETE:
77 maxTimeStamp = GetMaxTimeStamp(data.delData.extend);
78 break;
79 case DistributedDB::CloudWaterType::UPDATE:
80 maxTimeStamp = GetMaxTimeStamp(data.updData.extend);
81 break;
82 case DistributedDB::CloudWaterType::INSERT:
83 maxTimeStamp = GetMaxTimeStamp(data.insData.extend);
84 break;
85 case DistributedDB::CloudWaterType::BUTT:
86 default:
87 break;
88 }
89 if (maxTimeStamp > token.GetQueryBeginTime()) {
90 token.SetNextBeginTime("", maxTimeStamp);
91 return true;
92 }
93 LOGW("[SqliteCloudKvExecutorUtils] The start time of the in memory database has not been updated.");
94 return false;
95 }
96
GetCloudDataForSync(const CloudSyncConfig & config,const CloudUploadRecorder & recorder,sqlite3_stmt * statement,CloudSyncData & cloudDataResult,UploadDetail & detail)97 int SqliteCloudKvExecutorUtils::GetCloudDataForSync(const CloudSyncConfig &config, const CloudUploadRecorder &recorder,
98 sqlite3_stmt *statement, CloudSyncData &cloudDataResult, UploadDetail &detail)
99 {
100 auto &[stepNum, totalSize] = detail;
101 VBucket log;
102 VBucket extraLog;
103 uint32_t preSize = totalSize;
104 GetCloudLog(statement, log, totalSize);
105 GetCloudExtraLog(statement, extraLog);
106
107 VBucket data;
108 int64_t flag = 0;
109 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::FLAG, extraLog, flag);
110 if (errCode != E_OK) {
111 return errCode;
112 }
113
114 if ((static_cast<uint64_t>(flag) & DataItem::DELETE_FLAG) == 0) {
115 errCode = GetCloudKvData(statement, data, totalSize);
116 if (errCode != E_OK) {
117 return errCode;
118 }
119 }
120
121 if (CloudStorageUtils::IsGetCloudDataContinue(stepNum, totalSize, config.maxUploadSize, config.maxUploadCount)) {
122 errCode = CloudStorageUtils::IdentifyCloudType(recorder, cloudDataResult, data, log, extraLog);
123 } else {
124 errCode = -E_UNFINISHED;
125 }
126 if (errCode == -E_IGNORE_DATA) {
127 errCode = E_OK;
128 totalSize = preSize;
129 stepNum--;
130 }
131 return errCode;
132 }
133
GetCloudLog(sqlite3_stmt * stmt,VBucket & logInfo,uint32_t & totalSize)134 void SqliteCloudKvExecutorUtils::GetCloudLog(sqlite3_stmt *stmt, VBucket &logInfo,
135 uint32_t &totalSize)
136 {
137 int64_t modifyTime = static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_MODIFY_TIME_INDEX));
138 uint64_t curTime = 0;
139 if (TimeHelper::GetSysCurrentRawTime(curTime) == E_OK) {
140 if (modifyTime > static_cast<int64_t>(curTime)) {
141 modifyTime = static_cast<int64_t>(curTime);
142 }
143 } else {
144 LOGW("[SqliteCloudKvExecutorUtils] get raw sys time failed.");
145 }
146 logInfo.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
147 logInfo.insert_or_assign(CloudDbConstant::CREATE_FIELD,
148 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_CREATE_TIME_INDEX)));
149 totalSize += sizeof(int64_t) + sizeof(int64_t);
150 if (sqlite3_column_text(stmt, CLOUD_QUERY_CLOUD_GID_INDEX) != nullptr) {
151 std::string cloudGid = reinterpret_cast<const std::string::value_type *>(
152 sqlite3_column_text(stmt, CLOUD_QUERY_CLOUD_GID_INDEX));
153 if (!cloudGid.empty()) {
154 logInfo.insert_or_assign(CloudDbConstant::GID_FIELD, cloudGid);
155 totalSize += cloudGid.size();
156 }
157 }
158 std::string version;
159 SQLiteUtils::GetColumnTextValue(stmt, CLOUD_QUERY_VERSION_INDEX, version);
160 logInfo.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
161 totalSize += version.size();
162 }
163
GetCloudExtraLog(sqlite3_stmt * stmt,VBucket & flags)164 void SqliteCloudKvExecutorUtils::GetCloudExtraLog(sqlite3_stmt *stmt, VBucket &flags)
165 {
166 flags.insert_or_assign(CloudDbConstant::ROWID,
167 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_ROW_ID_INDEX)));
168 flags.insert_or_assign(CloudDbConstant::TIMESTAMP,
169 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_MODIFY_TIME_INDEX)));
170 flags.insert_or_assign(CloudDbConstant::FLAG,
171 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_FLAG_INDEX)));
172 Bytes hashKey;
173 (void)SQLiteUtils::GetColumnBlobValue(stmt, CLOUD_QUERY_HASH_KEY_INDEX, hashKey);
174 flags.insert_or_assign(CloudDbConstant::HASH_KEY, hashKey);
175 flags.insert_or_assign(CloudDbConstant::CLOUD_FLAG,
176 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_CLOUD_FLAG_INDEX)));
177 }
178
GetCloudKvData(sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)179 int SqliteCloudKvExecutorUtils::GetCloudKvData(sqlite3_stmt *stmt, VBucket &data, uint32_t &totalSize)
180 {
181 int errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, data, totalSize);
182 if (errCode != E_OK) {
183 return errCode;
184 }
185 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, CLOUD_QUERY_VALUE_INDEX, stmt, data, totalSize);
186 if (errCode != E_OK) {
187 return errCode;
188 }
189 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, CLOUD_QUERY_DEV_INDEX, stmt, data, totalSize);
190 if (errCode != E_OK) {
191 return errCode;
192 }
193 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE, CLOUD_QUERY_ORI_DEV_INDEX, stmt, data,
194 totalSize);
195 if (errCode != E_OK) {
196 return errCode;
197 }
198 data.insert_or_assign(CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME,
199 static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_DEV_CREATE_TIME_INDEX)));
200 totalSize += sizeof(int64_t);
201 return E_OK;
202 }
203
GetCloudKvBlobData(const std::string & keyStr,int index,sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)204 int SqliteCloudKvExecutorUtils::GetCloudKvBlobData(const std::string &keyStr, int index, sqlite3_stmt *stmt,
205 VBucket &data, uint32_t &totalSize)
206 {
207 std::vector<uint8_t> blob;
208 int errCode = SQLiteUtils::GetColumnBlobValue(stmt, index, blob);
209 if (errCode != E_OK) {
210 LOGE("[SqliteCloudKvExecutorUtils] Get %.3s failed %d", keyStr.c_str(), errCode);
211 return errCode;
212 }
213 std::string tmp = std::string(blob.begin(), blob.end());
214 if ((keyStr == CloudDbConstant::CLOUD_KV_FIELD_DEVICE ||
215 keyStr == CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE)) {
216 if (tmp.empty()) {
217 errCode = RuntimeContext::GetInstance()->GetLocalIdentity(tmp);
218 if (errCode != E_OK) {
219 return errCode;
220 }
221 tmp = DBCommon::TransferHashString(tmp);
222 }
223 tmp = DBBase64Utils::Encode(std::vector<uint8_t>(tmp.begin(), tmp.end()));
224 }
225 totalSize += tmp.size();
226 data.insert_or_assign(keyStr, tmp);
227 return E_OK;
228 }
229
GetLogInfo(sqlite3 * db,bool isMemory,const VBucket & cloudData,const std::string & userId)230 std::pair<int, DataInfoWithLog> SqliteCloudKvExecutorUtils::GetLogInfo(sqlite3 *db, bool isMemory,
231 const VBucket &cloudData, const std::string &userId)
232 {
233 std::pair<int, DataInfoWithLog> res;
234 int &errCode = res.first;
235 std::string keyStr;
236 errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::CLOUD_KV_FIELD_KEY, cloudData, keyStr);
237 if (errCode == -E_NOT_FOUND) {
238 errCode = E_OK;
239 }
240 if (errCode != E_OK) {
241 LOGE("[SqliteCloudKvExecutorUtils] Get key failed %d", errCode);
242 return res;
243 }
244 Bytes key;
245 DBCommon::StringToVector(keyStr, key);
246 Bytes hashKey;
247 DBCommon::CalcValueHash(key, hashKey);
248 sqlite3_stmt *stmt = nullptr;
249 std::tie(errCode, stmt) = GetLogInfoStmt(db, cloudData, !hashKey.empty(), userId.empty());
250 if (errCode != E_OK) {
251 LOGE("[SqliteCloudKvExecutorUtils] Get stmt failed %d", errCode);
252 return res;
253 }
254 std::string gid;
255 errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, cloudData, gid);
256 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
257 LOGE("[SqliteCloudKvExecutorUtils] Get gid failed %d", errCode);
258 return res;
259 }
260 return GetLogInfoInner(stmt, isMemory, gid, hashKey, userId);
261 }
262
GetLogInfoStmt(sqlite3 * db,const VBucket & cloudData,bool existKey,bool emptyUserId)263 std::pair<int, sqlite3_stmt*> SqliteCloudKvExecutorUtils::GetLogInfoStmt(sqlite3 *db, const VBucket &cloudData,
264 bool existKey, bool emptyUserId)
265 {
266 std::pair<int, sqlite3_stmt*> res;
267 auto &[errCode, stmt] = res;
268 std::string querySql = QUERY_CLOUD_SYNC_DATA_LOG;
269 if (!emptyUserId) {
270 querySql = QUERY_CLOUD_SYNC_DATA_LOG_WITH_USERID;
271 }
272 std::string sql = querySql;
273 sql += " WHERE cloud_gid = ?";
274 if (existKey) {
275 sql += " UNION ";
276 sql += querySql;
277 sql += " WHERE sync_data.hash_key = ?";
278 }
279 errCode = SQLiteUtils::GetStatement(db, sql, stmt);
280 return res;
281 }
282
GetLogInfoInner(sqlite3_stmt * stmt,bool isMemory,const std::string & gid,const Bytes & key,const std::string & userId)283 std::pair<int, DataInfoWithLog> SqliteCloudKvExecutorUtils::GetLogInfoInner(sqlite3_stmt *stmt, bool isMemory,
284 const std::string &gid, const Bytes &key, const std::string &userId)
285 {
286 ResFinalizer finalizer([stmt]() {
287 sqlite3_stmt *statement = stmt;
288 int ret = E_OK;
289 SQLiteUtils::ResetStatement(statement, true, ret);
290 if (ret != E_OK) {
291 LOGW("[SqliteCloudKvExecutorUtils] Reset stmt failed %d when get log", ret);
292 }
293 });
294 std::pair<int, DataInfoWithLog> res;
295 auto &[errCode, logInfo] = res;
296 int index = 1;
297 if (!userId.empty()) {
298 errCode = SQLiteUtils::BindTextToStatement(stmt, index++, userId);
299 if (errCode != E_OK) {
300 LOGE("[SqliteCloudKvExecutorUtils] Bind 1st userId failed %d", errCode);
301 return res;
302 }
303 }
304 errCode = SQLiteUtils::BindTextToStatement(stmt, index++, gid);
305 if (errCode != E_OK) {
306 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d", errCode);
307 return res;
308 }
309 if (!key.empty()) {
310 if (!userId.empty()) {
311 errCode = SQLiteUtils::BindTextToStatement(stmt, index++, userId);
312 if (errCode != E_OK) {
313 LOGE("[SqliteCloudKvExecutorUtils] Bind 2nd userId failed %d", errCode);
314 return res;
315 }
316 }
317 errCode = SQLiteUtils::BindBlobToStatement(stmt, index++, key);
318 if (errCode != E_OK) {
319 LOGE("[SqliteCloudKvExecutorUtils] Bind key failed %d", errCode);
320 return res;
321 }
322 }
323 errCode = SQLiteUtils::StepNext(stmt, isMemory);
324 if (errCode == -E_FINISHED) {
325 errCode = -E_NOT_FOUND;
326 // not found is ok, just return error
327 return res;
328 }
329 if (errCode != E_OK) {
330 LOGE("[SqliteCloudKvExecutorUtils] Get log failed %d", errCode);
331 return res;
332 }
333 logInfo = FillLogInfoWithStmt(stmt);
334 return res;
335 }
336
FillLogInfoWithStmt(sqlite3_stmt * stmt)337 DataInfoWithLog SqliteCloudKvExecutorUtils::FillLogInfoWithStmt(sqlite3_stmt *stmt)
338 {
339 DataInfoWithLog dataInfoWithLog;
340 int index = 0;
341 dataInfoWithLog.logInfo.dataKey = sqlite3_column_int64(stmt, index++);
342 dataInfoWithLog.logInfo.flag = static_cast<uint64_t>(sqlite3_column_int64(stmt, index++));
343 std::vector<uint8_t> device;
344 (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, device);
345 DBCommon::VectorToString(device, dataInfoWithLog.logInfo.device);
346 std::vector<uint8_t> oriDev;
347 (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, oriDev);
348 DBCommon::VectorToString(oriDev, dataInfoWithLog.logInfo.originDev);
349 dataInfoWithLog.logInfo.timestamp = static_cast<Timestamp>(sqlite3_column_int64(stmt, index++));
350 dataInfoWithLog.logInfo.wTimestamp = static_cast<Timestamp>(sqlite3_column_int64(stmt, index++));
351 std::string gid;
352 (void)SQLiteUtils::GetColumnTextValue(stmt, index++, gid);
353 dataInfoWithLog.logInfo.cloudGid = gid;
354 (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, dataInfoWithLog.logInfo.hashKey);
355 Bytes key;
356 (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, key);
357 std::string keyStr(key.begin(), key.end());
358 dataInfoWithLog.primaryKeys.insert_or_assign(CloudDbConstant::CLOUD_KV_FIELD_KEY, keyStr);
359 (void)SQLiteUtils::GetColumnTextValue(stmt, index++, dataInfoWithLog.logInfo.version);
360 dataInfoWithLog.logInfo.cloud_flag = static_cast<uint64_t>(sqlite3_column_int64(stmt, index++));
361 return dataInfoWithLog;
362 }
363
PutCloudData(sqlite3 * db,bool isMemory,DownloadData & downloadData)364 int SqliteCloudKvExecutorUtils::PutCloudData(sqlite3 *db, bool isMemory, DownloadData &downloadData)
365 {
366 if (downloadData.data.size() != downloadData.opType.size()) {
367 LOGE("[SqliteCloudKvExecutorUtils] data size %zu != flag size %zu.", downloadData.data.size(),
368 downloadData.opType.size());
369 return -E_CLOUD_ERROR;
370 }
371 std::map<int, int> statisticMap = {};
372 int errCode = ExecutePutCloudData(db, isMemory, downloadData, statisticMap);
373 LOGI("[SqliteCloudKvExecutorUtils] save cloud data: %d, insert cnt = %d, update cnt = %d, delete cnt = %d,"
374 " only update gid cnt = %d, set LCC flag zero cnt = %d, set LCC flag one cnt = %d,"
375 " update timestamp cnt = %d, clear gid count = %d, not handle cnt = %d",
376 errCode, statisticMap[static_cast<int>(OpType::INSERT)], statisticMap[static_cast<int>(OpType::UPDATE)],
377 statisticMap[static_cast<int>(OpType::DELETE)], statisticMap[static_cast<int>(OpType::ONLY_UPDATE_GID)],
378 statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO)],
379 statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE)],
380 statisticMap[static_cast<int>(OpType::UPDATE_TIMESTAMP)], statisticMap[static_cast<int>(OpType::CLEAR_GID)],
381 statisticMap[static_cast<int>(OpType::NOT_HANDLE)]);
382 return errCode;
383 }
384
ExecutePutCloudData(sqlite3 * db,bool isMemory,DownloadData & downloadData,std::map<int,int> & statisticMap)385 int SqliteCloudKvExecutorUtils::ExecutePutCloudData(sqlite3 *db, bool isMemory,
386 DownloadData &downloadData, std::map<int, int> &statisticMap)
387 {
388 int index = 0;
389 int errCode = E_OK;
390 for (OpType op : downloadData.opType) {
391 switch (op) {
392 case OpType::INSERT: // fallthrough
393 case OpType::UPDATE: // fallthrough
394 case OpType::DELETE: // fallthrough
395 errCode = OperateCloudData(db, isMemory, index, op, downloadData);
396 break;
397 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: // fallthrough
398 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE: // fallthrough
399 case OpType::UPDATE_TIMESTAMP: // fallthrough
400 errCode = OnlyUpdateSyncData(db, isMemory, index, op, downloadData);
401 if (errCode != E_OK) {
402 break;
403 }
404 [[fallthrough]];
405 case OpType::ONLY_UPDATE_GID: // fallthrough
406 case OpType::NOT_HANDLE: // fallthrough
407 case OpType::CLEAR_GID: // fallthrough
408 errCode = OnlyUpdateLogTable(db, isMemory, index, op, downloadData);
409 break;
410 default:
411 errCode = -E_CLOUD_ERROR;
412 break;
413 }
414 if (errCode != E_OK) {
415 LOGE("put cloud sync data fail:%d op:%d", errCode, static_cast<int>(op));
416 return errCode;
417 }
418 statisticMap[static_cast<int>(op)]++;
419 index++;
420 }
421 return errCode;
422 }
423
OperateCloudData(sqlite3 * db,bool isMemory,int index,OpType opType,DownloadData & downloadData)424 int SqliteCloudKvExecutorUtils::OperateCloudData(sqlite3 *db, bool isMemory, int index, OpType opType,
425 DownloadData &downloadData)
426 {
427 sqlite3_stmt *logStmt = nullptr;
428 int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(opType), logStmt);
429 if (errCode != E_OK) {
430 LOGE("[SqliteCloudKvExecutorUtils] Get insert log statement failed %d", errCode);
431 return errCode;
432 }
433 sqlite3_stmt *dataStmt = nullptr;
434 errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
435 if (errCode != E_OK) {
436 int ret = E_OK;
437 SQLiteUtils::ResetStatement(logStmt, true, ret);
438 LOGE("[SqliteCloudKvExecutorUtils] Get insert data statement failed %d reset %d", errCode, ret);
439 return errCode;
440 }
441 ResFinalizer finalizerData([logStmt, dataStmt, opType]() {
442 sqlite3_stmt *statement = logStmt;
443 int ret = E_OK;
444 SQLiteUtils::ResetStatement(statement, true, ret);
445 if (ret != E_OK) {
446 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d opType %d", ret, static_cast<int>(opType));
447 }
448 statement = dataStmt;
449 SQLiteUtils::ResetStatement(statement, true, ret);
450 if (ret != E_OK) {
451 LOGW("[SqliteCloudKvExecutorUtils] Reset data stmt failed %d opType %d", ret, static_cast<int>(opType));
452 }
453 });
454 errCode = BindStmt(logStmt, dataStmt, index, opType, downloadData);
455 if (errCode != E_OK) {
456 return errCode;
457 }
458 return StepStmt(logStmt, dataStmt, isMemory);
459 }
460
GetOperateDataSql(OpType opType)461 std::string SqliteCloudKvExecutorUtils::GetOperateDataSql(OpType opType)
462 {
463 switch (opType) {
464 case OpType::INSERT:
465 return INSERT_SYNC_SQL;
466 case OpType::UPDATE: // fallthrough
467 case OpType::DELETE:
468 return UPDATE_SYNC_SQL;
469 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
470 return SET_SYNC_DATA_NO_FORCE_PUSH;
471 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
472 return SET_SYNC_DATA_FORCE_PUSH;
473 case OpType::UPDATE_TIMESTAMP:
474 return UPDATE_TIMESTAMP;
475 default:
476 return "";
477 }
478 }
479
GetOperateLogSql(OpType opType)480 std::string SqliteCloudKvExecutorUtils::GetOperateLogSql(OpType opType)
481 {
482 switch (opType) {
483 case OpType::INSERT: // fallthrough
484 case OpType::UPDATE:
485 return INSERT_CLOUD_SYNC_DATA_LOG;
486 case OpType::DELETE:
487 return UPDATE_CLOUD_SYNC_DATA_LOG;
488 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: // fallthrough
489 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE: // fallthrough
490 case OpType::UPDATE_TIMESTAMP: // fallthrough
491 case OpType::ONLY_UPDATE_GID: // fallthrough
492 case OpType::NOT_HANDLE: // fallthrough
493 case OpType::CLEAR_GID: // fallthrough
494 return UPSERT_CLOUD_SYNC_DATA_LOG;
495 default:
496 return "";
497 }
498 }
499
TransToOpType(const CloudWaterType type)500 OpType SqliteCloudKvExecutorUtils::TransToOpType(const CloudWaterType type)
501 {
502 switch (type) {
503 case CloudWaterType::INSERT:
504 return OpType::INSERT;
505 case CloudWaterType::UPDATE:
506 return OpType::UPDATE;
507 case CloudWaterType::DELETE:
508 return OpType::DELETE;
509 default:
510 return OpType::NOT_HANDLE;
511 }
512 }
513
BindOnlyUpdateLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)514 int SqliteCloudKvExecutorUtils::BindOnlyUpdateLogStmt(sqlite3_stmt *logStmt, const std::string &user,
515 const DataItem &dataItem)
516 {
517 int index = 0;
518 int errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, user);
519 if (errCode != E_OK) {
520 LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when only insert log", errCode);
521 return errCode;
522 }
523 errCode = SQLiteUtils::BindBlobToStatement(logStmt, ++index, dataItem.hashKey);
524 if (errCode != E_OK) {
525 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when only insert log", errCode);
526 }
527 errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.gid);
528 if (errCode != E_OK) {
529 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when only insert gid.", errCode);
530 return errCode;
531 }
532 errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.version);
533 if (errCode != E_OK) {
534 LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when only insert log", errCode);
535 return errCode;
536 }
537 errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.gid);
538 if (errCode != E_OK) {
539 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when only update gid.", errCode);
540 return errCode;
541 }
542 errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.version);
543 if (errCode != E_OK) {
544 LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when only update log", errCode);
545 return errCode;
546 }
547 return errCode;
548 }
549
BindStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,int index,OpType opType,DownloadData & downloadData)550 int SqliteCloudKvExecutorUtils::BindStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, int index, OpType opType,
551 DownloadData &downloadData)
552 {
553 auto [errCode, dataItem] = GetDataItem(index, downloadData);
554 if (errCode != E_OK) {
555 return errCode;
556 }
557 switch (opType) {
558 case OpType::INSERT:
559 return BindInsertStmt(logStmt, dataStmt, downloadData.user, dataItem);
560 case OpType::UPDATE:
561 return BindUpdateStmt(logStmt, dataStmt, downloadData.user, dataItem);
562 case OpType::DELETE:
563 dataItem.hashKey = downloadData.existDataHashKey[index];
564 dataItem.gid.clear();
565 dataItem.version.clear();
566 return BindDeleteStmt(logStmt, dataStmt, downloadData.user, dataItem);
567 default:
568 return E_OK;
569 }
570 }
571
BindInsertStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,const DataItem & dataItem)572 int SqliteCloudKvExecutorUtils::BindInsertStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt,
573 const std::string &user, const DataItem &dataItem)
574 {
575 int errCode = BindInsertLogStmt(logStmt, user, dataItem); // insert or replace LOG table for insert DATA table.
576 if (errCode != E_OK) {
577 return errCode;
578 }
579 return BindDataStmt(dataStmt, dataItem, true);
580 }
581
BindInsertLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)582 int SqliteCloudKvExecutorUtils::BindInsertLogStmt(sqlite3_stmt *logStmt, const std::string &user,
583 const DataItem &dataItem)
584 {
585 int errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_USER_INDEX, user);
586 if (errCode != E_OK) {
587 LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when insert", errCode);
588 return errCode;
589 }
590 errCode = SQLiteUtils::BindBlobToStatement(logStmt, BIND_INSERT_HASH_KEY_INDEX, dataItem.hashKey);
591 if (errCode != E_OK) {
592 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when insert", errCode);
593 return errCode;
594 }
595 errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_CLOUD_GID_INDEX, dataItem.gid);
596 if (errCode != E_OK) {
597 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when insert", errCode);
598 return errCode;
599 }
600 errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_VERSION_INDEX, dataItem.version);
601 if (errCode != E_OK) {
602 LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when insert", errCode);
603 return errCode;
604 }
605 errCode = SQLiteUtils::BindInt64ToStatement(logStmt, BIND_INSERT_CLOUD_FLAG_INDEX, dataItem.cloud_flag);
606 if (errCode != E_OK) {
607 LOGE("[SqliteCloudKvExecutorUtils] Bind cloud_flag failed %d when insert", errCode);
608 }
609 return errCode;
610 }
611
BindUpdateStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,const DataItem & dataItem)612 int SqliteCloudKvExecutorUtils::BindUpdateStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, const std::string &user,
613 const DataItem &dataItem)
614 {
615 int errCode = BindInsertLogStmt(logStmt, user, dataItem); // insert or replace LOG table for update DATA table.
616 if (errCode != E_OK) {
617 return errCode;
618 }
619 errCode = BindDataStmt(dataStmt, dataItem, false);
620 if (errCode != E_OK) {
621 return errCode;
622 }
623 return E_OK;
624 }
625
BindUpdateLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)626 int SqliteCloudKvExecutorUtils::BindUpdateLogStmt(sqlite3_stmt *logStmt, const std::string &user,
627 const DataItem &dataItem)
628 {
629 int index = 1;
630 int errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, dataItem.gid);
631 if (errCode != E_OK) {
632 LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when update", errCode);
633 return errCode;
634 }
635 errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, dataItem.version);
636 if (errCode != E_OK) {
637 LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when update", errCode);
638 return errCode;
639 }
640 errCode = SQLiteUtils::BindInt64ToStatement(logStmt, index++, dataItem.cloud_flag);
641 if (errCode != E_OK) {
642 LOGE("[SqliteCloudKvExecutorUtils] Bind cloud_flag failed %d when update", errCode);
643 return errCode;
644 }
645 errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, user);
646 if (errCode != E_OK) {
647 LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when update", errCode);
648 return errCode;
649 }
650 errCode = SQLiteUtils::BindBlobToStatement(logStmt, index++, dataItem.hashKey);
651 if (errCode != E_OK) {
652 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when update", errCode);
653 }
654 return errCode;
655 }
656
BindDeleteStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,DataItem & dataItem)657 int SqliteCloudKvExecutorUtils::BindDeleteStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, const std::string &user,
658 DataItem &dataItem)
659 {
660 dataItem.key = {};
661 dataItem.value = {};
662 dataItem.flag |= static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE);
663 int errCode = BindUpdateLogStmt(logStmt, user, dataItem); // update LOG table for delete DATA table.
664 if (errCode != E_OK) {
665 return errCode;
666 }
667 errCode = BindDataStmt(dataStmt, dataItem, false);
668 if (errCode != E_OK) {
669 return errCode;
670 }
671 return E_OK;
672 }
673
BindDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,bool isInsert)674 int SqliteCloudKvExecutorUtils::BindDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, bool isInsert)
675 {
676 int index = 1;
677 int errCode = BindSyncDataStmt(dataStmt, dataItem, isInsert, index);
678 if (errCode != E_OK) {
679 return errCode;
680 }
681 errCode = BindCloudDataStmt(dataStmt, dataItem, index);
682 if (errCode != E_OK) {
683 return errCode;
684 }
685 if (!isInsert) {
686 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.hashKey);
687 if (errCode != E_OK) {
688 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
689 }
690 }
691 return errCode;
692 }
693
BindSyncDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,bool isInsert,int & index)694 int SqliteCloudKvExecutorUtils::BindSyncDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, bool isInsert,
695 int &index)
696 {
697 int errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.key);
698 if (errCode != E_OK) {
699 LOGE("[SqliteCloudKvExecutorUtils] Bind key failed %d", errCode);
700 return errCode;
701 }
702 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.value);
703 if (errCode != E_OK) {
704 LOGE("[SqliteCloudKvExecutorUtils] Bind value failed %d", errCode);
705 return errCode;
706 }
707 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.timestamp));
708 if (errCode != E_OK) {
709 LOGE("[SqliteCloudKvExecutorUtils] Bind timestamp failed %d", errCode);
710 return errCode;
711 }
712 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.flag));
713 if (errCode != E_OK) {
714 LOGE("[SqliteCloudKvExecutorUtils] Bind flag failed %d", errCode);
715 return errCode;
716 }
717 Bytes bytes;
718 DBCommon::StringToVector(dataItem.dev, bytes);
719 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, bytes);
720 if (errCode != E_OK) {
721 LOGE("[SqliteCloudKvExecutorUtils] Bind dev failed %d", errCode);
722 return errCode;
723 }
724 DBCommon::StringToVector(dataItem.origDev, bytes);
725 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, bytes);
726 if (errCode != E_OK) {
727 LOGE("[SqliteCloudKvExecutorUtils] Bind oriDev failed %d", errCode);
728 return errCode;
729 }
730 if (isInsert) {
731 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.hashKey);
732 if (errCode != E_OK) {
733 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
734 return errCode;
735 }
736 }
737 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.writeTimestamp));
738 if (errCode != E_OK) {
739 LOGE("[SqliteCloudKvExecutorUtils] Bind wTime failed %d", errCode);
740 }
741 return errCode;
742 }
743
BindCloudDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,int & index)744 int SqliteCloudKvExecutorUtils::BindCloudDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, int &index)
745 {
746 int errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, dataItem.modifyTime);
747 if (errCode != E_OK) {
748 LOGE("[SqliteCloudKvExecutorUtils] Bind modifyTime failed %d", errCode);
749 return errCode;
750 }
751 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, dataItem.createTime);
752 if (errCode != E_OK) {
753 LOGE("[SqliteCloudKvExecutorUtils] Bind createTime failed %d", errCode);
754 }
755 return errCode;
756 }
757
StepStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,bool isMemory)758 int SqliteCloudKvExecutorUtils::StepStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, bool isMemory)
759 {
760 int errCode = SQLiteUtils::StepNext(logStmt, isMemory);
761 if (errCode != -E_FINISHED) {
762 LOGE("[SqliteCloudKvExecutorUtils] Step insert log stmt failed %d", errCode);
763 return errCode;
764 }
765 errCode = SQLiteUtils::StepNext(dataStmt, isMemory);
766 if (errCode != -E_FINISHED) {
767 LOGE("[SqliteCloudKvExecutorUtils] Step insert data stmt failed %d", errCode);
768 return errCode;
769 }
770 return E_OK;
771 }
772
FillCloudLog(const FillGidParam & param,OpType opType,const CloudSyncData & data,const std::string & user,CloudUploadRecorder & recorder)773 int SqliteCloudKvExecutorUtils::FillCloudLog(const FillGidParam ¶m, OpType opType, const CloudSyncData &data,
774 const std::string &user, CloudUploadRecorder &recorder)
775 {
776 if (param.first == nullptr) {
777 LOGE("[SqliteCloudKvExecutorUtils] Fill log got nullptr db");
778 return -E_INVALID_ARGS;
779 }
780 if (data.isCloudVersionRecord) {
781 int errCode = FillCloudVersionRecord(param.first, opType, data);
782 if (errCode != E_OK) {
783 return errCode;
784 }
785 }
786 switch (opType) {
787 case OpType::INSERT:
788 return FillCloudGid(param, data.insData, user, CloudWaterType::INSERT, recorder);
789 case OpType::UPDATE:
790 return FillCloudGid(param, data.updData, user, CloudWaterType::UPDATE, recorder);
791 case OpType::DELETE:
792 return FillCloudGid(param, data.delData, user, CloudWaterType::DELETE, recorder);
793 default:
794 return E_OK;
795 }
796 }
797
OnlyUpdateLogTable(sqlite3 * db,bool isMemory,int index,OpType op,DownloadData & downloadData)798 int SqliteCloudKvExecutorUtils::OnlyUpdateLogTable(sqlite3 *db, bool isMemory, int index, OpType op,
799 DownloadData &downloadData)
800 {
801 if (downloadData.existDataHashKey[index].empty()) {
802 return E_OK;
803 }
804 sqlite3_stmt *logStmt = nullptr;
805 int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(op), logStmt);
806 if (errCode != E_OK) {
807 LOGE("[SqliteCloudKvExecutorUtils] Get update sync data stmt failed %d", errCode);
808 return errCode;
809 }
810 ResFinalizer finalizerData([logStmt]() {
811 sqlite3_stmt *statement = logStmt;
812 int ret = E_OK;
813 SQLiteUtils::ResetStatement(statement, true, ret);
814 if (ret != E_OK) {
815 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when only update log", ret);
816 }
817 });
818 auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
819 if (res.first != E_OK) {
820 LOGE("[SqliteCloudKvExecutorUtils] Get data item failed %d", res.first);
821 return res.first;
822 }
823 bool clearCloudInfo = (op == OpType::CLEAR_GID);
824 if (res.second.hashKey.empty() || DBCommon::IsRecordDelete(downloadData.data[index])) {
825 res.second.hashKey = downloadData.existDataHashKey[index];
826 clearCloudInfo = true;
827 }
828 if (clearCloudInfo) {
829 res.second.gid.clear();
830 res.second.version.clear();
831 }
832 errCode = BindOnlyUpdateLogStmt(logStmt, downloadData.user, res.second);
833 if (errCode != E_OK) {
834 return errCode;
835 }
836 errCode = SQLiteUtils::StepNext(logStmt, isMemory);
837 if (errCode == -E_FINISHED) {
838 errCode = E_OK;
839 }
840 return errCode;
841 }
842
FillCloudGid(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,const CloudWaterType & type,CloudUploadRecorder & recorder)843 int SqliteCloudKvExecutorUtils::FillCloudGid(const FillGidParam ¶m, const CloudSyncBatch &data,
844 const std::string &user, const CloudWaterType &type, CloudUploadRecorder &recorder)
845 {
846 auto [db, ignoreEmptyGid] = param;
847 sqlite3_stmt *logStmt = nullptr;
848 int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(TransToOpType(type)), logStmt);
849 ResFinalizer finalizerData([logStmt]() {
850 sqlite3_stmt *statement = logStmt;
851 int ret = E_OK;
852 SQLiteUtils::ResetStatement(statement, true, ret);
853 if (ret != E_OK) {
854 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when fill log", ret);
855 }
856 });
857 for (size_t i = 0; i < data.hashKey.size(); ++i) {
858 if (DBCommon::IsRecordError(data.extend[i]) || DBCommon::IsRecordVersionConflict(data.extend[i])) {
859 continue;
860 }
861 DataItem dataItem;
862 errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, data.extend[i], dataItem.gid);
863 if (dataItem.gid.empty() && ignoreEmptyGid) {
864 continue;
865 }
866 if (errCode != E_OK) {
867 return errCode;
868 }
869 CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::VERSION_FIELD, data.extend[i], dataItem.version);
870 dataItem.hashKey = data.hashKey[i];
871 dataItem.dataDelete = CheckDataDelete(param, data, i);
872 errCode = BindFillGidLogStmt(logStmt, user, dataItem, data.extend[i], type);
873 if (errCode != E_OK) {
874 return errCode;
875 }
876 errCode = SQLiteUtils::StepNext(logStmt, false);
877 if (errCode == -E_FINISHED) {
878 errCode = E_OK;
879 }
880 if (errCode != E_OK) {
881 LOGE("[SqliteCloudKvExecutorUtils] fill back failed %d index %zu", errCode, i);
882 return errCode;
883 }
884 SQLiteUtils::ResetStatement(logStmt, false, errCode);
885 MarkUploadSuccess(param, data, user, i);
886 // ignored version record
887 if (i >= data.timestamp.size()) {
888 continue;
889 }
890 recorder.RecordUploadRecord(CloudDbConstant::CLOUD_KV_TABLE_NAME, data.hashKey[i], type, data.timestamp[i]);
891 }
892 return E_OK;
893 }
894
OnlyUpdateSyncData(sqlite3 * db,bool isMemory,int index,OpType opType,DownloadData & downloadData)895 int SqliteCloudKvExecutorUtils::OnlyUpdateSyncData(sqlite3 *db, bool isMemory, int index, OpType opType,
896 DownloadData &downloadData)
897 {
898 if (opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO && opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE &&
899 opType != OpType::UPDATE_TIMESTAMP) {
900 LOGW("[SqliteCloudKvExecutorUtils] Ignore unknown opType %d", static_cast<int>(opType));
901 return E_OK;
902 }
903 sqlite3_stmt *dataStmt = nullptr;
904 int errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
905 if (errCode != E_OK) {
906 LOGE("[SqliteCloudKvExecutorUtils] Get update sync data stmt failed %d", errCode);
907 return errCode;
908 }
909 ResFinalizer finalizerData([dataStmt]() {
910 sqlite3_stmt *statement = dataStmt;
911 int ret = E_OK;
912 SQLiteUtils::ResetStatement(statement, true, ret);
913 if (ret != E_OK) {
914 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when update log", ret);
915 }
916 });
917 errCode = BindUpdateSyncDataStmt(dataStmt, index, opType, downloadData);
918 if (errCode != E_OK) {
919 LOGE("[SqliteCloudKvExecutorUtils] Bind update sync data stmt failed %d", errCode);
920 return errCode;
921 }
922 errCode = SQLiteUtils::StepNext(dataStmt, isMemory);
923 if (errCode == -E_FINISHED) {
924 errCode = E_OK;
925 }
926 return errCode;
927 }
928
BindUpdateSyncDataStmt(sqlite3_stmt * dataStmt,int index,OpType opType,DownloadData & downloadData)929 int SqliteCloudKvExecutorUtils::BindUpdateSyncDataStmt(sqlite3_stmt *dataStmt, int index, OpType opType,
930 DownloadData &downloadData)
931 {
932 switch (opType) {
933 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
934 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
935 return SQLiteUtils::BindBlobToStatement(dataStmt, 1, downloadData.existDataHashKey[index]);
936 case OpType::UPDATE_TIMESTAMP:
937 return BindUpdateTimestampStmt(dataStmt, index, downloadData);
938 default:
939 return E_OK;
940 }
941 }
942
BindUpdateTimestampStmt(sqlite3_stmt * dataStmt,int index,DownloadData & downloadData)943 int SqliteCloudKvExecutorUtils::BindUpdateTimestampStmt(sqlite3_stmt *dataStmt, int index, DownloadData &downloadData)
944 {
945 auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
946 auto &[errCode, dataItem] = res;
947 if (errCode != E_OK) {
948 return errCode;
949 }
950 int currentBindIndex = 1; // bind sql index start at 1
951 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, currentBindIndex++, dataItem.timestamp);
952 if (errCode != E_OK) {
953 LOGE("[SqliteCloudKvExecutorUtils] Bind timestamp failed %d", errCode);
954 return errCode;
955 }
956 errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, currentBindIndex++, dataItem.modifyTime);
957 if (errCode != E_OK) {
958 LOGE("[SqliteCloudKvExecutorUtils] Bind modifyTime failed %d", errCode);
959 }
960 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, currentBindIndex++, dataItem.hashKey);
961 if (errCode != E_OK) {
962 LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
963 return errCode;
964 }
965 return E_OK;
966 }
967
GetDataItem(int index,DownloadData & downloadData)968 std::pair<int, DataItem> SqliteCloudKvExecutorUtils::GetDataItem(int index, DownloadData &downloadData)
969 {
970 auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
971 auto &[errCode, dataItem] = res;
972 if (errCode != E_OK) {
973 LOGE("[SqliteCloudKvExecutorUtils] Get data item failed %d", errCode);
974 return res;
975 }
976 std::string dev;
977 errCode = RuntimeContext::GetInstance()->GetLocalIdentity(dev);
978 if (errCode != E_OK) {
979 return res;
980 }
981 dev = DBCommon::TransferHashString(dev);
982 auto decodeDevice = DBBase64Utils::Decode(dataItem.dev);
983 if (!decodeDevice.empty()) {
984 dataItem.dev = std::string(decodeDevice.begin(), decodeDevice.end());
985 }
986 if (dataItem.dev == dev) {
987 dataItem.dev = "";
988 }
989 decodeDevice = DBBase64Utils::Decode(dataItem.origDev);
990 if (!decodeDevice.empty()) {
991 dataItem.origDev = std::string(decodeDevice.begin(), decodeDevice.end());
992 }
993 if (dataItem.origDev == dev) {
994 dataItem.origDev = "";
995 }
996 dataItem.timestamp = static_cast<Timestamp>(static_cast<int64_t>(dataItem.modifyTime) + downloadData.timeOffset);
997 dataItem.writeTimestamp = dataItem.timestamp; // writeTimestamp is process conflict time
998 return res;
999 }
1000
CountCloudDataInner(sqlite3 * db,bool isMemory,const Timestamp & timestamp,const std::string & user,std::string & sql)1001 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountCloudDataInner(sqlite3 *db, bool isMemory,
1002 const Timestamp ×tamp, const std::string &user, std::string &sql)
1003 {
1004 std::pair<int, int64_t> res;
1005 auto &[errCode, count] = res;
1006 sqlite3_stmt *stmt = nullptr;
1007 errCode = SQLiteUtils::GetStatement(db, sql, stmt);
1008 if (errCode != E_OK) {
1009 LOGE("[SqliteCloudKvExecutorUtils] Count data stmt failed %d", errCode);
1010 return res;
1011 }
1012 ResFinalizer finalizer([stmt]() {
1013 sqlite3_stmt *statement = stmt;
1014 int ret = E_OK;
1015 SQLiteUtils::ResetStatement(statement, true, ret);
1016 if (ret != E_OK) {
1017 LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when count data", ret);
1018 }
1019 });
1020 errCode = SQLiteUtils::BindTextToStatement(stmt, BIND_CLOUD_USER, user);
1021 if (errCode != E_OK) {
1022 LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d", errCode);
1023 return res;
1024 }
1025 errCode = SQLiteUtils::BindInt64ToStatement(stmt, BIND_CLOUD_TIMESTAMP, static_cast<int64_t>(timestamp));
1026 if (errCode != E_OK) {
1027 LOGE("[SqliteCloudKvExecutorUtils] Bind begin time failed %d", errCode);
1028 return res;
1029 }
1030 errCode = SQLiteUtils::StepNext(stmt, isMemory);
1031 if (errCode == -E_FINISHED) {
1032 count = 0;
1033 return res;
1034 }
1035 count = sqlite3_column_int64(stmt, CLOUD_QUERY_COUNT_INDEX);
1036 LOGD("[SqliteCloudKvExecutorUtils] Get total upload count %" PRId64, count);
1037 return res;
1038 }
1039
CountCloudData(sqlite3 * db,bool isMemory,const Timestamp & timestamp,const std::string & user,bool forcePush)1040 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountCloudData(sqlite3 *db, bool isMemory,
1041 const Timestamp ×tamp, const std::string &user, bool forcePush)
1042 {
1043 std::string sql = SqliteQueryHelper::GetKvCloudQuerySql(true, forcePush);
1044 return CountCloudDataInner(db, isMemory, timestamp, user, sql);
1045 }
1046
CountAllCloudData(const DBParam & param,const std::vector<Timestamp> & timestampVec,const std::string & user,bool forcePush,QuerySyncObject & querySyncObject)1047 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountAllCloudData(const DBParam ¶m,
1048 const std::vector<Timestamp> ×tampVec, const std::string &user, bool forcePush,
1049 QuerySyncObject &querySyncObject)
1050 {
1051 std::pair<int, int64_t> res = { E_OK, 0 };
1052 auto &[errCode, count] = res;
1053 if (timestampVec.size() != 3) { // 3 is the number of three mode.
1054 errCode = -E_INVALID_ARGS;
1055 return res;
1056 }
1057 std::vector<CloudWaterType> typeVec = DBCommon::GetWaterTypeVec();
1058 SqliteQueryHelper helper = querySyncObject.GetQueryHelper(errCode);
1059 if (errCode != E_OK) {
1060 return res;
1061 }
1062 for (size_t i = 0; i < typeVec.size(); i++) {
1063 sqlite3_stmt *stmt = nullptr;
1064 errCode = helper.GetCountKvCloudDataStatement(param.first, forcePush, typeVec[i], stmt);
1065 if (errCode != E_OK) {
1066 return res;
1067 }
1068 // count no use watermark
1069 auto [err, cnt] = helper.BindCountKvCloudDataStatement(param.first, param.second, 0u, user, stmt);
1070 if (err != E_OK) {
1071 return { err, 0 };
1072 }
1073 count += cnt;
1074 }
1075 return res;
1076 }
1077
FillCloudVersionRecord(sqlite3 * db,OpType opType,const CloudSyncData & data)1078 int SqliteCloudKvExecutorUtils::FillCloudVersionRecord(sqlite3 *db, OpType opType, const CloudSyncData &data)
1079 {
1080 if (opType != OpType::INSERT && opType != OpType::UPDATE) {
1081 return E_OK;
1082 }
1083 bool isInsert = (opType == OpType::INSERT);
1084 CloudSyncBatch syncBatch = isInsert ? data.insData : data.updData;
1085 if (syncBatch.record.empty()) {
1086 LOGW("[SqliteCloudKvExecutorUtils] Fill empty cloud version record");
1087 return E_OK;
1088 }
1089 syncBatch.record[0].insert(syncBatch.extend[0].begin(), syncBatch.extend[0].end());
1090 auto res = CloudStorageUtils::GetSystemRecordFromCloudData(syncBatch.record[0]); // only record first one
1091 auto &[errCode, dataItem] = res;
1092 sqlite3_stmt *dataStmt = nullptr;
1093 errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
1094 if (errCode != E_OK) {
1095 LOGE("[SqliteCloudKvExecutorUtils] Get insert version record statement failed %d", errCode);
1096 return errCode;
1097 }
1098 ResFinalizer finalizerData([dataStmt]() {
1099 int ret = E_OK;
1100 sqlite3_stmt *statement = dataStmt;
1101 SQLiteUtils::ResetStatement(statement, true, ret);
1102 if (ret != E_OK) {
1103 LOGW("[SqliteCloudKvExecutorUtils] Reset version record stmt failed %d", ret);
1104 }
1105 });
1106 errCode = BindDataStmt(dataStmt, dataItem, isInsert);
1107 if (errCode != E_OK) {
1108 return errCode;
1109 }
1110 errCode = SQLiteUtils::StepNext(dataStmt, false);
1111 if (errCode != -E_FINISHED) {
1112 LOGE("[SqliteCloudKvExecutorUtils] Step insert version record stmt failed %d", errCode);
1113 return errCode;
1114 }
1115 return E_OK;
1116 }
1117
GetLocalCloudVersion(sqlite3 * db,bool isMemory,const std::string & user)1118 std::pair<int, CloudSyncData> SqliteCloudKvExecutorUtils::GetLocalCloudVersion(sqlite3 *db, bool isMemory,
1119 const std::string &user)
1120 {
1121 auto res = GetLocalCloudVersionInner(db, isMemory, user);
1122 if (res.first != E_OK) {
1123 LOGE("[SqliteCloudKvExecutorUtils] Get local cloud version failed %d", res.first);
1124 }
1125 return res;
1126 }
1127
GetLocalCloudVersionInner(sqlite3 * db,bool isMemory,const std::string & user)1128 std::pair<int, CloudSyncData> SqliteCloudKvExecutorUtils::GetLocalCloudVersionInner(sqlite3 *db, bool isMemory,
1129 const std::string &user)
1130 {
1131 std::pair<int, CloudSyncData> res;
1132 auto &[errCode, syncData] = res;
1133 auto sql = SqliteQueryHelper::GetKvCloudRecordSql();
1134 sqlite3_stmt *stmt = nullptr;
1135 errCode = SQLiteUtils::GetStatement(db, sql, stmt);
1136 if (errCode != E_OK) {
1137 return res;
1138 }
1139 ResFinalizer finalizerData([stmt]() {
1140 int ret = E_OK;
1141 sqlite3_stmt *statement = stmt;
1142 SQLiteUtils::ResetStatement(statement, true, ret);
1143 if (ret != E_OK) {
1144 LOGW("[SqliteCloudKvExecutorUtils] Reset local version record stmt failed %d", ret);
1145 }
1146 });
1147 std::string hashDev;
1148 errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDev);
1149 if (errCode != E_OK) {
1150 return res;
1151 }
1152 std::string tempDev = DBCommon::TransferHashString(hashDev);
1153 hashDev = DBCommon::TransferStringToHex(tempDev);
1154 std::string key = CloudDbConstant::CLOUD_VERSION_RECORD_PREFIX_KEY + hashDev;
1155 Key keyVec;
1156 DBCommon::StringToVector(key, keyVec);
1157 errCode = SQLiteUtils::BindBlobToStatement(stmt, BIND_CLOUD_VERSION_RECORD_KEY_INDEX, keyVec);
1158 if (errCode != E_OK) {
1159 return res;
1160 }
1161 errCode = SQLiteUtils::BindTextToStatement(stmt, BIND_CLOUD_VERSION_RECORD_USER_INDEX, user);
1162 if (errCode != E_OK) {
1163 return res;
1164 }
1165 errCode = GetCloudVersionRecord(isMemory, stmt, syncData);
1166 if (errCode == -E_NOT_FOUND) {
1167 InitDefaultCloudVersionRecord(key, tempDev, syncData);
1168 errCode = E_OK;
1169 }
1170 return res;
1171 }
1172
GetCloudVersionRecord(bool isMemory,sqlite3_stmt * stmt,CloudSyncData & syncData)1173 int SqliteCloudKvExecutorUtils::GetCloudVersionRecord(bool isMemory, sqlite3_stmt *stmt, CloudSyncData &syncData)
1174 {
1175 int errCode = SQLiteUtils::StepNext(stmt, isMemory);
1176 if (errCode == -E_FINISHED) {
1177 return -E_NOT_FOUND;
1178 }
1179 if (errCode != E_OK) {
1180 LOGE("[SqliteCloudKvExecutorUtils] Get local version failed %d", errCode);
1181 return errCode;
1182 }
1183 CloudSyncConfig config;
1184 config.maxUploadSize = CloudDbConstant::MAX_UPLOAD_SIZE;
1185 config.maxUploadCount = 1;
1186 CloudUploadRecorder recorder; // ignore last record
1187 UploadDetail detail;
1188 errCode = GetCloudDataForSync(config, recorder, stmt, syncData, detail);
1189 return errCode;
1190 }
1191
InitDefaultCloudVersionRecord(const std::string & key,const std::string & dev,CloudSyncData & syncData)1192 void SqliteCloudKvExecutorUtils::InitDefaultCloudVersionRecord(const std::string &key, const std::string &dev,
1193 CloudSyncData &syncData)
1194 {
1195 LOGI("[SqliteCloudKvExecutorUtils] Not found local version record");
1196 VBucket defaultRecord;
1197 defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_KEY] = key;
1198 defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_VALUE] = std::string("");
1199 auto encodeDev = DBBase64Utils::Encode(dev);
1200 defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_DEVICE] = encodeDev;
1201 defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE] = encodeDev;
1202 syncData.insData.record.push_back(std::move(defaultRecord));
1203 VBucket defaultExtend;
1204 defaultExtend[CloudDbConstant::HASH_KEY_FIELD] = DBCommon::TransferStringToHex(key);
1205 syncData.insData.extend.push_back(std::move(defaultExtend));
1206 syncData.insData.assets.emplace_back();
1207 Bytes bytesHashKey;
1208 DBCommon::StringToVector(key, bytesHashKey);
1209 syncData.insData.hashKey.push_back(bytesHashKey);
1210 }
1211
BindVersionStmt(const std::string & device,const std::string & user,sqlite3_stmt * dataStmt)1212 int SqliteCloudKvExecutorUtils::BindVersionStmt(const std::string &device, const std::string &user,
1213 sqlite3_stmt *dataStmt)
1214 {
1215 std::string hashDevice;
1216 int errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDevice);
1217 if (errCode != E_OK) {
1218 return errCode;
1219 }
1220 Bytes bytes;
1221 if (device == hashDevice) {
1222 DBCommon::StringToVector("", bytes);
1223 } else {
1224 hashDevice = DBCommon::TransferHashString(device);
1225 DBCommon::StringToVector(hashDevice, bytes);
1226 }
1227 errCode = SQLiteUtils::BindBlobToStatement(dataStmt, BIND_CLOUD_VERSION_DEVICE_INDEX, bytes);
1228 if (errCode != E_OK) {
1229 LOGE("[SqliteCloudKvExecutorUtils] Bind device failed %d", errCode);
1230 }
1231 return errCode;
1232 }
1233
GetCloudVersionFromCloud(sqlite3 * db,bool isMemory,const std::string & user,const std::string & device,std::vector<VBucket> & dataVector)1234 int SqliteCloudKvExecutorUtils::GetCloudVersionFromCloud(sqlite3 *db, bool isMemory, const std::string &user,
1235 const std::string &device, std::vector<VBucket> &dataVector)
1236 {
1237 sqlite3_stmt *dataStmt = nullptr;
1238 bool isDeviceEmpty = device.empty();
1239 std::string sql = SqliteQueryHelper::GetCloudVersionRecordSql(isDeviceEmpty);
1240 int errCode = SQLiteUtils::GetStatement(db, sql, dataStmt);
1241 if (errCode != E_OK) {
1242 LOGE("[SqliteCloudKvExecutorUtils] Get cloud version record statement failed %d", errCode);
1243 return errCode;
1244 }
1245 ResFinalizer finalizerData([dataStmt]() {
1246 int ret = E_OK;
1247 sqlite3_stmt *statement = dataStmt;
1248 SQLiteUtils::ResetStatement(statement, true, ret);
1249 if (ret != E_OK) {
1250 LOGW("[SqliteCloudKvExecutorUtils] Reset cloud version record stmt failed %d", ret);
1251 }
1252 });
1253 if (!isDeviceEmpty) {
1254 errCode = BindVersionStmt(device, user, dataStmt);
1255 if (errCode != E_OK) {
1256 return errCode;
1257 }
1258 }
1259 uint32_t totalSize = 0;
1260 do {
1261 errCode = SQLiteUtils::StepWithRetry(dataStmt, isMemory);
1262 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1263 errCode = E_OK;
1264 break;
1265 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1266 LOGE("[SqliteCloudKvExecutorUtils] Get cloud version from cloud failed. %d", errCode);
1267 break;
1268 }
1269 VBucket data;
1270 errCode = GetCloudVersionRecordData(dataStmt, data, totalSize);
1271 dataVector.push_back(data);
1272 } while (errCode == E_OK);
1273 return errCode;
1274 }
1275
GetCloudVersionRecordData(sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)1276 int SqliteCloudKvExecutorUtils::GetCloudVersionRecordData(sqlite3_stmt *stmt, VBucket &data, uint32_t &totalSize)
1277 {
1278 int errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, data, totalSize);
1279 if (errCode != E_OK) {
1280 return errCode;
1281 }
1282 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, CLOUD_QUERY_VALUE_INDEX, stmt, data, totalSize);
1283 if (errCode != E_OK) {
1284 return errCode;
1285 }
1286 return GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, CLOUD_QUERY_DEV_INDEX, stmt, data, totalSize);
1287 }
1288
GetWaitCompensatedSyncDataPk(sqlite3 * db,bool isMemory,std::vector<VBucket> & data)1289 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataPk(sqlite3 *db, bool isMemory, std::vector<VBucket> &data)
1290 {
1291 sqlite3_stmt *stmt = nullptr;
1292 int errCode = SQLiteUtils::GetStatement(db, SELECT_COMPENSATE_SYNC_KEY_SQL, stmt);
1293 if (errCode != E_OK) {
1294 LOGE("[SqliteCloudKvExecutorUtils] Get compensate key stmt failed %d", errCode);
1295 return errCode;
1296 }
1297 ResFinalizer finalizerData([stmt]() {
1298 int ret = E_OK;
1299 sqlite3_stmt *statement = stmt;
1300 SQLiteUtils::ResetStatement(statement, true, ret);
1301 if (ret != E_OK) {
1302 LOGW("[SqliteCloudKvExecutorUtils] Reset compensate key stmt failed %d", ret);
1303 }
1304 });
1305 uint32_t totalSize = 0;
1306 do {
1307 errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1308 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1309 errCode = E_OK;
1310 break;
1311 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1312 LOGE("[SqliteCloudKvExecutorUtils] Get key from compensate key stmt failed. %d", errCode);
1313 break;
1314 }
1315 VBucket key;
1316 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, key, totalSize);
1317 if (errCode != E_OK) {
1318 return errCode;
1319 }
1320 data.push_back(key);
1321 } while (errCode == E_OK);
1322 return errCode;
1323 }
1324
GetWaitCompensatedSyncDataUserId(sqlite3 * db,bool isMemory,std::vector<VBucket> & users)1325 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataUserId(sqlite3 *db, bool isMemory,
1326 std::vector<VBucket> &users)
1327 {
1328 sqlite3_stmt *stmt = nullptr;
1329 int errCode = SQLiteUtils::GetStatement(db, SELECT_COMPENSATE_SYNC_USERID_SQL, stmt);
1330 if (errCode != E_OK) {
1331 LOGE("[SqliteCloudKvExecutorUtils] Get compensate userid stmt failed %d", errCode);
1332 return errCode;
1333 }
1334 ResFinalizer finalizerData([stmt]() {
1335 int ret = E_OK;
1336 sqlite3_stmt *statement = stmt;
1337 SQLiteUtils::ResetStatement(statement, true, ret);
1338 if (ret != E_OK) {
1339 LOGW("[SqliteCloudKvExecutorUtils] Reset compensate key stmt failed %d", ret);
1340 }
1341 });
1342 uint32_t totalSize = 0;
1343 do {
1344 errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1345 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1346 errCode = E_OK;
1347 break;
1348 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1349 LOGE("[SqliteCloudKvExecutorUtils] Get key from compensate key stmt failed. %d", errCode);
1350 break;
1351 }
1352 VBucket key;
1353 errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_USERID, CLOUD_QUERY_KEY_INDEX, stmt,
1354 key, totalSize);
1355 if (errCode != E_OK) {
1356 return errCode;
1357 }
1358 users.push_back(key);
1359 } while (errCode == E_OK);
1360 return errCode;
1361 }
1362
GetWaitCompensatedSyncData(sqlite3 * db,bool isMemory,std::vector<VBucket> & data,std::vector<VBucket> & users)1363 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncData(sqlite3 *db, bool isMemory, std::vector<VBucket> &data,
1364 std::vector<VBucket> &users)
1365 {
1366 int errCode = SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataPk(db, isMemory, data);
1367 if (errCode != E_OK) {
1368 LOGW("[GetWaitCompensatedSyncData] Get wait compensated sync date failed! errCode=%d", errCode);
1369 return errCode;
1370 }
1371 errCode = SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataUserId(db, isMemory, users);
1372 if (errCode != E_OK) {
1373 LOGW("[GetWaitCompensatedSyncData] Get wait compensated sync date failed! errCode=%d", errCode);
1374 }
1375 return errCode;
1376 }
1377
QueryCloudGid(sqlite3 * db,bool isMemory,const std::string & user,QuerySyncObject & querySyncObject,std::vector<std::string> & cloudGid)1378 int SqliteCloudKvExecutorUtils::QueryCloudGid(sqlite3 *db, bool isMemory, const std::string &user,
1379 QuerySyncObject &querySyncObject, std::vector<std::string> &cloudGid)
1380 {
1381 int errCode = E_OK;
1382 QuerySyncObject query = querySyncObject;
1383 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1384 if (errCode != E_OK) {
1385 return errCode;
1386 }
1387 sqlite3_stmt *stmt = nullptr;
1388 errCode = helper.GetAndBindGidKvCloudQueryStatement(user, db, stmt);
1389 if (errCode != E_OK) {
1390 return errCode;
1391 }
1392 do {
1393 errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1394 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1395 errCode = E_OK;
1396 break;
1397 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1398 LOGE("[SqliteCloudKvExecutorUtils] Get cloud version from cloud failed. %d", errCode);
1399 break;
1400 }
1401 std::string gid;
1402 errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, gid);
1403 cloudGid.push_back(gid);
1404 } while (errCode == E_OK);
1405 int ret = E_OK;
1406 SQLiteUtils::ResetStatement(stmt, true, ret);
1407 return errCode == E_OK ? ret : errCode;
1408 }
1409
BindFillGidLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem,const VBucket & uploadExtend,const CloudWaterType & type)1410 int SqliteCloudKvExecutorUtils::BindFillGidLogStmt(sqlite3_stmt *logStmt, const std::string &user,
1411 const DataItem &dataItem, const VBucket &uploadExtend, const CloudWaterType &type)
1412 {
1413 DataItem wItem = dataItem;
1414 if (DBCommon::IsNeedCompensatedForUpload(uploadExtend, type) && !wItem.dataDelete) {
1415 wItem.cloud_flag |= static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC);
1416 }
1417 if (DBCommon::IsCloudRecordNotFound(uploadExtend) &&
1418 (type == CloudWaterType::UPDATE || type == CloudWaterType::DELETE)) {
1419 wItem.gid = {};
1420 wItem.version = {};
1421 }
1422 int errCode = E_OK;
1423 if (type == CloudWaterType::DELETE) {
1424 if (DBCommon::IsCloudRecordNotFound(uploadExtend)) {
1425 errCode = BindUpdateLogStmt(logStmt, user, wItem);
1426 }
1427 } else {
1428 errCode = BindInsertLogStmt(logStmt, user, wItem);
1429 }
1430 if (errCode != E_OK) {
1431 LOGE("[SqliteCloudKvExecutorUtils] fill cloud gid failed. %d", errCode);
1432 }
1433 return errCode;
1434 }
1435
MarkUploadSuccess(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,size_t dataIndex)1436 void SqliteCloudKvExecutorUtils::MarkUploadSuccess(const FillGidParam ¶m, const CloudSyncBatch &data,
1437 const std::string &user, size_t dataIndex)
1438 {
1439 if (data.extend.size() <= dataIndex || data.hashKey.size() <= dataIndex || data.timestamp.size() <= dataIndex) {
1440 LOGW("[SqliteCloudKvExecutorUtils] invalid index %zu when mark upload success", dataIndex);
1441 return;
1442 }
1443 if (!DBCommon::IsRecordSuccess(data.extend[dataIndex])) {
1444 return;
1445 }
1446 if (CheckDataChanged(param, data, dataIndex)) {
1447 LOGW("[SqliteCloudKvExecutorUtils] %zu data changed when mark upload success", dataIndex);
1448 return;
1449 }
1450 MarkUploadSuccessInner(param, data, user, dataIndex);
1451 }
1452
CheckDataChanged(const FillGidParam & param,const CloudSyncBatch & data,size_t dataIndex)1453 bool SqliteCloudKvExecutorUtils::CheckDataChanged(const FillGidParam ¶m,
1454 const CloudSyncBatch &data, size_t dataIndex)
1455 {
1456 sqlite3_stmt *checkStmt = nullptr;
1457 int errCode = SQLiteUtils::GetStatement(param.first, CHECK_DATA_CHANGED, checkStmt);
1458 ResFinalizer finalizerData([checkStmt]() {
1459 sqlite3_stmt *statement = checkStmt;
1460 int ret = E_OK;
1461 SQLiteUtils::ResetStatement(statement, true, ret);
1462 if (ret != E_OK) {
1463 LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when check data changed", ret);
1464 }
1465 });
1466 int index = 1;
1467 if (data.timestamp.size() > 0) {
1468 errCode = SQLiteUtils::BindInt64ToStatement(checkStmt, index++, data.timestamp[dataIndex]);
1469 if (errCode != E_OK) {
1470 LOGW("[SqliteCloudKvExecutorUtils] bind modify time failed %d when check data changed", errCode);
1471 return true;
1472 }
1473 }
1474 if (data.hashKey.size() > 0) {
1475 errCode = SQLiteUtils::BindBlobToStatement(checkStmt, index++, data.hashKey[dataIndex]);
1476 if (errCode != E_OK) {
1477 LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when check data changed", errCode);
1478 return true;
1479 }
1480 }
1481 errCode = SQLiteUtils::StepNext(checkStmt);
1482 if (errCode != E_OK) {
1483 LOGW("[SqliteCloudKvExecutorUtils] step failed %d when check data changed", errCode);
1484 return true;
1485 }
1486 return sqlite3_column_int64(checkStmt, 0) == 0; // get index start at 0, get 0 is data changed
1487 }
1488
CheckDataDelete(const FillGidParam & param,const CloudSyncBatch & data,size_t dataIndex)1489 bool SqliteCloudKvExecutorUtils::CheckDataDelete(const FillGidParam ¶m,
1490 const CloudSyncBatch &data, size_t dataIndex)
1491 {
1492 sqlite3_stmt *checkStmt = nullptr;
1493 int errCode = SQLiteUtils::GetStatement(param.first, CHECK_DATA_DELETE, checkStmt);
1494 ResFinalizer finalizerData([checkStmt]() {
1495 sqlite3_stmt *statement = checkStmt;
1496 int ret = E_OK;
1497 SQLiteUtils::ResetStatement(statement, true, ret);
1498 if (ret != E_OK) {
1499 LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when check data delete", ret);
1500 }
1501 });
1502 int index = 1;
1503 if (data.timestamp.size() > 0) {
1504 errCode = SQLiteUtils::BindInt64ToStatement(checkStmt, index++, data.timestamp[dataIndex]);
1505 if (errCode != E_OK) {
1506 LOGW("[SqliteCloudKvExecutorUtils] bind modify time failed %d when check data delete", errCode);
1507 return true;
1508 }
1509 }
1510 if (data.hashKey.size() > 0) {
1511 errCode = SQLiteUtils::BindBlobToStatement(checkStmt, index++, data.hashKey[dataIndex]);
1512 if (errCode != E_OK) {
1513 LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when check data delete", errCode);
1514 return true;
1515 }
1516 }
1517 errCode = SQLiteUtils::StepNext(checkStmt);
1518 if (errCode != E_OK) {
1519 LOGW("[SqliteCloudKvExecutorUtils] step failed %d when check data delete", errCode);
1520 return true;
1521 }
1522 return sqlite3_column_int64(checkStmt, 0) != 0; // get index start at !0, get 0 means data delete
1523 }
1524
MarkUploadSuccessInner(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,size_t dataIndex)1525 void SqliteCloudKvExecutorUtils::MarkUploadSuccessInner(const FillGidParam ¶m,
1526 const CloudSyncBatch &data, const std::string &user, size_t dataIndex)
1527 {
1528 sqlite3_stmt *logStmt = nullptr;
1529 int errCode = SQLiteUtils::GetStatement(param.first, MARK_UPLOAD_SUCCESS, logStmt);
1530 ResFinalizer finalizerData([logStmt]() {
1531 sqlite3_stmt *statement = logStmt;
1532 int ret = E_OK;
1533 SQLiteUtils::ResetStatement(statement, true, ret);
1534 if (ret != E_OK) {
1535 LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when mark upload success", ret);
1536 }
1537 });
1538 int index = 1;
1539 errCode = SQLiteUtils::BindBlobToStatement(logStmt, index++, data.hashKey[dataIndex]);
1540 if (errCode != E_OK) {
1541 LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when mark upload success", errCode);
1542 return;
1543 }
1544 errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, user);
1545 if (errCode != E_OK) {
1546 LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when mark upload success", errCode);
1547 return;
1548 }
1549 errCode = SQLiteUtils::StepNext(logStmt);
1550 if (errCode != E_OK && errCode != -E_FINISHED) {
1551 LOGW("[SqliteCloudKvExecutorUtils] step failed %d when mark upload success", errCode);
1552 return;
1553 }
1554 }
1555 }