1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include <iostream>
18 #include "cloud/cloud_storage_utils.h"
19 #include "cloud/cloud_db_constant.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "process_system_api_adapter_impl.h"
23 #include "relational_store_instance.h"
24 #include "relational_store_manager.h"
25 #include "runtime_config.h"
26 #include "sqlite_relational_store.h"
27 #include "sqlite_relational_utils.h"
28 #include "store_observer.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33 #include "virtual_communicator_aggregator.h"
34 #include "mock_asset_loader.h"
35
36 using namespace testing::ext;
37 using namespace DistributedDB;
38 using namespace DistributedDBUnitTest;
39 using namespace std;
40
41 namespace {
42 string g_storeID = "Relational_Store_SYNC";
43 const string g_tableName1 = "worker1";
44 const string g_tableName2 = "worker2";
45 const string g_tableName3 = "worker3";
46 const string g_tableName4 = "worker4";
47 const string DEVICE_CLOUD = "cloud_dev";
48 const string DB_SUFFIX = ".db";
49 const int64_t g_syncWaitTime = 60;
50 const int g_arrayHalfSub = 2;
51 int g_syncIndex = 0;
52 string g_testDir;
53 string g_storePath;
54 std::mutex g_processMutex;
55 std::condition_variable g_processCondition;
56 std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
57 std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
58 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
59 RelationalStoreObserverUnitTest *g_observer = nullptr;
60 RelationalStoreDelegate *g_delegate = nullptr;
61 SyncProcess g_syncProcess;
62 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
63 const std::string CREATE_LOCAL_TABLE_SQL =
64 "CREATE TABLE IF NOT EXISTS " + g_tableName1 + "(" \
65 "name TEXT PRIMARY KEY," \
66 "height REAL ," \
67 "married BOOLEAN ," \
68 "photo BLOB NOT NULL," \
69 "assert BLOB," \
70 "age INT);";
71 const std::string INTEGER_PRIMARY_KEY_TABLE_SQL =
72 "CREATE TABLE IF NOT EXISTS " + g_tableName2 + "(" \
73 "id INTEGER PRIMARY KEY," \
74 "name TEXT ," \
75 "height REAL ," \
76 "photo BLOB ," \
77 "asserts BLOB," \
78 "age INT);";
79 const std::string DROP_INTEGER_PRIMARY_KEY_TABLE_SQL = "DROP TABLE " + g_tableName2 + ";";
80 const std::string CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL =
81 "CREATE TABLE IF NOT EXISTS " + g_tableName3 + "(" \
82 "name TEXT," \
83 "height REAL ," \
84 "married BOOLEAN ," \
85 "photo BLOB NOT NULL," \
86 "assert BLOB," \
87 "age INT);";
88 const std::string INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE =
89 "CREATE TABLE IF NOT EXISTS " + g_tableName4 + "(" \
90 "id INTEGER PRIMARY KEY," \
91 "name TEXT ," \
92 "height REAL ," \
93 "photo BLOB ," \
94 "asserts BLOB," \
95 "age INT);";
96 const std::vector<Field> g_cloudFiled1 = {
97 {"Name", TYPE_INDEX<std::string>, true}, {"height", TYPE_INDEX<double>},
98 {"MArried", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
99 {"Assert", TYPE_INDEX<Asset>}, {"age", TYPE_INDEX<int64_t>}
100 };
101 const std::vector<Field> g_invalidCloudFiled1 = {
102 {"name", TYPE_INDEX<std::string>, true}, {"height", TYPE_INDEX<int>},
103 {"married", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
104 {"assert", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
105 };
106 const std::vector<Field> g_cloudFiled2 = {
107 {"id", TYPE_INDEX<int64_t>, true}, {"name", TYPE_INDEX<std::string>},
108 {"height", TYPE_INDEX<double>}, {"photo", TYPE_INDEX<Bytes>},
109 {"asserts", TYPE_INDEX<Assets>}, {"age", TYPE_INDEX<int64_t>}
110 };
111 const std::vector<Field> g_cloudFiledWithOutPrimaryKey3 = {
112 {"name", TYPE_INDEX<std::string>, false, true}, {"height", TYPE_INDEX<double>},
113 {"married", TYPE_INDEX<bool>}, {"photo", TYPE_INDEX<Bytes>, false, false},
114 {"assert", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
115 };
116 const std::vector<std::string> g_tables = {g_tableName1, g_tableName2};
117 const std::vector<std::string> g_tablesPKey = {g_cloudFiled1[0].colName, g_cloudFiled2[0].colName};
118 const std::vector<string> g_prefix = {"Local", ""};
119 const Asset g_localAsset = {
120 .version = 1, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/local/sync",
121 .modifyTime = "123456", .createTime = "", .size = "256", .hash = "ASE"
122 };
123 const Asset g_cloudAsset = {
124 .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
125 .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
126 };
127
CreateUserDBAndTable(sqlite3 * & db)128 void CreateUserDBAndTable(sqlite3 *&db)
129 {
130 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
131 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_LOCAL_TABLE_SQL), SQLITE_OK);
132 EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL), SQLITE_OK);
133 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_LOCAL_TABLE_WITHOUT_PRIMARY_KEY_SQL), SQLITE_OK);
134 }
135
InsertUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)136 void InsertUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull)
137 {
138 std::string photo(photoSize, 'v');
139 int errCode;
140 std::vector<uint8_t> assetBlob;
141 for (int64_t i = begin; i < begin + count; ++i) {
142 Asset asset = g_localAsset;
143 asset.name = asset.name + std::to_string(i);
144 RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
145 string sql = "INSERT OR REPLACE INTO " + g_tableName1
146 + " (name, height, married, photo, assert, age) VALUES ('Local" + std::to_string(i) +
147 "', '175.8', '0', '" + photo + "', ? , '18');";
148 sqlite3_stmt *stmt = nullptr;
149 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
150 if (assetIsNull) {
151 ASSERT_EQ(sqlite3_bind_null(stmt, 1), SQLITE_OK);
152 } else {
153 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
154 }
155 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
156 SQLiteUtils::ResetStatement(stmt, true, errCode);
157 }
158 for (int64_t i = begin; i < begin + count; ++i) {
159 std::vector<Asset> assets;
160 Asset asset = g_localAsset;
161 asset.name = g_localAsset.name + std::to_string(i);
162 assets.push_back(asset);
163 asset.name = g_localAsset.name + std::to_string(i + 1);
164 assets.push_back(asset);
165 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetBlob);
166 string sql = "INSERT OR REPLACE INTO " + g_tableName2
167 + " (id, name, height, photo, asserts, age) VALUES ('" + std::to_string(i) + "', 'Local"
168 + std::to_string(i) + "', '155.10', '"+ photo + "', ? , '21');";
169 sqlite3_stmt *stmt = nullptr;
170 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
171 if (assetIsNull) {
172 ASSERT_EQ(sqlite3_bind_null(stmt, 1), E_OK);
173 } else {
174 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
175 }
176 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
177 SQLiteUtils::ResetStatement(stmt, true, errCode);
178 }
179 LOGD("insert user record worker1[primary key]:[Local%" PRId64 " - Local%" PRId64
180 ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
181 }
182
UpdateUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count)183 void UpdateUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count)
184 {
185 for (size_t i = 0; i < g_tables.size(); i++) {
186 string updateAge = "UPDATE " + g_tables[i] + " SET age = '99' where " + g_tablesPKey[i] + " in (";
187 for (int64_t j = begin; j < begin + count; ++j) {
188 updateAge += "'" + g_prefix[i] + std::to_string(j) + "',";
189 }
190 updateAge.pop_back();
191 updateAge += ");";
192 ASSERT_EQ(RelationalTestUtils::ExecSql(db, updateAge), SQLITE_OK);
193 }
194 LOGD("update local record worker1[primary key]:[local%" PRId64 " - local%" PRId64
195 ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
196 }
197
DeleteUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count)198 void DeleteUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count)
199 {
200 for (size_t i = 0; i < g_tables.size(); i++) {
201 string updateAge = "Delete from " + g_tables[i] + " where " + g_tablesPKey[i] + " in (";
202 for (int64_t j = begin; j < count; ++j) {
203 updateAge += "'" + g_prefix[i] + std::to_string(j) + "',";
204 }
205 updateAge.pop_back();
206 updateAge += ");";
207 ASSERT_EQ(RelationalTestUtils::ExecSql(db, updateAge), SQLITE_OK);
208 }
209 LOGD("delete local record worker1[primary key]:[local%" PRId64 " - local%" PRId64
210 ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
211 }
212
InsertRecordWithoutPk2LocalAndCloud(sqlite3 * & db,int64_t begin,int64_t count,int photoSize)213 void InsertRecordWithoutPk2LocalAndCloud(sqlite3 *&db, int64_t begin, int64_t count, int photoSize)
214 {
215 std::vector<uint8_t> photo(photoSize, 'v');
216 std::string photoLocal(photoSize, 'v');
217 Asset asset = { .version = 1, .name = "Phone" };
218 std::vector<uint8_t> assetBlob;
219 RuntimeContext::GetInstance()->BlobToAsset(assetBlob, asset);
220 std::string assetStr(assetBlob.begin(), assetBlob.end());
221 std::vector<VBucket> record1;
222 std::vector<VBucket> extend1;
223 for (int64_t i = begin; i < count; ++i) {
224 Timestamp now = TimeHelper::GetSysCurrentTime();
225 VBucket data;
226 data.insert_or_assign("name", "Cloud" + std::to_string(i));
227 data.insert_or_assign("height", 166.0); // 166.0 is random double value
228 data.insert_or_assign("married", (bool)0);
229 data.insert_or_assign("photo", photo);
230 data.insert_or_assign("assert", KEY_1);
231 data.insert_or_assign("age", 13L);
232 record1.push_back(data);
233 VBucket log;
234 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
235 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
236 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
237 extend1.push_back(log);
238 std::this_thread::sleep_for(std::chrono::milliseconds(1)); // wait for 1 ms
239 }
240 int errCode = g_virtualCloudDb->BatchInsert(g_tableName3, std::move(record1), extend1);
241 ASSERT_EQ(errCode, DBStatus::OK);
242 for (int64_t i = begin; i < count; ++i) {
243 string sql = "INSERT OR REPLACE INTO " + g_tableName3
244 + " (name, height, married, photo, assert, age) VALUES ('Local" + std::to_string(i) +
245 "', '175.8', '0', '" + photoLocal + "', '" + assetStr + "', '18');";
246 ASSERT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
247 }
248 }
249
InsertCloudTableRecord(int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)250 void InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull)
251 {
252 std::vector<uint8_t> photo(photoSize, 'v');
253 std::vector<VBucket> record1;
254 std::vector<VBucket> extend1;
255 std::vector<VBucket> record2;
256 std::vector<VBucket> extend2;
257 Timestamp now = TimeHelper::GetSysCurrentTime();
258 for (int64_t i = begin; i < begin + count; ++i) {
259 VBucket data;
260 data.insert_or_assign("name", "Cloud" + std::to_string(i));
261 data.insert_or_assign("height", 166.0); // 166.0 is random double value
262 data.insert_or_assign("married", false);
263 data.insert_or_assign("photo", photo);
264 data.insert_or_assign("AGE", 13L);
265 Asset asset = g_cloudAsset;
266 asset.name = asset.name + std::to_string(i);
267 assetIsNull ? data.insert_or_assign("assert", Nil()) : data.insert_or_assign("assert", asset);
268 record1.push_back(data);
269 VBucket log;
270 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
271 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
272 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
273 extend1.push_back(log);
274
275 std::vector<Asset> assets;
276 data.insert_or_assign("id", i);
277 data.insert_or_assign("height", 180.3); // 180.3 is random double value
278 for (int64_t j = i; j <= i + 2; j++) { // 2 extra num
279 asset.name = g_cloudAsset.name + std::to_string(j);
280 assets.push_back(asset);
281 }
282 data.erase("assert");
283 data.erase("married");
284 assetIsNull ? data.insert_or_assign("asserts", Nil()) : data.insert_or_assign("asserts", assets);
285 record2.push_back(data);
286 extend2.push_back(log);
287 }
288 ASSERT_EQ(g_virtualCloudDb->BatchInsert(g_tableName1, std::move(record1), extend1), DBStatus::OK);
289 ASSERT_EQ(g_virtualCloudDb->BatchInsert(g_tableName2, std::move(record2), extend2), DBStatus::OK);
290 LOGD("insert cloud record worker1[primary key]:[cloud%" PRId64 " - cloud%" PRId64
291 ") , worker2[primary key]:[%" PRId64 "- %" PRId64")", begin, count, begin, count);
292 std::this_thread::sleep_for(std::chrono::milliseconds(count));
293 }
294
UpdateAssetForTest(sqlite3 * & db,AssetOpType opType,int64_t cloudCount,int64_t rowid)295 void UpdateAssetForTest(sqlite3 *&db, AssetOpType opType, int64_t cloudCount, int64_t rowid)
296 {
297 string sql = "UPDATE " + g_tables[0] + " SET assert = ? where rowid = '" + std::to_string(rowid) + "';";
298 std::vector<uint8_t> assetBlob;
299 int errCode;
300 Asset asset = g_cloudAsset;
301 asset.name = "Phone" + std::to_string(rowid - cloudCount - 1);
302 if (opType == AssetOpType::UPDATE) {
303 asset.uri = "/data/test";
304 asset.hash = "";
305 } else if (opType == AssetOpType::INSERT) {
306 asset.name = "Test10";
307 }
308 asset.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
309 sqlite3_stmt *stmt = nullptr;
310 RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
311 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
312 if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false) == E_OK) {
313 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
314 }
315 SQLiteUtils::ResetStatement(stmt, true, errCode);
316 }
317
UpdateAssetsForTest(sqlite3 * & db,AssetOpType opType,int64_t rowid)318 void UpdateAssetsForTest(sqlite3 *&db, AssetOpType opType, int64_t rowid)
319 {
320 string sql = "UPDATE " + g_tables[1] + " SET asserts = ? where rowid = '" + std::to_string(rowid) + "';";
321 Asset asset1 = g_localAsset;
322 Asset asset2 = g_localAsset;
323 Assets assets;
324 asset1.name = g_localAsset.name + std::to_string(rowid);
325 asset1.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(AssetOpType::NO_CHANGE));
326 asset2.name = g_localAsset.name + std::to_string(rowid + 1);
327 asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(AssetOpType::NO_CHANGE));
328 if (opType == AssetOpType::UPDATE) {
329 assets.push_back(asset1);
330 asset2.uri = "/data/test";
331 asset2.hash = "";
332 asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
333 assets.push_back(asset2);
334 } else if (opType == AssetOpType::INSERT) {
335 assets.push_back(asset1);
336 assets.push_back(asset2);
337 Asset asset3;
338 asset3.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
339 asset3.name = "Test10";
340 assets.push_back(asset3);
341 } else if (opType == AssetOpType::DELETE) {
342 assets.push_back(asset1);
343 asset2.status = static_cast<uint32_t>(CloudStorageUtils::FlagToStatus(opType));
344 assets.push_back(asset2);
345 } else {
346 assets.push_back(asset1);
347 assets.push_back(asset2);
348 }
349 sqlite3_stmt *stmt = nullptr;
350 std::vector<uint8_t> assetsBlob;
351 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
352 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
353 if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetsBlob, false) == E_OK) {
354 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
355 }
356 int errCode;
357 SQLiteUtils::ResetStatement(stmt, true, errCode);
358 }
359
UpdateLocalAssets(sqlite3 * & db,Assets & assets,int64_t rowid)360 void UpdateLocalAssets(sqlite3 *&db, Assets &assets, int64_t rowid)
361 {
362 string sql = "UPDATE " + g_tables[1] + " SET asserts = ? where rowid = '" + std::to_string(rowid) + "';";
363 std::vector<uint8_t> assetsBlob;
364 int errCode;
365 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
366 sqlite3_stmt *stmt = nullptr;
367 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
368 if (SQLiteUtils::BindBlobToStatement(stmt, 1, assetsBlob, false) == E_OK) {
369 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
370 }
371 SQLiteUtils::ResetStatement(stmt, true, errCode);
372 }
373
UpdateDiffType(int64_t begin)374 void UpdateDiffType(int64_t begin)
375 {
376 std::vector<std::string> hash = {"DEC", "update_", "insert_"};
377 std::vector<std::string> name = {
378 g_cloudAsset.name + std::to_string(0),
379 g_cloudAsset.name + std::to_string(1),
380 g_cloudAsset.name + std::to_string(3) // 3 is insert id
381 };
382 std::vector<VBucket> record;
383 std::vector<VBucket> extend;
384 Assets assets;
385 for (int i = 0; i < 3; i ++) { // 3 is type num
386 Asset asset = g_cloudAsset;
387 asset.name = name[i];
388 asset.hash = hash[i];
389 assets.push_back(asset);
390 }
391 VBucket data;
392 data.insert_or_assign("name", "Cloud" + std::to_string(0));
393 data.insert_or_assign("id", 0L);
394 data.insert_or_assign("asserts", assets);
395 Timestamp now = TimeHelper::GetSysCurrentTime();
396 VBucket log;
397 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
398 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(begin));
399 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
400 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
401 record.push_back(data);
402 extend.push_back(log);
403 ASSERT_EQ(g_virtualCloudDb->BatchUpdate(g_tableName2, std::move(record), extend), DBStatus::OK);
404 }
405
CheckDiffTypeAsset(sqlite3 * & db)406 void CheckDiffTypeAsset(sqlite3 *&db)
407 {
408 std::vector<std::string> names = {
409 g_cloudAsset.name + std::to_string(0),
410 g_cloudAsset.name + std::to_string(1),
411 g_cloudAsset.name + std::to_string(3) // 3 is insert id
412 };
413 std::string sql = "SELECT asserts from " + g_tables[1] + " WHERE rowid = 0;";
414 sqlite3_stmt *stmt = nullptr;
415 int index = 0;
416 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
417 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
418 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
419 Type cloudValue;
420 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
421 std::vector<uint8_t> assetsBlob;
422 Assets assets;
423 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
424 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
425 for (const Asset &asset: assets) {
426 ASSERT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
427 ASSERT_EQ(asset.name, names[index]);
428 LOGE("lyh_test: name: %s", names[index].c_str());
429 index++;
430 }
431 }
432 int errCode;
433 SQLiteUtils::ResetStatement(stmt, true, errCode);
434 }
435
CheckAssetForAssetTest006()436 void CheckAssetForAssetTest006()
437 {
438 VBucket extend;
439 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
440 std::vector<VBucket> data;
441 g_virtualCloudDb->Query(g_tables[1], extend, data);
442 for (size_t j = 0; j < data.size(); ++j) {
443 ASSERT_NE(data[j].find("asserts"), data[j].end());
444 ASSERT_TRUE((data[j]["asserts"]).index() == TYPE_INDEX<Assets>);
445 Assets &assets = std::get<Assets>(data[j]["asserts"]);
446 ASSERT_TRUE(assets.size() > 0);
447 Asset &asset = assets[0];
448 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::DELETE));
449 EXPECT_EQ(asset.flag, static_cast<uint32_t>(AssetOpType::DELETE));
450 }
451 }
452
CheckFillAssetForTest10(sqlite3 * & db)453 void CheckFillAssetForTest10(sqlite3 *&db)
454 {
455 std::string sql = "SELECT assert from " + g_tables[0] + " WHERE rowid in ('27','28','29','30');";
456 sqlite3_stmt *stmt = nullptr;
457 int index = 0;
458 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
459 int suffixId = 6;
460 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
461 if (index == 0 || index == 1 || index == 3) { // 3 is rowid index of 29
462 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
463 Type cloudValue;
464 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Asset>, 0, cloudValue), E_OK);
465 std::vector<uint8_t> assetBlob;
466 Asset asset;
467 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetBlob), E_OK);
468 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(assetBlob, asset), E_OK);
469 ASSERT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
470 if (index == 0) {
471 ASSERT_EQ(asset.name, g_cloudAsset.name + std::to_string(suffixId + index));
472 } else if (index == 1) {
473 ASSERT_EQ(asset.name, "Test10");
474 } else {
475 ASSERT_EQ(asset.name, g_cloudAsset.name + std::to_string(suffixId + index));
476 ASSERT_EQ(asset.uri, "/data/test");
477 }
478 } else {
479 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_NULL);
480 }
481 index++;
482 }
483 int errCode;
484 SQLiteUtils::ResetStatement(stmt, true, errCode);
485 }
486
CheckFillAssetsForTest10(sqlite3 * & db)487 void CheckFillAssetsForTest10(sqlite3 *&db)
488 {
489 std::string sql = "SELECT asserts from " + g_tables[1] + " WHERE rowid in ('0','1','2','3');";
490 sqlite3_stmt *stmt = nullptr;
491 int index = 0;
492 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
493 int insertIndex = 2;
494 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
495 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
496 Type cloudValue;
497 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
498 std::vector<uint8_t> assetsBlob;
499 Assets assets;
500 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
501 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
502 if (index == 0) {
503 ASSERT_EQ(assets.size(), 2u);
504 ASSERT_EQ(assets[0].name, g_localAsset.name + std::to_string(index));
505 ASSERT_EQ(assets[1].name, g_localAsset.name + std::to_string(index + 1));
506 } else if (index == 1) {
507 ASSERT_EQ(assets.size(), 3u);
508 ASSERT_EQ(assets[insertIndex].name, "Test10");
509 ASSERT_EQ(assets[insertIndex].status, static_cast<uint32_t>(AssetStatus::NORMAL));
510 } else if (index == 2) { // 2 is the third element
511 ASSERT_EQ(assets.size(), 1u);
512 ASSERT_EQ(assets[0].name, g_cloudAsset.name + std::to_string(index));
513 } else {
514 ASSERT_EQ(assets.size(), 2u);
515 ASSERT_EQ(assets[1].uri, "/data/test");
516 ASSERT_EQ(assets[1].status, static_cast<uint32_t>(AssetStatus::NORMAL));
517 }
518 index++;
519 }
520 int errCode;
521 SQLiteUtils::ResetStatement(stmt, true, errCode);
522 }
523
QueryCountCallback(void * data,int count,char ** colValue,char ** colName)524 int QueryCountCallback(void *data, int count, char **colValue, char **colName)
525 {
526 if (count != 1) {
527 return 0;
528 }
529 auto expectCount = reinterpret_cast<int64_t>(data);
530 EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal
531 return 0;
532 }
533
CheckDownloadResult(sqlite3 * & db,std::vector<int64_t> expectCounts,std::string keyStr="")534 void CheckDownloadResult(sqlite3 *&db, std::vector<int64_t> expectCounts, std::string keyStr = "Cloud")
535 {
536 for (size_t i = 0; i < g_tables.size(); ++i) {
537 string queryDownload = "select count(*) from " + g_tables[i] + " where name "
538 + " like '" + keyStr + "%'";
539 EXPECT_EQ(sqlite3_exec(db, queryDownload.c_str(), QueryCountCallback,
540 reinterpret_cast<void *>(expectCounts[i]), nullptr), SQLITE_OK);
541 }
542 }
543
CheckCloudTotalCount(std::vector<int64_t> expectCounts)544 void CheckCloudTotalCount(std::vector<int64_t> expectCounts)
545 {
546 VBucket extend;
547 for (size_t i = 0; i < g_tables.size(); ++i) {
548 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
549 int64_t realCount = 0;
550 std::vector<VBucket> data;
551 g_virtualCloudDb->Query(g_tables[i], extend, data);
552 for (size_t j = 0; j < data.size(); ++j) {
553 auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
554 if (entry != data[j].end() && std::get<bool>(entry->second)) {
555 continue;
556 }
557 realCount++;
558 }
559 EXPECT_EQ(realCount, expectCounts[i]); // ExpectCount represents the total amount of cloud data.
560 }
561 }
562
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)563 void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
564 {
565 TableSchema tableSchema1 = {
566 .name = g_tableName1,
567 .sharedTableName = g_tableName1 + "_shared",
568 .fields = g_cloudFiled1
569 };
570 TableSchema tableSchema2 = {
571 .name = g_tableName2,
572 .sharedTableName = g_tableName2 + "_shared",
573 .fields = g_cloudFiled2
574 };
575 TableSchema tableSchemaWithOutPrimaryKey = {
576 .name = g_tableName3,
577 .sharedTableName = g_tableName3 + "_shared",
578 .fields = g_cloudFiledWithOutPrimaryKey3
579 };
580 TableSchema tableSchema4 = {
581 .name = g_tableName4,
582 .sharedTableName = g_tableName4 + "_shared",
583 .fields = g_cloudFiled2
584 };
585 dataBaseSchema.tables.push_back(tableSchema1);
586 dataBaseSchema.tables.push_back(tableSchema2);
587 dataBaseSchema.tables.push_back(tableSchemaWithOutPrimaryKey);
588 dataBaseSchema.tables.push_back(tableSchema4);
589 }
590
591
GetInvalidCloudDbSchema(DataBaseSchema & dataBaseSchema)592 void GetInvalidCloudDbSchema(DataBaseSchema &dataBaseSchema)
593 {
594 TableSchema tableSchema1 = {
595 .name = g_tableName1,
596 .sharedTableName = "",
597 .fields = g_invalidCloudFiled1
598 };
599 TableSchema tableSchema2 = {
600 .name = g_tableName2,
601 .sharedTableName = "",
602 .fields = g_cloudFiled2
603 };
604 dataBaseSchema.tables.push_back(tableSchema1);
605 dataBaseSchema.tables.push_back(tableSchema2);
606 }
607
InitProcessForTest1(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)608 void InitProcessForTest1(const uint32_t &cloudCount, const uint32_t &localCount,
609 std::vector<SyncProcess> &expectProcess)
610 {
611 expectProcess.clear();
612 std::vector<TableProcessInfo> infos;
613 uint32_t index = 1;
614 infos.push_back(TableProcessInfo{
615 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
616 });
617 infos.push_back(TableProcessInfo{
618 PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
619 });
620
621 infos.push_back(TableProcessInfo{
622 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
623 });
624 infos.push_back(TableProcessInfo{
625 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
626 });
627
628 infos.push_back(TableProcessInfo{
629 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
630 });
631 infos.push_back(TableProcessInfo{
632 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
633 });
634
635 infos.push_back(TableProcessInfo{
636 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
637 });
638 infos.push_back(TableProcessInfo{
639 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
640 });
641
642 for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
643 SyncProcess syncProcess;
644 syncProcess.errCode = OK;
645 syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
646 syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
647 syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
648 expectProcess.push_back(syncProcess);
649 }
650 }
651
InitProcessForMannualSync1(std::vector<SyncProcess> & expectProcess)652 void InitProcessForMannualSync1(std::vector<SyncProcess> &expectProcess)
653 {
654 expectProcess.clear();
655 std::vector<TableProcessInfo> infos;
656 // first notify, first table
657 infos.push_back(TableProcessInfo{
658 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
659 });
660 // first notify, second table
661 infos.push_back(TableProcessInfo{
662 PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
663 });
664 // second notify, first table
665 infos.push_back(TableProcessInfo{
666 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
667 });
668 // second notify, second table
669 infos.push_back(TableProcessInfo{
670 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
671 });
672
673 infos.push_back(TableProcessInfo{
674 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
675 });
676 // second notify, second table
677 infos.push_back(TableProcessInfo{
678 FINISHED, {0, 0, 0, 0}, {0, 0, 0, 0}
679 });
680 for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
681 SyncProcess syncProcess;
682 syncProcess.errCode = OK;
683 syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
684 syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
685 syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
686 expectProcess.push_back(syncProcess);
687 }
688 }
689
InitProcessForTest2(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)690 void InitProcessForTest2(const uint32_t &cloudCount, const uint32_t &localCount,
691 std::vector<SyncProcess> &expectProcess)
692 {
693 expectProcess.clear();
694 std::vector<TableProcessInfo> infos;
695 uint32_t index = 1;
696 infos.push_back(TableProcessInfo{
697 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
698 });
699 infos.push_back(TableProcessInfo{
700 PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
701 });
702
703 infos.push_back(TableProcessInfo{
704 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
705 });
706 infos.push_back(TableProcessInfo{
707 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
708 });
709
710 infos.push_back(TableProcessInfo{
711 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
712 });
713 infos.push_back(TableProcessInfo{
714 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
715 });
716
717 infos.push_back(TableProcessInfo{
718 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount, localCount, 0}
719 });
720 infos.push_back(TableProcessInfo{
721 FINISHED, {index, cloudCount, cloudCount, 0}, {index, localCount - cloudCount, localCount - cloudCount, 0}
722 });
723
724 for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
725 SyncProcess syncProcess;
726 syncProcess.errCode = OK;
727 syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
728 syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
729 syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
730 expectProcess.push_back(syncProcess);
731 }
732 }
733
InitProcessForTest9(const uint32_t & cloudCount,const uint32_t & localCount,std::vector<SyncProcess> & expectProcess)734 void InitProcessForTest9(const uint32_t &cloudCount, const uint32_t &localCount,
735 std::vector<SyncProcess> &expectProcess)
736 {
737 expectProcess.clear();
738 std::vector<TableProcessInfo> infos;
739 uint32_t index = 1;
740 infos.push_back(TableProcessInfo{
741 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
742 });
743 infos.push_back(TableProcessInfo{
744 PREPARED, {0, 0, 0, 0}, {0, 0, 0, 0}
745 });
746
747 infos.push_back(TableProcessInfo{
748 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
749 });
750 infos.push_back(TableProcessInfo{
751 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
752 });
753
754 infos.push_back(TableProcessInfo{
755 FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
756 });
757 infos.push_back(TableProcessInfo{
758 PROCESSING, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
759 });
760
761 infos.push_back(TableProcessInfo{
762 FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
763 });
764 infos.push_back(TableProcessInfo{
765 FINISHED, {index, cloudCount, cloudCount, 0}, {0, 0, 0, 0}
766 });
767
768 for (size_t i = 0; i < infos.size() / g_arrayHalfSub; ++i) {
769 SyncProcess syncProcess;
770 syncProcess.errCode = OK;
771 syncProcess.process = i == infos.size() ? FINISHED : PROCESSING;
772 syncProcess.tableProcess.insert_or_assign(g_tables[0], std::move(infos[g_arrayHalfSub * i]));
773 syncProcess.tableProcess.insert_or_assign(g_tables[1], std::move(infos[g_arrayHalfSub * i + 1]));
774 expectProcess.push_back(syncProcess);
775 }
776 }
GetCallback(SyncProcess & syncProcess,CloudSyncStatusCallback & callback,std::vector<SyncProcess> & expectProcess)777 void GetCallback(SyncProcess &syncProcess, CloudSyncStatusCallback &callback,
778 std::vector<SyncProcess> &expectProcess)
779 {
780 g_syncIndex = 0;
781 callback = [&syncProcess, &expectProcess](const std::map<std::string, SyncProcess> &process) {
782 LOGI("devices size = %d", process.size());
783 ASSERT_EQ(process.size(), 1u);
784 syncProcess = std::move(process.begin()->second);
785 ASSERT_EQ(process.begin()->first, DEVICE_CLOUD);
786 ASSERT_NE(syncProcess.tableProcess.empty(), true);
787 LOGI("current sync process status:%d, db status:%d ", syncProcess.process, syncProcess.errCode);
788 std::for_each(g_tables.begin(), g_tables.end(), [&](const auto &item) {
789 auto table1 = syncProcess.tableProcess.find(item);
790 if (table1 != syncProcess.tableProcess.end()) {
791 LOGI("table[%s], table process status:%d, [downloadInfo](batchIndex:%u, total:%u, successCount:%u, "
792 "failCount:%u) [uploadInfo](batchIndex:%u, total:%u, successCount:%u,failCount:%u",
793 item.c_str(), table1->second.process, table1->second.downLoadInfo.batchIndex,
794 table1->second.downLoadInfo.total, table1->second.downLoadInfo.successCount,
795 table1->second.downLoadInfo.failCount, table1->second.upLoadInfo.batchIndex,
796 table1->second.upLoadInfo.total, table1->second.upLoadInfo.successCount,
797 table1->second.upLoadInfo.failCount);
798 }
799 });
800 if (expectProcess.empty()) {
801 if (syncProcess.process == FINISHED) {
802 g_processCondition.notify_one();
803 }
804 return;
805 }
806 ASSERT_LE(static_cast<size_t>(g_syncIndex), expectProcess.size());
807 for (size_t i = 0; i < g_tables.size() && static_cast<size_t>(g_syncIndex) < expectProcess.size(); ++i) {
808 SyncProcess head = expectProcess[g_syncIndex];
809 for (auto &expect : head.tableProcess) {
810 auto real = syncProcess.tableProcess.find(expect.first);
811 ASSERT_NE(real, syncProcess.tableProcess.end());
812 EXPECT_EQ(expect.second.process, real->second.process);
813 EXPECT_EQ(expect.second.downLoadInfo.batchIndex, real->second.downLoadInfo.batchIndex);
814 EXPECT_EQ(expect.second.downLoadInfo.total, real->second.downLoadInfo.total);
815 EXPECT_EQ(expect.second.downLoadInfo.successCount, real->second.downLoadInfo.successCount);
816 EXPECT_EQ(expect.second.downLoadInfo.failCount, real->second.downLoadInfo.failCount);
817 EXPECT_EQ(expect.second.upLoadInfo.batchIndex, real->second.upLoadInfo.batchIndex);
818 EXPECT_EQ(expect.second.upLoadInfo.total, real->second.upLoadInfo.total);
819 EXPECT_EQ(expect.second.upLoadInfo.successCount, real->second.upLoadInfo.successCount);
820 EXPECT_EQ(expect.second.upLoadInfo.failCount, real->second.upLoadInfo.failCount);
821 }
822 }
823 g_syncIndex++;
824 if (syncProcess.process == FINISHED) {
825 g_processCondition.notify_one();
826 }
827 };
828 }
829
CheckAllAssetAfterUpload(int64_t localCount)830 void CheckAllAssetAfterUpload(int64_t localCount)
831 {
832 VBucket extend;
833 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
834 std::vector<VBucket> data1;
835 g_virtualCloudDb->Query(g_tables[0], extend, data1);
836 for (size_t j = 0; j < data1.size(); ++j) {
837 Type entry;
838 bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive("assert", data1[j], entry);
839 ASSERT_TRUE(isExisted);
840 Asset asset = std::get<Asset>(entry);
841 bool isLocal = j >= (size_t)(localCount / g_arrayHalfSub);
842 Asset baseAsset = isLocal ? g_localAsset : g_cloudAsset;
843 EXPECT_EQ(asset.version, baseAsset.version);
844 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(isLocal ? j - localCount / g_arrayHalfSub : j));
845 EXPECT_EQ(asset.uri, baseAsset.uri);
846 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
847 EXPECT_EQ(asset.createTime, baseAsset.createTime);
848 EXPECT_EQ(asset.size, baseAsset.size);
849 EXPECT_EQ(asset.hash, baseAsset.hash);
850 }
851
852 std::vector<VBucket> data2;
853 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
854 g_virtualCloudDb->Query(g_tables[1], extend, data2);
855 for (size_t j = 0; j < data2.size(); ++j) {
856 Type entry;
857 bool isExisted = CloudStorageUtils::GetTypeCaseInsensitive("asserts", data2[j], entry);
858 ASSERT_TRUE(isExisted);
859 Assets assets = std::get<Assets>(entry);
860 Asset baseAsset = j >= (size_t)(localCount / g_arrayHalfSub) ? g_localAsset : g_cloudAsset;
861 int index = static_cast<int>(j);
862 for (const auto &asset: assets) {
863 EXPECT_EQ(asset.version, baseAsset.version);
864 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(index++));
865 EXPECT_EQ(asset.uri, baseAsset.uri);
866 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
867 EXPECT_EQ(asset.createTime, baseAsset.createTime);
868 EXPECT_EQ(asset.size, baseAsset.size);
869 EXPECT_EQ(asset.hash, baseAsset.hash);
870 }
871 }
872 }
873
CheckAssetsAfterDownload(sqlite3 * & db,int64_t localCount)874 void CheckAssetsAfterDownload(sqlite3 *&db, int64_t localCount)
875 {
876 string queryDownload = "select asserts from " + g_tables[1] + " where rowid in (";
877 for (int64_t i = 0; i < localCount; ++i) {
878 queryDownload += "'" + std::to_string(i) + "',";
879 }
880 queryDownload.pop_back();
881 queryDownload += ");";
882 sqlite3_stmt *stmt = nullptr;
883 ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
884 int index = 0;
885 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
886 std::vector<uint8_t> blobValue;
887 ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
888 Assets assets;
889 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(blobValue, assets), E_OK);
890 bool isLocal = index >= localCount / g_arrayHalfSub;
891 Asset baseAsset = isLocal ? g_localAsset : g_cloudAsset;
892 int nameIndex = index;
893 for (const auto &asset: assets) {
894 EXPECT_EQ(asset.version, baseAsset.version);
895 EXPECT_EQ(asset.name, baseAsset.name + std::to_string(nameIndex));
896 EXPECT_EQ(asset.uri, baseAsset.uri);
897 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
898 EXPECT_EQ(asset.createTime, baseAsset.createTime);
899 EXPECT_EQ(asset.size, baseAsset.size);
900 EXPECT_EQ(asset.hash, baseAsset.hash);
901 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
902 nameIndex++;
903 }
904 index++;
905 }
906 int errCode;
907 SQLiteUtils::ResetStatement(stmt, true, errCode);
908 }
909
CheckAssetAfterDownload(sqlite3 * & db,int64_t localCount)910 void CheckAssetAfterDownload(sqlite3 *&db, int64_t localCount)
911 {
912 string queryDownload = "select assert from " + g_tables[0] + " where rowid in (";
913 for (int64_t i = 0; i < localCount; ++i) {
914 queryDownload += "'" + std::to_string(i) + "',";
915 }
916 queryDownload.pop_back();
917 queryDownload += ");";
918 sqlite3_stmt *stmt = nullptr;
919 ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
920 int index = 0;
921 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
922 std::vector<uint8_t> blobValue;
923 ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
924 Asset asset;
925 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
926 bool isCloud = index >= localCount;
927 Asset baseAsset = isCloud ? g_cloudAsset : g_localAsset;
928 EXPECT_EQ(asset.version, baseAsset.version);
929 EXPECT_EQ(asset.name,
930 baseAsset.name + std::to_string(isCloud ? index - localCount / g_arrayHalfSub : index));
931 EXPECT_EQ(asset.uri, baseAsset.uri);
932 EXPECT_EQ(asset.modifyTime, baseAsset.modifyTime);
933 EXPECT_EQ(asset.createTime, baseAsset.createTime);
934 EXPECT_EQ(asset.size, baseAsset.size);
935 EXPECT_EQ(asset.hash, baseAsset.hash);
936 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
937 index++;
938 }
939 int errCode;
940 SQLiteUtils::ResetStatement(stmt, true, errCode);
941 }
942
UpdateCloudAssetForDownloadAssetTest003()943 void UpdateCloudAssetForDownloadAssetTest003()
944 {
945 VBucket data;
946 std::vector<uint8_t> photo(1, 'x');
947 data.insert_or_assign("name", "Cloud" + std::to_string(0));
948 data.insert_or_assign("photo", photo);
949 data.insert_or_assign("assert", g_cloudAsset);
950 Timestamp now = TimeHelper::GetSysCurrentTime();
951 VBucket log;
952 std::vector<VBucket> record;
953 std::vector<VBucket> extend;
954 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
955 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(0));
956 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
957 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
958 record.push_back(data);
959 extend.push_back(log);
960 ASSERT_EQ(g_virtualCloudDb->BatchUpdate(g_tableName1, std::move(record), extend), DBStatus::OK);
961 }
962
CheckAssetForDownloadAssetTest003(sqlite3 * & db)963 void CheckAssetForDownloadAssetTest003(sqlite3 *&db)
964 {
965 string queryDownload = "select assert from " + g_tables[0] + " where rowid = '11';";
966 sqlite3_stmt *stmt = nullptr;
967 ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
968 int index = 0;
969 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
970 std::vector<uint8_t> blobValue;
971 ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
972 Asset asset;
973 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
974 EXPECT_EQ(asset.name, g_cloudAsset.name);
975 EXPECT_EQ(asset.hash, g_cloudAsset.hash);
976 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
977 index++;
978 }
979 int errCode;
980 SQLiteUtils::ResetStatement(stmt, true, errCode);
981 }
982
CheckAssetAfterDownload2(sqlite3 * & db,int64_t localCount)983 void CheckAssetAfterDownload2(sqlite3 *&db, int64_t localCount)
984 {
985 string queryDownload = "select assert from " + g_tables[0] + " where rowid in (";
986 for (int64_t i = localCount + 1; i < localCount + localCount; ++i) {
987 queryDownload += "'" + std::to_string(i) + "',";
988 }
989 queryDownload.pop_back();
990 queryDownload += ");";
991 sqlite3_stmt *stmt = nullptr;
992 ASSERT_EQ(SQLiteUtils::GetStatement(db, queryDownload, stmt), E_OK);
993 int index = 0;
994 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
995 std::vector<uint8_t> blobValue;
996 ASSERT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blobValue), E_OK);
997 Asset asset;
998 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset), E_OK);
999 EXPECT_EQ(asset.version, g_cloudAsset.version);
1000 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::ABNORMAL));
1001 index++;
1002 }
1003 int errCode;
1004 SQLiteUtils::ResetStatement(stmt, true, errCode);
1005 }
1006
InsertCloudForCloudProcessNotify001(std::vector<VBucket> & record,std::vector<VBucket> & extend)1007 void InsertCloudForCloudProcessNotify001(std::vector<VBucket> &record, std::vector<VBucket> &extend)
1008 {
1009 VBucket data;
1010 std::vector<uint8_t> photo(1, 'v');
1011 data.insert_or_assign("name", "Local" + std::to_string(0));
1012 data.insert_or_assign("height", 166.0); // 166.0 is random double value
1013 data.insert_or_assign("married", false);
1014 data.insert_or_assign("age", 13L);
1015 data.insert_or_assign("photo", photo);
1016 Asset asset = g_cloudAsset;
1017 asset.name = asset.name + std::to_string(0);
1018 data.insert_or_assign("assert", asset);
1019 record.push_back(data);
1020 VBucket log;
1021 Timestamp now = TimeHelper::GetSysCurrentTime();
1022 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1023 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1024 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1025 log.insert_or_assign("#_gid", std::to_string(2)); // 2 is gid
1026 extend.push_back(log);
1027 }
1028
WaitForSyncFinish(SyncProcess & syncProcess,const int64_t & waitTime)1029 void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime)
1030 {
1031 std::unique_lock<std::mutex> lock(g_processMutex);
1032 bool result = g_processCondition.wait_for(lock, std::chrono::seconds(waitTime), [&syncProcess]() {
1033 return syncProcess.process == FINISHED;
1034 });
1035 ASSERT_EQ(result, true);
1036 LOGD("-------------------sync end--------------");
1037 }
1038
callSync(const std::vector<std::string> & tableNames,SyncMode mode,DBStatus dbStatus)1039 void callSync(const std::vector<std::string> &tableNames, SyncMode mode, DBStatus dbStatus)
1040 {
1041 g_syncProcess = {};
1042 Query query = Query::Select().FromTable(tableNames);
1043 std::vector<SyncProcess> expectProcess;
1044 CloudSyncStatusCallback callback;
1045 GetCallback(g_syncProcess, callback, expectProcess);
1046 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, mode, query, callback, g_syncWaitTime), dbStatus);
1047 if (dbStatus == DBStatus::OK) {
1048 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1049 }
1050 }
1051
CloseDb()1052 void CloseDb()
1053 {
1054 delete g_observer;
1055 g_virtualCloudDb = nullptr;
1056 if (g_delegate != nullptr) {
1057 EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
1058 g_delegate = nullptr;
1059 }
1060 }
1061
InitMockAssetLoader(DBStatus & status,int & index)1062 void InitMockAssetLoader(DBStatus &status, int &index)
1063 {
1064 std::shared_ptr<MockAssetLoader> assetLoader = make_shared<MockAssetLoader>();
1065 ASSERT_EQ(g_delegate->SetIAssetLoader(assetLoader), DBStatus::OK);
1066 EXPECT_CALL(*assetLoader, Download(testing::_, testing::_, testing::_, testing::_))
1067 .WillRepeatedly([&status, &index](const std::string &, const std::string &gid, const Type &,
1068 std::map<std::string, Assets> &assets) {
1069 LOGD("Download GID:%s", gid.c_str());
1070 for (auto &item: assets) {
1071 for (auto &asset: item.second) {
1072 uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(asset.status);
1073 EXPECT_TRUE(lowBitStatus == static_cast<uint32_t>(AssetStatus::INSERT) ||
1074 lowBitStatus == static_cast<uint32_t>(AssetStatus::UPDATE));
1075 LOGD("asset [name]:%s, [status]:%u, [flag]:%u", asset.name.c_str(), asset.status, asset.flag);
1076 asset.status = (index++) % 5u + 1; // 6 is AssetStatus type num, include invalid type
1077 }
1078 }
1079 return status;
1080 });
1081 }
1082
1083 class DistributedDBCloudInterfacesRelationalSyncTest : public testing::Test {
1084 public:
1085 static void SetUpTestCase(void);
1086 static void TearDownTestCase(void);
1087 void SetUp();
1088 void TearDown();
1089 protected:
1090 sqlite3 *db = nullptr;
1091 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
1092 };
1093
1094
SetUpTestCase(void)1095 void DistributedDBCloudInterfacesRelationalSyncTest::SetUpTestCase(void)
1096 {
1097 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
1098 g_storePath = g_testDir + "/" + g_storeID + DB_SUFFIX;
1099 LOGI("The test db is:%s", g_testDir.c_str());
1100 RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
1101 }
1102
TearDownTestCase(void)1103 void DistributedDBCloudInterfacesRelationalSyncTest::TearDownTestCase(void)
1104 {}
1105
SetUp(void)1106 void DistributedDBCloudInterfacesRelationalSyncTest::SetUp(void)
1107 {
1108 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
1109 LOGE("rm test db files error.");
1110 }
1111 DistributedDBToolsUnitTest::PrintTestCaseInfo();
1112 LOGD("Test dir is %s", g_testDir.c_str());
1113 db = RelationalTestUtils::CreateDataBase(g_storePath);
1114 ASSERT_NE(db, nullptr);
1115 CreateUserDBAndTable(db);
1116 g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
1117 ASSERT_NE(g_observer, nullptr);
1118 ASSERT_EQ(g_mgr.OpenStore(g_storePath, g_storeID, RelationalStoreDelegate::Option { .observer = g_observer },
1119 g_delegate), DBStatus::OK);
1120 ASSERT_NE(g_delegate, nullptr);
1121 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName1, CLOUD_COOPERATION), DBStatus::OK);
1122 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName2, CLOUD_COOPERATION), DBStatus::OK);
1123 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName3, CLOUD_COOPERATION), DBStatus::OK);
1124 g_virtualCloudDb = make_shared<VirtualCloudDb>();
1125 g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
1126 g_syncProcess = {};
1127 ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
1128 ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
1129 // sync before setting cloud db schema,it should return SCHEMA_MISMATCH
1130 Query query = Query::Select().FromTable(g_tables);
1131 CloudSyncStatusCallback callback;
1132 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1133 DBStatus::SCHEMA_MISMATCH);
1134 DataBaseSchema dataBaseSchema;
1135 GetCloudDbSchema(dataBaseSchema);
1136 ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
1137 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
1138 ASSERT_TRUE(communicatorAggregator_ != nullptr);
1139 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
1140 }
1141
TearDown(void)1142 void DistributedDBCloudInterfacesRelationalSyncTest::TearDown(void)
1143 {
1144 EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
1145 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
1146 LOGE("rm test db files error.");
1147 }
1148 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1149 communicatorAggregator_ = nullptr;
1150 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
1151 }
1152
1153 /**
1154 * @tc.name: CloudSyncTest001
1155 * @tc.desc: Cloud data is older than local data.
1156 * @tc.type: FUNC
1157 * @tc.require:
1158 * @tc.author: bty
1159 */
1160 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest001, TestSize.Level0)
1161 {
1162 int64_t paddingSize = 10;
1163 int64_t cloudCount = 20;
1164 int64_t localCount = cloudCount / g_arrayHalfSub;
1165 ChangedData changedDataForTable1;
1166 ChangedData changedDataForTable2;
1167 changedDataForTable1.tableName = g_tableName1;
1168 changedDataForTable2.tableName = g_tableName2;
1169 changedDataForTable1.field.push_back(std::string("name"));
1170 changedDataForTable2.field.push_back(std::string("id"));
1171 for (int i = 0; i < cloudCount; i++) {
1172 changedDataForTable1.primaryData[ChangeType::OP_INSERT].push_back({"Cloud" + std::to_string(i)});
1173 changedDataForTable2.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 10});
1174 }
1175 g_observer->SetExpectedResult(changedDataForTable1);
1176 g_observer->SetExpectedResult(changedDataForTable2);
1177 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1178 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1179 Query query = Query::Select().FromTable(g_tables);
1180 std::vector<SyncProcess> expectProcess;
1181 InitProcessForTest1(cloudCount, localCount, expectProcess);
1182 CloudSyncStatusCallback callback;
1183 GetCallback(g_syncProcess, callback, expectProcess);
1184 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1185 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1186 EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1187 g_observer->ClearChangedData();
1188 LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:[10 - 20)");
1189 CheckDownloadResult(db, {20L, 10L}); // 20 and 10 means the num of downloads from cloud db by worker1 and worker2
1190 LOGD("expect upload:worker1[primary key]:[local0 - local10), worker2[primary key]:[0 - 10)");
1191 CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1192 CloseDb();
1193 }
1194
1195 /**
1196 * @tc.name: CloudSyncTest002
1197 * @tc.desc: Local data is older than cloud data.
1198 * @tc.type: FUNC
1199 * @tc.require:
1200 * @tc.author: bty
1201 */
1202 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest002, TestSize.Level0)
1203 {
1204 int64_t localCount = 20;
1205 int64_t cloudCount = 10;
1206 int64_t paddingSize = 100;
1207 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1208 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1209 Query query = Query::Select().FromTable(g_tables);
1210 std::vector<SyncProcess> expectProcess;
1211 InitProcessForTest2(cloudCount, localCount, expectProcess);
1212 CloudSyncStatusCallback callback;
1213 GetCallback(g_syncProcess, callback, expectProcess);
1214 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1215 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1216 LOGD("expect download:worker1[primary key]:[cloud0 - cloud10), worker2[primary key]:[0 - 10)");
1217 CheckDownloadResult(db, {10L, 10L}); // 10 and 10 means the num of downloads from cloud db by worker1 and worker2
1218 LOGD("expect upload:worker1[primary key]:[local0 - local20), worker2[primary key]:[10 - 20)");
1219 CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1220 CloseDb();
1221 }
1222
1223 /**
1224 * @tc.name: CloudSyncTest003
1225 * @tc.desc: test with update and delete operator
1226 * @tc.type: FUNC
1227 * @tc.require:
1228 * @tc.author: bty
1229 */
1230 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest003, TestSize.Level0)
1231 {
1232 int64_t paddingSize = 10;
1233 int cloudCount = 20;
1234 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1235 InsertUserTableRecord(db, 0, cloudCount, paddingSize, false);
1236 Query query = Query::Select().FromTable(g_tables);
1237 std::vector<SyncProcess> expectProcess;
1238 InitProcessForTest1(cloudCount, cloudCount, expectProcess);
1239 CloudSyncStatusCallback callback;
1240 GetCallback(g_syncProcess, callback, expectProcess);
1241 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1242 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1243 CheckDownloadResult(db, {20L, 0L}); // 20 and 0 means the num of downloads from cloud db by worker1 and worker2
1244 CheckCloudTotalCount({40L, 20L}); // 40 and 20 means the total num of worker1 and worker2 from the cloud db
1245
1246 int updateCount = 10;
1247 UpdateUserTableRecord(db, 5, updateCount); // 5 is start id to be updated
1248 g_syncProcess = {};
1249 InitProcessForTest1(cloudCount, updateCount, expectProcess);
1250 GetCallback(g_syncProcess, callback, expectProcess);
1251 LOGD("-------------------sync after update--------------");
1252 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1253 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1254
1255 VBucket extend;
1256 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
1257 std::vector<VBucket> data1;
1258 g_virtualCloudDb->Query(g_tables[0], extend, data1);
1259 for (int j = 25; j < 35; ++j) { // index[25, 35) in cloud db expected to be updated
1260 EXPECT_EQ(std::get<int64_t>(data1[j]["age"]), 99); // 99 is the updated age field of cloud db
1261 }
1262
1263 std::vector<VBucket> data2;
1264 g_virtualCloudDb->Query(g_tables[1], extend, data2);
1265 for (int j = 5; j < 15; ++j) { // index[5, 15) in cloud db expected to be updated
1266 EXPECT_EQ(std::get<int64_t>(data2[j]["age"]), 99); // 99 is the updated age field of cloud db
1267 }
1268
1269 int deleteCount = 3;
1270 DeleteUserTableRecord(db, 0, deleteCount);
1271 g_syncProcess = {};
1272 InitProcessForTest1(updateCount, deleteCount, expectProcess);
1273 GetCallback(g_syncProcess, callback, expectProcess);
1274 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1275 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1276
1277 CheckCloudTotalCount({37L, 17L}); // 37 and 17 means the total num of worker1 and worker2 from the cloud db
1278 CloseDb();
1279 }
1280
1281 /**
1282 * @tc.name: CloudSyncTest004
1283 * @tc.desc: Random write of local and cloud data
1284 * @tc.type: FUNC
1285 * @tc.require:
1286 * @tc.author: bty
1287 */
1288 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest004, TestSize.Level0)
1289 {
1290 int64_t paddingSize = 1024 * 8;
1291 vector<thread> threads;
1292 int cloudCount = 1024;
1293 threads.emplace_back(InsertCloudTableRecord, 0, cloudCount, paddingSize, false);
1294 threads.emplace_back(InsertUserTableRecord, std::ref(db), 0, cloudCount, paddingSize, false);
1295 for (auto &thread: threads) {
1296 thread.join();
1297 }
1298 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1299 CloseDb();
1300 }
1301
1302 /**
1303 * @tc.name: CloudSyncTest005
1304 * @tc.desc: sync with device sync query
1305 * @tc.type: FUNC
1306 * @tc.require:
1307 * @tc.author: bty
1308 */
1309 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest005, TestSize.Level0)
1310 {
1311 Query query = Query::Select().FromTable(g_tables).OrderBy("123", true);
1312 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1313 DBStatus::NOT_SUPPORT);
1314
1315 query = Query::Select();
1316 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1317 DBStatus::INVALID_ARGS);
1318 CloseDb();
1319 }
1320
1321 /**
1322 * @tc.name: CloudSyncTest006
1323 * @tc.desc: Firstly set a correct schema, and then null or invalid schema
1324 * @tc.type: FUNC
1325 * @tc.require:
1326 * @tc.author: wanyi
1327 */
1328 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest006, TestSize.Level0)
1329 {
1330 int64_t paddingSize = 10;
1331 int cloudCount = 20;
1332 ChangedData changedDataForTable1;
1333 ChangedData changedDataForTable2;
1334 changedDataForTable1.tableName = g_tableName1;
1335 changedDataForTable2.tableName = g_tableName2;
1336 changedDataForTable1.field.push_back(std::string("name"));
1337 changedDataForTable2.field.push_back(std::string("id"));
1338 for (int i = 0; i < cloudCount; i++) {
1339 changedDataForTable1.primaryData[ChangeType::OP_INSERT].push_back({"Cloud" + std::to_string(i)});
1340 changedDataForTable2.primaryData[ChangeType::OP_INSERT].push_back({(int64_t)i + 10});
1341 }
1342 g_observer->SetExpectedResult(changedDataForTable1);
1343 g_observer->SetExpectedResult(changedDataForTable2);
1344 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1345 InsertUserTableRecord(db, 0, cloudCount / g_arrayHalfSub, paddingSize, false);
1346 // Set correct cloudDbSchema (correct version)
1347 DataBaseSchema correctSchema;
1348 GetCloudDbSchema(correctSchema);
1349 ASSERT_EQ(g_delegate->SetCloudDbSchema(correctSchema), DBStatus::OK);
1350 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1351 EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1352 g_observer->ClearChangedData();
1353 LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:[10 - 20)");
1354 CheckDownloadResult(db, {20L, 10L}); // 20 and 10 means the num of downloads from cloud db by worker1 and worker2
1355 LOGD("expect upload:worker1[primary key]:[local0 - local10), worker2[primary key]:[0 - 10)");
1356 CheckCloudTotalCount({30L, 20L}); // 30 and 20 means the total num of worker1 and worker2 from the cloud db
1357
1358 // Reset cloudDbSchema (invalid version - null)
1359 DataBaseSchema nullSchema;
1360 ASSERT_EQ(g_delegate->SetCloudDbSchema(nullSchema), DBStatus::OK);
1361 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::SCHEMA_MISMATCH);
1362
1363 // Reset cloudDbSchema (invalid version - field mismatch)
1364 DataBaseSchema invalidSchema;
1365 GetInvalidCloudDbSchema(invalidSchema);
1366 ASSERT_EQ(g_delegate->SetCloudDbSchema(invalidSchema), DBStatus::OK);
1367 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::SCHEMA_MISMATCH);
1368 CloseDb();
1369 }
1370
1371 /**
1372 * @tc.name: CloudSyncTest007
1373 * @tc.desc: Check the asset types after sync
1374 * @tc.type: FUNC
1375 * @tc.require:
1376 * @tc.author: bty
1377 */
1378 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest007, TestSize.Level1)
1379 {
1380 int64_t paddingSize = 100;
1381 int localCount = 20;
1382 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1383 InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
1384 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1385
1386 CheckAssetAfterDownload(db, localCount);
1387 CheckAllAssetAfterUpload(localCount);
1388 CheckAssetsAfterDownload(db, localCount);
1389 CloseDb();
1390 }
1391
1392 /*
1393 * @tc.name: CloudSyncTest008
1394 * @tc.desc: Test sync with invalid param
1395 * @tc.type: FUNC
1396 * @tc.require:
1397 * @tc.author: zhangqiquan
1398 */
1399 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest008, TestSize.Level0)
1400 {
1401 ASSERT_EQ(g_delegate->SetCloudDB(nullptr), OK); // it will not happen because cloudb has been set in SetUp()
1402 Query query = Query::Select().FromTable({g_tableName3});
1403 // clouddb has been set in SetUp() and it's not null
1404 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime), OK);
1405 CloseDb();
1406 }
1407
1408 /**
1409 * @tc.name: CloudSyncTest009
1410 * @tc.desc: The second time there was no data change and sync was called.
1411 * @tc.type: FUNC
1412 * @tc.require:
1413 * @tc.author: bty
1414 */
1415 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest009, TestSize.Level0)
1416 {
1417 int64_t paddingSize = 10;
1418 int cloudCount = 20;
1419 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1420 InsertUserTableRecord(db, 0, cloudCount, paddingSize, false);
1421 Query query = Query::Select().FromTable(g_tables);
1422 std::vector<SyncProcess> expectProcess;
1423 InitProcessForTest1(cloudCount, cloudCount, expectProcess);
1424 CloudSyncStatusCallback callback;
1425 GetCallback(g_syncProcess, callback, expectProcess);
1426 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1427 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1428 LOGD("expect download:worker1[primary key]:[cloud0 - cloud20), worker2[primary key]:none");
1429 CheckDownloadResult(db, {20L, 0L}); // 20 and 0 means the num of downloads from cloud db by worker1 and worker2
1430 LOGD("expect upload:worker1[primary key]:[local0 - local20), worker2[primary key]:[0 - 20)");
1431 CheckCloudTotalCount({40L, 20L}); // 40 and 20 means the total num of worker1 and worker2 from the cloud db
1432
1433 g_syncProcess = {};
1434 InitProcessForTest9(cloudCount, 0, expectProcess);
1435 GetCallback(g_syncProcess, callback, expectProcess);
1436 LOGD("--------------the second sync-------------");
1437 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1438 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1439 CloseDb();
1440 }
1441
1442 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest0010, TestSize.Level0)
1443 {
1444 int64_t paddingSize = 10;
1445 int cloudCount = 20;
1446 int localCount = 10;
1447 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1448 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1449 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1450
1451 int rowid = 27;
1452 UpdateAssetForTest(db, AssetOpType::NO_CHANGE, cloudCount, rowid++);
1453 UpdateAssetForTest(db, AssetOpType::INSERT, cloudCount, rowid++);
1454 UpdateAssetForTest(db, AssetOpType::DELETE, cloudCount, rowid++);
1455 UpdateAssetForTest(db, AssetOpType::UPDATE, cloudCount, rowid++);
1456
1457 int id = 0;
1458 UpdateAssetsForTest(db, AssetOpType::NO_CHANGE, id++);
1459 UpdateAssetsForTest(db, AssetOpType::INSERT, id++);
1460 UpdateAssetsForTest(db, AssetOpType::DELETE, id++);
1461 UpdateAssetsForTest(db, AssetOpType::UPDATE, id++);
1462
1463 LOGD("--------------the second sync-------------");
1464 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1465
1466 CheckFillAssetForTest10(db);
1467 CheckFillAssetsForTest10(db);
1468 CloseDb();
1469 }
1470
1471 /**
1472 * @tc.name: CloudSyncTest011
1473 * @tc.desc: Test sync with same table name.
1474 * @tc.type: FUNC
1475 * @tc.require:
1476 * @tc.author: zhangqiquan
1477 */
1478 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest011, TestSize.Level0)
1479 {
1480 Query query = Query::Select().FromTable({g_tableName1, g_tableName1});
1481 bool syncFinish = false;
1482 std::mutex syncMutex;
1483 std::condition_variable cv;
1484 std::atomic<int> callCount = 0;
1485 CloudSyncStatusCallback callback = [&callCount, &cv, &syncFinish, &syncMutex](
__anon239839ae0602( const std::map<std::string, SyncProcess> &onProcess) 1486 const std::map<std::string, SyncProcess> &onProcess) {
1487 ASSERT_NE(onProcess.find(DEVICE_CLOUD), onProcess.end());
1488 SyncProcess syncProcess = onProcess.at(DEVICE_CLOUD);
1489 callCount++;
1490 if (syncProcess.process == FINISHED) {
1491 std::lock_guard<std::mutex> autoLock(syncMutex);
1492 syncFinish = true;
1493 }
1494 cv.notify_all();
1495 };
1496 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1497 std::unique_lock<std::mutex> uniqueLock(syncMutex);
__anon239839ae0702() 1498 cv.wait(uniqueLock, [&syncFinish]() {
1499 return syncFinish;
1500 });
1501 RuntimeContext::GetInstance()->StopTaskPool();
1502 EXPECT_EQ(callCount, 2); // 2 is onProcess count
1503 CloseDb();
1504 }
1505
1506 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest012, TestSize.Level0)
1507 {
1508 int64_t localCount = 20;
1509 int64_t cloudCount = 10;
1510 int64_t paddingSize = 10;
1511 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1512 InsertUserTableRecord(db, 0, localCount, paddingSize, true);
1513 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1514
1515 InsertCloudTableRecord(localCount + cloudCount, cloudCount, paddingSize, false);
1516 InsertUserTableRecord(db, localCount + cloudCount, localCount, paddingSize, true);
1517 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1518
1519 InsertCloudTableRecord(2 * (localCount + cloudCount), cloudCount, paddingSize, false); // 2 is offset
1520 InsertUserTableRecord(db, 2 * (localCount + cloudCount), localCount, paddingSize, false); // 2 is offset
1521 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1522
1523
1524 InsertCloudTableRecord(3 * (localCount + cloudCount), cloudCount, paddingSize, true); // 3 is offset
1525 InsertUserTableRecord(db, 3 * (localCount + cloudCount), localCount, paddingSize, true); // 3 is offset
1526 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1527 CloseDb();
1528 }
1529
1530 /*
1531 * @tc.name: CloudSyncTest013
1532 * @tc.desc: test increment watermark when cloud db query data size is 0
1533 * @tc.type: FUNC
1534 * @tc.require:
1535 * @tc.author: zhuwentao
1536 */
1537 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest013, TestSize.Level0)
1538 {
1539 /**
1540 * @tc.steps: insert some data into cloud db
1541 * @tc.expected: return ok.
1542 */
1543 int64_t paddingSize = 10;
1544 int64_t cloudCount = 10;
1545 SyncProcess syncProcess;
1546 InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1547 /**
1548 * @tc.steps: try to cloud sync
1549 * @tc.expected: return ok.
1550 */
1551 Query query = Query::Select().FromTable(g_tables);
__anon239839ae0802(const std::map<std::string, SyncProcess> &process) 1552 CloudSyncStatusCallback callback = [&syncProcess](const std::map<std::string, SyncProcess> &process) {
1553 LOGI("devices size = %d", process.size());
1554 ASSERT_EQ(process.size(), 1u);
1555 syncProcess = std::move(process.begin()->second);
1556 if (syncProcess.process == FINISHED) {
1557 g_processCondition.notify_one();
1558 }
1559 };
1560 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1561 WaitForSyncFinish(syncProcess, g_syncWaitTime);
1562 uint32_t queryTimes = g_virtualCloudDb->GetQueryTimes(g_tableName1);
1563 /**
1564 * @tc.steps: insert some increment data into cloud db
1565 * @tc.expected: return ok.
1566 */
1567 VBucket data;
1568 Timestamp now = TimeHelper::GetSysCurrentTime();
1569 data.insert_or_assign("name", "Cloud" + std::to_string(0));
1570 data.insert_or_assign("height", 166.0); // 166.0 is random double value
1571 data.insert_or_assign("married", false);
1572 data.insert_or_assign("age", 13L);
1573 VBucket log;
1574 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1575 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1576 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
1577 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
1578 log.insert_or_assign(CloudDbConstant::CURSOR_FIELD, "0123");
1579 g_virtualCloudDb->SetIncrementData(g_tableName1, data, log);
1580 syncProcess.process = PREPARED;
1581 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1582 WaitForSyncFinish(syncProcess, g_syncWaitTime);
1583 uint32_t lastQueryTimes = g_virtualCloudDb->GetQueryTimes(g_tableName1);
1584 ASSERT_EQ(lastQueryTimes - queryTimes, 2u);
1585 CloseDb();
1586 }
1587
TestSyncForStatus(RelationalStoreDelegate * delegate,DBStatus expectStatus)1588 void TestSyncForStatus(RelationalStoreDelegate *delegate, DBStatus expectStatus)
1589 {
1590 std::mutex dataMutex;
1591 std::condition_variable cv;
1592 bool finish = false;
1593 DBStatus res = OK;
1594 CloudSyncStatusCallback callback = [&dataMutex, &cv, &finish, &res](
1595 const std::map<std::string, SyncProcess> &process) {
1596 std::map<std::string, SyncProcess> syncProcess;
1597 {
1598 std::lock_guard<std::mutex> autoLock(dataMutex);
1599 syncProcess = process;
1600 if (syncProcess[DEVICE_CLOUD].process == FINISHED) {
1601 finish = true;
1602 }
1603 res = syncProcess[DEVICE_CLOUD].errCode;
1604 }
1605 cv.notify_one();
1606 };
1607 Query query = Query::Select().FromTable({g_tableName3});
1608 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
1609 {
1610 std::unique_lock<std::mutex> uniqueLock(dataMutex);
1611 cv.wait(uniqueLock, [&finish] {
1612 return finish;
1613 });
1614 }
1615 EXPECT_EQ(res, expectStatus);
1616 }
1617
1618 /*
1619 * @tc.name: CloudSyncTest015
1620 * @tc.desc: Test sync with cloud error
1621 * @tc.type: FUNC
1622 * @tc.require:
1623 * @tc.author: zhangqiquan
1624 */
1625 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest015, TestSize.Level0)
1626 {
1627 g_virtualCloudDb->SetActionStatus(CLOUD_NETWORK_ERROR);
1628 TestSyncForStatus(g_delegate, CLOUD_NETWORK_ERROR);
1629
1630 g_virtualCloudDb->SetActionStatus(CLOUD_SYNC_UNSET);
1631 TestSyncForStatus(g_delegate, CLOUD_SYNC_UNSET);
1632
1633 g_virtualCloudDb->SetActionStatus(CLOUD_FULL_RECORDS);
1634 TestSyncForStatus(g_delegate, CLOUD_FULL_RECORDS);
1635
1636 g_virtualCloudDb->SetActionStatus(CLOUD_LOCK_ERROR);
1637 TestSyncForStatus(g_delegate, CLOUD_LOCK_ERROR);
1638
1639 g_virtualCloudDb->SetActionStatus(DB_ERROR);
1640 TestSyncForStatus(g_delegate, CLOUD_ERROR);
1641
1642 g_virtualCloudDb->SetActionStatus(OK);
1643 CloseDb();
1644 }
1645
1646 /*
1647 * @tc.name: CloudSyncTest014
1648 * @tc.desc: Test sync with s4
1649 * @tc.type: FUNC
1650 * @tc.require:
1651 * @tc.author: zhangqiquan
1652 */
1653 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest014, TestSize.Level0)
1654 {
1655 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
1656 RuntimeConfig::SetProcessSystemAPIAdapter(adapter);
1657
1658 // sync failed because get security option failed
__anon239839ae0b02(const std::string&, SecurityOption &option) 1659 adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1660 option.securityLabel = S0;
1661 return DB_ERROR;
1662 });
1663 Query query = Query::Select().FromTable({g_tableName3});
1664 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1665 SECURITY_OPTION_CHECK_ERROR);
1666
1667 // sync failed because get S4
__anon239839ae0c02(const std::string&, SecurityOption &option) 1668 adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1669 option.securityLabel = S4;
1670 return NOT_SUPPORT;
1671 });
1672 Query invalidQuery = Query::Select().FromTable({g_tableName3}).PrefixKey({'k'});
1673 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, invalidQuery, nullptr, g_syncWaitTime),
1674 NOT_SUPPORT);
1675
1676 // sync failed because get S4
__anon239839ae0d02(const std::string&, SecurityOption &option) 1677 adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1678 option.securityLabel = S4;
1679 return OK;
1680 });
1681 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1682 SECURITY_OPTION_CHECK_ERROR);
1683
1684 // sync failed because S4 has been cached
__anon239839ae0e02(const std::string&, SecurityOption &option) 1685 adapter->ForkGetSecurityOption([](const std::string&, SecurityOption &option) {
1686 option.securityLabel = S0;
1687 return OK;
1688 });
1689 EXPECT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, nullptr, g_syncWaitTime),
1690 SECURITY_OPTION_CHECK_ERROR);
1691 RuntimeConfig::SetProcessSystemAPIAdapter(nullptr);
1692 CloseDb();
1693 }
1694
1695 /*
1696 * @tc.name: CloudSyncTest016
1697 * @tc.desc: Test sync when push before merge
1698 * @tc.type: FUNC
1699 * @tc.require:
1700 * @tc.author: chenchaohao
1701 */
1702 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest016, TestSize.Level0)
1703 {
1704 int64_t localCount = 10;
1705 int64_t paddingSize = 10;
1706 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1707 callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1708 CheckCloudTotalCount({10L, 10L});
1709 UpdateUserTableRecord(db, 0, localCount);
1710 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1711
1712 VBucket extend;
1713 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
1714 std::vector<VBucket> data1;
1715 g_virtualCloudDb->Query(g_tables[0], extend, data1);
1716 for (int i = 0; i < 10; ++i) { // index[0, 10) in cloud db expected to be updated
1717 EXPECT_EQ(std::get<int64_t>(data1[i]["age"]), 99); // 99 is the updated age field of cloud db
1718 }
1719
1720 std::vector<VBucket> data2;
1721 g_virtualCloudDb->Query(g_tables[1], extend, data2);
1722 for (int i = 0; i < 10; ++i) { // index[0, 10) in cloud db expected to be updated
1723 EXPECT_EQ(std::get<int64_t>(data2[i]["age"]), 99); // 99 is the updated age field of cloud db
1724 }
1725
1726 CloseDb();
1727 }
1728
1729 /*
1730 * @tc.name: CloudSyncTest017
1731 * @tc.desc: Test sync to push when local data deleted and not upload to cloud
1732 * @tc.type: FUNC
1733 * @tc.require:
1734 * @tc.author: wangxiangdong
1735 */
1736 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncTest017, TestSize.Level0)
1737 {
1738 /**
1739 * @tc.steps: step1. make data: 20 records on local, 20 records on cloud
1740 */
1741 int64_t localCount = 20;
1742 int64_t paddingSize = 20;
1743 InsertCloudTableRecord(0, localCount, paddingSize, true);
1744 InsertUserTableRecord(db, 0, localCount, paddingSize, true);
1745 localCount = 10;
1746 /**
1747 * @tc.steps: step2. delete 10 local record before sync
1748 */
1749 DeleteUserTableRecord(db, 0, localCount);
1750 callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1751 /**
1752 * @tc.steps: step3. check local and clod num
1753 */
1754 CheckCloudTotalCount({30L, 20L});
1755 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(g_tables[0]) +
1756 " where data_key=-1 and cloud_gid='';";
1757 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), QueryCountCallback,
1758 reinterpret_cast<void *>(10), nullptr), SQLITE_OK);
1759 CloseDb();
1760 }
1761
1762 /*
1763 * @tc.name: DataNotifier001
1764 * @tc.desc: Notify data without primary key
1765 * @tc.type: FUNC
1766 * @tc.require:
1767 * @tc.author: wanyi
1768 */
1769 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DataNotifier001, TestSize.Level0)
1770 {
1771 int64_t paddingSize = 10;
1772 int localCount = 20;
1773 InsertRecordWithoutPk2LocalAndCloud(db, 0, localCount, paddingSize);
1774 callSync({g_tableName3}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1775 CloseDb();
1776 }
1777
1778 /**
1779 * @tc.name: CloudSyncAssetTest001
1780 * @tc.desc:
1781 * @tc.type: FUNC
1782 * @tc.require:
1783 * @tc.author: wanyi
1784 */
1785 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest001, TestSize.Level1)
1786 {
1787 int64_t paddingSize = 100;
1788 int localCount = 20;
1789 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1790 InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
1791 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1792
1793 CheckAssetAfterDownload(db, localCount);
1794 CheckAllAssetAfterUpload(localCount);
1795 CloseDb();
1796 }
1797
1798 /*
1799 * @tc.name: MannualNotify001
1800 * @tc.desc: Test FLAG_ONLY mode of RemoveDeviceData
1801 * @tc.type: FUNC
1802 * @tc.require:
1803 * @tc.author: huangboxin
1804 */
1805 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, MannualNotify001, TestSize.Level0)
1806 {
1807 int64_t paddingSize = 10;
1808 int localCount = 10;
1809 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1810 Query query = Query::Select().FromTable(g_tables);
1811 std::vector<SyncProcess> expectProcess;
1812 InitProcessForMannualSync1(expectProcess);
1813 CloudSyncStatusCallback callback;
1814 GetCallback(g_syncProcess, callback, expectProcess);
1815 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_FORCE_PULL, query, callback, g_syncWaitTime),
1816 DBStatus::OK);
1817 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
1818 CloseDb();
1819 }
1820
1821 /**
1822 * @tc.name: CloudProcessNotify001
1823 * @tc.desc: Test duplicate cloud records. SYNC_MODE_CLOUD_MERGE
1824 * @tc.type: FUNC
1825 * @tc.require:
1826 * @tc.author: liufuchenxing
1827 */
1828 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudProcessNotify001, TestSize.Level1)
1829 {
1830 /**
1831 * @tc.steps: step1. table work1 and work2 insert 1 record which name is local0, then sync().
1832 * @tc.expected: step 1. table work1 and work2 download result is 0. table work1 and work2 upload 1 record.
1833 */
1834 int64_t paddingSize = 10;
1835 int64_t localCount = 1;
1836 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1837 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1838 EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1839 g_observer->ClearChangedData();
1840 LOGD("expect download:worker1[primary key]:[], worker2[primary key]:[]");
1841 CheckDownloadResult(db, {0L, 0L}); // 0 and 0 means the num of downloads from cloud db by worker1 and worker2
1842 LOGD("expect upload:worker1[primary key]:[local0], worker2[primary key]:[0]");
1843 CheckCloudTotalCount({1L, 1L}); // 1 and 1 means the total num of worker1 and worker2 from the cloud db
1844
1845 /**
1846 * @tc.steps: step2. reset data
1847 * @tc.expected: step2. return ok.
1848 */
1849 std::this_thread::sleep_for(std::chrono::milliseconds(100));
1850 g_syncProcess = {};
1851 ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
1852
1853 /**
1854 * @tc.steps: step3. table work1 delete record which gid is 0 and name is local0 on cloud.
1855 * @tc.expected: step3. return ok.
1856 */
1857 VBucket idMap;
1858 idMap.insert_or_assign("#_gid", std::to_string(0));
1859 ASSERT_EQ(g_virtualCloudDb->DeleteByGid(g_tableName1, idMap), DBStatus::OK);
1860
1861 /**
1862 * @tc.steps: step4. table work1 insert record which gid is 0 and name is local0 on cloud.
1863 * @tc.expected: step4. return ok.
1864 */
1865 std::vector<VBucket> record1;
1866 std::vector<VBucket> extend1;
1867 InsertCloudForCloudProcessNotify001(record1, extend1);
1868 ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(g_tableName1, std::move(record1), extend1), DBStatus::OK);
1869
1870 /**
1871 * @tc.steps: step5. sync() and check local data.
1872 * @tc.expected: step5. return ok.
1873 */
1874 ChangedData changedDataForTable1;
1875 changedDataForTable1.tableName = g_tableName1;
1876 changedDataForTable1.field.push_back(std::string("name"));
1877 changedDataForTable1.primaryData[ChangeType::OP_UPDATE].push_back({"Local" + std::to_string(0)});
1878 g_observer->SetExpectedResult(changedDataForTable1);
1879
1880 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1881 EXPECT_TRUE(g_observer->IsAllChangedDataEq());
1882 g_observer->ClearChangedData();
1883 LOGD("expect download:worker1[primary key]:[Local0], worker2[primary key]:[0]");
1884 // 1 and 1 means the num of downloads from cloud db by worker1 and worker2
1885 CheckDownloadResult(db, {1L, 1L}, "Local");
1886 LOGD("expect upload:worker1[primary key]:[local0], worker2[primary key]:[0]");
1887 CheckCloudTotalCount({1L, 1L}); // 0 and 0 means the total num of worker1 and worker2 from the cloud db
1888
1889 /**
1890 * @tc.steps: step6. CloseDb().
1891 * @tc.expected: step6. return ok.
1892 */
1893 CloseDb();
1894 }
1895
1896 /*
1897 * @tc.name: CloudSyncAssetTest002
1898 * @tc.desc:
1899 * @tc.type: FUNC
1900 * @tc.require:
1901 * @tc.author: huangboxin
1902 */
1903 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest002, TestSize.Level0)
1904 {
1905 int64_t paddingSize = 10;
1906 int localCount = 3;
1907 int cloudCount = 3;
1908 InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1909 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1910 callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
1911 CloseDb();
1912 }
1913
1914 /*
1915 * @tc.name: CloudSyncAssetTest003
1916 * @tc.desc:
1917 * @tc.type: FUNC
1918 * @tc.require:
1919 * @tc.author: bty
1920 */
1921 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest003, TestSize.Level0)
1922 {
1923 int64_t paddingSize = 10;
1924 int localCount = 3;
1925 int cloudCount = 3;
1926 InsertCloudTableRecord(0, cloudCount, paddingSize, true);
1927 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1928 Assets assets;
1929 assets.push_back(g_localAsset);
1930 assets.push_back(g_localAsset);
1931 UpdateLocalAssets(db, assets, 1);
1932 Query query = Query::Select().FromTable(g_tables);
1933 std::vector<SyncProcess> expectProcess;
__anon239839ae0f02(const std::map<std::string, SyncProcess> &process) 1934 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
1935 ASSERT_EQ(process.size(), 1u);
1936 g_syncProcess = std::move(process.begin()->second);
1937
1938 if (g_syncProcess.process == FINISHED) {
1939 g_processCondition.notify_one();
1940 }
1941 };
1942 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
1943 DBStatus::OK);
1944 {
1945 std::unique_lock<std::mutex> lock(g_processMutex);
__anon239839ae1002() 1946 g_processCondition.wait(lock, []() {
1947 return g_syncProcess.process == FINISHED;
1948 });
1949 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1950 }
1951 CloseDb();
1952 }
1953
1954 /*
1955 * @tc.name: CloudSyncAssetTest004
1956 * @tc.desc:
1957 * @tc.type: FUNC
1958 * @tc.require:
1959 * @tc.author: bty
1960 */
1961 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest004, TestSize.Level0)
1962 {
1963 int64_t paddingSize = 10;
1964 int localCount = 3;
1965 int cloudCount = 3;
1966 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
1967 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
1968 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
1969
1970 UpdateDiffType(localCount);
1971 g_syncProcess = {};
__anon239839ae1102(const std::map<std::string, SyncProcess> &process) 1972 CloudSyncStatusCallback callback1 = [](const std::map<std::string, SyncProcess> &process) {
1973 ASSERT_EQ(process.size(), 1u);
1974 g_syncProcess = std::move(process.begin()->second);
1975 if (g_syncProcess.process == FINISHED) {
1976 g_processCondition.notify_one();
1977 }
1978 };
1979 Query query = Query::Select().FromTable(g_tables);
1980 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback1, g_syncWaitTime),
1981 DBStatus::OK);
1982 {
1983 std::unique_lock<std::mutex> lock(g_processMutex);
__anon239839ae1202() 1984 g_processCondition.wait(lock, []() {
1985 return g_syncProcess.process == FINISHED;
1986 });
1987 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
1988 }
1989 CheckDiffTypeAsset(db);
1990 CloseDb();
1991 }
1992
1993 /*
1994 * @tc.name: CloudSyncAssetTest005
1995 * @tc.desc: Test erase all no change Asset
1996 * @tc.type: FUNC
1997 * @tc.require:
1998 * @tc.author: bty
1999 */
2000 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest005, TestSize.Level0)
2001 {
2002 /**
2003 * @tc.steps:step1. Construct local data with asset names and hashes consistent with the cloud
2004 * @tc.expected: step1. return ok.
2005 */
2006 int64_t paddingSize = 10;
2007 int localCount = 3;
2008 int cloudCount = 3;
2009 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2010 Assets assets;
2011 for (int64_t j = 0; j < cloudCount; j++) {
2012 Asset asset = g_cloudAsset;
2013 asset.name = g_cloudAsset.name + std::to_string(j);
2014 assets.push_back(asset);
2015 }
2016 UpdateLocalAssets(db, assets, 0);
2017 std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2018
2019 /**
2020 * @tc.steps:step2. Construct cloud data
2021 * @tc.expected: step2. return ok.
2022 */
2023 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2024
2025 /**
2026 * @tc.steps:step3. sync, expect EraseNoChangeAsset to erase all Nochange assets
2027 * @tc.expected: step3. return ok.
2028 */
2029 Query query = Query::Select().FromTable(g_tables);
2030 std::vector<SyncProcess> expectProcess;
__anon239839ae1302(const std::map<std::string, SyncProcess> &process) 2031 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2032 ASSERT_EQ(process.size(), 1u);
2033 g_syncProcess = std::move(process.begin()->second);
2034
2035 if (g_syncProcess.process == FINISHED) {
2036 g_processCondition.notify_one();
2037 }
2038 };
2039 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2040 DBStatus::OK);
2041 {
2042 std::unique_lock<std::mutex> lock(g_processMutex);
__anon239839ae1402() 2043 g_processCondition.wait(lock, []() {
2044 return g_syncProcess.process == FINISHED;
2045 });
2046 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2047 }
2048 CloseDb();
2049 }
2050
2051 /*
2052 * @tc.name: CloudSyncAssetTest006
2053 * @tc.desc: Test upload new data without assets
2054 * @tc.type: FUNC
2055 * @tc.require:
2056 * @tc.author: bty
2057 */
2058 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest006, TestSize.Level0)
2059 {
2060 /**
2061 * @tc.steps:step1. Construct local data with NULL asset and the local count is greater than the cloud
2062 * @tc.expected: step1. return ok.
2063 */
2064 int64_t paddingSize = 10;
2065 int localCount = 6;
2066 int cloudCount = 3;
2067 InsertUserTableRecord(db, 0, localCount, paddingSize, true);
2068 std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2069 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2070
2071 /**
2072 * @tc.steps:step2. sync, upload new data without assets,
2073 * @tc.expected: step2. return ok.
2074 */
2075 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2076 CloseDb();
2077 }
2078
2079 /*
2080 * @tc.name: CloudSyncAssetTest007
2081 * @tc.desc: for expilictly set not-change assets. If an asset is deleted, and its hash is not set to empty, it will be
2082 * regarded as NO-CHANGE, rather than delete
2083 * @tc.type: FUNC
2084 * @tc.require:
2085 * @tc.author: wanyi
2086 */
2087 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest007, TestSize.Level0)
2088 {
2089 /**
2090 * @tc.steps:step1. local asset contain an asset which has a corresponding asset in cloud
2091 * @tc.expected: step1. return ok.
2092 */
2093 int64_t paddingSize = 10;
2094 int localCount = 1;
2095 int cloudCount = 1;
2096 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2097 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2098 /**
2099 * @tc.steps:step2. local asset is set to delete, but hash is not set to empty
2100 * @tc.expected: step2. return ok.
2101 */
2102 Assets assets;
2103 for (int64_t j = 0; j < cloudCount; j++) {
2104 Asset asset = g_cloudAsset;
2105 asset.name = g_cloudAsset.name + std::to_string(j);
2106 asset.status = static_cast<uint32_t>(AssetStatus::DELETE);
2107 assets.push_back(asset);
2108 }
2109 UpdateLocalAssets(db, assets, 0);
2110 std::this_thread::sleep_for(std::chrono::milliseconds(cloudCount));
2111 /**
2112 * @tc.steps:step3. Do sync
2113 * @tc.expected: step3. return ok.
2114 */
2115 Query query = Query::Select().FromTable(g_tables);
2116 std::vector<SyncProcess> expectProcess;
__anon239839ae1502(const std::map<std::string, SyncProcess> &process) 2117 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2118 ASSERT_EQ(process.size(), 1u);
2119 g_syncProcess = std::move(process.begin()->second);
2120
2121 if (g_syncProcess.process == FINISHED) {
2122 g_processCondition.notify_one();
2123 }
2124 };
2125 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2126 DBStatus::OK);
2127 {
2128 std::unique_lock<std::mutex> lock(g_processMutex);
__anon239839ae1602() 2129 g_processCondition.wait(lock, []() {
2130 return g_syncProcess.process == FINISHED;
2131 });
2132 ASSERT_EQ(g_syncProcess.errCode, DBStatus::OK);
2133 }
2134 /**
2135 * @tc.steps:step4. Check result. Cloud db should not contain asset.
2136 * @tc.expected: step4. return ok.
2137 */
2138 CheckAssetForAssetTest006();
2139 CloseDb();
2140 }
2141
2142 /**
2143 * @tc.name: DownloadAssetTest001
2144 * @tc.desc: Test the sync of different Asset status out of parameters when the download is successful
2145 * @tc.type: FUNC
2146 * @tc.require:
2147 * @tc.author: bty
2148 */
2149 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest001, TestSize.Level0)
2150 {
2151 /**
2152 * @tc.steps:step1. Set different status out of parameters, and the code returns OK
2153 * @tc.expected: step1. return ok.
2154 */
2155 DBStatus expectStatus = DBStatus::OK;
2156 int index = 0;
2157 InitMockAssetLoader(expectStatus, index);
2158
2159 /**
2160 * @tc.steps:step2. init download data
2161 * @tc.expected: step2. return ok.
2162 */
2163 int64_t paddingSize = 1;
2164 int localCount = 120;
2165 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2166 InsertCloudTableRecord(0, localCount / g_arrayHalfSub, paddingSize, false);
2167
2168 /**
2169 * @tc.steps:step3. sync
2170 * @tc.expected: step3. return ok.
2171 */
2172 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2173
2174 /**
2175 * @tc.steps:step4. Expect all states to be normal
2176 * @tc.expected: step4. return ok.
2177 */
2178 CheckAssetAfterDownload(db, localCount);
2179 CloseDb();
2180 }
2181
2182 /*
2183 * @tc.name: CloudSyncAssetTest008
2184 * @tc.desc: sync failed with download asset
2185 * @tc.type: FUNC
2186 * @tc.require:
2187 * @tc.author: zhangqiquan
2188 */
2189 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest008, TestSize.Level0)
2190 {
2191 /**
2192 * @tc.steps:step1. prepare asset data
2193 */
2194 int64_t paddingSize = 10;
2195 int localCount = 1;
2196 int cloudCount = 1;
2197 InsertCloudTableRecord(0, cloudCount, paddingSize, false);
2198 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2199 /**
2200 * @tc.steps:step2. set download asset status failed
2201 */
2202 g_virtualAssetLoader->SetDownloadStatus(CLOUD_ASSET_SPACE_INSUFFICIENT);
2203 Query query = Query::Select().FromTable(g_tables);
2204 std::vector<SyncProcess> expectProcess;
__anon239839ae1702(const std::map<std::string, SyncProcess> &process) 2205 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2206 for (const auto &item: process) {
2207 g_syncProcess = item.second;
2208 }
2209 if (g_syncProcess.process == FINISHED) {
2210 g_processCondition.notify_one();
2211 }
2212 };
2213 /**
2214 * @tc.steps:step3. sync and wait sync finished.
2215 * @tc.expected: step3. sync return CLOUD_ASSET_SPACE_INSUFFICIENT.
2216 */
2217 ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2218 DBStatus::OK);
2219 {
2220 std::unique_lock<std::mutex> lock(g_processMutex);
__anon239839ae1802() 2221 g_processCondition.wait(lock, []() {
2222 return g_syncProcess.process == FINISHED;
2223 });
2224 ASSERT_EQ(g_syncProcess.errCode, DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT);
2225 }
2226 /**
2227 * @tc.steps:step4. clear data.
2228 */
2229 g_virtualAssetLoader->SetDownloadStatus(OK);
2230 CloseDb();
2231 }
2232
2233 /*
2234 * @tc.name: CloudSyncAssetTest009
2235 * @tc.desc:
2236 * @tc.type: FUNC
2237 * @tc.require:
2238 * @tc.author: zhangqiquan
2239 */
2240 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudSyncAssetTest009, TestSize.Level0)
2241 {
2242 // insert 3 data with asset 3 data without asset into local
2243 // sync them to cloud
2244 int64_t paddingSize = 10;
2245 int localCount = 3;
2246 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2247 InsertUserTableRecord(db, localCount, localCount, paddingSize, true);
2248 callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2249 // update these data and sync again
2250 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2251 InsertUserTableRecord(db, localCount, localCount, paddingSize, true);
2252 callSync(g_tables, SYNC_MODE_CLOUD_FORCE_PUSH, DBStatus::OK);
2253 EXPECT_EQ(g_syncProcess.errCode, DBStatus::OK);
2254 CloseDb();
2255 }
2256
2257 /**
2258 * @tc.name: DownloadAssetTest002
2259 * @tc.desc: Test the sync of different Asset status out of parameters when the download is failed
2260 * @tc.type: FUNC
2261 * @tc.require:
2262 * @tc.author: bty
2263 */
2264 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest002, TestSize.Level0)
2265 {
2266 /**
2267 * @tc.steps:step1. Set different status out of parameters, and the code returns CLOUD_ERROR
2268 * @tc.expected: step1. return ok.
2269 */
2270 DBStatus expectStatus = DBStatus::CLOUD_ERROR;
2271 int index = 0;
2272 InitMockAssetLoader(expectStatus, index);
2273 int64_t paddingSize = 1;
2274 int localCount = 100;
2275
2276 /**
2277 * @tc.steps:step2. init download data
2278 * @tc.expected: step2. return ok.
2279 */
2280 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2281 InsertCloudTableRecord(0, localCount, paddingSize, false);
2282
2283 /**
2284 * @tc.steps:step3. sync
2285 * @tc.expected: step3. return ok.
2286 */
2287 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2288
2289 /**
2290 * @tc.steps:step4. Those status that are not normal are all be abnormal after sync.
2291 * @tc.expected: step4. return ok.
2292 */
2293 CheckAssetAfterDownload2(db, localCount);
2294 CloseDb();
2295 }
2296
2297 /**
2298 * @tc.name: DownloadAssetTest003
2299 * @tc.desc: Init different asset name between local and cloud, then sync to test download
2300 * @tc.type: FUNC
2301 * @tc.require:
2302 * @tc.author: bty
2303 */
2304 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest003, TestSize.Level0)
2305 {
2306 /**
2307 * @tc.steps:step1. Init data and sync
2308 * @tc.expected: step1. return ok.
2309 */
2310 int64_t paddingSize = 1;
2311 int localCount = 10;
2312 InsertUserTableRecord(db, 0, localCount, paddingSize, false);
2313 InsertCloudTableRecord(0, localCount, paddingSize, false);
2314 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2315
2316 /**
2317 * @tc.steps:step2. update cloud Asset where gid = 0
2318 * @tc.expected: step2. return ok.
2319 */
2320 UpdateCloudAssetForDownloadAssetTest003();
2321
2322 /**
2323 * @tc.steps:step3. sync again
2324 * @tc.expected: step3. return ok.
2325 */
2326 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2327
2328 /**
2329 * @tc.steps:step4. check asset after download where gid = 0
2330 * @tc.expected: step4. return ok.
2331 */
2332 CheckAssetForDownloadAssetTest003(db);
2333 CloseDb();
2334 }
2335
2336 /**
2337 * @tc.name: DownloadAssetTest004
2338 * @tc.desc: Test total count, fail count and success count when drop table
2339 * @tc.type: FUNC
2340 * @tc.require:
2341 * @tc.author: liufuchenxing
2342 */
2343 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, DownloadAssetTest004, TestSize.Level0)
2344 {
2345 /**
2346 * @tc.steps:step1. Init data and sync
2347 * @tc.expected: step1. return ok.
2348 */
2349 int64_t paddingSize = 1;
2350 int count = 10;
2351 InsertUserTableRecord(db, 0, count, paddingSize, false);
2352 g_syncProcess = {};
__anon239839ae1902(const std::map<std::string, SyncProcess> &process) 2353 CloudSyncStatusCallback callback = [](const std::map<std::string, SyncProcess> &process) {
2354 for (const auto &item : process) {
2355 g_syncProcess = item.second;
2356 }
2357 if (g_syncProcess.process == FINISHED) {
2358 g_processCondition.notify_one();
2359 }
2360 };
2361 Query query = Query::Select().FromTable(g_tables);
2362 EXPECT_EQ(g_delegate->Sync({ DEVICE_CLOUD }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), DBStatus::OK);
2363 WaitForSyncFinish(g_syncProcess, g_syncWaitTime);
2364
2365 /**
2366 * @tc.steps:step2. drop table work2. sync failed, check total, success and fail count.
2367 * @tc.expected: step2. total = 20, success=0, fail=20
2368 */
2369 g_syncProcess = {};
2370 InsertCloudTableRecord(0, count, paddingSize, false);
2371 EXPECT_EQ(RelationalTestUtils::ExecSql(db, DROP_INTEGER_PRIMARY_KEY_TABLE_SQL), E_OK);
2372 EXPECT_EQ(g_delegate->Sync({ DEVICE_CLOUD }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime),
2373 DBStatus::NOT_FOUND);
2374
2375 /**
2376 * @tc.steps:step3. close db.
2377 * @tc.expected: step3. close success.
2378 */
2379 CloseDb();
2380 }
2381
2382 /**
2383 * @tc.name: SchemaTest001
2384 * @tc.desc: Create table with Cloud cooperation mode and do sync
2385 * @tc.type: FUNC
2386 * @tc.require:
2387 * @tc.author: wanyi
2388 */
2389 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, SchemaTest001, TestSize.Level0)
2390 {
2391 /**
2392 * @tc.steps:step1. Create table with Cloud cooperation mode
2393 * @tc.expected: step1. return ok.
2394 */
2395 EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE), SQLITE_OK);
2396 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName4, CLOUD_COOPERATION), DBStatus::OK);
2397 /**
2398 * @tc.steps:step1. do sync
2399 * @tc.expected: step1. return ok.
2400 */
2401 callSync({g_tableName4}, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2402 CloseDb();
2403 }
2404
2405 /**
2406 * @tc.name: SchemaTest002
2407 * @tc.desc: Create table with DEVICE_COOPERATION mode and do sync
2408 * @tc.type: FUNC
2409 * @tc.require:
2410 * @tc.author: wanyi
2411 */
2412 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, SchemaTest002, TestSize.Level0)
2413 {
2414 /**
2415 * @tc.steps:step1. Create table with DEVICE_COOPERATION mode
2416 * @tc.expected: step1. return ok.
2417 */
2418 EXPECT_EQ(RelationalTestUtils::ExecSql(db, INTEGER_PRIMARY_KEY_TABLE_SQL_WRONG_SYNC_MODE), SQLITE_OK);
2419 ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName4, DEVICE_COOPERATION), DBStatus::OK);
2420 /**
2421 * @tc.steps:step1. do sync
2422 * @tc.expected: step1. return NOT_SUPPORT.
2423 */
2424 callSync({g_tableName4}, SYNC_MODE_CLOUD_MERGE, DBStatus::NOT_SUPPORT);
2425 CloseDb();
2426 }
2427
2428 /**
2429 * @tc.name: CloudCursorTest001
2430 * @tc.desc: Init different asset name between local and cloud, then sync to test download
2431 * @tc.type: FUNC
2432 * @tc.require:
2433 * @tc.author: bty
2434 */
2435 HWTEST_F(DistributedDBCloudInterfacesRelationalSyncTest, CloudCursorTest001, TestSize.Level0)
2436 {
2437 /**
2438 * @tc.steps:step1. Init data and sync
2439 * @tc.expected: step1. return ok.
2440 */
2441 int64_t paddingSize = 1;
2442 int localCount = 10;
2443 InsertUserTableRecord(db, 0, localCount, paddingSize, true);
2444 InsertCloudTableRecord(0, localCount, paddingSize, true);
2445 callSync(g_tables, SYNC_MODE_CLOUD_MERGE, DBStatus::OK);
2446
2447 /**
2448 * @tc.steps:step2. the cursor does not increase during upload, the cursor will increase during download
2449 * although it is unTrackerTable
2450 * @tc.expected: step2. return ok.
2451 */
2452 string sql = "select cursor from " + DBConstant::RELATIONAL_PREFIX + g_tableName1 + "_log";
2453 sqlite3_stmt *stmt = nullptr;
2454 EXPECT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
2455 int64_t index = 0;
2456 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2457 EXPECT_EQ(static_cast<int64_t>(sqlite3_column_int64(stmt, 0)), ++index);
2458 }
2459 int errCode;
2460 SQLiteUtils::ResetStatement(stmt, true, errCode);
2461 CloseDb();
2462 }
2463 }
2464 #endif // RELATIONAL_STORE
2465