1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sqlite_cloud_kv_executor_utils.h"
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_storage_utils.h"
19 #include "db_base64_utils.h"
20 #include "db_common.h"
21 #include "res_finalizer.h"
22 #include "runtime_context.h"
23 #include "sqlite_single_ver_storage_executor_sql.h"
24 #include "time_helper.h"
25 
26 namespace DistributedDB {
GetCloudData(const CloudSyncConfig & config,const DBParam & param,const CloudUploadRecorder & recorder,SQLiteSingleVerContinueToken & token,CloudSyncData & data)27 int SqliteCloudKvExecutorUtils::GetCloudData(const CloudSyncConfig &config, const DBParam &param,
28     const CloudUploadRecorder &recorder, SQLiteSingleVerContinueToken &token, CloudSyncData &data)
29 {
30     auto [db, isMemory] = param;
31     bool stepNext = false;
32     auto [errCode, stmt] = token.GetCloudQueryStmt(db, data.isCloudForcePushStrategy, stepNext, data.mode);
33     if (errCode != E_OK) {
34         token.ReleaseCloudQueryStmt();
35         return errCode;
36     }
37     UploadDetail detail;
38     auto &[stepNum, totalSize] = detail;
39     do {
40         if (stepNext) {
41             errCode = SQLiteUtils::StepNext(stmt, isMemory);
42             if (errCode != E_OK) {
43                 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
44                 break;
45             }
46         }
47         stepNext = true;
48         errCode = GetCloudDataForSync(config, recorder, stmt, data, detail);
49         stepNum++;
50     } while (errCode == E_OK);
51     LOGI("[SqliteCloudKvExecutorUtils] Get cloud sync data, insData:%u, upData:%u, delLog:%u errCode:%d total:%" PRIu32,
52          data.insData.record.size(), data.updData.record.size(), data.delData.extend.size(), errCode, totalSize);
53     if (errCode != -E_UNFINISHED) {
54         token.ReleaseCloudQueryStmt();
55     } else if (isMemory && UpdateBeginTimeForMemoryDB(token, data)) {
56         token.ReleaseCloudQueryStmt();
57     }
58     return errCode;
59 }
60 
GetMaxTimeStamp(std::vector<VBucket> & dataExtend)61 Timestamp SqliteCloudKvExecutorUtils::GetMaxTimeStamp(std::vector<VBucket> &dataExtend)
62 {
63     Timestamp maxTimeStamp = 0;
64     VBucket lastRecord = dataExtend.back();
65     auto it = lastRecord.find(CloudDbConstant::MODIFY_FIELD);
66     if (it != lastRecord.end() && maxTimeStamp < static_cast<Timestamp>(std::get<int64_t>(it->second))) {
67         maxTimeStamp = static_cast<Timestamp>(std::get<int64_t>(it->second));
68     }
69     return maxTimeStamp;
70 }
71 
UpdateBeginTimeForMemoryDB(SQLiteSingleVerContinueToken & token,CloudSyncData & data)72 bool SqliteCloudKvExecutorUtils::UpdateBeginTimeForMemoryDB(SQLiteSingleVerContinueToken &token, CloudSyncData &data)
73 {
74     Timestamp maxTimeStamp = 0;
75     switch (data.mode) {
76         case DistributedDB::CloudWaterType::DELETE:
77             maxTimeStamp = GetMaxTimeStamp(data.delData.extend);
78             break;
79         case DistributedDB::CloudWaterType::UPDATE:
80             maxTimeStamp = GetMaxTimeStamp(data.updData.extend);
81             break;
82         case DistributedDB::CloudWaterType::INSERT:
83             maxTimeStamp = GetMaxTimeStamp(data.insData.extend);
84             break;
85         case DistributedDB::CloudWaterType::BUTT:
86         default:
87             break;
88     }
89     if (maxTimeStamp > token.GetQueryBeginTime()) {
90         token.SetNextBeginTime("", maxTimeStamp);
91         return true;
92     }
93     LOGW("[SqliteCloudKvExecutorUtils] The start time of the in memory database has not been updated.");
94     return false;
95 }
96 
GetCloudDataForSync(const CloudSyncConfig & config,const CloudUploadRecorder & recorder,sqlite3_stmt * statement,CloudSyncData & cloudDataResult,UploadDetail & detail)97 int SqliteCloudKvExecutorUtils::GetCloudDataForSync(const CloudSyncConfig &config, const CloudUploadRecorder &recorder,
98     sqlite3_stmt *statement, CloudSyncData &cloudDataResult, UploadDetail &detail)
99 {
100     auto &[stepNum, totalSize] = detail;
101     VBucket log;
102     VBucket extraLog;
103     uint32_t preSize = totalSize;
104     GetCloudLog(statement, log, totalSize);
105     GetCloudExtraLog(statement, extraLog);
106 
107     VBucket data;
108     int64_t flag = 0;
109     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::FLAG, extraLog, flag);
110     if (errCode != E_OK) {
111         return errCode;
112     }
113 
114     if ((static_cast<uint64_t>(flag) & DataItem::DELETE_FLAG) == 0) {
115         errCode = GetCloudKvData(statement, data, totalSize);
116         if (errCode != E_OK) {
117             return errCode;
118         }
119     }
120 
121     if (CloudStorageUtils::IsGetCloudDataContinue(stepNum, totalSize, config.maxUploadSize, config.maxUploadCount)) {
122         errCode = CloudStorageUtils::IdentifyCloudType(recorder, cloudDataResult, data, log, extraLog);
123     } else {
124         errCode = -E_UNFINISHED;
125     }
126     if (errCode == -E_IGNORE_DATA) {
127         errCode = E_OK;
128         totalSize = preSize;
129         stepNum--;
130     }
131     return errCode;
132 }
133 
GetCloudLog(sqlite3_stmt * stmt,VBucket & logInfo,uint32_t & totalSize)134 void SqliteCloudKvExecutorUtils::GetCloudLog(sqlite3_stmt *stmt, VBucket &logInfo,
135     uint32_t &totalSize)
136 {
137     int64_t modifyTime = static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_MODIFY_TIME_INDEX));
138     uint64_t curTime = 0;
139     if (TimeHelper::GetSysCurrentRawTime(curTime) == E_OK) {
140         if (modifyTime > static_cast<int64_t>(curTime)) {
141             modifyTime = static_cast<int64_t>(curTime);
142         }
143     } else {
144         LOGW("[SqliteCloudKvExecutorUtils] get raw sys time failed.");
145     }
146     logInfo.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
147     logInfo.insert_or_assign(CloudDbConstant::CREATE_FIELD,
148         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_CREATE_TIME_INDEX)));
149     totalSize += sizeof(int64_t) + sizeof(int64_t);
150     if (sqlite3_column_text(stmt, CLOUD_QUERY_CLOUD_GID_INDEX) != nullptr) {
151         std::string cloudGid = reinterpret_cast<const std::string::value_type *>(
152             sqlite3_column_text(stmt, CLOUD_QUERY_CLOUD_GID_INDEX));
153         if (!cloudGid.empty()) {
154             logInfo.insert_or_assign(CloudDbConstant::GID_FIELD, cloudGid);
155             totalSize += cloudGid.size();
156         }
157     }
158     std::string version;
159     SQLiteUtils::GetColumnTextValue(stmt, CLOUD_QUERY_VERSION_INDEX, version);
160     logInfo.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
161     totalSize += version.size();
162 }
163 
GetCloudExtraLog(sqlite3_stmt * stmt,VBucket & flags)164 void SqliteCloudKvExecutorUtils::GetCloudExtraLog(sqlite3_stmt *stmt, VBucket &flags)
165 {
166     flags.insert_or_assign(CloudDbConstant::ROWID,
167         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_ROW_ID_INDEX)));
168     flags.insert_or_assign(CloudDbConstant::TIMESTAMP,
169         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_MODIFY_TIME_INDEX)));
170     flags.insert_or_assign(CloudDbConstant::FLAG,
171         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_FLAG_INDEX)));
172     Bytes hashKey;
173     (void)SQLiteUtils::GetColumnBlobValue(stmt, CLOUD_QUERY_HASH_KEY_INDEX, hashKey);
174     flags.insert_or_assign(CloudDbConstant::HASH_KEY, hashKey);
175     flags.insert_or_assign(CloudDbConstant::CLOUD_FLAG,
176         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_CLOUD_FLAG_INDEX)));
177 }
178 
GetCloudKvData(sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)179 int SqliteCloudKvExecutorUtils::GetCloudKvData(sqlite3_stmt *stmt, VBucket &data, uint32_t &totalSize)
180 {
181     int errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, data, totalSize);
182     if (errCode != E_OK) {
183         return errCode;
184     }
185     errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, CLOUD_QUERY_VALUE_INDEX, stmt, data, totalSize);
186     if (errCode != E_OK) {
187         return errCode;
188     }
189     errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, CLOUD_QUERY_DEV_INDEX, stmt, data, totalSize);
190     if (errCode != E_OK) {
191         return errCode;
192     }
193     errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE, CLOUD_QUERY_ORI_DEV_INDEX, stmt, data,
194         totalSize);
195     if (errCode != E_OK) {
196         return errCode;
197     }
198     data.insert_or_assign(CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME,
199         static_cast<int64_t>(sqlite3_column_int64(stmt, CLOUD_QUERY_DEV_CREATE_TIME_INDEX)));
200     totalSize += sizeof(int64_t);
201     return E_OK;
202 }
203 
GetCloudKvBlobData(const std::string & keyStr,int index,sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)204 int SqliteCloudKvExecutorUtils::GetCloudKvBlobData(const std::string &keyStr, int index, sqlite3_stmt *stmt,
205     VBucket &data, uint32_t &totalSize)
206 {
207     std::vector<uint8_t> blob;
208     int errCode = SQLiteUtils::GetColumnBlobValue(stmt, index, blob);
209     if (errCode != E_OK) {
210         LOGE("[SqliteCloudKvExecutorUtils] Get %.3s failed %d", keyStr.c_str(), errCode);
211         return errCode;
212     }
213     std::string tmp = std::string(blob.begin(), blob.end());
214     if ((keyStr == CloudDbConstant::CLOUD_KV_FIELD_DEVICE ||
215         keyStr == CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE)) {
216         if (tmp.empty()) {
217             errCode = RuntimeContext::GetInstance()->GetLocalIdentity(tmp);
218             if (errCode != E_OK) {
219                 return errCode;
220             }
221             tmp = DBCommon::TransferHashString(tmp);
222         }
223         tmp = DBBase64Utils::Encode(std::vector<uint8_t>(tmp.begin(), tmp.end()));
224     }
225     totalSize += tmp.size();
226     data.insert_or_assign(keyStr, tmp);
227     return E_OK;
228 }
229 
GetLogInfo(sqlite3 * db,bool isMemory,const VBucket & cloudData,const std::string & userId)230 std::pair<int, DataInfoWithLog> SqliteCloudKvExecutorUtils::GetLogInfo(sqlite3 *db, bool isMemory,
231     const VBucket &cloudData, const std::string &userId)
232 {
233     std::pair<int, DataInfoWithLog> res;
234     int &errCode = res.first;
235     std::string keyStr;
236     errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::CLOUD_KV_FIELD_KEY, cloudData, keyStr);
237     if (errCode == -E_NOT_FOUND) {
238         errCode = E_OK;
239     }
240     if (errCode != E_OK) {
241         LOGE("[SqliteCloudKvExecutorUtils] Get key failed %d", errCode);
242         return res;
243     }
244     Bytes key;
245     DBCommon::StringToVector(keyStr, key);
246     Bytes hashKey;
247     DBCommon::CalcValueHash(key, hashKey);
248     sqlite3_stmt *stmt = nullptr;
249     std::tie(errCode, stmt) = GetLogInfoStmt(db, cloudData, !hashKey.empty(), userId.empty());
250     if (errCode != E_OK) {
251         LOGE("[SqliteCloudKvExecutorUtils] Get stmt failed %d", errCode);
252         return res;
253     }
254     std::string gid;
255     errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, cloudData, gid);
256     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
257         LOGE("[SqliteCloudKvExecutorUtils] Get gid failed %d", errCode);
258         return res;
259     }
260     return GetLogInfoInner(stmt, isMemory, gid, hashKey, userId);
261 }
262 
GetLogInfoStmt(sqlite3 * db,const VBucket & cloudData,bool existKey,bool emptyUserId)263 std::pair<int, sqlite3_stmt*> SqliteCloudKvExecutorUtils::GetLogInfoStmt(sqlite3 *db, const VBucket &cloudData,
264     bool existKey, bool emptyUserId)
265 {
266     std::pair<int, sqlite3_stmt*> res;
267     auto &[errCode, stmt] = res;
268     std::string querySql = QUERY_CLOUD_SYNC_DATA_LOG;
269     if (!emptyUserId) {
270         querySql = QUERY_CLOUD_SYNC_DATA_LOG_WITH_USERID;
271     }
272     std::string sql = querySql;
273     sql += " WHERE cloud_gid = ?";
274     if (existKey) {
275         sql += " UNION ";
276         sql += querySql;
277         sql += " WHERE sync_data.hash_key = ?";
278     }
279     errCode = SQLiteUtils::GetStatement(db, sql, stmt);
280     return res;
281 }
282 
GetLogInfoInner(sqlite3_stmt * stmt,bool isMemory,const std::string & gid,const Bytes & key,const std::string & userId)283 std::pair<int, DataInfoWithLog> SqliteCloudKvExecutorUtils::GetLogInfoInner(sqlite3_stmt *stmt, bool isMemory,
284     const std::string &gid, const Bytes &key, const std::string &userId)
285 {
286     ResFinalizer finalizer([stmt]() {
287         sqlite3_stmt *statement = stmt;
288         int ret = E_OK;
289         SQLiteUtils::ResetStatement(statement, true, ret);
290         if (ret != E_OK) {
291             LOGW("[SqliteCloudKvExecutorUtils] Reset stmt failed %d when get log", ret);
292         }
293     });
294     std::pair<int, DataInfoWithLog> res;
295     auto &[errCode, logInfo] = res;
296     int index = 1;
297     if (!userId.empty()) {
298         errCode = SQLiteUtils::BindTextToStatement(stmt, index++, userId);
299         if (errCode != E_OK) {
300             LOGE("[SqliteCloudKvExecutorUtils] Bind 1st userId failed %d", errCode);
301             return res;
302         }
303     }
304     errCode = SQLiteUtils::BindTextToStatement(stmt, index++, gid);
305     if (errCode != E_OK) {
306         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d", errCode);
307         return res;
308     }
309     if (!key.empty()) {
310         if (!userId.empty()) {
311             errCode = SQLiteUtils::BindTextToStatement(stmt, index++, userId);
312             if (errCode != E_OK) {
313                 LOGE("[SqliteCloudKvExecutorUtils] Bind 2nd userId failed %d", errCode);
314                 return res;
315             }
316         }
317         errCode = SQLiteUtils::BindBlobToStatement(stmt, index++, key);
318         if (errCode != E_OK) {
319             LOGE("[SqliteCloudKvExecutorUtils] Bind key failed %d", errCode);
320             return res;
321         }
322     }
323     errCode = SQLiteUtils::StepNext(stmt, isMemory);
324     if (errCode == -E_FINISHED) {
325         errCode = -E_NOT_FOUND;
326         // not found is ok, just return error
327         return res;
328     }
329     if (errCode != E_OK) {
330         LOGE("[SqliteCloudKvExecutorUtils] Get log failed %d", errCode);
331         return res;
332     }
333     logInfo = FillLogInfoWithStmt(stmt);
334     return res;
335 }
336 
FillLogInfoWithStmt(sqlite3_stmt * stmt)337 DataInfoWithLog SqliteCloudKvExecutorUtils::FillLogInfoWithStmt(sqlite3_stmt *stmt)
338 {
339     DataInfoWithLog dataInfoWithLog;
340     int index = 0;
341     dataInfoWithLog.logInfo.dataKey = sqlite3_column_int64(stmt, index++);
342     dataInfoWithLog.logInfo.flag = static_cast<uint64_t>(sqlite3_column_int64(stmt, index++));
343     std::vector<uint8_t> device;
344     (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, device);
345     DBCommon::VectorToString(device, dataInfoWithLog.logInfo.device);
346     std::vector<uint8_t> oriDev;
347     (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, oriDev);
348     DBCommon::VectorToString(oriDev, dataInfoWithLog.logInfo.originDev);
349     dataInfoWithLog.logInfo.timestamp = static_cast<Timestamp>(sqlite3_column_int64(stmt, index++));
350     dataInfoWithLog.logInfo.wTimestamp = static_cast<Timestamp>(sqlite3_column_int64(stmt, index++));
351     std::string gid;
352     (void)SQLiteUtils::GetColumnTextValue(stmt, index++, gid);
353     dataInfoWithLog.logInfo.cloudGid = gid;
354     (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, dataInfoWithLog.logInfo.hashKey);
355     Bytes key;
356     (void)SQLiteUtils::GetColumnBlobValue(stmt, index++, key);
357     std::string keyStr(key.begin(), key.end());
358     dataInfoWithLog.primaryKeys.insert_or_assign(CloudDbConstant::CLOUD_KV_FIELD_KEY, keyStr);
359     (void)SQLiteUtils::GetColumnTextValue(stmt, index++, dataInfoWithLog.logInfo.version);
360     dataInfoWithLog.logInfo.cloud_flag = static_cast<uint64_t>(sqlite3_column_int64(stmt, index++));
361     return dataInfoWithLog;
362 }
363 
PutCloudData(sqlite3 * db,bool isMemory,DownloadData & downloadData)364 int SqliteCloudKvExecutorUtils::PutCloudData(sqlite3 *db, bool isMemory, DownloadData &downloadData)
365 {
366     if (downloadData.data.size() != downloadData.opType.size()) {
367         LOGE("[SqliteCloudKvExecutorUtils] data size %zu != flag size %zu.", downloadData.data.size(),
368             downloadData.opType.size());
369         return -E_CLOUD_ERROR;
370     }
371     std::map<int, int> statisticMap = {};
372     int errCode = ExecutePutCloudData(db, isMemory, downloadData, statisticMap);
373     LOGI("[SqliteCloudKvExecutorUtils] save cloud data: %d, insert cnt = %d, update cnt = %d, delete cnt = %d,"
374         " only update gid cnt = %d, set LCC flag zero cnt = %d, set LCC flag one cnt = %d,"
375         " update timestamp cnt = %d, clear gid count = %d, not handle cnt = %d",
376         errCode, statisticMap[static_cast<int>(OpType::INSERT)], statisticMap[static_cast<int>(OpType::UPDATE)],
377         statisticMap[static_cast<int>(OpType::DELETE)], statisticMap[static_cast<int>(OpType::ONLY_UPDATE_GID)],
378         statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO)],
379         statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE)],
380         statisticMap[static_cast<int>(OpType::UPDATE_TIMESTAMP)], statisticMap[static_cast<int>(OpType::CLEAR_GID)],
381         statisticMap[static_cast<int>(OpType::NOT_HANDLE)]);
382     return errCode;
383 }
384 
ExecutePutCloudData(sqlite3 * db,bool isMemory,DownloadData & downloadData,std::map<int,int> & statisticMap)385 int SqliteCloudKvExecutorUtils::ExecutePutCloudData(sqlite3 *db, bool isMemory,
386     DownloadData &downloadData, std::map<int, int> &statisticMap)
387 {
388     int index = 0;
389     int errCode = E_OK;
390     for (OpType op : downloadData.opType) {
391         switch (op) {
392             case OpType::INSERT: // fallthrough
393             case OpType::UPDATE: // fallthrough
394             case OpType::DELETE: // fallthrough
395                 errCode = OperateCloudData(db, isMemory, index, op, downloadData);
396                 break;
397             case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: // fallthrough
398             case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:  // fallthrough
399             case OpType::UPDATE_TIMESTAMP:               // fallthrough
400                 errCode = OnlyUpdateSyncData(db, isMemory, index, op, downloadData);
401                 if (errCode != E_OK) {
402                     break;
403                 }
404                 [[fallthrough]];
405             case OpType::ONLY_UPDATE_GID:                // fallthrough
406             case OpType::NOT_HANDLE:                     // fallthrough
407             case OpType::CLEAR_GID:                      // fallthrough
408                 errCode = OnlyUpdateLogTable(db, isMemory, index, op, downloadData);
409                 break;
410             default:
411                 errCode = -E_CLOUD_ERROR;
412                 break;
413         }
414         if (errCode != E_OK) {
415             LOGE("put cloud sync data fail:%d op:%d", errCode, static_cast<int>(op));
416             return errCode;
417         }
418         statisticMap[static_cast<int>(op)]++;
419         index++;
420     }
421     return errCode;
422 }
423 
OperateCloudData(sqlite3 * db,bool isMemory,int index,OpType opType,DownloadData & downloadData)424 int SqliteCloudKvExecutorUtils::OperateCloudData(sqlite3 *db, bool isMemory, int index, OpType opType,
425     DownloadData &downloadData)
426 {
427     sqlite3_stmt *logStmt = nullptr;
428     int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(opType), logStmt);
429     if (errCode != E_OK) {
430         LOGE("[SqliteCloudKvExecutorUtils] Get insert log statement failed %d", errCode);
431         return errCode;
432     }
433     sqlite3_stmt *dataStmt = nullptr;
434     errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
435     if (errCode != E_OK) {
436         int ret = E_OK;
437         SQLiteUtils::ResetStatement(logStmt, true, ret);
438         LOGE("[SqliteCloudKvExecutorUtils] Get insert data statement failed %d reset %d", errCode, ret);
439         return errCode;
440     }
441     ResFinalizer finalizerData([logStmt, dataStmt, opType]() {
442         sqlite3_stmt *statement = logStmt;
443         int ret = E_OK;
444         SQLiteUtils::ResetStatement(statement, true, ret);
445         if (ret != E_OK) {
446             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d opType %d", ret, static_cast<int>(opType));
447         }
448         statement = dataStmt;
449         SQLiteUtils::ResetStatement(statement, true, ret);
450         if (ret != E_OK) {
451             LOGW("[SqliteCloudKvExecutorUtils] Reset data stmt failed %d opType %d", ret, static_cast<int>(opType));
452         }
453     });
454     errCode = BindStmt(logStmt, dataStmt, index, opType, downloadData);
455     if (errCode != E_OK) {
456         return errCode;
457     }
458     return StepStmt(logStmt, dataStmt, isMemory);
459 }
460 
GetOperateDataSql(OpType opType)461 std::string SqliteCloudKvExecutorUtils::GetOperateDataSql(OpType opType)
462 {
463     switch (opType) {
464         case OpType::INSERT:
465             return INSERT_SYNC_SQL;
466         case OpType::UPDATE: // fallthrough
467         case OpType::DELETE:
468             return UPDATE_SYNC_SQL;
469         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
470             return SET_SYNC_DATA_NO_FORCE_PUSH;
471         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
472             return SET_SYNC_DATA_FORCE_PUSH;
473         case OpType::UPDATE_TIMESTAMP:
474             return UPDATE_TIMESTAMP;
475         default:
476             return "";
477     }
478 }
479 
GetOperateLogSql(OpType opType)480 std::string SqliteCloudKvExecutorUtils::GetOperateLogSql(OpType opType)
481 {
482     switch (opType) {
483         case OpType::INSERT: // fallthrough
484         case OpType::UPDATE:
485             return INSERT_CLOUD_SYNC_DATA_LOG;
486         case OpType::DELETE:
487             return UPDATE_CLOUD_SYNC_DATA_LOG;
488         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: // fallthrough
489         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:  // fallthrough
490         case OpType::UPDATE_TIMESTAMP:               // fallthrough
491         case OpType::ONLY_UPDATE_GID:                // fallthrough
492         case OpType::NOT_HANDLE:                     // fallthrough
493         case OpType::CLEAR_GID:                      // fallthrough
494             return UPSERT_CLOUD_SYNC_DATA_LOG;
495         default:
496             return "";
497     }
498 }
499 
TransToOpType(const CloudWaterType type)500 OpType SqliteCloudKvExecutorUtils::TransToOpType(const CloudWaterType type)
501 {
502     switch (type) {
503         case CloudWaterType::INSERT:
504             return OpType::INSERT;
505         case CloudWaterType::UPDATE:
506             return OpType::UPDATE;
507         case CloudWaterType::DELETE:
508             return OpType::DELETE;
509         default:
510             return OpType::NOT_HANDLE;
511     }
512 }
513 
BindOnlyUpdateLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)514 int SqliteCloudKvExecutorUtils::BindOnlyUpdateLogStmt(sqlite3_stmt *logStmt, const std::string &user,
515     const DataItem &dataItem)
516 {
517     int index = 0;
518     int errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, user);
519     if (errCode != E_OK) {
520         LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when only insert log", errCode);
521         return errCode;
522     }
523     errCode = SQLiteUtils::BindBlobToStatement(logStmt, ++index, dataItem.hashKey);
524     if (errCode != E_OK) {
525         LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when only insert log", errCode);
526     }
527     errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.gid);
528     if (errCode != E_OK) {
529         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when only insert gid.", errCode);
530         return errCode;
531     }
532     errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.version);
533     if (errCode != E_OK) {
534         LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when only insert log", errCode);
535         return errCode;
536     }
537     errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.gid);
538     if (errCode != E_OK) {
539         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when only update gid.", errCode);
540         return errCode;
541     }
542     errCode = SQLiteUtils::BindTextToStatement(logStmt, ++index, dataItem.version);
543     if (errCode != E_OK) {
544         LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when only update log", errCode);
545         return errCode;
546     }
547     return errCode;
548 }
549 
BindStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,int index,OpType opType,DownloadData & downloadData)550 int SqliteCloudKvExecutorUtils::BindStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, int index, OpType opType,
551     DownloadData &downloadData)
552 {
553     auto [errCode, dataItem] = GetDataItem(index, downloadData);
554     if (errCode != E_OK) {
555         return errCode;
556     }
557     switch (opType) {
558         case OpType::INSERT:
559             return BindInsertStmt(logStmt, dataStmt, downloadData.user, dataItem);
560         case OpType::UPDATE:
561             return BindUpdateStmt(logStmt, dataStmt, downloadData.user, dataItem);
562         case OpType::DELETE:
563             dataItem.hashKey = downloadData.existDataHashKey[index];
564             dataItem.gid.clear();
565             dataItem.version.clear();
566             return BindDeleteStmt(logStmt, dataStmt, downloadData.user, dataItem);
567         default:
568             return E_OK;
569     }
570 }
571 
BindInsertStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,const DataItem & dataItem)572 int SqliteCloudKvExecutorUtils::BindInsertStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt,
573     const std::string &user, const DataItem &dataItem)
574 {
575     int errCode = BindInsertLogStmt(logStmt, user, dataItem); // insert or replace LOG table for insert DATA table.
576     if (errCode != E_OK) {
577         return errCode;
578     }
579     return BindDataStmt(dataStmt, dataItem, true);
580 }
581 
BindInsertLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)582 int SqliteCloudKvExecutorUtils::BindInsertLogStmt(sqlite3_stmt *logStmt, const std::string &user,
583     const DataItem &dataItem)
584 {
585     int errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_USER_INDEX, user);
586     if (errCode != E_OK) {
587         LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when insert", errCode);
588         return errCode;
589     }
590     errCode = SQLiteUtils::BindBlobToStatement(logStmt, BIND_INSERT_HASH_KEY_INDEX, dataItem.hashKey);
591     if (errCode != E_OK) {
592         LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when insert", errCode);
593         return errCode;
594     }
595     errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_CLOUD_GID_INDEX, dataItem.gid);
596     if (errCode != E_OK) {
597         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when insert", errCode);
598         return errCode;
599     }
600     errCode = SQLiteUtils::BindTextToStatement(logStmt, BIND_INSERT_VERSION_INDEX, dataItem.version);
601     if (errCode != E_OK) {
602         LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when insert", errCode);
603         return errCode;
604     }
605     errCode = SQLiteUtils::BindInt64ToStatement(logStmt, BIND_INSERT_CLOUD_FLAG_INDEX, dataItem.cloud_flag);
606     if (errCode != E_OK) {
607         LOGE("[SqliteCloudKvExecutorUtils] Bind cloud_flag failed %d when insert", errCode);
608     }
609     return errCode;
610 }
611 
BindUpdateStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,const DataItem & dataItem)612 int SqliteCloudKvExecutorUtils::BindUpdateStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, const std::string &user,
613     const DataItem &dataItem)
614 {
615     int errCode = BindInsertLogStmt(logStmt, user, dataItem); // insert or replace LOG table for update DATA table.
616     if (errCode != E_OK) {
617         return errCode;
618     }
619     errCode = BindDataStmt(dataStmt, dataItem, false);
620     if (errCode != E_OK) {
621         return errCode;
622     }
623     return E_OK;
624 }
625 
BindUpdateLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem)626 int SqliteCloudKvExecutorUtils::BindUpdateLogStmt(sqlite3_stmt *logStmt, const std::string &user,
627     const DataItem &dataItem)
628 {
629     int index = 1;
630     int errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, dataItem.gid);
631     if (errCode != E_OK) {
632         LOGE("[SqliteCloudKvExecutorUtils] Bind gid failed %d when update", errCode);
633         return errCode;
634     }
635     errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, dataItem.version);
636     if (errCode != E_OK) {
637         LOGE("[SqliteCloudKvExecutorUtils] Bind version failed %d when update", errCode);
638         return errCode;
639     }
640     errCode = SQLiteUtils::BindInt64ToStatement(logStmt, index++, dataItem.cloud_flag);
641     if (errCode != E_OK) {
642         LOGE("[SqliteCloudKvExecutorUtils] Bind cloud_flag failed %d when update", errCode);
643         return errCode;
644     }
645     errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, user);
646     if (errCode != E_OK) {
647         LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d when update", errCode);
648         return errCode;
649     }
650     errCode = SQLiteUtils::BindBlobToStatement(logStmt, index++, dataItem.hashKey);
651     if (errCode != E_OK) {
652         LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d when update", errCode);
653     }
654     return errCode;
655 }
656 
BindDeleteStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,const std::string & user,DataItem & dataItem)657 int SqliteCloudKvExecutorUtils::BindDeleteStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, const std::string &user,
658     DataItem &dataItem)
659 {
660     dataItem.key = {};
661     dataItem.value = {};
662     dataItem.flag |= static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE);
663     int errCode = BindUpdateLogStmt(logStmt, user, dataItem); // update LOG table for delete DATA table.
664     if (errCode != E_OK) {
665         return errCode;
666     }
667     errCode = BindDataStmt(dataStmt, dataItem, false);
668     if (errCode != E_OK) {
669         return errCode;
670     }
671     return E_OK;
672 }
673 
BindDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,bool isInsert)674 int SqliteCloudKvExecutorUtils::BindDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, bool isInsert)
675 {
676     int index = 1;
677     int errCode = BindSyncDataStmt(dataStmt, dataItem, isInsert, index);
678     if (errCode != E_OK) {
679         return errCode;
680     }
681     errCode = BindCloudDataStmt(dataStmt, dataItem, index);
682     if (errCode != E_OK) {
683         return errCode;
684     }
685     if (!isInsert) {
686         errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.hashKey);
687         if (errCode != E_OK) {
688             LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
689         }
690     }
691     return errCode;
692 }
693 
BindSyncDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,bool isInsert,int & index)694 int SqliteCloudKvExecutorUtils::BindSyncDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, bool isInsert,
695     int &index)
696 {
697     int errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.key);
698     if (errCode != E_OK) {
699         LOGE("[SqliteCloudKvExecutorUtils] Bind key failed %d", errCode);
700         return errCode;
701     }
702     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.value);
703     if (errCode != E_OK) {
704         LOGE("[SqliteCloudKvExecutorUtils] Bind value failed %d", errCode);
705         return errCode;
706     }
707     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.timestamp));
708     if (errCode != E_OK) {
709         LOGE("[SqliteCloudKvExecutorUtils] Bind timestamp failed %d", errCode);
710         return errCode;
711     }
712     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.flag));
713     if (errCode != E_OK) {
714         LOGE("[SqliteCloudKvExecutorUtils] Bind flag failed %d", errCode);
715         return errCode;
716     }
717     Bytes bytes;
718     DBCommon::StringToVector(dataItem.dev, bytes);
719     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, bytes);
720     if (errCode != E_OK) {
721         LOGE("[SqliteCloudKvExecutorUtils] Bind dev failed %d", errCode);
722         return errCode;
723     }
724     DBCommon::StringToVector(dataItem.origDev, bytes);
725     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, bytes);
726     if (errCode != E_OK) {
727         LOGE("[SqliteCloudKvExecutorUtils] Bind oriDev failed %d", errCode);
728         return errCode;
729     }
730     if (isInsert) {
731         errCode = SQLiteUtils::BindBlobToStatement(dataStmt, index++, dataItem.hashKey);
732         if (errCode != E_OK) {
733             LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
734             return errCode;
735         }
736     }
737     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, static_cast<int64_t>(dataItem.writeTimestamp));
738     if (errCode != E_OK) {
739         LOGE("[SqliteCloudKvExecutorUtils] Bind wTime failed %d", errCode);
740     }
741     return errCode;
742 }
743 
BindCloudDataStmt(sqlite3_stmt * dataStmt,const DataItem & dataItem,int & index)744 int SqliteCloudKvExecutorUtils::BindCloudDataStmt(sqlite3_stmt *dataStmt, const DataItem &dataItem, int &index)
745 {
746     int errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, dataItem.modifyTime);
747     if (errCode != E_OK) {
748         LOGE("[SqliteCloudKvExecutorUtils] Bind modifyTime failed %d", errCode);
749         return errCode;
750     }
751     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, index++, dataItem.createTime);
752     if (errCode != E_OK) {
753         LOGE("[SqliteCloudKvExecutorUtils] Bind createTime failed %d", errCode);
754     }
755     return errCode;
756 }
757 
StepStmt(sqlite3_stmt * logStmt,sqlite3_stmt * dataStmt,bool isMemory)758 int SqliteCloudKvExecutorUtils::StepStmt(sqlite3_stmt *logStmt, sqlite3_stmt *dataStmt, bool isMemory)
759 {
760     int errCode = SQLiteUtils::StepNext(logStmt, isMemory);
761     if (errCode != -E_FINISHED) {
762         LOGE("[SqliteCloudKvExecutorUtils] Step insert log stmt failed %d", errCode);
763         return errCode;
764     }
765     errCode = SQLiteUtils::StepNext(dataStmt, isMemory);
766     if (errCode != -E_FINISHED) {
767         LOGE("[SqliteCloudKvExecutorUtils] Step insert data stmt failed %d", errCode);
768         return errCode;
769     }
770     return E_OK;
771 }
772 
FillCloudLog(const FillGidParam & param,OpType opType,const CloudSyncData & data,const std::string & user,CloudUploadRecorder & recorder)773 int SqliteCloudKvExecutorUtils::FillCloudLog(const FillGidParam &param, OpType opType, const CloudSyncData &data,
774     const std::string &user, CloudUploadRecorder &recorder)
775 {
776     if (param.first == nullptr) {
777         LOGE("[SqliteCloudKvExecutorUtils] Fill log got nullptr db");
778         return -E_INVALID_ARGS;
779     }
780     if (data.isCloudVersionRecord) {
781         int errCode = FillCloudVersionRecord(param.first, opType, data);
782         if (errCode != E_OK) {
783             return errCode;
784         }
785     }
786     switch (opType) {
787         case OpType::INSERT:
788             return FillCloudGid(param, data.insData, user, CloudWaterType::INSERT, recorder);
789         case OpType::UPDATE:
790             return FillCloudGid(param, data.updData, user, CloudWaterType::UPDATE, recorder);
791         case OpType::DELETE:
792             return FillCloudGid(param, data.delData, user, CloudWaterType::DELETE, recorder);
793         default:
794             return E_OK;
795     }
796 }
797 
OnlyUpdateLogTable(sqlite3 * db,bool isMemory,int index,OpType op,DownloadData & downloadData)798 int SqliteCloudKvExecutorUtils::OnlyUpdateLogTable(sqlite3 *db, bool isMemory, int index, OpType op,
799     DownloadData &downloadData)
800 {
801     if (downloadData.existDataHashKey[index].empty()) {
802         return E_OK;
803     }
804     sqlite3_stmt *logStmt = nullptr;
805     int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(op), logStmt);
806     if (errCode != E_OK) {
807         LOGE("[SqliteCloudKvExecutorUtils] Get update sync data stmt failed %d", errCode);
808         return errCode;
809     }
810     ResFinalizer finalizerData([logStmt]() {
811         sqlite3_stmt *statement = logStmt;
812         int ret = E_OK;
813         SQLiteUtils::ResetStatement(statement, true, ret);
814         if (ret != E_OK) {
815             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when only update log", ret);
816         }
817     });
818     auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
819     if (res.first != E_OK) {
820         LOGE("[SqliteCloudKvExecutorUtils] Get data item failed %d", res.first);
821         return res.first;
822     }
823     bool clearCloudInfo = (op == OpType::CLEAR_GID);
824     if (res.second.hashKey.empty() || DBCommon::IsRecordDelete(downloadData.data[index])) {
825         res.second.hashKey = downloadData.existDataHashKey[index];
826         clearCloudInfo = true;
827     }
828     if (clearCloudInfo) {
829         res.second.gid.clear();
830         res.second.version.clear();
831     }
832     errCode = BindOnlyUpdateLogStmt(logStmt, downloadData.user, res.second);
833     if (errCode != E_OK) {
834         return errCode;
835     }
836     errCode = SQLiteUtils::StepNext(logStmt, isMemory);
837     if (errCode == -E_FINISHED) {
838         errCode = E_OK;
839     }
840     return errCode;
841 }
842 
FillCloudGid(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,const CloudWaterType & type,CloudUploadRecorder & recorder)843 int SqliteCloudKvExecutorUtils::FillCloudGid(const FillGidParam &param, const CloudSyncBatch &data,
844     const std::string &user, const CloudWaterType &type, CloudUploadRecorder &recorder)
845 {
846     auto [db, ignoreEmptyGid] = param;
847     sqlite3_stmt *logStmt = nullptr;
848     int errCode = SQLiteUtils::GetStatement(db, GetOperateLogSql(TransToOpType(type)), logStmt);
849     ResFinalizer finalizerData([logStmt]() {
850         sqlite3_stmt *statement = logStmt;
851         int ret = E_OK;
852         SQLiteUtils::ResetStatement(statement, true, ret);
853         if (ret != E_OK) {
854             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when fill log", ret);
855         }
856     });
857     for (size_t i = 0; i < data.hashKey.size(); ++i) {
858         if (DBCommon::IsRecordError(data.extend[i]) || DBCommon::IsRecordVersionConflict(data.extend[i])) {
859             continue;
860         }
861         DataItem dataItem;
862         errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, data.extend[i], dataItem.gid);
863         if (dataItem.gid.empty() && ignoreEmptyGid) {
864             continue;
865         }
866         if (errCode != E_OK) {
867             return errCode;
868         }
869         CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::VERSION_FIELD, data.extend[i], dataItem.version);
870         dataItem.hashKey = data.hashKey[i];
871         dataItem.dataDelete = CheckDataDelete(param, data, i);
872         errCode = BindFillGidLogStmt(logStmt, user, dataItem, data.extend[i], type);
873         if (errCode != E_OK) {
874             return errCode;
875         }
876         errCode = SQLiteUtils::StepNext(logStmt, false);
877         if (errCode == -E_FINISHED) {
878             errCode = E_OK;
879         }
880         if (errCode != E_OK) {
881             LOGE("[SqliteCloudKvExecutorUtils] fill back failed %d index %zu", errCode, i);
882             return errCode;
883         }
884         SQLiteUtils::ResetStatement(logStmt, false, errCode);
885         MarkUploadSuccess(param, data, user, i);
886         // ignored version record
887         if (i >= data.timestamp.size()) {
888             continue;
889         }
890         recorder.RecordUploadRecord(CloudDbConstant::CLOUD_KV_TABLE_NAME, data.hashKey[i], type, data.timestamp[i]);
891     }
892     return E_OK;
893 }
894 
OnlyUpdateSyncData(sqlite3 * db,bool isMemory,int index,OpType opType,DownloadData & downloadData)895 int SqliteCloudKvExecutorUtils::OnlyUpdateSyncData(sqlite3 *db, bool isMemory, int index, OpType opType,
896     DownloadData &downloadData)
897 {
898     if (opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO && opType != OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE &&
899         opType != OpType::UPDATE_TIMESTAMP) {
900         LOGW("[SqliteCloudKvExecutorUtils] Ignore unknown opType %d", static_cast<int>(opType));
901         return E_OK;
902     }
903     sqlite3_stmt *dataStmt = nullptr;
904     int errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
905     if (errCode != E_OK) {
906         LOGE("[SqliteCloudKvExecutorUtils] Get update sync data stmt failed %d", errCode);
907         return errCode;
908     }
909     ResFinalizer finalizerData([dataStmt]() {
910         sqlite3_stmt *statement = dataStmt;
911         int ret = E_OK;
912         SQLiteUtils::ResetStatement(statement, true, ret);
913         if (ret != E_OK) {
914             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when update log", ret);
915         }
916     });
917     errCode = BindUpdateSyncDataStmt(dataStmt, index, opType, downloadData);
918     if (errCode != E_OK) {
919         LOGE("[SqliteCloudKvExecutorUtils] Bind update sync data stmt failed %d", errCode);
920         return errCode;
921     }
922     errCode = SQLiteUtils::StepNext(dataStmt, isMemory);
923     if (errCode == -E_FINISHED) {
924         errCode = E_OK;
925     }
926     return errCode;
927 }
928 
BindUpdateSyncDataStmt(sqlite3_stmt * dataStmt,int index,OpType opType,DownloadData & downloadData)929 int SqliteCloudKvExecutorUtils::BindUpdateSyncDataStmt(sqlite3_stmt *dataStmt, int index, OpType opType,
930     DownloadData &downloadData)
931 {
932     switch (opType) {
933         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
934         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
935             return SQLiteUtils::BindBlobToStatement(dataStmt, 1, downloadData.existDataHashKey[index]);
936         case OpType::UPDATE_TIMESTAMP:
937             return BindUpdateTimestampStmt(dataStmt, index, downloadData);
938         default:
939             return E_OK;
940     }
941 }
942 
BindUpdateTimestampStmt(sqlite3_stmt * dataStmt,int index,DownloadData & downloadData)943 int SqliteCloudKvExecutorUtils::BindUpdateTimestampStmt(sqlite3_stmt *dataStmt, int index, DownloadData &downloadData)
944 {
945     auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
946     auto &[errCode, dataItem] = res;
947     if (errCode != E_OK) {
948         return errCode;
949     }
950     int currentBindIndex = 1; // bind sql index start at 1
951     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, currentBindIndex++, dataItem.timestamp);
952     if (errCode != E_OK) {
953         LOGE("[SqliteCloudKvExecutorUtils] Bind timestamp failed %d", errCode);
954         return errCode;
955     }
956     errCode = SQLiteUtils::BindInt64ToStatement(dataStmt, currentBindIndex++, dataItem.modifyTime);
957     if (errCode != E_OK) {
958         LOGE("[SqliteCloudKvExecutorUtils] Bind modifyTime failed %d", errCode);
959     }
960     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, currentBindIndex++, dataItem.hashKey);
961     if (errCode != E_OK) {
962         LOGE("[SqliteCloudKvExecutorUtils] Bind hashKey failed %d", errCode);
963         return errCode;
964     }
965     return E_OK;
966 }
967 
GetDataItem(int index,DownloadData & downloadData)968 std::pair<int, DataItem> SqliteCloudKvExecutorUtils::GetDataItem(int index, DownloadData &downloadData)
969 {
970     auto res = CloudStorageUtils::GetDataItemFromCloudData(downloadData.data[index]);
971     auto &[errCode, dataItem] = res;
972     if (errCode != E_OK) {
973         LOGE("[SqliteCloudKvExecutorUtils] Get data item failed %d", errCode);
974         return res;
975     }
976     std::string dev;
977     errCode = RuntimeContext::GetInstance()->GetLocalIdentity(dev);
978     if (errCode != E_OK) {
979         return res;
980     }
981     dev = DBCommon::TransferHashString(dev);
982     auto decodeDevice = DBBase64Utils::Decode(dataItem.dev);
983     if (!decodeDevice.empty()) {
984         dataItem.dev = std::string(decodeDevice.begin(), decodeDevice.end());
985     }
986     if (dataItem.dev == dev) {
987         dataItem.dev = "";
988     }
989     decodeDevice = DBBase64Utils::Decode(dataItem.origDev);
990     if (!decodeDevice.empty()) {
991         dataItem.origDev = std::string(decodeDevice.begin(), decodeDevice.end());
992     }
993     if (dataItem.origDev == dev) {
994         dataItem.origDev = "";
995     }
996     dataItem.timestamp = static_cast<Timestamp>(static_cast<int64_t>(dataItem.modifyTime) + downloadData.timeOffset);
997     dataItem.writeTimestamp = dataItem.timestamp; // writeTimestamp is process conflict time
998     return res;
999 }
1000 
CountCloudDataInner(sqlite3 * db,bool isMemory,const Timestamp & timestamp,const std::string & user,std::string & sql)1001 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountCloudDataInner(sqlite3 *db, bool isMemory,
1002     const Timestamp &timestamp, const std::string &user, std::string &sql)
1003 {
1004     std::pair<int, int64_t> res;
1005     auto &[errCode, count] = res;
1006     sqlite3_stmt *stmt = nullptr;
1007     errCode = SQLiteUtils::GetStatement(db, sql, stmt);
1008     if (errCode != E_OK) {
1009         LOGE("[SqliteCloudKvExecutorUtils] Count data stmt failed %d", errCode);
1010         return res;
1011     }
1012     ResFinalizer finalizer([stmt]() {
1013         sqlite3_stmt *statement = stmt;
1014         int ret = E_OK;
1015         SQLiteUtils::ResetStatement(statement, true, ret);
1016         if (ret != E_OK) {
1017             LOGW("[SqliteCloudKvExecutorUtils] Reset log stmt failed %d when count data", ret);
1018         }
1019     });
1020     errCode = SQLiteUtils::BindTextToStatement(stmt, BIND_CLOUD_USER, user);
1021     if (errCode != E_OK) {
1022         LOGE("[SqliteCloudKvExecutorUtils] Bind user failed %d", errCode);
1023         return res;
1024     }
1025     errCode = SQLiteUtils::BindInt64ToStatement(stmt, BIND_CLOUD_TIMESTAMP, static_cast<int64_t>(timestamp));
1026     if (errCode != E_OK) {
1027         LOGE("[SqliteCloudKvExecutorUtils] Bind begin time failed %d", errCode);
1028         return res;
1029     }
1030     errCode = SQLiteUtils::StepNext(stmt, isMemory);
1031     if (errCode == -E_FINISHED) {
1032         count = 0;
1033         return res;
1034     }
1035     count = sqlite3_column_int64(stmt, CLOUD_QUERY_COUNT_INDEX);
1036     LOGD("[SqliteCloudKvExecutorUtils] Get total upload count %" PRId64, count);
1037     return res;
1038 }
1039 
CountCloudData(sqlite3 * db,bool isMemory,const Timestamp & timestamp,const std::string & user,bool forcePush)1040 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountCloudData(sqlite3 *db, bool isMemory,
1041     const Timestamp &timestamp, const std::string &user, bool forcePush)
1042 {
1043     std::string sql = SqliteQueryHelper::GetKvCloudQuerySql(true, forcePush);
1044     return CountCloudDataInner(db, isMemory, timestamp, user, sql);
1045 }
1046 
CountAllCloudData(const DBParam & param,const std::vector<Timestamp> & timestampVec,const std::string & user,bool forcePush,QuerySyncObject & querySyncObject)1047 std::pair<int, int64_t> SqliteCloudKvExecutorUtils::CountAllCloudData(const DBParam &param,
1048     const std::vector<Timestamp> &timestampVec, const std::string &user, bool forcePush,
1049     QuerySyncObject &querySyncObject)
1050 {
1051     std::pair<int, int64_t> res = { E_OK, 0 };
1052     auto &[errCode, count] = res;
1053     if (timestampVec.size() != 3) { // 3 is the number of three mode.
1054         errCode = -E_INVALID_ARGS;
1055         return res;
1056     }
1057     std::vector<CloudWaterType> typeVec = DBCommon::GetWaterTypeVec();
1058     SqliteQueryHelper helper = querySyncObject.GetQueryHelper(errCode);
1059     if (errCode != E_OK) {
1060         return res;
1061     }
1062     for (size_t i = 0; i < typeVec.size(); i++) {
1063         sqlite3_stmt *stmt = nullptr;
1064         errCode = helper.GetCountKvCloudDataStatement(param.first, forcePush, typeVec[i], stmt);
1065         if (errCode != E_OK) {
1066             return res;
1067         }
1068         // count no use watermark
1069         auto [err, cnt] = helper.BindCountKvCloudDataStatement(param.first, param.second, 0u, user, stmt);
1070         if (err != E_OK) {
1071             return { err, 0 };
1072         }
1073         count += cnt;
1074     }
1075     return res;
1076 }
1077 
FillCloudVersionRecord(sqlite3 * db,OpType opType,const CloudSyncData & data)1078 int SqliteCloudKvExecutorUtils::FillCloudVersionRecord(sqlite3 *db, OpType opType, const CloudSyncData &data)
1079 {
1080     if (opType != OpType::INSERT && opType != OpType::UPDATE) {
1081         return E_OK;
1082     }
1083     bool isInsert = (opType == OpType::INSERT);
1084     CloudSyncBatch syncBatch = isInsert ? data.insData : data.updData;
1085     if (syncBatch.record.empty()) {
1086         LOGW("[SqliteCloudKvExecutorUtils] Fill empty cloud version record");
1087         return E_OK;
1088     }
1089     syncBatch.record[0].insert(syncBatch.extend[0].begin(), syncBatch.extend[0].end());
1090     auto res = CloudStorageUtils::GetSystemRecordFromCloudData(syncBatch.record[0]); // only record first one
1091     auto &[errCode, dataItem] = res;
1092     sqlite3_stmt *dataStmt = nullptr;
1093     errCode = SQLiteUtils::GetStatement(db, GetOperateDataSql(opType), dataStmt);
1094     if (errCode != E_OK) {
1095         LOGE("[SqliteCloudKvExecutorUtils] Get insert version record statement failed %d", errCode);
1096         return errCode;
1097     }
1098     ResFinalizer finalizerData([dataStmt]() {
1099         int ret = E_OK;
1100         sqlite3_stmt *statement = dataStmt;
1101         SQLiteUtils::ResetStatement(statement, true, ret);
1102         if (ret != E_OK) {
1103             LOGW("[SqliteCloudKvExecutorUtils] Reset version record stmt failed %d", ret);
1104         }
1105     });
1106     errCode = BindDataStmt(dataStmt, dataItem, isInsert);
1107     if (errCode != E_OK) {
1108         return errCode;
1109     }
1110     errCode = SQLiteUtils::StepNext(dataStmt, false);
1111     if (errCode != -E_FINISHED) {
1112         LOGE("[SqliteCloudKvExecutorUtils] Step insert version record stmt failed %d", errCode);
1113         return errCode;
1114     }
1115     return E_OK;
1116 }
1117 
GetLocalCloudVersion(sqlite3 * db,bool isMemory,const std::string & user)1118 std::pair<int, CloudSyncData> SqliteCloudKvExecutorUtils::GetLocalCloudVersion(sqlite3 *db, bool isMemory,
1119     const std::string &user)
1120 {
1121     auto res = GetLocalCloudVersionInner(db, isMemory, user);
1122     if (res.first != E_OK) {
1123         LOGE("[SqliteCloudKvExecutorUtils] Get local cloud version failed %d", res.first);
1124     }
1125     return res;
1126 }
1127 
GetLocalCloudVersionInner(sqlite3 * db,bool isMemory,const std::string & user)1128 std::pair<int, CloudSyncData> SqliteCloudKvExecutorUtils::GetLocalCloudVersionInner(sqlite3 *db, bool isMemory,
1129     const std::string &user)
1130 {
1131     std::pair<int, CloudSyncData> res;
1132     auto &[errCode, syncData] = res;
1133     auto sql = SqliteQueryHelper::GetKvCloudRecordSql();
1134     sqlite3_stmt *stmt = nullptr;
1135     errCode = SQLiteUtils::GetStatement(db, sql, stmt);
1136     if (errCode != E_OK) {
1137         return res;
1138     }
1139     ResFinalizer finalizerData([stmt]() {
1140         int ret = E_OK;
1141         sqlite3_stmt *statement = stmt;
1142         SQLiteUtils::ResetStatement(statement, true, ret);
1143         if (ret != E_OK) {
1144             LOGW("[SqliteCloudKvExecutorUtils] Reset local version record stmt failed %d", ret);
1145         }
1146     });
1147     std::string hashDev;
1148     errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDev);
1149     if (errCode != E_OK) {
1150         return res;
1151     }
1152     std::string tempDev = DBCommon::TransferHashString(hashDev);
1153     hashDev = DBCommon::TransferStringToHex(tempDev);
1154     std::string key = CloudDbConstant::CLOUD_VERSION_RECORD_PREFIX_KEY + hashDev;
1155     Key keyVec;
1156     DBCommon::StringToVector(key, keyVec);
1157     errCode = SQLiteUtils::BindBlobToStatement(stmt, BIND_CLOUD_VERSION_RECORD_KEY_INDEX, keyVec);
1158     if (errCode != E_OK) {
1159         return res;
1160     }
1161     errCode = SQLiteUtils::BindTextToStatement(stmt, BIND_CLOUD_VERSION_RECORD_USER_INDEX, user);
1162     if (errCode != E_OK) {
1163         return res;
1164     }
1165     errCode = GetCloudVersionRecord(isMemory, stmt, syncData);
1166     if (errCode == -E_NOT_FOUND) {
1167         InitDefaultCloudVersionRecord(key, tempDev, syncData);
1168         errCode = E_OK;
1169     }
1170     return res;
1171 }
1172 
GetCloudVersionRecord(bool isMemory,sqlite3_stmt * stmt,CloudSyncData & syncData)1173 int SqliteCloudKvExecutorUtils::GetCloudVersionRecord(bool isMemory, sqlite3_stmt *stmt, CloudSyncData &syncData)
1174 {
1175     int errCode = SQLiteUtils::StepNext(stmt, isMemory);
1176     if (errCode == -E_FINISHED) {
1177         return -E_NOT_FOUND;
1178     }
1179     if (errCode != E_OK) {
1180         LOGE("[SqliteCloudKvExecutorUtils] Get local version failed %d", errCode);
1181         return errCode;
1182     }
1183     CloudSyncConfig config;
1184     config.maxUploadSize = CloudDbConstant::MAX_UPLOAD_SIZE;
1185     config.maxUploadCount = 1;
1186     CloudUploadRecorder recorder; // ignore last record
1187     UploadDetail detail;
1188     errCode = GetCloudDataForSync(config, recorder, stmt, syncData, detail);
1189     return errCode;
1190 }
1191 
InitDefaultCloudVersionRecord(const std::string & key,const std::string & dev,CloudSyncData & syncData)1192 void SqliteCloudKvExecutorUtils::InitDefaultCloudVersionRecord(const std::string &key, const std::string &dev,
1193     CloudSyncData &syncData)
1194 {
1195     LOGI("[SqliteCloudKvExecutorUtils] Not found local version record");
1196     VBucket defaultRecord;
1197     defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_KEY] = key;
1198     defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_VALUE] = std::string("");
1199     auto encodeDev = DBBase64Utils::Encode(dev);
1200     defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_DEVICE] = encodeDev;
1201     defaultRecord[CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE] = encodeDev;
1202     syncData.insData.record.push_back(std::move(defaultRecord));
1203     VBucket defaultExtend;
1204     defaultExtend[CloudDbConstant::HASH_KEY_FIELD] = DBCommon::TransferStringToHex(key);
1205     syncData.insData.extend.push_back(std::move(defaultExtend));
1206     syncData.insData.assets.emplace_back();
1207     Bytes bytesHashKey;
1208     DBCommon::StringToVector(key, bytesHashKey);
1209     syncData.insData.hashKey.push_back(bytesHashKey);
1210 }
1211 
BindVersionStmt(const std::string & device,const std::string & user,sqlite3_stmt * dataStmt)1212 int SqliteCloudKvExecutorUtils::BindVersionStmt(const std::string &device, const std::string &user,
1213     sqlite3_stmt *dataStmt)
1214 {
1215     std::string hashDevice;
1216     int errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDevice);
1217     if (errCode != E_OK) {
1218         return errCode;
1219     }
1220     Bytes bytes;
1221     if (device == hashDevice) {
1222         DBCommon::StringToVector("", bytes);
1223     } else {
1224         hashDevice = DBCommon::TransferHashString(device);
1225         DBCommon::StringToVector(hashDevice, bytes);
1226     }
1227     errCode = SQLiteUtils::BindBlobToStatement(dataStmt, BIND_CLOUD_VERSION_DEVICE_INDEX, bytes);
1228     if (errCode != E_OK) {
1229         LOGE("[SqliteCloudKvExecutorUtils] Bind device failed %d", errCode);
1230     }
1231     return errCode;
1232 }
1233 
GetCloudVersionFromCloud(sqlite3 * db,bool isMemory,const std::string & user,const std::string & device,std::vector<VBucket> & dataVector)1234 int SqliteCloudKvExecutorUtils::GetCloudVersionFromCloud(sqlite3 *db, bool isMemory, const std::string &user,
1235     const std::string &device, std::vector<VBucket> &dataVector)
1236 {
1237     sqlite3_stmt *dataStmt = nullptr;
1238     bool isDeviceEmpty = device.empty();
1239     std::string sql = SqliteQueryHelper::GetCloudVersionRecordSql(isDeviceEmpty);
1240     int errCode = SQLiteUtils::GetStatement(db, sql, dataStmt);
1241     if (errCode != E_OK) {
1242         LOGE("[SqliteCloudKvExecutorUtils] Get cloud version record statement failed %d", errCode);
1243         return errCode;
1244     }
1245     ResFinalizer finalizerData([dataStmt]() {
1246         int ret = E_OK;
1247         sqlite3_stmt *statement = dataStmt;
1248         SQLiteUtils::ResetStatement(statement, true, ret);
1249         if (ret != E_OK) {
1250             LOGW("[SqliteCloudKvExecutorUtils] Reset cloud version record stmt failed %d", ret);
1251         }
1252     });
1253     if (!isDeviceEmpty) {
1254         errCode = BindVersionStmt(device, user, dataStmt);
1255         if (errCode != E_OK) {
1256             return errCode;
1257         }
1258     }
1259     uint32_t totalSize = 0;
1260     do {
1261         errCode = SQLiteUtils::StepWithRetry(dataStmt, isMemory);
1262         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1263             errCode = E_OK;
1264             break;
1265         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1266             LOGE("[SqliteCloudKvExecutorUtils] Get cloud version from cloud failed. %d", errCode);
1267             break;
1268         }
1269         VBucket data;
1270         errCode = GetCloudVersionRecordData(dataStmt, data, totalSize);
1271         dataVector.push_back(data);
1272     } while (errCode == E_OK);
1273     return errCode;
1274 }
1275 
GetCloudVersionRecordData(sqlite3_stmt * stmt,VBucket & data,uint32_t & totalSize)1276 int SqliteCloudKvExecutorUtils::GetCloudVersionRecordData(sqlite3_stmt *stmt, VBucket &data, uint32_t &totalSize)
1277 {
1278     int errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, data, totalSize);
1279     if (errCode != E_OK) {
1280         return errCode;
1281     }
1282     errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, CLOUD_QUERY_VALUE_INDEX, stmt, data, totalSize);
1283     if (errCode != E_OK) {
1284         return errCode;
1285     }
1286     return GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_DEVICE, CLOUD_QUERY_DEV_INDEX, stmt, data, totalSize);
1287 }
1288 
GetWaitCompensatedSyncDataPk(sqlite3 * db,bool isMemory,std::vector<VBucket> & data)1289 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataPk(sqlite3 *db, bool isMemory, std::vector<VBucket> &data)
1290 {
1291     sqlite3_stmt *stmt = nullptr;
1292     int errCode = SQLiteUtils::GetStatement(db, SELECT_COMPENSATE_SYNC_KEY_SQL, stmt);
1293     if (errCode != E_OK) {
1294         LOGE("[SqliteCloudKvExecutorUtils] Get compensate key stmt failed %d", errCode);
1295         return errCode;
1296     }
1297     ResFinalizer finalizerData([stmt]() {
1298         int ret = E_OK;
1299         sqlite3_stmt *statement = stmt;
1300         SQLiteUtils::ResetStatement(statement, true, ret);
1301         if (ret != E_OK) {
1302             LOGW("[SqliteCloudKvExecutorUtils] Reset compensate key stmt failed %d", ret);
1303         }
1304     });
1305     uint32_t totalSize = 0;
1306     do {
1307         errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1308         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1309             errCode = E_OK;
1310             break;
1311         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1312             LOGE("[SqliteCloudKvExecutorUtils] Get key from compensate key stmt failed. %d", errCode);
1313             break;
1314         }
1315         VBucket key;
1316         errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_KEY, CLOUD_QUERY_KEY_INDEX, stmt, key, totalSize);
1317         if (errCode != E_OK) {
1318             return errCode;
1319         }
1320         data.push_back(key);
1321     } while (errCode == E_OK);
1322     return errCode;
1323 }
1324 
GetWaitCompensatedSyncDataUserId(sqlite3 * db,bool isMemory,std::vector<VBucket> & users)1325 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataUserId(sqlite3 *db, bool isMemory,
1326     std::vector<VBucket> &users)
1327 {
1328     sqlite3_stmt *stmt = nullptr;
1329     int errCode = SQLiteUtils::GetStatement(db, SELECT_COMPENSATE_SYNC_USERID_SQL, stmt);
1330     if (errCode != E_OK) {
1331         LOGE("[SqliteCloudKvExecutorUtils] Get compensate userid stmt failed %d", errCode);
1332         return errCode;
1333     }
1334     ResFinalizer finalizerData([stmt]() {
1335         int ret = E_OK;
1336         sqlite3_stmt *statement = stmt;
1337         SQLiteUtils::ResetStatement(statement, true, ret);
1338         if (ret != E_OK) {
1339             LOGW("[SqliteCloudKvExecutorUtils] Reset compensate key stmt failed %d", ret);
1340         }
1341     });
1342     uint32_t totalSize = 0;
1343     do {
1344         errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1345         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1346             errCode = E_OK;
1347             break;
1348         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1349             LOGE("[SqliteCloudKvExecutorUtils] Get key from compensate key stmt failed. %d", errCode);
1350             break;
1351         }
1352         VBucket key;
1353         errCode = GetCloudKvBlobData(CloudDbConstant::CLOUD_KV_FIELD_USERID, CLOUD_QUERY_KEY_INDEX, stmt,
1354             key, totalSize);
1355         if (errCode != E_OK) {
1356             return errCode;
1357         }
1358         users.push_back(key);
1359     } while (errCode == E_OK);
1360     return errCode;
1361 }
1362 
GetWaitCompensatedSyncData(sqlite3 * db,bool isMemory,std::vector<VBucket> & data,std::vector<VBucket> & users)1363 int SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncData(sqlite3 *db, bool isMemory, std::vector<VBucket> &data,
1364     std::vector<VBucket> &users)
1365 {
1366     int errCode = SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataPk(db, isMemory, data);
1367     if (errCode != E_OK) {
1368         LOGW("[GetWaitCompensatedSyncData] Get wait compensated sync date failed! errCode=%d", errCode);
1369         return errCode;
1370     }
1371     errCode = SqliteCloudKvExecutorUtils::GetWaitCompensatedSyncDataUserId(db, isMemory, users);
1372     if (errCode != E_OK) {
1373         LOGW("[GetWaitCompensatedSyncData] Get wait compensated sync date failed! errCode=%d", errCode);
1374     }
1375     return errCode;
1376 }
1377 
QueryCloudGid(sqlite3 * db,bool isMemory,const std::string & user,QuerySyncObject & querySyncObject,std::vector<std::string> & cloudGid)1378 int SqliteCloudKvExecutorUtils::QueryCloudGid(sqlite3 *db, bool isMemory, const std::string &user,
1379     QuerySyncObject &querySyncObject, std::vector<std::string> &cloudGid)
1380 {
1381     int errCode = E_OK;
1382     QuerySyncObject query = querySyncObject;
1383     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1384     if (errCode != E_OK) {
1385         return errCode;
1386     }
1387     sqlite3_stmt *stmt = nullptr;
1388     errCode = helper.GetAndBindGidKvCloudQueryStatement(user, db, stmt);
1389     if (errCode != E_OK) {
1390         return errCode;
1391     }
1392     do {
1393         errCode = SQLiteUtils::StepWithRetry(stmt, isMemory);
1394         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1395             errCode = E_OK;
1396             break;
1397         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1398             LOGE("[SqliteCloudKvExecutorUtils] Get cloud version from cloud failed. %d", errCode);
1399             break;
1400         }
1401         std::string gid;
1402         errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, gid);
1403         cloudGid.push_back(gid);
1404     } while (errCode == E_OK);
1405     int ret = E_OK;
1406     SQLiteUtils::ResetStatement(stmt, true, ret);
1407     return errCode == E_OK ? ret : errCode;
1408 }
1409 
BindFillGidLogStmt(sqlite3_stmt * logStmt,const std::string & user,const DataItem & dataItem,const VBucket & uploadExtend,const CloudWaterType & type)1410 int SqliteCloudKvExecutorUtils::BindFillGidLogStmt(sqlite3_stmt *logStmt, const std::string &user,
1411     const DataItem &dataItem, const VBucket &uploadExtend, const CloudWaterType &type)
1412 {
1413     DataItem wItem = dataItem;
1414     if (DBCommon::IsNeedCompensatedForUpload(uploadExtend, type) && !wItem.dataDelete) {
1415         wItem.cloud_flag |= static_cast<uint32_t>(LogInfoFlag::FLAG_WAIT_COMPENSATED_SYNC);
1416     }
1417     if (DBCommon::IsCloudRecordNotFound(uploadExtend) &&
1418         (type == CloudWaterType::UPDATE || type == CloudWaterType::DELETE)) {
1419         wItem.gid = {};
1420         wItem.version = {};
1421     }
1422     int errCode = E_OK;
1423     if (type == CloudWaterType::DELETE) {
1424         if (DBCommon::IsCloudRecordNotFound(uploadExtend)) {
1425             errCode = BindUpdateLogStmt(logStmt, user, wItem);
1426         }
1427     } else {
1428         errCode = BindInsertLogStmt(logStmt, user, wItem);
1429     }
1430     if (errCode != E_OK) {
1431         LOGE("[SqliteCloudKvExecutorUtils] fill cloud gid failed. %d", errCode);
1432     }
1433     return errCode;
1434 }
1435 
MarkUploadSuccess(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,size_t dataIndex)1436 void SqliteCloudKvExecutorUtils::MarkUploadSuccess(const FillGidParam &param, const CloudSyncBatch &data,
1437     const std::string &user, size_t dataIndex)
1438 {
1439     if (data.extend.size() <= dataIndex || data.hashKey.size() <= dataIndex || data.timestamp.size() <= dataIndex) {
1440         LOGW("[SqliteCloudKvExecutorUtils] invalid index %zu when mark upload success", dataIndex);
1441         return;
1442     }
1443     if (!DBCommon::IsRecordSuccess(data.extend[dataIndex])) {
1444         return;
1445     }
1446     if (CheckDataChanged(param, data, dataIndex)) {
1447         LOGW("[SqliteCloudKvExecutorUtils] %zu data changed when mark upload success", dataIndex);
1448         return;
1449     }
1450     MarkUploadSuccessInner(param, data, user, dataIndex);
1451 }
1452 
CheckDataChanged(const FillGidParam & param,const CloudSyncBatch & data,size_t dataIndex)1453 bool SqliteCloudKvExecutorUtils::CheckDataChanged(const FillGidParam &param,
1454     const CloudSyncBatch &data, size_t dataIndex)
1455 {
1456     sqlite3_stmt *checkStmt = nullptr;
1457     int errCode = SQLiteUtils::GetStatement(param.first, CHECK_DATA_CHANGED, checkStmt);
1458     ResFinalizer finalizerData([checkStmt]() {
1459         sqlite3_stmt *statement = checkStmt;
1460         int ret = E_OK;
1461         SQLiteUtils::ResetStatement(statement, true, ret);
1462         if (ret != E_OK) {
1463             LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when check data changed", ret);
1464         }
1465     });
1466     int index = 1;
1467     if (data.timestamp.size() > 0) {
1468         errCode = SQLiteUtils::BindInt64ToStatement(checkStmt, index++, data.timestamp[dataIndex]);
1469         if (errCode != E_OK) {
1470             LOGW("[SqliteCloudKvExecutorUtils] bind modify time failed %d when check data changed", errCode);
1471             return true;
1472         }
1473     }
1474     if (data.hashKey.size() > 0) {
1475         errCode = SQLiteUtils::BindBlobToStatement(checkStmt, index++, data.hashKey[dataIndex]);
1476         if (errCode != E_OK) {
1477             LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when check data changed", errCode);
1478             return true;
1479         }
1480     }
1481     errCode = SQLiteUtils::StepNext(checkStmt);
1482     if (errCode != E_OK) {
1483         LOGW("[SqliteCloudKvExecutorUtils] step failed %d when check data changed", errCode);
1484         return true;
1485     }
1486     return sqlite3_column_int64(checkStmt, 0) == 0; // get index start at 0, get 0 is data changed
1487 }
1488 
CheckDataDelete(const FillGidParam & param,const CloudSyncBatch & data,size_t dataIndex)1489 bool SqliteCloudKvExecutorUtils::CheckDataDelete(const FillGidParam &param,
1490     const CloudSyncBatch &data, size_t dataIndex)
1491 {
1492     sqlite3_stmt *checkStmt = nullptr;
1493     int errCode = SQLiteUtils::GetStatement(param.first, CHECK_DATA_DELETE, checkStmt);
1494     ResFinalizer finalizerData([checkStmt]() {
1495         sqlite3_stmt *statement = checkStmt;
1496         int ret = E_OK;
1497         SQLiteUtils::ResetStatement(statement, true, ret);
1498         if (ret != E_OK) {
1499             LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when check data delete", ret);
1500         }
1501     });
1502     int index = 1;
1503     if (data.timestamp.size() > 0) {
1504         errCode = SQLiteUtils::BindInt64ToStatement(checkStmt, index++, data.timestamp[dataIndex]);
1505         if (errCode != E_OK) {
1506             LOGW("[SqliteCloudKvExecutorUtils] bind modify time failed %d when check data delete", errCode);
1507             return true;
1508         }
1509     }
1510     if (data.hashKey.size() > 0) {
1511         errCode = SQLiteUtils::BindBlobToStatement(checkStmt, index++, data.hashKey[dataIndex]);
1512         if (errCode != E_OK) {
1513             LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when check data delete", errCode);
1514             return true;
1515         }
1516     }
1517     errCode = SQLiteUtils::StepNext(checkStmt);
1518     if (errCode != E_OK) {
1519         LOGW("[SqliteCloudKvExecutorUtils] step failed %d when check data delete", errCode);
1520         return true;
1521     }
1522     return sqlite3_column_int64(checkStmt, 0) != 0; // get index start at !0, get 0 means data delete
1523 }
1524 
MarkUploadSuccessInner(const FillGidParam & param,const CloudSyncBatch & data,const std::string & user,size_t dataIndex)1525 void SqliteCloudKvExecutorUtils::MarkUploadSuccessInner(const FillGidParam &param,
1526     const CloudSyncBatch &data, const std::string &user, size_t dataIndex)
1527 {
1528     sqlite3_stmt *logStmt = nullptr;
1529     int errCode = SQLiteUtils::GetStatement(param.first, MARK_UPLOAD_SUCCESS, logStmt);
1530     ResFinalizer finalizerData([logStmt]() {
1531         sqlite3_stmt *statement = logStmt;
1532         int ret = E_OK;
1533         SQLiteUtils::ResetStatement(statement, true, ret);
1534         if (ret != E_OK) {
1535             LOGW("[SqliteCloudKvExecutorUtils] reset log stmt failed %d when mark upload success", ret);
1536         }
1537     });
1538     int index = 1;
1539     errCode = SQLiteUtils::BindBlobToStatement(logStmt, index++, data.hashKey[dataIndex]);
1540     if (errCode != E_OK) {
1541         LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when mark upload success", errCode);
1542         return;
1543     }
1544     errCode = SQLiteUtils::BindTextToStatement(logStmt, index++, user);
1545     if (errCode != E_OK) {
1546         LOGW("[SqliteCloudKvExecutorUtils] bind hashKey failed %d when mark upload success", errCode);
1547         return;
1548     }
1549     errCode = SQLiteUtils::StepNext(logStmt);
1550     if (errCode != E_OK && errCode != -E_FINISHED) {
1551         LOGW("[SqliteCloudKvExecutorUtils] step failed %d when mark upload success", errCode);
1552         return;
1553     }
1554 }
1555 }