1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include <iostream>
18 #include "cloud/cloud_storage_utils.h"
19 #include "cloud/cloud_db_constant.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "process_system_api_adapter_impl.h"
23 #include "relational_store_instance.h"
24 #include "relational_store_manager.h"
25 #include "runtime_config.h"
26 #include "sqlite_relational_store.h"
27 #include "sqlite_relational_utils.h"
28 #include "store_observer.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33 #include "virtual_communicator_aggregator.h"
34 #include "mock_asset_loader.h"
35 
36 using namespace testing::ext;
37 using namespace DistributedDB;
38 using namespace DistributedDBUnitTest;
39 using namespace std;
40 
41 namespace {
42     string g_storeID = "Relational_Store_SYNC";
43     const string g_tableName1 = "worker1";
44     const string g_tableName2 = "worker2";
45     const string g_tableName3 = "worker3";
46     const string g_tableName4 = "worker4";
47     const string DEVICE_CLOUD = "cloud_dev";
48     const string DB_SUFFIX = ".db";
49     const int64_t g_syncWaitTime = 60;
50     const int g_arrayHalfSub = 2;
51     int g_syncIndex = 0;
52     string g_testDir;
53     string g_storePath;
54     std::mutex g_processMutex;
55     std::condition_variable g_processCondition;
56     std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
57     std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
58     DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
59     RelationalStoreObserverUnitTest *g_observer = nullptr;
60     RelationalStoreDelegate *g_delegate = nullptr;
61     SyncProcess g_syncProcess;
62     using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
63     const std::string CREATE_LOCAL_TABLE_SQL =
64             "CREATE TABLE IF NOT EXISTS " + g_tableName1 + "(" \
65     "name TEXT PRIMARY KEY," \
66     "height REAL ," \
67     "married BOOLEAN ," \
68     "photo BLOB NOT NULL," \
69     "assert BLOB," \
70     "age INT);";
71     const std::string INTEGER_PRIMARY_KEY_TABLE_SQL =
72             "CREATE TABLE IF NOT EXISTS " + g_tableName2 + "(" \
73     "id INTEGER PRIMARY KEY," \
74     "name TEXT ," \
75     "height REAL ," \
76     "photo BLOB ," \
77     "asserts BLOB," \
78     "age INT);";
79     const std::string DROP_INTEGER_PRIMARY_KEY_TABLE_SQL = "DROP TABLE " + g_tableName2 + ";";
80     const std::string CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL =
81             "CREATE TABLE IF NOT EXISTS " + g_tableName3 + "(" \
82     "name TEXT," \
83     "height REAL ," \
84     "married BOOLEAN ," \
85     "photo BLOB NOT NULL," \
86     "assert BLOB," \
87     "age INT);";
88     const std::string INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE =
89             "CREATE TABLE IF NOT EXISTS " + g_tableName4 + "(" \
90     "id INTEGER PRIMARY KEY," \
91     "name TEXT ," \
92     "height REAL ," \
93     "photo BLOB ," \
94     "asserts BLOB," \
95     "age INT);";
96     const std::vector<Field> g_cloudFiled1 = {
97         {"Name", TYPE_INDEX<std::string>, true}, {"height", TYPE_INDEX<double>},
98         {"MArried", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
99         {"Assert", TYPE_INDEX<Asset>}, {"age", TYPE_INDEX<int64_t>}
100     };
101     const std::vector<Field> g_invalidCloudFiled1 = {
102         {"name", TYPE_INDEX<std::string>, true}, {"height", TYPE_INDEX<int>},
103         {"married", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
104         {"assert", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
105     };
106     const std::vector<Field> g_cloudFiled2 = {
107         {"id", TYPE_INDEX<int64_t>, true}, {"name", TYPE_INDEX<std::string>},
108         {"height", TYPE_INDEX<double>},  {"photo", TYPE_INDEX<Bytes>},
109         {"asserts", TYPE_INDEX<Assets>}, {"age", TYPE_INDEX<int64_t>}
110     };
111     const std::vector<Field> g_cloudFiledWithOutPrimaryKey3 = {
112         {"name", TYPE_INDEX<std::string>, false, true}, {"height", TYPE_INDEX<double>},
113         {"married", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
114         {"assert", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
115     };
116     const std::vector<std::string> g_tables = {g_tableName1, g_tableName2};
117     const std::vector<std::string> g_tablesPKey = {g_cloudFiled1[0].colName, g_cloudFiled2[0].colName};
118     const std::vector<string> g_prefix = {"Local", ""};
119     const Asset g_localAsset = {
120         .version = 1, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/local/sync",
121         .modifyTime = "123456", .createTime = "", .size = "256", .hash = "ASE"
122     };
123     const Asset g_cloudAsset = {
124         .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
125         .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
126     };
127 
CreateUserDBAndTable(sqlite3 * & db)128     void CreateUserDBAndTable(sqlite3 *&db)
129     {
130         EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
131         EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_LOCAL_TABLE_SQL), SQLITE_OK);
132         EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL), SQLITE_OK);
133         EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL), SQLITE_OK);
134     }
135 
InsertUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)136     void InsertUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull)
137     {
138         std::string photo(photoSize, 'v');
139         int errCode;
140         std::vector<uint8_t> assetBlob;
141         for (int64_t i = begin; i < begin + count; ++i) {
142             Asset asset = g_localAsset;
143             asset.name = asset.name + std::to_string(i);
144             RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
145             string sql = "INSERT OR REPLACE INTO " + g_tableName1
146                          + " (name, height, married, photo, assert, age) VALUES ('Local" + std::to_string(i) +
147                          "', '175.8', '0', '" + photo + "', ? , '18');";
148             sqlite3_stmt *stmt = nullptr;
149             ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
150             if (assetIsNull) {
151                 ASSERT_EQ(sqlite3_bind_null(stmt, 1), SQLITE_OK);
152             } else {
153                 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
154             }
155             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
156             SQLiteUtils::ResetStatement(stmt, true, errCode);
157         }
158         for (int64_t i = begin; i < begin + count; ++i) {
159             std::vector<Asset> assets;
160             Asset asset = g_localAsset;
161             asset.name = g_localAsset.name + std::to_string(i);
162             assets.push_back(asset);
163             asset.name = g_localAsset.name + std::to_string(i + 1);
164             assets.push_back(asset);
165             RuntimeContext::GetInstance()->AssetsToBlob(assets, assetBlob);
166             string sql = "INSERT OR REPLACE INTO " + g_tableName2
167                          + " (id, name, height, photo, asserts, age) VALUES ('" + std::to_string(i) + "', 'Local"
168                          + std::to_string(i) + "', '155.10', '"+ photo + "',  ? , '21');";
169             sqlite3_stmt *stmt = nullptr;
170             ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
171             if (assetIsNull) {
172                 ASSERT_EQ(sqlite3_bind_null(stmt, 1), E_OK);
173             } else {
174                 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
175             }
176             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
177             SQLiteUtils::ResetStatement(stmt, true, errCode);
178         }
179         LOGD("insert user record worker1[primary key]:[Local%" PRId64 " - Local%" PRId64
180             ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
181     }
182 
UpdateUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count)183     void UpdateUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count)
184     {
185         for (size_t i = 0; i < g_tables.size(); i++) {
186             string updateAge = "UPDATE " + g_tables[i] + " SET age = '99' where " + g_tablesPKey[i] + " in (";
187             for (int64_t j = begin; j < begin + count; ++j) {
188                 updateAge += "'" + g_prefix[i] + std::to_string(j) + "',";
189             }
190             updateAge.pop_back();
191             updateAge += ");";
192             ASSERT_EQ(RelationalTestUtils::ExecSql(db, updateAge), SQLITE_OK);
193         }
194         LOGD("update local record worker1[primary key]:[local%" PRId64 " - local%" PRId64
195             ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
196     }
197 
DeleteUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count)198     void DeleteUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count)
199     {
200         for (size_t i = 0; i < g_tables.size(); i++) {
201             string updateAge = "Delete from " + g_tables[i] + " where " + g_tablesPKey[i] + " in (";
202             for (int64_t j = begin; j < count; ++j) {
203                 updateAge += "'" + g_prefix[i] + std::to_string(j) + "',";
204             }
205             updateAge.pop_back();
206             updateAge += ");";
207             ASSERT_EQ(RelationalTestUtils::ExecSql(db, updateAge), SQLITE_OK);
208         }
209         LOGD("delete local record worker1[primary key]:[local%" PRId64 " - local%" PRId64
210             ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
211     }
212 
InsertRecordWithoutPk2LocalAndCloud(sqlite3 * & db,int64_t begin,int64_t count,int photoSize)213     void InsertRecordWithoutPk2LocalAndCloud(sqlite3 *&db, int64_t begin, int64_t count, int photoSize)
214     {
215         std::vector<uint8_t> photo(photoSize, 'v');
216         std::string photoLocal(photoSize, 'v');
217         Asset asset = { .version = 1, .name = "Phone" };
218         std::vector<uint8_t> assetBlob;
219         RuntimeContext::GetInstance()->BlobToAsset(assetBlob, asset);
220         std::string assetStr(assetBlob.begin(), assetBlob.end());
221         std::vector<VBucket> record1;
222         std::vector<VBucket> extend1;
223         for (int64_t i = begin; i < count; ++i) {
224             Timestamp now = TimeHelper::GetSysCurrentTime();
225             VBucket data;
226             data.insert_or_assign("name", "Cloud" + std::to_string(i));
227             data.insert_or_assign("height", 166.0); // 166.0 is random double value
228             data.insert_or_assign("married", (bool)0);
229             data.insert_or_assign("photo", photo);
230             data.insert_or_assign("assert", KEY_1);
231             data.insert_or_assign("age", 13L);
232             record1.push_back(data);
233             VBucket log;
234             log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
235             log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
236             log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
237             extend1.push_back(log);
238             std::this_thread::sleep_for(std::chrono::milliseconds(1));  // wait for 1 ms
239         }
240         int errCode = g_virtualCloudDb->BatchInsert(g_tableName3, std::move(record1), extend1);
241         ASSERT_EQ(errCode, DBStatus::OK);
242         for (int64_t i = begin; i < count; ++i) {
243             string sql = "INSERT OR REPLACE INTO " + g_tableName3
244                          + " (name, height, married, photo, assert, age) VALUES ('Local" + std::to_string(i) +
245                          "', '175.8', '0', '" + photoLocal + "', '" + assetStr + "', '18');";
246             ASSERT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
247         }
248     }
249 
InsertCloudTableRecord(int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)250     void InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull)
251     {
252         std::vector<uint8_t> photo(photoSize, 'v');
253         std::vector<VBucket> record1;
254         std::vector<VBucket> extend1;
255         std::vector<VBucket> record2;
256         std::vector<VBucket> extend2;
257         Timestamp now = TimeHelper::GetSysCurrentTime();
258         for (int64_t i = begin; i < begin + count; ++i) {
259             VBucket data;
260             data.insert_or_assign("name", "Cloud" + std::to_string(i));
261             data.insert_or_assign("height", 166.0); // 166.0 is random double value
262             data.insert_or_assign("married", false);
263             data.insert_or_assign("photo", photo);
264             data.insert_or_assign("AGE", 13L);
265             Asset asset = g_cloudAsset;
266             asset.name = asset.name + std::to_string(i);
267             assetIsNull ? data.insert_or_assign("assert", Nil()) : data.insert_or_assign("assert", asset);
268             record1.push_back(data);
269             VBucket log;
270             log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
271             log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
272             log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
273             extend1.push_back(log);
274 
275             std::vector<Asset> assets;
276             data.insert_or_assign("id", i);
277             data.insert_or_assign("height", 180.3); // 180.3 is random double value
278             for (int64_t j = i; j <= i + 2; j++) { // 2 extra num
279                 asset.name = g_cloudAsset.name + std::to_string(j);
280                 assets.push_back(asset);
281             }
282             data.erase("assert");
283             data.erase("married");
284             assetIsNull ? data.insert_or_assign("asserts", Nil()) : data.insert_or_assign("asserts", assets);
285             record2.push_back(data);
286             extend2.push_back(log);
287         }
288         ASSERT_EQ(g_virtualCloudDb->BatchInsert(g_tableName1, std::move(record1), extend1), DBStatus::OK);
289         ASSERT_EQ(g_virtualCloudDb->BatchInsert(g_tableName2, std::move(record2), extend2), DBStatus::OK);
290         LOGD("insert cloud record worker1[primary key]:[cloud%" PRId64 " - cloud%" PRId64
291             ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
292         std::this_thread::sleep_for(std::chrono::milliseconds(count));
293     }
294 
UpdateAssetForTest(sqlite3 * & db,AssetOpType opType,int64_t cloudCount,int64_t rowid)295     void UpdateAssetForTest(sqlite3 *&db, AssetOpType opType, int64_t cloudCount, int64_t rowid)
296     {
297         string sql = "UPDATE " + g_tables[0] + " SET assert = ? where rowid = '" + std::to_string(rowid) + "';";
298         std::vector<uint8_t> assetBlob;
299         int errCode;
300         Asset asset = g_cloudAsset;
301         asset.name = "Phone" + std::to_string(rowid - cloudCount - 1);
302         if (opType == AssetOpType::UPDATE) {
303             asset.uri = "/data/test";
304             asset.hash = "";
305         } else if (opType == AssetOpType::INSERT) {
306             asset.name = "Test10";
307         }
308         asset.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
309         sqlite3_stmt *stmt = nullptr;
310         RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
311         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
312         if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false) == E_OK) {
313             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
314         }
315         SQLiteUtils::ResetStatement(stmt, true, errCode);
316     }
317 
UpdateAssetsForTest(sqlite3 * & db,AssetOpType opType,int64_t rowid)318     void UpdateAssetsForTest(sqlite3 *&db, AssetOpType opType, int64_t rowid)
319     {
320         string sql = "UPDATE " + g_tables[1] + " SET asserts = ? where rowid = '" + std::to_string(rowid) + "';";
321         Asset asset1 = g_localAsset;
322         Asset asset2 = g_localAsset;
323         Assets assets;
324         asset1.name = g_localAsset.name + std::to_string(rowid);
325         asset1.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(AssetOpType::NO_CHANGE));
326         asset2.name = g_localAsset.name + std::to_string(rowid + 1);
327         asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(AssetOpType::NO_CHANGE));
328         if (opType == AssetOpType::UPDATE) {
329             assets.push_back(asset1);
330             asset2.uri = "/data/test";
331             asset2.hash = "";
332             asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
333             assets.push_back(asset2);
334         } else if (opType == AssetOpType::INSERT) {
335             assets.push_back(asset1);
336             assets.push_back(asset2);
337             Asset asset3;
338             asset3.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
339             asset3.name = "Test10";
340             assets.push_back(asset3);
341         } else if (opType == AssetOpType::DELETE) {
342             assets.push_back(asset1);
343             asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
344             assets.push_back(asset2);
345         } else {
346             assets.push_back(asset1);
347             assets.push_back(asset2);
348         }
349         sqlite3_stmt *stmt = nullptr;
350         std::vector<uint8_t> assetsBlob;
351         RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
352         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
353         if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetsBlob, false) == E_OK) {
354             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
355         }
356         int errCode;
357         SQLiteUtils::ResetStatement(stmt, true, errCode);
358     }
359 
UpdateLocalAssets(sqlite3 * & db,Assets & assets,int64_t rowid)360     void UpdateLocalAssets(sqlite3 *&db, Assets &assets, int64_t rowid)
361     {
362         string sql = "UPDATE " + g_tables[1] + " SET asserts = ? where rowid = '" + std::to_string(rowid) + "';";
363         std::vector<uint8_t> assetsBlob;
364         int errCode;
365         RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
366         sqlite3_stmt *stmt = nullptr;
367         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
368         if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetsBlob, false) == E_OK) {
369             EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
370         }
371         SQLiteUtils::ResetStatement(stmt, true, errCode);
372     }
373 
UpdateDiffType(int64_t begin)374     void UpdateDiffType(int64_t begin)
375     {
376         std::vector<std::string> hash = {"DEC", "update_", "insert_"};
377         std::vector<std::string> name = {
378             g_cloudAsset.name + std::to_string(0),
379             g_cloudAsset.name + std::to_string(1),
380             g_cloudAsset.name + std::to_string(3) // 3 is insert id
381         };
382         std::vector<VBucket> record;
383         std::vector<VBucket> extend;
384         Assets assets;
385         for (int i = 0; i < 3; i ++) { // 3 is type num
386             Asset asset = g_cloudAsset;
387             asset.name = name[i];
388             asset.hash = hash[i];
389             assets.push_back(asset);
390         }
391         VBucket data;
392         data.insert_or_assign("name", "Cloud" + std::to_string(0));
393         data.insert_or_assign("id", 0L);
394         data.insert_or_assign("asserts", assets);
395         Timestamp now = TimeHelper::GetSysCurrentTime();
396         VBucket log;
397         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
398         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(begin));
399         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
400         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
401         record.push_back(data);
402         extend.push_back(log);
403         ASSERT_EQ(g_virtualCloudDb->BatchUpdate(g_tableName2, std::move(record), extend), DBStatus::OK);
404     }
405 
CheckDiffTypeAsset(sqlite3 * & db)406     void CheckDiffTypeAsset(sqlite3 *&db)
407     {
408         std::vector<std::string> names = {
409             g_cloudAsset.name + std::to_string(0),
410             g_cloudAsset.name + std::to_string(1),
411             g_cloudAsset.name + std::to_string(3) // 3 is insert id
412         };
413         std::string sql = "SELECT asserts from " + g_tables[1] + " WHERE rowid = 0;";
414         sqlite3_stmt *stmt = nullptr;
415         int index = 0;
416         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
417         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
418             ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
419             Type cloudValue;
420             ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
421             std::vector<uint8_t> assetsBlob;
422             Assets assets;
423             ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
424             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
425             for (const Asset &asset: assets) {
426                 ASSERT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
427                 ASSERT_EQ(asset.name, names[index]);
428                 LOGE("lyh_test: name: %s", names[index].c_str());
429                 index++;
430             }
431         }
432         int errCode;
433         SQLiteUtils::ResetStatement(stmt, true, errCode);
434     }
435 
CheckAssetForAssetTest006()436     void CheckAssetForAssetTest006()
437     {
438         VBucket extend;
439         extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
440         std::vector<VBucket> data;
441         g_virtualCloudDb->Query(g_tables[1], extend, data);
442         for (size_t j = 0; j < data.size(); ++j) {
443             ASSERT_NE(data[j].find("asserts"), data[j].end());
444             ASSERT_TRUE((data[j]["asserts"]).index() == TYPE_INDEX<Assets>);
445             Assets &assets = std::get<Assets>(data[j]["asserts"]);
446             ASSERT_TRUE(assets.size() > 0);
447             Asset &asset = assets[0];
448             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::DELETE));
449             EXPECT_EQ(asset.flag, static_cast<uint32_t>(AssetOpType::DELETE));
450         }
451     }
452 
CheckFillAssetForTest10(sqlite3 * & db)453     void CheckFillAssetForTest10(sqlite3 *&db)
454     {
455         std::string sql = "SELECT assert from " + g_tables[0] + " WHERE rowid in ('27','28','29','30');";
456         sqlite3_stmt *stmt = nullptr;
457         int index = 0;
458         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
459         int suffixId = 6;
460         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
461             if (index == 0 || index == 1 || index == 3) { // 3 is rowid index of 29
462                 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
463                 Type cloudValue;
464                 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Asset>, 0, cloudValue), E_OK);
465                 std::vector<uint8_t> assetBlob;
466                 Asset asset;
467                 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetBlob), E_OK);
468                 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(assetBlob, asset), E_OK);
469                 ASSERT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
470                 if (index == 0) {
471                     ASSERT_EQ(asset.name, g_cloudAsset.name + std::to_string(suffixId + index));
472                 } else if (index == 1) {
473                     ASSERT_EQ(asset.name, "Test10");
474                 } else {
475                     ASSERT_EQ(asset.name, g_cloudAsset.name + std::to_string(suffixId + index));
476                     ASSERT_EQ(asset.uri, "/data/test");
477                 }
478             } else {
479                 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_NULL);
480             }
481             index++;
482         }
483         int errCode;
484         SQLiteUtils::ResetStatement(stmt, true, errCode);
485     }
486 
CheckFillAssetsForTest10(sqlite3 * & db)487     void CheckFillAssetsForTest10(sqlite3 *&db)
488     {
489         std::string sql = "SELECT asserts from " + g_tables[1] + " WHERE rowid in ('0','1','2','3');";
490         sqlite3_stmt *stmt = nullptr;
491         int index = 0;
492         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
493         int insertIndex = 2;
494         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
495             ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
496             Type cloudValue;
497             ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
498             std::vector<uint8_t> assetsBlob;
499             Assets assets;
500             ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
501             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
502             if (index == 0) {
503                 ASSERT_EQ(assets.size(), 2u);
504                 ASSERT_EQ(assets[0].name, g_localAsset.name + std::to_string(index));
505                 ASSERT_EQ(assets[1].name, g_localAsset.name + std::to_string(index + 1));
506             } else if (index == 1) {
507                 ASSERT_EQ(assets.size(), 3u);
508                 ASSERT_EQ(assets[insertIndex].name, "Test10");
509                 ASSERT_EQ(assets[insertIndex].status, static_cast<uint32_t>(AssetStatus::NORMAL));
510             } else if (index == 2) { // 2 is the third element
511                 ASSERT_EQ(assets.size(), 1u);
512                 ASSERT_EQ(assets[0].name, g_cloudAsset.name + std::to_string(index));
513             } else {
514                 ASSERT_EQ(assets.size(), 2u);
515                 ASSERT_EQ(assets[1].uri, "/data/test");
516                 ASSERT_EQ(assets[1].status, static_cast<uint32_t>(AssetStatus::NORMAL));
517             }
518             index++;
519         }
520         int errCode;
521         SQLiteUtils::ResetStatement(stmt, true, errCode);
522     }
523 
QueryCountCallback(void * data,int count,char ** colValue,char ** colName)524     int QueryCountCallback(void *data, int count, char **colValue, char **colName)
525     {
526         if (count != 1) {
527             return 0;
528         }
529         auto expectCount = reinterpret_cast<int64_t>(data);
530         EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal
531         return 0;
532     }
533 
CheckDownloadResult(sqlite3 * & db,std::vector<int64_t> expectCounts,std::string keyStr="")534     void CheckDownloadResult(sqlite3 *&db, std::vector<int64_t> expectCounts, std::string keyStr = "Cloud")
535     {
536         for (size_t i = 0; i < g_tables.size(); ++i) {
537             string queryDownload = "select count(*) from " + g_tables[i] + " where name "
538                                    + " like '" + keyStr + "%'";
539             EXPECT_EQ(sqlite3_exec(db, queryDownload.c_str(), QueryCountCallback,
540                 reinterpret_cast<void *>(expectCounts[i]), nullptr), SQLITE_OK);
541         }
542     }
543 
CheckCloudTotalCount(std::vector<int64_t> expectCounts)544     void CheckCloudTotalCount(std::vector<int64_t> expectCounts)
545     {
546         VBucket extend;
547         for (size_t i = 0; i < g_tables.size(); ++i) {
548             extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
549             int64_t realCount = 0;
550             std::vector<VBucket> data;
551             g_virtualCloudDb->Query(g_tables[i], extend, data);
552             for (size_t j = 0; j < data.size(); ++j) {
553                 auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
554                 if (entry != data[j].end() && std::get<bool>(entry->second)) {
555                     continue;
556                 }
557                 realCount++;
558             }
559             EXPECT_EQ(realCount, expectCounts[i]); // ExpectCount represents the total amount of cloud data.
560         }
561     }
562 
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)563     void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
564     {
565         TableSchema tableSchema1 = {
566             .name = g_tableName1,
567             .sharedTableName = g_tableName1 + "_shared",
568             .fields = g_cloudFiled1
569         };
570         TableSchema tableSchema2 = {
571             .name = g_tableName2,
572             .sharedTableName = g_tableName2 + "_shared",
573             .fields = g_cloudFiled2
574         };
575         TableSchema tableSchemaWithOutPrimaryKey = {
576             .name = g_tableName3,
577             .sharedTableName = g_tableName3 + "_shared",
578             .fields = g_cloudFiledWithOutPrimaryKey3
579         };
580         TableSchema tableSchema4 = {
581             .name = g_tableName4,
582             .sharedTableName = g_tableName4 + "_shared",
583             .fields = g_cloudFiled2
584         };
585         dataBaseSchema.tables.push_back(tableSchema1);
586         dataBaseSchema.tables.push_back(tableSchema2);
587         dataBaseSchema.tables.push_back(tableSchemaWithOutPrimaryKey);
588         dataBaseSchema.tables.push_back(tableSchema4);
589     }
590 
591 
GetInvalidCloudDbSchema(DataBaseSchema & dataBaseSchema)592     void GetInvalidCloudDbSchema(DataBaseSchema &dataBaseSchema)
593     {
594         TableSchema tableSchema1 = {
595             .name = g_tableName1,
596             .sharedTableName = "",
597             .fields = g_invalidCloudFiled1
598         };
599         TableSchema tableSchema2 = {
600             .name = g_tableName2,
601             .sharedTableName = "",
602             .fields = g_cloudFiled2
603         };
604         dataBaseSchema.tables.push_back(tableSchema1);
605         dataBaseSchema.tables.push_back(tableSchema2);
606     }
607 
InitProcessForTest1(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)608     void InitProcessForTest1(const uint32_t &cloudCount, const uint32_t &localCount,
609         std::vector<SyncProcess> &expectProcess)
610     {
611         expectProcess.clear();
612         std::vector<TableProcessInfo> infos;
613         uint32_t index = 1;
614         infos.push_back(TableProcessInfo{
615             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
616         });
617         infos.push_back(TableProcessInfo{
618             PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
619         });
620 
621         infos.push_back(TableProcessInfo{
622             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
623         });
624         infos.push_back(TableProcessInfo{
625             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
626         });
627 
628         infos.push_back(TableProcessInfo{
629             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
630         });
631         infos.push_back(TableProcessInfo{
632             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
633         });
634 
635         infos.push_back(TableProcessInfo{
636             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
637         });
638         infos.push_back(TableProcessInfo{
639             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
640         });
641 
642         for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
643             SyncProcess syncProcess;
644             syncProcess.errCode = OK;
645             syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
646             syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
647             syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
648             expectProcess.push_back(syncProcess);
649         }
650     }
651 
InitProcessForMannualSync1(std::vector<SyncProcess> & expectProcess)652     void InitProcessForMannualSync1(std::vector<SyncProcess> &expectProcess)
653     {
654         expectProcess.clear();
655         std::vector<TableProcessInfo> infos;
656         // first notify, first table
657         infos.push_back(TableProcessInfo{
658             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
659         });
660         // first notify, second table
661         infos.push_back(TableProcessInfo{
662             PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
663         });
664         // second notify, first table
665         infos.push_back(TableProcessInfo{
666             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
667         });
668         // second notify, second table
669         infos.push_back(TableProcessInfo{
670             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
671         });
672 
673         infos.push_back(TableProcessInfo{
674             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
675         });
676         // second notify, second table
677         infos.push_back(TableProcessInfo{
678             FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
679         });
680         for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
681             SyncProcess syncProcess;
682             syncProcess.errCode = OK;
683             syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
684             syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
685             syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
686             expectProcess.push_back(syncProcess);
687         }
688     }
689 
InitProcessForTest2(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)690     void InitProcessForTest2(const uint32_t &cloudCount, const uint32_t &localCount,
691         std::vector<SyncProcess> &expectProcess)
692     {
693         expectProcess.clear();
694         std::vector<TableProcessInfo> infos;
695         uint32_t index = 1;
696         infos.push_back(TableProcessInfo{
697             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
698         });
699         infos.push_back(TableProcessInfo{
700             PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
701         });
702 
703         infos.push_back(TableProcessInfo{
704             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
705         });
706         infos.push_back(TableProcessInfo{
707             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
708         });
709 
710         infos.push_back(TableProcessInfo{
711             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
712         });
713         infos.push_back(TableProcessInfo{
714             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
715         });
716 
717         infos.push_back(TableProcessInfo{
718             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
719         });
720         infos.push_back(TableProcessInfo{
721             FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount - cloudCount, localCount - cloudCount, 0}
722         });
723 
724         for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
725             SyncProcess syncProcess;
726             syncProcess.errCode = OK;
727             syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
728             syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
729             syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
730             expectProcess.push_back(syncProcess);
731         }
732     }
733 
InitProcessForTest9(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)734     void InitProcessForTest9(const uint32_t &cloudCount, const uint32_t &localCount,
735         std::vector<SyncProcess> &expectProcess)
736     {
737         expectProcess.clear();
738         std::vector<TableProcessInfo> infos;
739         uint32_t index = 1;
740         infos.push_back(TableProcessInfo{
741             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
742         });
743         infos.push_back(TableProcessInfo{
744             PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
745         });
746 
747         infos.push_back(TableProcessInfo{
748             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
749         });
750         infos.push_back(TableProcessInfo{
751             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
752         });
753 
754         infos.push_back(TableProcessInfo{
755             FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
756         });
757         infos.push_back(TableProcessInfo{
758             PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
759         });
760 
761         infos.push_back(TableProcessInfo{
762             FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
763         });
764         infos.push_back(TableProcessInfo{
765             FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
766         });
767 
768         for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
769             SyncProcess syncProcess;
770             syncProcess.errCode = OK;
771             syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
772             syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
773             syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
774             expectProcess.push_back(syncProcess);
775         }
776     }
GetCallback(SyncProcess & syncProcess,CloudSyncStatusCallback & callback,std::vector<SyncProcess> & expectProcess)777     void GetCallback(SyncProcess &syncProcess, CloudSyncStatusCallback &callback,
778         std::vector<SyncProcess> &expectProcess)
779     {
780         g_syncIndex = 0;
781         callback = [&syncProcess, &expectProcess](const std::map<std::string, SyncProcess> &process) {
782             LOGI("devices size = %d", process.size());
783             ASSERT_EQ(process.size(), 1u);
784             syncProcess = std::move(process.begin()->second);
785             ASSERT_EQ(process.begin()->first, DEVICE_CLOUD);
786             ASSERT_NE(syncProcess.tableProcess.empty(), true);
787             LOGI("current sync process status:%d, db status:%d ", syncProcess.process, syncProcess.errCode);
788             std::for_each(g_tables.begin(), g_tables.end(), [&](const auto &item) {
789                 auto table1 = syncProcess.tableProcess.find(item);
790                 if (table1 != syncProcess.tableProcess.end()) {
791                     LOGI("table[%s], table process status:%d, [downloadInfo](batchIndex:%u, total:%u, successCount:%u, "
792                          "failCount:%u) [uploadInfo](batchIndex:%u, total:%u, successCount:%u,failCount:%u",
793                          item.c_str(), table1->second.process, table1->second.downLoadInfo.batchIndex,
794                          table1->second.downLoadInfo.total, table1->second.downLoadInfo.successCount,
795                          table1->second.downLoadInfo.failCount, table1->second.upLoadInfo.batchIndex,
796                          table1->second.upLoadInfo.total, table1->second.upLoadInfo.successCount,
797                          table1->second.upLoadInfo.failCount);
798                 }
799             });
800             if (expectProcess.empty()) {
801                 if (syncProcess.process == FINISHED) {
802                     g_processCondition.notify_one();
803                 }
804                 return;
805             }
806             ASSERT_LE(static_cast<size_t>(g_syncIndex), expectProcess.size());
807             for (size_t i = 0; i < g_tables.size() && static_cast<size_t>(g_syncIndex) < expectProcess.size(); ++i) {
808                 SyncProcess head = expectProcess[g_syncIndex];
809                 for (auto &expect : head.tableProcess) {
810                     auto real = syncProcess.tableProcess.find(expect.first);
811                     ASSERT_NE(real, syncProcess.tableProcess.end());
812                     EXPECT_EQ(expect.second.process, real->second.process);
813                     EXPECT_EQ(expect.second.downLoadInfo.batchIndex, real->second.downLoadInfo.batchIndex);
814                     EXPECT_EQ(expect.second.downLoadInfo.total, real->second.downLoadInfo.total);
815                     EXPECT_EQ(expect.second.downLoadInfo.successCount, real->second.downLoadInfo.successCount);
816                     EXPECT_EQ(expect.second.downLoadInfo.failCount, real->second.downLoadInfo.failCount);
817                     EXPECT_EQ(expect.second.upLoadInfo.batchIndex, real->second.upLoadInfo.batchIndex);
818                     EXPECT_EQ(expect.second.upLoadInfo.total, real->second.upLoadInfo.total);
819                     EXPECT_EQ(expect.second.upLoadInfo.successCount, real->second.upLoadInfo.successCount);
820                     EXPECT_EQ(expect.second.upLoadInfo.failCount, real->second.upLoadInfo.failCount);
821                 }
822             }
823             g_syncIndex++;
824             if (syncProcess.process == FINISHED) {
825                 g_processCondition.notify_one();
826             }
827         };
828     }
829 
CheckAllAssetAfterUpload(int64_t localCount)830     void CheckAllAssetAfterUpload(int64_t localCount)
831     {
832         VBucket extend;
833         extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
834         std::vector<VBucket> data1;
835         g_virtualCloudDb->Query(g_tables[0], extend, data1);
836         for (size_t j = 0; j < data1.size(); ++j) {
837             Type entry;
838             bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive("assert", data1[j], entry);
839             ASSERT_TRUE(isExisted);
840             Asset asset = std::get<Asset>(entry);
841             bool isLocal = j >= (size_t)(localCount / g_arrayHalfSub);
842             Asset baseAsset = isLocal ? g_localAsset : g_cloudAsset;
843             EXPECT_EQ(asset.version, baseAsset.version);
844             EXPECT_EQ(asset.name, baseAsset.name + std::to_string(isLocal ? j - localCount / g_arrayHalfSub : j));
845             EXPECT_EQ(asset.uri, baseAsset.uri);
846             EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
847             EXPECT_EQ(asset.createTime, baseAsset.createTime);
848             EXPECT_EQ(asset.size, baseAsset.size);
849             EXPECT_EQ(asset.hash, baseAsset.hash);
850         }
851 
852         std::vector<VBucket> data2;
853         extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
854         g_virtualCloudDb->Query(g_tables[1], extend, data2);
855         for (size_t j = 0; j < data2.size(); ++j) {
856             Type entry;
857             bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive("asserts", data2[j], entry);
858             ASSERT_TRUE(isExisted);
859             Assets assets = std::get<Assets>(entry);
860             Asset baseAsset = j >= (size_t)(localCount / g_arrayHalfSub) ? g_localAsset : g_cloudAsset;
861             int index = static_cast<int>(j);
862             for (const auto &asset: assets) {
863                 EXPECT_EQ(asset.version, baseAsset.version);
864                 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(index++));
865                 EXPECT_EQ(asset.uri, baseAsset.uri);
866                 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
867                 EXPECT_EQ(asset.createTime, baseAsset.createTime);
868                 EXPECT_EQ(asset.size, baseAsset.size);
869                 EXPECT_EQ(asset.hash, baseAsset.hash);
870             }
871         }
872     }
873 
CheckAssetsAfterDownload(sqlite3 * & db,int64_t localCount)874     void CheckAssetsAfterDownload(sqlite3 *&db, int64_t localCount)
875     {
876         string queryDownload = "select asserts from " + g_tables[1] + " where rowid in (";
877         for (int64_t i = 0; i < localCount; ++i) {
878             queryDownload +=  "'" + std::to_string(i) + "',";
879         }
880         queryDownload.pop_back();
881         queryDownload += ");";
882         sqlite3_stmt *stmt = nullptr;
883         ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
884         int index = 0;
885         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
886             std::vector<uint8_t> blobValue;
887             ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
888             Assets assets;
889             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets), E_OK);
890             bool isLocal = index >= localCount / g_arrayHalfSub;
891             Asset baseAsset = isLocal ? g_localAsset : g_cloudAsset;
892             int nameIndex = index;
893             for (const auto &asset: assets) {
894                 EXPECT_EQ(asset.version, baseAsset.version);
895                 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(nameIndex));
896                 EXPECT_EQ(asset.uri, baseAsset.uri);
897                 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
898                 EXPECT_EQ(asset.createTime, baseAsset.createTime);
899                 EXPECT_EQ(asset.size, baseAsset.size);
900                 EXPECT_EQ(asset.hash, baseAsset.hash);
901                 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
902                 nameIndex++;
903             }
904             index++;
905         }
906         int errCode;
907         SQLiteUtils::ResetStatement(stmt, true, errCode);
908     }
909 
CheckAssetAfterDownload(sqlite3 * & db,int64_t localCount)910     void CheckAssetAfterDownload(sqlite3 *&db, int64_t localCount)
911     {
912         string queryDownload = "select assert from " + g_tables[0] + " where rowid in (";
913         for (int64_t i = 0; i < localCount; ++i) {
914             queryDownload +=  "'" + std::to_string(i) + "',";
915         }
916         queryDownload.pop_back();
917         queryDownload += ");";
918         sqlite3_stmt *stmt = nullptr;
919         ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
920         int index = 0;
921         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
922             std::vector<uint8_t> blobValue;
923             ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
924             Asset asset;
925             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
926             bool isCloud = index >= localCount;
927             Asset baseAsset = isCloud ? g_cloudAsset : g_localAsset;
928             EXPECT_EQ(asset.version, baseAsset.version);
929             EXPECT_EQ(asset.name,
930                 baseAsset.name + std::to_string(isCloud ?  index - localCount / g_arrayHalfSub : index));
931             EXPECT_EQ(asset.uri, baseAsset.uri);
932             EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
933             EXPECT_EQ(asset.createTime, baseAsset.createTime);
934             EXPECT_EQ(asset.size, baseAsset.size);
935             EXPECT_EQ(asset.hash, baseAsset.hash);
936             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
937             index++;
938         }
939         int errCode;
940         SQLiteUtils::ResetStatement(stmt, true, errCode);
941     }
942 
UpdateCloudAssetForDownloadAssetTest003()943     void UpdateCloudAssetForDownloadAssetTest003()
944     {
945         VBucket data;
946         std::vector<uint8_t> photo(1, 'x');
947         data.insert_or_assign("name", "Cloud" + std::to_string(0));
948         data.insert_or_assign("photo", photo);
949         data.insert_or_assign("assert", g_cloudAsset);
950         Timestamp now = TimeHelper::GetSysCurrentTime();
951         VBucket log;
952         std::vector<VBucket> record;
953         std::vector<VBucket> extend;
954         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
955         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(0));
956         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
957         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
958         record.push_back(data);
959         extend.push_back(log);
960         ASSERT_EQ(g_virtualCloudDb->BatchUpdate(g_tableName1, std::move(record), extend), DBStatus::OK);
961     }
962 
CheckAssetForDownloadAssetTest003(sqlite3 * & db)963     void CheckAssetForDownloadAssetTest003(sqlite3 *&db)
964     {
965         string queryDownload = "select assert from " + g_tables[0] + " where rowid = '11';";
966         sqlite3_stmt *stmt = nullptr;
967         ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
968         int index = 0;
969         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
970             std::vector<uint8_t> blobValue;
971             ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
972             Asset asset;
973             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
974             EXPECT_EQ(asset.name, g_cloudAsset.name);
975             EXPECT_EQ(asset.hash, g_cloudAsset.hash);
976             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
977             index++;
978         }
979         int errCode;
980         SQLiteUtils::ResetStatement(stmt, true, errCode);
981     }
982 
CheckAssetAfterDownload2(sqlite3 * & db,int64_t localCount)983     void CheckAssetAfterDownload2(sqlite3 *&db, int64_t localCount)
984     {
985         string queryDownload = "select assert from " + g_tables[0] + " where rowid in (";
986         for (int64_t i = localCount + 1; i < localCount + localCount; ++i) {
987             queryDownload +=  "'" + std::to_string(i) + "',";
988         }
989         queryDownload.pop_back();
990         queryDownload += ");";
991         sqlite3_stmt *stmt = nullptr;
992         ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
993         int index = 0;
994         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
995             std::vector<uint8_t> blobValue;
996             ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
997             Asset asset;
998             ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
999             EXPECT_EQ(asset.version, g_cloudAsset.version);
1000             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::ABNORMAL));
1001             index++;
1002         }
1003         int errCode;
1004         SQLiteUtils::ResetStatement(stmt, true, errCode);
1005     }
1006 
InsertCloudForCloudProcessNotify001(std::vector<VBucket> & record,std::vector<VBucket> & extend)1007     void InsertCloudForCloudProcessNotify001(std::vector<VBucket> &record, std::vector<VBucket> &extend)
1008     {
1009         VBucket data;
1010         std::vector<uint8_t> photo(1, 'v');
1011         data.insert_or_assign("name", "Local" + std::to_string(0));
1012         data.insert_or_assign("height", 166.0); // 166.0 is random double value
1013         data.insert_or_assign("married", false);
1014         data.insert_or_assign("age", 13L);
1015         data.insert_or_assign("photo", photo);
1016         Asset asset = g_cloudAsset;
1017         asset.name = asset.name + std::to_string(0);
1018         data.insert_or_assign("assert", asset);
1019         record.push_back(data);
1020         VBucket log;
1021         Timestamp now = TimeHelper::GetSysCurrentTime();
1022         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1023         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1024         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1025         log.insert_or_assign("#_gid", std::to_string(2)); // 2 is gid
1026         extend.push_back(log);
1027     }
1028 
WaitForSyncFinish(SyncProcess & syncProcess,const int64_t & waitTime)1029     void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime)
1030     {
1031         std::unique_lock<std::mutex> lock(g_processMutex);
1032         bool result = g_processCondition.wait_for(lock, std::chrono::seconds(waitTime), [&syncProcess]() {
1033             return syncProcess.process == FINISHED;
1034         });
1035         ASSERT_EQ(result, true);
1036         LOGD("-------------------sync end--------------");
1037     }
1038 
callSync(const std::vector<std::string> & tableNames,SyncMode mode,DBStatus dbStatus)1039     void callSync(const std::vector<std::string> &tableNames, SyncMode mode, DBStatus dbStatus)
1040     {
1041         g_syncProcess = {};
1042         Query query = Query::Select().FromTable(tableNames);
1043         std::vector<SyncProcess> expectProcess;
1044         CloudSyncStatusCallback callback;
1045         GetCallback(g_syncProcess, callback, expectProcess);
1046         ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, mode, query, callback, g_syncWaitTime), dbStatus);
1047         if (dbStatus == DBStatus::OK) {
1048             WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1049         }
1050     }
1051 
CloseDb()1052     void CloseDb()
1053     {
1054         delete g_observer;
1055         g_virtualCloudDb = nullptr;
1056         if (g_delegate != nullptr) {
1057             EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
1058             g_delegate = nullptr;
1059         }
1060     }
1061 
InitMockAssetLoader(DBStatus & status,int & index)1062     void InitMockAssetLoader(DBStatus &status, int &index)
1063     {
1064         std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1065         ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1066         EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1067             .WillRepeatedly([&status, &index](const std::string &, const std::string &gid, const Type &,
1068                 std::map<std::string, Assets> &assets) {
1069                 LOGD("Download GID:%s", gid.c_str());
1070                 for (auto &item: assets) {
1071                     for (auto &asset: item.second) {
1072                         uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(asset.status);
1073                         EXPECT_TRUE(lowBitStatus == static_cast<uint32_t>(AssetStatus::INSERT) ||
1074                             lowBitStatus == static_cast<uint32_t>(AssetStatus::UPDATE));
1075                         LOGD("asset [name]:%s, [status]:%u, [flag]:%u", asset.name.c_str(), asset.status, asset.flag);
1076                         asset.status = (index++) % 5u + 1; // 6 is AssetStatus type num, include invalid type
1077                     }
1078                 }
1079                 return status;
1080         });
1081     }
1082 
1083     class DistributedDBCloudInterfacesRelationalSyncTest : public testing::Test {
1084     public:
1085         static void SetUpTestCase(void);
1086         static void TearDownTestCase(void);
1087         void SetUp();
1088         void TearDown();
1089     protected:
1090         sqlite3 *db = nullptr;
1091         VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
1092     };
1093 
1094 
SetUpTestCase(void)1095     void DistributedDBCloudInterfacesRelationalSyncTest::SetUpTestCase(void)
1096     {
1097         DistributedDBToolsUnitTest::TestDirInit(g_testDir);
1098         g_storePath = g_testDir + "/" + g_storeID + DB_SUFFIX;
1099         LOGI("The test db is:%s", g_testDir.c_str());
1100         RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
1101     }
1102 
TearDownTestCase(void)1103     void DistributedDBCloudInterfacesRelationalSyncTest::TearDownTestCase(void)
1104     {}
1105 
SetUp(void)1106     void DistributedDBCloudInterfacesRelationalSyncTest::SetUp(void)
1107     {
1108         if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
1109             LOGE("rm test db files error.");
1110         }
1111         DistributedDBToolsUnitTest::PrintTestCaseInfo();
1112         LOGD("Test dir is %s", g_testDir.c_str());
1113         db = RelationalTestUtils::CreateDataBase(g_storePath);
1114         ASSERT_NE(db, nullptr);
1115         CreateUserDBAndTable(db);
1116         g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
1117         ASSERT_NE(g_observer, nullptr);
1118         ASSERT_EQ(g_mgr.OpenStore(g_storePath, g_storeID, RelationalStoreDelegate::Option { .observer = g_observer },
1119             g_delegate), DBStatus::OK);
1120         ASSERT_NE(g_delegate, nullptr);
1121         ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName1, CLOUD_COOPERATION), DBStatus::OK);
1122         ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName2, CLOUD_COOPERATION), DBStatus::OK);
1123         ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName3, CLOUD_COOPERATION), DBStatus::OK);
1124         g_virtualCloudDb = make_shared<VirtualCloudDb>();
1125         g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
1126         g_syncProcess = {};
1127         ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
1128         ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
1129         // sync before setting cloud db schema,it should return SCHEMA_MISMATCH
1130         Query query = Query::Select().FromTable(g_tables);
1131         CloudSyncStatusCallback callback;
1132         ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1133             DBStatus::SCHEMA_MISMATCH);
1134         DataBaseSchema dataBaseSchema;
1135         GetCloudDbSchema(dataBaseSchema);
1136         ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1137         communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1138         ASSERT_TRUE(communicatorAggregator_ != nullptr);
1139         RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1140     }
1141 
TearDown(void)1142     void DistributedDBCloudInterfacesRelationalSyncTest::TearDown(void)
1143     {
1144         EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
1145         if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
1146             LOGE("rm test db files error.");
1147         }
1148         RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1149         communicatorAggregator_ = nullptr;
1150         RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
1151     }
1152 
1153 /**
1154  * @tc.name: CloudSyncTest001
1155  * @tc.desc: Cloud data is older than local data.
1156  * @tc.type: FUNC
1157  * @tc.require:
1158  * @tc.author: bty
1159  */
1160 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest001, TestSize.Level0)
1161 {
1162     int64_t paddingSize = 10;
1163     int64_t cloudCount = 20;
1164     int64_t localCount = cloudCount / g_arrayHalfSub;
1165     ChangedData changedDataForTable1;
1166     ChangedData changedDataForTable2;
1167     changedDataForTable1.tableName = g_tableName1;
1168     changedDataForTable2.tableName = g_tableName2;
1169     changedDataForTable1.field.push_back(std::string("name"));
1170     changedDataForTable2.field.push_back(std::string("id"));
1171     for (int i = 0; i < cloudCount; i++) {
1172         changedDataForTable1.primaryData[ChangeType::OP_INSERT].push_back({"Cloud" + std::to_string(i)});
1173         changedDataForTable2.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 10});
1174     }
1175     g_observer->SetExpectedResult(changedDataForTable1);
1176     g_observer->SetExpectedResult(changedDataForTable2);
1177     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1178     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1179     Query query = Query::Select().FromTable(g_tables);
1180     std::vector<SyncProcess> expectProcess;
1181     InitProcessForTest1(cloudCount, localCount, expectProcess);
1182     CloudSyncStatusCallback callback;
1183     GetCallback(g_syncProcess, callback, expectProcess);
1184     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1185     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1186     EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1187     g_observer->ClearChangedData();
1188     LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:[10 - 20)");
1189     CheckDownloadResult(db, {20L, 10L}); // 20 and 10 means the num of downloads from cloud db by worker1 and worker2
1190     LOGD("expect upload:worker1[primary key]:[local0 - local10), worker2[primary key]:[0 - 10)");
1191     CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1192     CloseDb();
1193 }
1194 
1195 /**
1196  * @tc.name: CloudSyncTest002
1197  * @tc.desc: Local data is older than cloud data.
1198  * @tc.type: FUNC
1199  * @tc.require:
1200  * @tc.author: bty
1201  */
1202 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest002, TestSize.Level0)
1203 {
1204     int64_t localCount = 20;
1205     int64_t cloudCount = 10;
1206     int64_t paddingSize = 100;
1207     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1208     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1209     Query query = Query::Select().FromTable(g_tables);
1210     std::vector<SyncProcess> expectProcess;
1211     InitProcessForTest2(cloudCount, localCount, expectProcess);
1212     CloudSyncStatusCallback callback;
1213     GetCallback(g_syncProcess, callback, expectProcess);
1214     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1215     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1216     LOGD("expect download:worker1[primary key]:[cloud0 - cloud10), worker2[primary key]:[0 - 10)");
1217     CheckDownloadResult(db, {10L, 10L}); // 10 and 10 means the num of downloads from cloud db by worker1 and worker2
1218     LOGD("expect upload:worker1[primary key]:[local0 - local20), worker2[primary key]:[10 - 20)");
1219     CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1220     CloseDb();
1221 }
1222 
1223 /**
1224  * @tc.name: CloudSyncTest003
1225  * @tc.desc: test with update and delete operator
1226  * @tc.type: FUNC
1227  * @tc.require:
1228  * @tc.author: bty
1229  */
1230 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest003, TestSize.Level0)
1231 {
1232     int64_t paddingSize = 10;
1233     int cloudCount = 20;
1234     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1235     InsertUserTableRecord(db, 0, cloudCount, paddingSize, false);
1236     Query query = Query::Select().FromTable(g_tables);
1237     std::vector<SyncProcess> expectProcess;
1238     InitProcessForTest1(cloudCount, cloudCount, expectProcess);
1239     CloudSyncStatusCallback callback;
1240     GetCallback(g_syncProcess, callback, expectProcess);
1241     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1242     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1243     CheckDownloadResult(db, {20L, 0L}); // 20 and 0 means the num of downloads from cloud db by worker1 and worker2
1244     CheckCloudTotalCount({40L, 20L}); // 40 and 20 means the total num of worker1 and worker2 from the cloud db
1245 
1246     int updateCount = 10;
1247     UpdateUserTableRecord(db, 5, updateCount); // 5 is start id to be updated
1248     g_syncProcess = {};
1249     InitProcessForTest1(cloudCount, updateCount, expectProcess);
1250     GetCallback(g_syncProcess, callback, expectProcess);
1251     LOGD("-------------------sync after update--------------");
1252     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1253     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1254 
1255     VBucket extend;
1256     extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
1257     std::vector<VBucket> data1;
1258     g_virtualCloudDb->Query(g_tables[0], extend, data1);
1259     for (int j = 25; j < 35; ++j) { // index[25, 35) in cloud db expected to be updated
1260         EXPECT_EQ(std::get<int64_t>(data1[j]["age"]), 99); // 99 is the updated age field of cloud db
1261     }
1262 
1263     std::vector<VBucket> data2;
1264     g_virtualCloudDb->Query(g_tables[1], extend, data2);
1265     for (int j = 5; j < 15; ++j) { // index[5, 15) in cloud db expected to be updated
1266         EXPECT_EQ(std::get<int64_t>(data2[j]["age"]), 99); // 99 is the updated age field of cloud db
1267     }
1268 
1269     int deleteCount = 3;
1270     DeleteUserTableRecord(db, 0, deleteCount);
1271     g_syncProcess = {};
1272     InitProcessForTest1(updateCount, deleteCount, expectProcess);
1273     GetCallback(g_syncProcess, callback, expectProcess);
1274     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1275     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1276 
1277     CheckCloudTotalCount({37L, 17L}); // 37 and 17 means the total num of worker1 and worker2 from the cloud db
1278     CloseDb();
1279 }
1280 
1281 /**
1282  * @tc.name: CloudSyncTest004
1283  * @tc.desc: Random write of local and cloud data
1284  * @tc.type: FUNC
1285  * @tc.require:
1286  * @tc.author: bty
1287  */
1288 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest004, TestSize.Level0)
1289 {
1290     int64_t paddingSize = 1024 * 8;
1291     vector<thread> threads;
1292     int cloudCount = 1024;
1293     threads.emplace_back(InsertCloudTableRecord, 0, cloudCount, paddingSize, false);
1294     threads.emplace_back(InsertUserTableRecord, std::ref(db), 0, cloudCount, paddingSize, false);
1295     for (auto &thread: threads) {
1296         thread.join();
1297     }
1298     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1299     CloseDb();
1300 }
1301 
1302 /**
1303  * @tc.name: CloudSyncTest005
1304  * @tc.desc: sync with device sync query
1305  * @tc.type: FUNC
1306  * @tc.require:
1307  * @tc.author: bty
1308  */
1309 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest005, TestSize.Level0)
1310 {
1311     Query query = Query::Select().FromTable(g_tables).OrderBy("123", true);
1312     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1313         DBStatus::NOT_SUPPORT);
1314 
1315     query = Query::Select();
1316     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1317         DBStatus::INVALID_ARGS);
1318     CloseDb();
1319 }
1320 
1321 /**
1322  * @tc.name: CloudSyncTest006
1323  * @tc.desc: Firstly set a correct schema, and then null or invalid schema
1324  * @tc.type: FUNC
1325  * @tc.require:
1326  * @tc.author: wanyi
1327  */
1328 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest006, TestSize.Level0)
1329 {
1330     int64_t paddingSize = 10;
1331     int cloudCount = 20;
1332     ChangedData changedDataForTable1;
1333     ChangedData changedDataForTable2;
1334     changedDataForTable1.tableName = g_tableName1;
1335     changedDataForTable2.tableName = g_tableName2;
1336     changedDataForTable1.field.push_back(std::string("name"));
1337     changedDataForTable2.field.push_back(std::string("id"));
1338     for (int i = 0; i < cloudCount; i++) {
1339         changedDataForTable1.primaryData[ChangeType::OP_INSERT].push_back({"Cloud" + std::to_string(i)});
1340         changedDataForTable2.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 10});
1341     }
1342     g_observer->SetExpectedResult(changedDataForTable1);
1343     g_observer->SetExpectedResult(changedDataForTable2);
1344     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1345     InsertUserTableRecord(db, 0, cloudCount / g_arrayHalfSub, paddingSize, false);
1346     // Set correct cloudDbSchema (correct version)
1347     DataBaseSchema correctSchema;
1348     GetCloudDbSchema(correctSchema);
1349     ASSERT_EQ(g_delegate->SetCloudDbSchema(correctSchema), DBStatus::OK);
1350     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1351     EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1352     g_observer->ClearChangedData();
1353     LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:[10 - 20)");
1354     CheckDownloadResult(db, {20L, 10L}); // 20 and 10 means the num of downloads from cloud db by worker1 and worker2
1355     LOGD("expect upload:worker1[primary key]:[local0 - local10), worker2[primary key]:[0 - 10)");
1356     CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1357 
1358     // Reset cloudDbSchema (invalid version - null)
1359     DataBaseSchema nullSchema;
1360     ASSERT_EQ(g_delegate->SetCloudDbSchema(nullSchema), DBStatus::OK);
1361     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::SCHEMA_MISMATCH);
1362 
1363     // Reset cloudDbSchema (invalid version - field mismatch)
1364     DataBaseSchema invalidSchema;
1365     GetInvalidCloudDbSchema(invalidSchema);
1366     ASSERT_EQ(g_delegate->SetCloudDbSchema(invalidSchema), DBStatus::OK);
1367     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::SCHEMA_MISMATCH);
1368     CloseDb();
1369 }
1370 
1371 /**
1372  * @tc.name: CloudSyncTest007
1373  * @tc.desc: Check the asset types after sync
1374  * @tc.type: FUNC
1375  * @tc.require:
1376  * @tc.author: bty
1377  */
1378 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest007, TestSize.Level1)
1379 {
1380     int64_t paddingSize = 100;
1381     int localCount = 20;
1382     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1383     InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
1384     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1385 
1386     CheckAssetAfterDownload(db, localCount);
1387     CheckAllAssetAfterUpload(localCount);
1388     CheckAssetsAfterDownload(db, localCount);
1389     CloseDb();
1390 }
1391 
1392 /*
1393  * @tc.name: CloudSyncTest008
1394  * @tc.desc: Test sync with invalid param
1395  * @tc.type: FUNC
1396  * @tc.require:
1397  * @tc.author: zhangqiquan
1398  */
1399 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest008, TestSize.Level0)
1400 {
1401     ASSERT_EQ(g_delegate->SetCloudDB(nullptr), OK);   // it will not happen because cloudb has been set in SetUp()
1402     Query query = Query::Select().FromTable({g_tableName3});
1403     // clouddb has been set in SetUp() and it's not null
1404     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime), OK);
1405     CloseDb();
1406 }
1407 
1408 /**
1409  * @tc.name: CloudSyncTest009
1410  * @tc.desc: The second time there was no data change and sync was called.
1411  * @tc.type: FUNC
1412  * @tc.require:
1413  * @tc.author: bty
1414  */
1415 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest009, TestSize.Level0)
1416 {
1417     int64_t paddingSize = 10;
1418     int cloudCount = 20;
1419     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1420     InsertUserTableRecord(db, 0, cloudCount, paddingSize, false);
1421     Query query = Query::Select().FromTable(g_tables);
1422     std::vector<SyncProcess> expectProcess;
1423     InitProcessForTest1(cloudCount, cloudCount, expectProcess);
1424     CloudSyncStatusCallback callback;
1425     GetCallback(g_syncProcess, callback, expectProcess);
1426     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1427     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1428     LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:none");
1429     CheckDownloadResult(db, {20L, 0L}); // 20 and 0 means the num of downloads from cloud db by worker1 and worker2
1430     LOGD("expect upload:worker1[primary key]:[local0 - local20), worker2[primary key]:[0 - 20)");
1431     CheckCloudTotalCount({40L, 20L}); // 40 and 20 means the total num of worker1 and worker2 from the cloud db
1432 
1433     g_syncProcess = {};
1434     InitProcessForTest9(cloudCount, 0, expectProcess);
1435     GetCallback(g_syncProcess, callback, expectProcess);
1436     LOGD("--------------the second sync-------------");
1437     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1438     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1439     CloseDb();
1440 }
1441 
1442 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest0010, TestSize.Level0)
1443 {
1444     int64_t paddingSize = 10;
1445     int cloudCount = 20;
1446     int localCount = 10;
1447     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1448     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1449     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1450 
1451     int rowid = 27;
1452     UpdateAssetForTest(db, AssetOpType::NO_CHANGE, cloudCount, rowid++);
1453     UpdateAssetForTest(db, AssetOpType::INSERT, cloudCount, rowid++);
1454     UpdateAssetForTest(db, AssetOpType::DELETE, cloudCount, rowid++);
1455     UpdateAssetForTest(db, AssetOpType::UPDATE, cloudCount, rowid++);
1456 
1457     int id = 0;
1458     UpdateAssetsForTest(db, AssetOpType::NO_CHANGE, id++);
1459     UpdateAssetsForTest(db, AssetOpType::INSERT, id++);
1460     UpdateAssetsForTest(db, AssetOpType::DELETE, id++);
1461     UpdateAssetsForTest(db, AssetOpType::UPDATE, id++);
1462 
1463     LOGD("--------------the second sync-------------");
1464     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1465 
1466     CheckFillAssetForTest10(db);
1467     CheckFillAssetsForTest10(db);
1468     CloseDb();
1469 }
1470 
1471 /**
1472  * @tc.name: CloudSyncTest011
1473  * @tc.desc: Test sync with same table name.
1474  * @tc.type: FUNC
1475  * @tc.require:
1476  * @tc.author: zhangqiquan
1477  */
1478 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest011, TestSize.Level0)
1479 {
1480     Query query = Query::Select().FromTable({g_tableName1, g_tableName1});
1481     bool syncFinish = false;
1482     std::mutex syncMutex;
1483     std::condition_variable cv;
1484     std::atomic<int> callCount = 0;
1485     CloudSyncStatusCallback callback = [&callCount, &cv, &syncFinish, &syncMutex](
__anon239839ae0602( const std::map<std::string, SyncProcess> &onProcess) 1486         const std::map<std::string, SyncProcess> &onProcess) {
1487         ASSERT_NE(onProcess.find(DEVICE_CLOUD), onProcess.end());
1488         SyncProcess syncProcess = onProcess.at(DEVICE_CLOUD);
1489         callCount++;
1490         if (syncProcess.process == FINISHED) {
1491             std::lock_guard<std::mutex> autoLock(syncMutex);
1492             syncFinish = true;
1493         }
1494         cv.notify_all();
1495     };
1496     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1497     std::unique_lock<std::mutex> uniqueLock(syncMutex);
__anon239839ae0702() 1498     cv.wait(uniqueLock, [&syncFinish]() {
1499         return syncFinish;
1500     });
1501     RuntimeContext::GetInstance()->StopTaskPool();
1502     EXPECT_EQ(callCount, 2); // 2 is onProcess count
1503     CloseDb();
1504 }
1505 
1506 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest012, TestSize.Level0)
1507 {
1508     int64_t localCount = 20;
1509     int64_t cloudCount = 10;
1510     int64_t paddingSize = 10;
1511     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1512     InsertUserTableRecord(db, 0, localCount, paddingSize, true);
1513     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1514 
1515     InsertCloudTableRecord(localCount + cloudCount, cloudCount, paddingSize, false);
1516     InsertUserTableRecord(db, localCount + cloudCount, localCount, paddingSize, true);
1517     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1518 
1519     InsertCloudTableRecord(2 * (localCount + cloudCount), cloudCount, paddingSize, false); // 2 is offset
1520     InsertUserTableRecord(db, 2 * (localCount + cloudCount), localCount, paddingSize, false); // 2 is offset
1521     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1522 
1523 
1524     InsertCloudTableRecord(3 * (localCount + cloudCount), cloudCount, paddingSize, true); // 3 is offset
1525     InsertUserTableRecord(db, 3 * (localCount + cloudCount), localCount, paddingSize, true); // 3 is offset
1526     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1527     CloseDb();
1528 }
1529 
1530 /*
1531  * @tc.name: CloudSyncTest013
1532  * @tc.desc: test increment watermark when cloud db query data size is 0
1533  * @tc.type: FUNC
1534  * @tc.require:
1535  * @tc.author: zhuwentao
1536  */
1537 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest013, TestSize.Level0)
1538 {
1539     /**
1540      * @tc.steps: insert some data into cloud db
1541      * @tc.expected: return ok.
1542      */
1543     int64_t paddingSize = 10;
1544     int64_t cloudCount = 10;
1545     SyncProcess syncProcess;
1546     InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1547     /**
1548      * @tc.steps: try to cloud sync
1549      * @tc.expected: return ok.
1550      */
1551     Query query = Query::Select().FromTable(g_tables);
__anon239839ae0802(const std::map<std::string, SyncProcess> &process) 1552     CloudSyncStatusCallback callback = [&syncProcess](const std::map<std::string, SyncProcess> &process) {
1553         LOGI("devices size = %d", process.size());
1554         ASSERT_EQ(process.size(), 1u);
1555         syncProcess = std::move(process.begin()->second);
1556         if (syncProcess.process == FINISHED) {
1557             g_processCondition.notify_one();
1558         }
1559     };
1560     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1561     WaitForSyncFinish(syncProcess, g_syncWaitTime);
1562     uint32_t queryTimes = g_virtualCloudDb->GetQueryTimes(g_tableName1);
1563     /**
1564      * @tc.steps: insert some increment data into cloud db
1565      * @tc.expected: return ok.
1566      */
1567     VBucket data;
1568     Timestamp now = TimeHelper::GetSysCurrentTime();
1569     data.insert_or_assign("name", "Cloud" + std::to_string(0));
1570     data.insert_or_assign("height", 166.0); // 166.0 is random double value
1571     data.insert_or_assign("married", false);
1572     data.insert_or_assign("age", 13L);
1573     VBucket log;
1574     log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1575     log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1576     log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1577     log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1578     log.insert_or_assign(CloudDbConstant::CURSOR_FIELD, "0123");
1579     g_virtualCloudDb->SetIncrementData(g_tableName1, data, log);
1580     syncProcess.process = PREPARED;
1581     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1582     WaitForSyncFinish(syncProcess, g_syncWaitTime);
1583     uint32_t lastQueryTimes = g_virtualCloudDb->GetQueryTimes(g_tableName1);
1584     ASSERT_EQ(lastQueryTimes - queryTimes, 2u);
1585     CloseDb();
1586 }
1587 
TestSyncForStatus(RelationalStoreDelegate * delegate,DBStatus expectStatus)1588 void TestSyncForStatus(RelationalStoreDelegate *delegate, DBStatus expectStatus)
1589 {
1590     std::mutex dataMutex;
1591     std::condition_variable cv;
1592     bool finish = false;
1593     DBStatus res = OK;
1594     CloudSyncStatusCallback callback = [&dataMutex, &cv, &finish, &res](
1595         const std::map<std::string, SyncProcess> &process) {
1596         std::map<std::string, SyncProcess> syncProcess;
1597         {
1598             std::lock_guard<std::mutex> autoLock(dataMutex);
1599             syncProcess = process;
1600             if (syncProcess[DEVICE_CLOUD].process == FINISHED) {
1601                 finish = true;
1602             }
1603             res = syncProcess[DEVICE_CLOUD].errCode;
1604         }
1605         cv.notify_one();
1606     };
1607     Query query = Query::Select().FromTable({g_tableName3});
1608     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1609     {
1610         std::unique_lock<std::mutex> uniqueLock(dataMutex);
1611         cv.wait(uniqueLock, [&finish] {
1612             return finish;
1613         });
1614     }
1615     EXPECT_EQ(res, expectStatus);
1616 }
1617 
1618 /*
1619  * @tc.name: CloudSyncTest015
1620  * @tc.desc: Test sync with cloud error
1621  * @tc.type: FUNC
1622  * @tc.require:
1623  * @tc.author: zhangqiquan
1624  */
1625 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest015, TestSize.Level0)
1626 {
1627     g_virtualCloudDb->SetActionStatus(CLOUD_NETWORK_ERROR);
1628     TestSyncForStatus(g_delegate, CLOUD_NETWORK_ERROR);
1629 
1630     g_virtualCloudDb->SetActionStatus(CLOUD_SYNC_UNSET);
1631     TestSyncForStatus(g_delegate, CLOUD_SYNC_UNSET);
1632 
1633     g_virtualCloudDb->SetActionStatus(CLOUD_FULL_RECORDS);
1634     TestSyncForStatus(g_delegate, CLOUD_FULL_RECORDS);
1635 
1636     g_virtualCloudDb->SetActionStatus(CLOUD_LOCK_ERROR);
1637     TestSyncForStatus(g_delegate, CLOUD_LOCK_ERROR);
1638 
1639     g_virtualCloudDb->SetActionStatus(DB_ERROR);
1640     TestSyncForStatus(g_delegate, CLOUD_ERROR);
1641 
1642     g_virtualCloudDb->SetActionStatus(OK);
1643     CloseDb();
1644 }
1645 
1646 /*
1647  * @tc.name: CloudSyncTest014
1648  * @tc.desc: Test sync with s4
1649  * @tc.type: FUNC
1650  * @tc.require:
1651  * @tc.author: zhangqiquan
1652  */
1653 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest014, TestSize.Level0)
1654 {
1655     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
1656     RuntimeConfig::SetProcessSystemAPIAdapter(adapter);
1657 
1658     // sync failed because get security option failed
__anon239839ae0b02(const std::string&, SecurityOption &option) 1659     adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1660         option.securityLabel = S0;
1661         return DB_ERROR;
1662     });
1663     Query query = Query::Select().FromTable({g_tableName3});
1664     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1665         SECURITY_OPTION_CHECK_ERROR);
1666 
1667     // sync failed because get S4
__anon239839ae0c02(const std::string&, SecurityOption &option) 1668     adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1669         option.securityLabel = S4;
1670         return NOT_SUPPORT;
1671     });
1672     Query invalidQuery = Query::Select().FromTable({g_tableName3}).PrefixKey({'k'});
1673     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, invalidQuery, nullptr, g_syncWaitTime),
1674         NOT_SUPPORT);
1675 
1676     // sync failed because get S4
__anon239839ae0d02(const std::string&, SecurityOption &option) 1677     adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1678         option.securityLabel = S4;
1679         return OK;
1680     });
1681     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1682         SECURITY_OPTION_CHECK_ERROR);
1683 
1684     // sync failed because S4 has been cached
__anon239839ae0e02(const std::string&, SecurityOption &option) 1685     adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1686         option.securityLabel = S0;
1687         return OK;
1688     });
1689     EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1690         SECURITY_OPTION_CHECK_ERROR);
1691     RuntimeConfig::SetProcessSystemAPIAdapter(nullptr);
1692     CloseDb();
1693 }
1694 
1695 /*
1696  * @tc.name: CloudSyncTest016
1697  * @tc.desc: Test sync when push before merge
1698  * @tc.type: FUNC
1699  * @tc.require:
1700  * @tc.author: chenchaohao
1701  */
1702 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest016, TestSize.Level0)
1703 {
1704     int64_t localCount = 10;
1705     int64_t paddingSize = 10;
1706     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1707     callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1708     CheckCloudTotalCount({10L, 10L});
1709     UpdateUserTableRecord(db, 0, localCount);
1710     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1711 
1712     VBucket extend;
1713     extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
1714     std::vector<VBucket> data1;
1715     g_virtualCloudDb->Query(g_tables[0], extend, data1);
1716     for (int i = 0; i < 10; ++i) { // index[0, 10) in cloud db expected to be updated
1717         EXPECT_EQ(std::get<int64_t>(data1[i]["age"]), 99); // 99 is the updated age field of cloud db
1718     }
1719 
1720     std::vector<VBucket> data2;
1721     g_virtualCloudDb->Query(g_tables[1], extend, data2);
1722     for (int i = 0; i < 10; ++i) { // index[0, 10) in cloud db expected to be updated
1723         EXPECT_EQ(std::get<int64_t>(data2[i]["age"]), 99); // 99 is the updated age field of cloud db
1724     }
1725 
1726     CloseDb();
1727 }
1728 
1729 /*
1730  * @tc.name: CloudSyncTest017
1731  * @tc.desc: Test sync to push when local data deleted and not upload to cloud
1732  * @tc.type: FUNC
1733  * @tc.require:
1734  * @tc.author: wangxiangdong
1735  */
1736 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest017, TestSize.Level0)
1737 {
1738     /**
1739      * @tc.steps: step1. make data: 20 records on local, 20 records on cloud
1740      */
1741     int64_t localCount = 20;
1742     int64_t paddingSize = 20;
1743     InsertCloudTableRecord(0, localCount, paddingSize, true);
1744     InsertUserTableRecord(db, 0, localCount, paddingSize, true);
1745     localCount = 10;
1746     /**
1747      * @tc.steps: step2. delete 10 local record before sync
1748      */
1749     DeleteUserTableRecord(db, 0, localCount);
1750     callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1751     /**
1752      * @tc.steps: step3. check local and clod num
1753      */
1754     CheckCloudTotalCount({30L, 20L});
1755     std::string sql = "select count(*) from " + DBCommon::GetLogTableName(g_tables[0]) +
1756         " where data_key=-1 and cloud_gid='';";
1757     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), QueryCountCallback,
1758         reinterpret_cast<void *>(10), nullptr), SQLITE_OK);
1759     CloseDb();
1760 }
1761 
1762 /*
1763  * @tc.name: DataNotifier001
1764  * @tc.desc: Notify data without primary key
1765  * @tc.type: FUNC
1766  * @tc.require:
1767  * @tc.author: wanyi
1768  */
1769 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DataNotifier001, TestSize.Level0)
1770 {
1771     int64_t paddingSize = 10;
1772     int localCount = 20;
1773     InsertRecordWithoutPk2LocalAndCloud(db, 0, localCount, paddingSize);
1774     callSync({g_tableName3}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1775     CloseDb();
1776 }
1777 
1778 /**
1779  * @tc.name: CloudSyncAssetTest001
1780  * @tc.desc:
1781  * @tc.type: FUNC
1782  * @tc.require:
1783  * @tc.author: wanyi
1784  */
1785 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest001, TestSize.Level1)
1786 {
1787     int64_t paddingSize = 100;
1788     int localCount = 20;
1789     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1790     InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
1791     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1792 
1793     CheckAssetAfterDownload(db, localCount);
1794     CheckAllAssetAfterUpload(localCount);
1795     CloseDb();
1796 }
1797 
1798 /*
1799  * @tc.name: MannualNotify001
1800  * @tc.desc: Test FLAG_ONLY mode of RemoveDeviceData
1801  * @tc.type: FUNC
1802  * @tc.require:
1803  * @tc.author: huangboxin
1804  */
1805 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, MannualNotify001, TestSize.Level0)
1806 {
1807     int64_t paddingSize = 10;
1808     int localCount = 10;
1809     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1810     Query query = Query::Select().FromTable(g_tables);
1811     std::vector<SyncProcess> expectProcess;
1812     InitProcessForMannualSync1(expectProcess);
1813     CloudSyncStatusCallback callback;
1814     GetCallback(g_syncProcess, callback, expectProcess);
1815     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_FORCE_PULL, query, callback, g_syncWaitTime),
1816         DBStatus::OK);
1817     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1818     CloseDb();
1819 }
1820 
1821 /**
1822  * @tc.name: CloudProcessNotify001
1823  * @tc.desc: Test duplicate cloud records. SYNC_MODE_CLOUD_MERGE
1824  * @tc.type: FUNC
1825  * @tc.require:
1826  * @tc.author: liufuchenxing
1827  */
1828 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudProcessNotify001, TestSize.Level1)
1829 {
1830     /**
1831      * @tc.steps: step1. table work1 and work2 insert 1 record which name is local0, then sync().
1832      * @tc.expected: step 1. table work1 and work2 download result is 0. table work1 and work2 upload 1 record.
1833      */
1834     int64_t paddingSize = 10;
1835     int64_t localCount = 1;
1836     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1837     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1838     EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1839     g_observer->ClearChangedData();
1840     LOGD("expect download:worker1[primary key]:[], worker2[primary key]:[]");
1841     CheckDownloadResult(db, {0L, 0L}); // 0 and 0 means the num of downloads from cloud db by worker1 and worker2
1842     LOGD("expect upload:worker1[primary key]:[local0], worker2[primary key]:[0]");
1843     CheckCloudTotalCount({1L, 1L}); // 1 and 1 means the total num of worker1 and worker2 from the cloud db
1844 
1845     /**
1846      * @tc.steps: step2. reset data
1847      * @tc.expected: step2. return ok.
1848      */
1849     std::this_thread::sleep_for(std::chrono::milliseconds(100));
1850     g_syncProcess = {};
1851     ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
1852 
1853     /**
1854      * @tc.steps: step3. table work1 delete record which gid is 0 and name is local0 on cloud.
1855      * @tc.expected: step3. return ok.
1856      */
1857     VBucket idMap;
1858     idMap.insert_or_assign("#_gid", std::to_string(0));
1859     ASSERT_EQ(g_virtualCloudDb->DeleteByGid(g_tableName1, idMap), DBStatus::OK);
1860 
1861     /**
1862      * @tc.steps: step4. table work1 insert record which gid is 0 and name is local0 on cloud.
1863      * @tc.expected: step4. return ok.
1864      */
1865     std::vector<VBucket> record1;
1866     std::vector<VBucket> extend1;
1867     InsertCloudForCloudProcessNotify001(record1, extend1);
1868     ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(g_tableName1, std::move(record1), extend1), DBStatus::OK);
1869 
1870     /**
1871      * @tc.steps: step5. sync() and check local data.
1872      * @tc.expected: step5. return ok.
1873      */
1874     ChangedData changedDataForTable1;
1875     changedDataForTable1.tableName = g_tableName1;
1876     changedDataForTable1.field.push_back(std::string("name"));
1877     changedDataForTable1.primaryData[ChangeType::OP_UPDATE].push_back({"Local" + std::to_string(0)});
1878     g_observer->SetExpectedResult(changedDataForTable1);
1879 
1880     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1881     EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1882     g_observer->ClearChangedData();
1883     LOGD("expect download:worker1[primary key]:[Local0], worker2[primary key]:[0]");
1884     // 1 and 1 means the num of downloads from cloud db by worker1 and worker2
1885     CheckDownloadResult(db, {1L, 1L}, "Local");
1886     LOGD("expect upload:worker1[primary key]:[local0], worker2[primary key]:[0]");
1887     CheckCloudTotalCount({1L, 1L}); // 0 and 0 means the total num of worker1 and worker2 from the cloud db
1888 
1889     /**
1890      * @tc.steps: step6. CloseDb().
1891      * @tc.expected: step6. return ok.
1892      */
1893     CloseDb();
1894 }
1895 
1896 /*
1897  * @tc.name: CloudSyncAssetTest002
1898  * @tc.desc:
1899  * @tc.type: FUNC
1900  * @tc.require:
1901  * @tc.author: huangboxin
1902  */
1903 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest002, TestSize.Level0)
1904 {
1905     int64_t paddingSize = 10;
1906     int localCount = 3;
1907     int cloudCount = 3;
1908     InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1909     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1910     callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1911     CloseDb();
1912 }
1913 
1914 /*
1915  * @tc.name: CloudSyncAssetTest003
1916  * @tc.desc:
1917  * @tc.type: FUNC
1918  * @tc.require:
1919  * @tc.author: bty
1920  */
1921 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest003, TestSize.Level0)
1922 {
1923     int64_t paddingSize = 10;
1924     int localCount = 3;
1925     int cloudCount = 3;
1926     InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1927     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1928     Assets assets;
1929     assets.push_back(g_localAsset);
1930     assets.push_back(g_localAsset);
1931     UpdateLocalAssets(db, assets, 1);
1932     Query query = Query::Select().FromTable(g_tables);
1933     std::vector<SyncProcess> expectProcess;
__anon239839ae0f02(const std::map<std::string, SyncProcess> &process) 1934     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
1935         ASSERT_EQ(process.size(), 1u);
1936         g_syncProcess = std::move(process.begin()->second);
1937 
1938         if (g_syncProcess.process == FINISHED) {
1939             g_processCondition.notify_one();
1940         }
1941     };
1942     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1943         DBStatus::OK);
1944     {
1945         std::unique_lock<std::mutex> lock(g_processMutex);
__anon239839ae1002() 1946         g_processCondition.wait(lock, []() {
1947             return g_syncProcess.process == FINISHED;
1948         });
1949         ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1950     }
1951     CloseDb();
1952 }
1953 
1954 /*
1955  * @tc.name: CloudSyncAssetTest004
1956  * @tc.desc:
1957  * @tc.type: FUNC
1958  * @tc.require:
1959  * @tc.author: bty
1960  */
1961 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest004, TestSize.Level0)
1962 {
1963     int64_t paddingSize = 10;
1964     int localCount = 3;
1965     int cloudCount = 3;
1966     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1967     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1968     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1969 
1970     UpdateDiffType(localCount);
1971     g_syncProcess = {};
__anon239839ae1102(const std::map<std::string, SyncProcess> &process) 1972     CloudSyncStatusCallback callback1 = [](const std::map<std::string, SyncProcess> &process) {
1973         ASSERT_EQ(process.size(), 1u);
1974         g_syncProcess = std::move(process.begin()->second);
1975         if (g_syncProcess.process == FINISHED) {
1976             g_processCondition.notify_one();
1977         }
1978     };
1979     Query query = Query::Select().FromTable(g_tables);
1980     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback1, g_syncWaitTime),
1981         DBStatus::OK);
1982     {
1983         std::unique_lock<std::mutex> lock(g_processMutex);
__anon239839ae1202() 1984         g_processCondition.wait(lock, []() {
1985             return g_syncProcess.process == FINISHED;
1986         });
1987         ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1988     }
1989     CheckDiffTypeAsset(db);
1990     CloseDb();
1991 }
1992 
1993 /*
1994  * @tc.name: CloudSyncAssetTest005
1995  * @tc.desc: Test erase all no change Asset
1996  * @tc.type: FUNC
1997  * @tc.require:
1998  * @tc.author: bty
1999  */
2000 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest005, TestSize.Level0)
2001 {
2002     /**
2003      * @tc.steps:step1. Construct local data with asset names and hashes consistent with the cloud
2004      * @tc.expected: step1. return ok.
2005      */
2006     int64_t paddingSize = 10;
2007     int localCount = 3;
2008     int cloudCount = 3;
2009     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2010     Assets assets;
2011     for (int64_t j = 0; j < cloudCount; j++) {
2012         Asset asset = g_cloudAsset;
2013         asset.name = g_cloudAsset.name + std::to_string(j);
2014         assets.push_back(asset);
2015     }
2016     UpdateLocalAssets(db, assets, 0);
2017     std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2018 
2019     /**
2020      * @tc.steps:step2. Construct cloud data
2021      * @tc.expected: step2. return ok.
2022      */
2023     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2024 
2025     /**
2026      * @tc.steps:step3. sync, expect EraseNoChangeAsset to erase all Nochange assets
2027      * @tc.expected: step3. return ok.
2028      */
2029     Query query = Query::Select().FromTable(g_tables);
2030     std::vector<SyncProcess> expectProcess;
__anon239839ae1302(const std::map<std::string, SyncProcess> &process) 2031     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2032         ASSERT_EQ(process.size(), 1u);
2033         g_syncProcess = std::move(process.begin()->second);
2034 
2035         if (g_syncProcess.process == FINISHED) {
2036             g_processCondition.notify_one();
2037         }
2038     };
2039     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2040         DBStatus::OK);
2041     {
2042         std::unique_lock<std::mutex> lock(g_processMutex);
__anon239839ae1402() 2043         g_processCondition.wait(lock, []() {
2044             return g_syncProcess.process == FINISHED;
2045         });
2046         ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2047     }
2048     CloseDb();
2049 }
2050 
2051 /*
2052  * @tc.name: CloudSyncAssetTest006
2053  * @tc.desc: Test upload new data without assets
2054  * @tc.type: FUNC
2055  * @tc.require:
2056  * @tc.author: bty
2057  */
2058 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest006, TestSize.Level0)
2059 {
2060     /**
2061      * @tc.steps:step1. Construct local data with NULL asset and the local count is greater than the cloud
2062      * @tc.expected: step1. return ok.
2063      */
2064     int64_t paddingSize = 10;
2065     int localCount = 6;
2066     int cloudCount = 3;
2067     InsertUserTableRecord(db, 0, localCount, paddingSize, true);
2068     std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2069     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2070 
2071     /**
2072      * @tc.steps:step2. sync, upload new data without assets,
2073      * @tc.expected: step2. return ok.
2074      */
2075     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2076     CloseDb();
2077 }
2078 
2079 /*
2080  * @tc.name: CloudSyncAssetTest007
2081  * @tc.desc: for expilictly set not-change assets. If an asset is deleted, and its hash is not set to empty, it will be
2082  * regarded as NO-CHANGE, rather than delete
2083  * @tc.type: FUNC
2084  * @tc.require:
2085  * @tc.author: wanyi
2086  */
2087 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest007, TestSize.Level0)
2088 {
2089     /**
2090      * @tc.steps:step1. local asset contain an asset which has a corresponding asset in cloud
2091      * @tc.expected: step1. return ok.
2092      */
2093     int64_t paddingSize = 10;
2094     int localCount = 1;
2095     int cloudCount = 1;
2096     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2097     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2098     /**
2099      * @tc.steps:step2. local asset is set to delete, but hash is not set to empty
2100      * @tc.expected: step2. return ok.
2101      */
2102     Assets assets;
2103     for (int64_t j = 0; j < cloudCount; j++) {
2104         Asset asset = g_cloudAsset;
2105         asset.name = g_cloudAsset.name + std::to_string(j);
2106         asset.status = static_cast<uint32_t>(AssetStatus::DELETE);
2107         assets.push_back(asset);
2108     }
2109     UpdateLocalAssets(db, assets, 0);
2110     std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2111     /**
2112      * @tc.steps:step3. Do sync
2113      * @tc.expected: step3. return ok.
2114      */
2115     Query query = Query::Select().FromTable(g_tables);
2116     std::vector<SyncProcess> expectProcess;
__anon239839ae1502(const std::map<std::string, SyncProcess> &process) 2117     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2118         ASSERT_EQ(process.size(), 1u);
2119         g_syncProcess = std::move(process.begin()->second);
2120 
2121         if (g_syncProcess.process == FINISHED) {
2122             g_processCondition.notify_one();
2123         }
2124     };
2125     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2126         DBStatus::OK);
2127     {
2128         std::unique_lock<std::mutex> lock(g_processMutex);
__anon239839ae1602() 2129         g_processCondition.wait(lock, []() {
2130             return g_syncProcess.process == FINISHED;
2131         });
2132         ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2133     }
2134     /**
2135      * @tc.steps:step4. Check result. Cloud db should not contain asset.
2136      * @tc.expected: step4. return ok.
2137      */
2138     CheckAssetForAssetTest006();
2139     CloseDb();
2140 }
2141 
2142 /**
2143  * @tc.name: DownloadAssetTest001
2144  * @tc.desc: Test the sync of different Asset status out of parameters when the download is successful
2145  * @tc.type: FUNC
2146  * @tc.require:
2147  * @tc.author: bty
2148  */
2149 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest001, TestSize.Level0)
2150 {
2151     /**
2152      * @tc.steps:step1. Set different status out of parameters, and the code returns OK
2153      * @tc.expected: step1. return ok.
2154      */
2155     DBStatus expectStatus = DBStatus::OK;
2156     int index = 0;
2157     InitMockAssetLoader(expectStatus, index);
2158 
2159     /**
2160      * @tc.steps:step2. init download data
2161      * @tc.expected: step2. return ok.
2162      */
2163     int64_t paddingSize = 1;
2164     int localCount = 120;
2165     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2166     InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
2167 
2168     /**
2169      * @tc.steps:step3. sync
2170      * @tc.expected: step3. return ok.
2171      */
2172     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2173 
2174     /**
2175      * @tc.steps:step4. Expect all states to be normal
2176      * @tc.expected: step4. return ok.
2177      */
2178     CheckAssetAfterDownload(db, localCount);
2179     CloseDb();
2180 }
2181 
2182 /*
2183  * @tc.name: CloudSyncAssetTest008
2184  * @tc.desc: sync failed with download asset
2185  * @tc.type: FUNC
2186  * @tc.require:
2187  * @tc.author: zhangqiquan
2188  */
2189 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest008, TestSize.Level0)
2190 {
2191     /**
2192      * @tc.steps:step1. prepare asset data
2193      */
2194     int64_t paddingSize = 10;
2195     int localCount = 1;
2196     int cloudCount = 1;
2197     InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2198     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2199     /**
2200      * @tc.steps:step2. set download asset status failed
2201      */
2202     g_virtualAssetLoader->SetDownloadStatus(CLOUD_ASSET_SPACE_INSUFFICIENT);
2203     Query query = Query::Select().FromTable(g_tables);
2204     std::vector<SyncProcess> expectProcess;
__anon239839ae1702(const std::map<std::string, SyncProcess> &process) 2205     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2206         for (const auto &item: process) {
2207             g_syncProcess = item.second;
2208         }
2209         if (g_syncProcess.process == FINISHED) {
2210             g_processCondition.notify_one();
2211         }
2212     };
2213     /**
2214      * @tc.steps:step3. sync and wait sync finished.
2215      * @tc.expected: step3. sync return CLOUD_ASSET_SPACE_INSUFFICIENT.
2216      */
2217     ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2218         DBStatus::OK);
2219     {
2220         std::unique_lock<std::mutex> lock(g_processMutex);
__anon239839ae1802() 2221         g_processCondition.wait(lock, []() {
2222             return g_syncProcess.process == FINISHED;
2223         });
2224         ASSERT_EQ(g_syncProcess.errCode, DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT);
2225     }
2226     /**
2227      * @tc.steps:step4. clear data.
2228      */
2229     g_virtualAssetLoader->SetDownloadStatus(OK);
2230     CloseDb();
2231 }
2232 
2233 /*
2234  * @tc.name: CloudSyncAssetTest009
2235  * @tc.desc:
2236  * @tc.type: FUNC
2237  * @tc.require:
2238  * @tc.author: zhangqiquan
2239  */
2240 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest009, TestSize.Level0)
2241 {
2242     // insert 3 data with asset 3 data without asset into local
2243     // sync them to cloud
2244     int64_t paddingSize = 10;
2245     int localCount = 3;
2246     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2247     InsertUserTableRecord(db, localCount, localCount, paddingSize, true);
2248     callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2249     // update these data and sync again
2250     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2251     InsertUserTableRecord(db, localCount, localCount, paddingSize, true);
2252     callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2253     EXPECT_EQ(g_syncProcess.errCode, DBStatus::OK);
2254     CloseDb();
2255 }
2256 
2257 /**
2258  * @tc.name: DownloadAssetTest002
2259  * @tc.desc: Test the sync of different Asset status out of parameters when the download is failed
2260  * @tc.type: FUNC
2261  * @tc.require:
2262  * @tc.author: bty
2263  */
2264 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest002, TestSize.Level0)
2265 {
2266     /**
2267      * @tc.steps:step1. Set different status out of parameters, and the code returns CLOUD_ERROR
2268      * @tc.expected: step1. return ok.
2269      */
2270     DBStatus expectStatus = DBStatus::CLOUD_ERROR;
2271     int index = 0;
2272     InitMockAssetLoader(expectStatus, index);
2273     int64_t paddingSize = 1;
2274     int localCount = 100;
2275 
2276     /**
2277      * @tc.steps:step2. init download data
2278      * @tc.expected: step2. return ok.
2279      */
2280     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2281     InsertCloudTableRecord(0, localCount, paddingSize, false);
2282 
2283     /**
2284      * @tc.steps:step3. sync
2285      * @tc.expected: step3. return ok.
2286      */
2287     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2288 
2289     /**
2290      * @tc.steps:step4. Those status that are not normal are all be abnormal after sync.
2291      * @tc.expected: step4. return ok.
2292      */
2293     CheckAssetAfterDownload2(db, localCount);
2294     CloseDb();
2295 }
2296 
2297 /**
2298  * @tc.name: DownloadAssetTest003
2299  * @tc.desc: Init different asset name between local and cloud, then sync to test download
2300  * @tc.type: FUNC
2301  * @tc.require:
2302  * @tc.author: bty
2303  */
2304 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest003, TestSize.Level0)
2305 {
2306     /**
2307      * @tc.steps:step1. Init data and sync
2308      * @tc.expected: step1. return ok.
2309      */
2310     int64_t paddingSize = 1;
2311     int localCount = 10;
2312     InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2313     InsertCloudTableRecord(0, localCount, paddingSize, false);
2314     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2315 
2316     /**
2317      * @tc.steps:step2. update cloud Asset where gid = 0
2318      * @tc.expected: step2. return ok.
2319      */
2320     UpdateCloudAssetForDownloadAssetTest003();
2321 
2322     /**
2323      * @tc.steps:step3. sync again
2324      * @tc.expected: step3. return ok.
2325      */
2326     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2327 
2328     /**
2329      * @tc.steps:step4. check asset after download where gid = 0
2330      * @tc.expected: step4. return ok.
2331      */
2332     CheckAssetForDownloadAssetTest003(db);
2333     CloseDb();
2334 }
2335 
2336 /**
2337  * @tc.name: DownloadAssetTest004
2338  * @tc.desc: Test total count, fail count and success count when drop table
2339  * @tc.type: FUNC
2340  * @tc.require:
2341  * @tc.author: liufuchenxing
2342  */
2343 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest004, TestSize.Level0)
2344 {
2345     /**
2346      * @tc.steps:step1. Init data and sync
2347      * @tc.expected: step1. return ok.
2348      */
2349     int64_t paddingSize = 1;
2350     int count = 10;
2351     InsertUserTableRecord(db, 0, count, paddingSize, false);
2352     g_syncProcess = {};
__anon239839ae1902(const std::map<std::string, SyncProcess> &process) 2353     CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2354         for (const auto &item : process) {
2355             g_syncProcess = item.second;
2356         }
2357         if (g_syncProcess.process == FINISHED) {
2358             g_processCondition.notify_one();
2359         }
2360     };
2361     Query query = Query::Select().FromTable(g_tables);
2362     EXPECT_EQ(g_delegate->Sync({ DEVICE_CLOUD }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
2363     WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
2364 
2365     /**
2366      * @tc.steps:step2. drop table work2. sync failed, check total, success and fail count.
2367      * @tc.expected: step2. total = 20, success=0, fail=20
2368      */
2369     g_syncProcess = {};
2370     InsertCloudTableRecord(0, count, paddingSize, false);
2371     EXPECT_EQ(RelationalTestUtils::ExecSql(db, DROP_INTEGER_PRIMARY_KEY_TABLE_SQL), E_OK);
2372     EXPECT_EQ(g_delegate->Sync({ DEVICE_CLOUD }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2373         DBStatus::NOT_FOUND);
2374 
2375     /**
2376      * @tc.steps:step3. close db.
2377      * @tc.expected: step3. close success.
2378      */
2379     CloseDb();
2380 }
2381 
2382 /**
2383  * @tc.name: SchemaTest001
2384  * @tc.desc: Create table with Cloud cooperation mode and do sync
2385  * @tc.type: FUNC
2386  * @tc.require:
2387  * @tc.author: wanyi
2388  */
2389 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, SchemaTest001, TestSize.Level0)
2390 {
2391     /**
2392      * @tc.steps:step1. Create table with Cloud cooperation mode
2393      * @tc.expected: step1. return ok.
2394      */
2395     EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE), SQLITE_OK);
2396     ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName4, CLOUD_COOPERATION), DBStatus::OK);
2397     /**
2398      * @tc.steps:step1. do sync
2399      * @tc.expected: step1. return ok.
2400      */
2401     callSync({g_tableName4}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2402     CloseDb();
2403 }
2404 
2405 /**
2406  * @tc.name: SchemaTest002
2407  * @tc.desc: Create table with DEVICE_COOPERATION mode and do sync
2408  * @tc.type: FUNC
2409  * @tc.require:
2410  * @tc.author: wanyi
2411  */
2412 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, SchemaTest002, TestSize.Level0)
2413 {
2414     /**
2415      * @tc.steps:step1. Create table with DEVICE_COOPERATION mode
2416      * @tc.expected: step1. return ok.
2417      */
2418     EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE), SQLITE_OK);
2419     ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName4, DEVICE_COOPERATION), DBStatus::OK);
2420     /**
2421      * @tc.steps:step1. do sync
2422      * @tc.expected: step1. return NOT_SUPPORT.
2423      */
2424     callSync({g_tableName4}, SYNC_MODE_CLOUD_MERGE, DBStatus::NOT_SUPPORT);
2425     CloseDb();
2426 }
2427 
2428 /**
2429  * @tc.name: CloudCursorTest001
2430  * @tc.desc: Init different asset name between local and cloud, then sync to test download
2431  * @tc.type: FUNC
2432  * @tc.require:
2433  * @tc.author: bty
2434  */
2435 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudCursorTest001, TestSize.Level0)
2436 {
2437     /**
2438      * @tc.steps:step1. Init data and sync
2439      * @tc.expected: step1. return ok.
2440      */
2441     int64_t paddingSize = 1;
2442     int localCount = 10;
2443     InsertUserTableRecord(db, 0, localCount, paddingSize, true);
2444     InsertCloudTableRecord(0, localCount, paddingSize, true);
2445     callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2446 
2447     /**
2448      * @tc.steps:step2. the cursor does not increase during upload, the cursor will increase during download
2449      * although it is unTrackerTable
2450      * @tc.expected: step2. return ok.
2451      */
2452     string sql = "select cursor from " + DBConstant::RELATIONAL_PREFIX + g_tableName1 + "_log";
2453     sqlite3_stmt *stmt = nullptr;
2454     EXPECT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
2455     int64_t index = 0;
2456     while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2457         EXPECT_EQ(static_cast<int64_t>(sqlite3_column_int64(stmt, 0)), ++index);
2458     }
2459     int errCode;
2460     SQLiteUtils::ResetStatement(stmt, true, errCode);
2461     CloseDb();
2462 }
2463 }
2464 #endif // RELATIONAL_STORE
2465