1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud_db_sync_utils_test.h"
16 
17 using namespace DistributedDB;
18 using namespace DistributedDBUnitTest;
19 using namespace std;
20 
21 namespace DistributedDB {
22     string g_storeID = "Relational_Store_SYNC";
23     const string TABLE_NAME = "worker";
24     const string DEVICE_CLOUD = "cloud_dev";
25     const int64_t SYNC_WAIT_TIME = 60;
26     int g_syncIndex = 0;
27     string g_storePath = "";
28     std::mutex g_processMutex;
29     std::condition_variable g_processCondition;
30     DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
31     SyncProcess g_syncProcess;
32     using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
33     const std::vector<std::string> g_tables = {TABLE_NAME};
34     const Asset g_cloudAsset1 = {
35         .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
36         .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
37     };
38     const Asset g_cloudAsset2 = {
39         .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
40         .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "UPDATE"
41     };
42 
SetStorePath(const std::string & path)43     void CloudDBSyncUtilsTest::SetStorePath(const std::string &path)
44     {
45         g_storePath = path;
46     }
47 
InitSyncUtils(const std::vector<Field> & cloudField,RelationalStoreObserverUnitTest * & observer,std::shared_ptr<VirtualCloudDb> & virtualCloudDb,std::shared_ptr<VirtualAssetLoader> & virtualAssetLoader,RelationalStoreDelegate * & delegate)48     void CloudDBSyncUtilsTest::InitSyncUtils(const std::vector<Field> &cloudField,
49         RelationalStoreObserverUnitTest *&observer, std::shared_ptr<VirtualCloudDb> &virtualCloudDb,
50         std::shared_ptr<VirtualAssetLoader> &virtualAssetLoader, RelationalStoreDelegate *&delegate)
51     {
52         observer = new (std::nothrow) RelationalStoreObserverUnitTest();
53         ASSERT_NE(observer, nullptr);
54         ASSERT_EQ(g_mgr.OpenStore(g_storePath, g_storeID, RelationalStoreDelegate::Option { .observer = observer },
55             delegate), DBStatus::OK);
56         ASSERT_NE(delegate, nullptr);
57         ASSERT_EQ(delegate->CreateDistributedTable(TABLE_NAME, CLOUD_COOPERATION), DBStatus::OK);
58         virtualCloudDb = make_shared<VirtualCloudDb>();
59         virtualAssetLoader = make_shared<VirtualAssetLoader>();
60         g_syncProcess = {};
61         ASSERT_EQ(delegate->SetCloudDB(virtualCloudDb), DBStatus::OK);
62         ASSERT_EQ(delegate->SetIAssetLoader(virtualAssetLoader), DBStatus::OK);
63         // sync before setting cloud db schema,it should return SCHEMA_MISMATCH
64         Query query = Query::Select().FromTable(g_tables);
65         CloudSyncStatusCallback callback;
66         ASSERT_EQ(delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, SYNC_WAIT_TIME),
67             DBStatus::SCHEMA_MISMATCH);
68         DataBaseSchema dataBaseSchema;
69         GetCloudDbSchema(TABLE_NAME, cloudField, dataBaseSchema);
70         ASSERT_EQ(delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
71     }
72 
CreateUserDBAndTable(sqlite3 * & db,std::string sql)73     void CloudDBSyncUtilsTest::CreateUserDBAndTable(sqlite3 *&db, std::string sql)
74     {
75         EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
76         EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
77     }
78 
InsertCloudTableRecord(int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull,std::shared_ptr<VirtualCloudDb> & virtualCloudDb)79     void CloudDBSyncUtilsTest::InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull,
80         std::shared_ptr<VirtualCloudDb> &virtualCloudDb)
81     {
82         std::vector<uint8_t> photo(photoSize, 'v');
83         std::vector<VBucket> record;
84         std::vector<VBucket> extend;
85         Timestamp now = TimeHelper::GetSysCurrentTime();
86         for (int64_t i = begin; i < begin + count; ++i) {
87             VBucket data;
88             data.insert_or_assign("name", "Cloud" + std::to_string(i));
89             data.insert_or_assign("height", 166.0); // 166.0 is random double value
90             data.insert_or_assign("married", false);
91             data.insert_or_assign("photo", photo);
92             data.insert_or_assign("age", 13L); // 13 is random int64_t value
93             Asset asset = g_cloudAsset1;
94             asset.name = asset.name + std::to_string(i);
95             assetIsNull ? data.insert_or_assign("asset", Nil()) : data.insert_or_assign("asset", asset);
96             record.push_back(data);
97             VBucket log;
98             log.insert_or_assign(CloudDbConstant::CREATE_FIELD,
99                 static_cast<int64_t>(now / CloudDbConstant::TEN_THOUSAND + i));
100             log.insert_or_assign(CloudDbConstant::MODIFY_FIELD,
101                 static_cast<int64_t>(now / CloudDbConstant::TEN_THOUSAND + i));
102             log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
103             extend.push_back(log);
104         }
105         ASSERT_EQ(virtualCloudDb->BatchInsert(TABLE_NAME, std::move(record), extend), DBStatus::OK);
106         LOGD("insert cloud record worker[primary key]:[cloud%" PRId64 " - cloud%" PRId64")", begin, count);
107         std::this_thread::sleep_for(std::chrono::milliseconds(count));
108     }
109 
UpdateCloudTableRecord(int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull,std::shared_ptr<VirtualCloudDb> & virtualCloudDb)110     void CloudDBSyncUtilsTest::UpdateCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull,
111         std::shared_ptr<VirtualCloudDb> &virtualCloudDb)
112     {
113         std::vector<uint8_t> photo(photoSize, 'v');
114         std::vector<VBucket> record;
115         std::vector<VBucket> extend;
116         Timestamp now = TimeHelper::GetSysCurrentTime();
117         for (int64_t i = begin; i < begin + count; ++i) {
118             VBucket data;
119             data.insert_or_assign("name", "Cloud" + std::to_string(i));
120             data.insert_or_assign("height", 188.0); // 188.0 is random double value
121             data.insert_or_assign("married", false);
122             data.insert_or_assign("photo", photo);
123             data.insert_or_assign("age", 13L); // 13 is random int64_t value
124             Asset asset = g_cloudAsset2;
125             asset.name = asset.name + std::to_string(i);
126             assetIsNull ? data.insert_or_assign("asset", Nil()) : data.insert_or_assign("asset", asset);
127             record.push_back(data);
128             VBucket log;
129             log.insert_or_assign(CloudDbConstant::CREATE_FIELD,
130                 static_cast<int64_t>(now / CloudDbConstant::TEN_THOUSAND + i));
131             log.insert_or_assign(CloudDbConstant::MODIFY_FIELD,
132                 static_cast<int64_t>(now / CloudDbConstant::TEN_THOUSAND + i));
133             log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
134             log.insert_or_assign(CloudDbConstant::GID_FIELD, to_string(i));
135             extend.push_back(log);
136         }
137         ASSERT_EQ(virtualCloudDb->BatchUpdate(TABLE_NAME, std::move(record), extend), DBStatus::OK);
138         LOGD("update cloud record worker[primary key]:[cloud%" PRId64 " - cloud%" PRId64")", begin, count);
139         std::this_thread::sleep_for(std::chrono::milliseconds(count));
140     }
141 
DeleteCloudTableRecordByGid(int64_t begin,int64_t count,std::shared_ptr<VirtualCloudDb> & virtualCloudDb)142     void CloudDBSyncUtilsTest::DeleteCloudTableRecordByGid(int64_t begin, int64_t count,
143         std::shared_ptr<VirtualCloudDb> &virtualCloudDb)
144     {
145         for (int64_t i = begin; i < begin + count; ++i) {
146             VBucket data;
147             data.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i));
148             ASSERT_EQ(virtualCloudDb->DeleteByGid(TABLE_NAME, data), DBStatus::OK);
149         }
150         LOGD("delete cloud record worker[primary key]:[cloud%" PRId64 " - cloud%" PRId64")", begin, count);
151         std::this_thread::sleep_for(std::chrono::milliseconds(count));
152     }
153 
DeleteUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count)154     void CloudDBSyncUtilsTest::DeleteUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count)
155     {
156         for (size_t i = 0; i < g_tables.size(); i++) {
157             for (int64_t j = begin; j < begin + count; j++) {
158                 string sql = "Delete from " + g_tables[i] + " where name = 'Cloud" + std::to_string(j) + "';";
159                 ASSERT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK);
160             }
161         }
162     }
163 
GetCallback(SyncProcess & syncProcess,CloudSyncStatusCallback & callback,std::vector<SyncProcess> & expectProcess)164     void CloudDBSyncUtilsTest::GetCallback(SyncProcess &syncProcess, CloudSyncStatusCallback &callback,
165         std::vector<SyncProcess> &expectProcess)
166     {
167         g_syncIndex = 0;
168         callback = [&syncProcess, &expectProcess](const std::map<std::string, SyncProcess> &process) {
169             LOGI("devices size = %d", process.size());
170             ASSERT_EQ(process.size(), 1u);
171             syncProcess = std::move(process.begin()->second);
172             ASSERT_EQ(process.begin()->first, DEVICE_CLOUD);
173             ASSERT_NE(syncProcess.tableProcess.empty(), true);
174             LOGI("current sync process status:%d, db status:%d ", syncProcess.process, syncProcess.errCode);
175             std::for_each(g_tables.begin(), g_tables.end(), [&](const auto &item) {
176                 auto table1 = syncProcess.tableProcess.find(item);
177                 if (table1 != syncProcess.tableProcess.end()) {
178                     LOGI("table[%s], table process status:%d, [downloadInfo](batchIndex:%u, total:%u, successCount:%u, "
179                          "failCount:%u) [uploadInfo](batchIndex:%u, total:%u, successCount:%u,failCount:%u",
180                          item.c_str(), table1->second.process, table1->second.downLoadInfo.batchIndex,
181                          table1->second.downLoadInfo.total, table1->second.downLoadInfo.successCount,
182                          table1->second.downLoadInfo.failCount, table1->second.upLoadInfo.batchIndex,
183                          table1->second.upLoadInfo.total, table1->second.upLoadInfo.successCount,
184                          table1->second.upLoadInfo.failCount);
185                 }
186             });
187             if (expectProcess.empty()) {
188                 if (syncProcess.process == FINISHED) {
189                     g_processCondition.notify_one();
190                 }
191                 return;
192             }
193             ASSERT_LE(static_cast<size_t>(g_syncIndex), expectProcess.size());
194             for (size_t i = 0; i < g_tables.size() && static_cast<size_t>(g_syncIndex) < expectProcess.size(); ++i) {
195                 SyncProcess head = expectProcess[g_syncIndex];
196                 for (auto &expect : head.tableProcess) {
197                     auto real = syncProcess.tableProcess.find(expect.first);
198                     ASSERT_NE(real, syncProcess.tableProcess.end());
199                     EXPECT_EQ(expect.second.process, real->second.process);
200                     EXPECT_EQ(expect.second.downLoadInfo.batchIndex, real->second.downLoadInfo.batchIndex);
201                     EXPECT_EQ(expect.second.downLoadInfo.total, real->second.downLoadInfo.total);
202                     EXPECT_EQ(expect.second.downLoadInfo.successCount, real->second.downLoadInfo.successCount);
203                     EXPECT_EQ(expect.second.downLoadInfo.failCount, real->second.downLoadInfo.failCount);
204                     EXPECT_EQ(expect.second.upLoadInfo.batchIndex, real->second.upLoadInfo.batchIndex);
205                     EXPECT_EQ(expect.second.upLoadInfo.total, real->second.upLoadInfo.total);
206                     EXPECT_EQ(expect.second.upLoadInfo.successCount, real->second.upLoadInfo.successCount);
207                     EXPECT_EQ(expect.second.upLoadInfo.failCount, real->second.upLoadInfo.failCount);
208                 }
209             }
210             g_syncIndex++;
211             if (syncProcess.process == FINISHED) {
212                 g_processCondition.notify_one();
213             }
214         };
215     }
216 
CheckCloudTotalCount(std::vector<int64_t> expectCounts,const std::shared_ptr<VirtualCloudDb> & virtualCloudDb)217     void CloudDBSyncUtilsTest::CheckCloudTotalCount(std::vector<int64_t> expectCounts,
218         const std::shared_ptr<VirtualCloudDb> &virtualCloudDb)
219     {
220         VBucket extend;
221         extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
222         for (size_t i = 0; i < g_tables.size(); ++i) {
223             int64_t realCount = 0;
224             std::vector<VBucket> data;
225             virtualCloudDb->Query(g_tables[i], extend, data);
226             for (size_t j = 0; j < data.size(); ++j) {
227                 auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
228                 if (entry != data[j].end() && std::get<bool>(entry->second)) {
229                     continue;
230                 }
231                 realCount++;
232             }
233             EXPECT_EQ(realCount, expectCounts[i]); // ExpectCount represents the total amount of cloud data.
234         }
235     }
236 
WaitForSyncFinish(SyncProcess & syncProcess,const int64_t & waitTime)237     void CloudDBSyncUtilsTest::WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime)
238     {
239         std::unique_lock<std::mutex> lock(g_processMutex);
240         bool result = g_processCondition.wait_for(lock, std::chrono::seconds(waitTime), [&syncProcess]() {
241             return syncProcess.process == FINISHED;
242         });
243         ASSERT_EQ(result, true);
244         LOGD("-------------------sync end--------------");
245     }
246 
callSync(const std::vector<std::string> & tableNames,SyncMode mode,DBStatus dbStatus,RelationalStoreDelegate * & delegate)247     void CloudDBSyncUtilsTest::callSync(const std::vector<std::string> &tableNames, SyncMode mode, DBStatus dbStatus,
248         RelationalStoreDelegate *&delegate)
249     {
250         g_syncProcess = {};
251         Query query = Query::Select().FromTable(tableNames);
252         std::vector<SyncProcess> expectProcess;
253         CloudSyncStatusCallback callback;
254         GetCallback(g_syncProcess, callback, expectProcess);
255         ASSERT_EQ(delegate->Sync({DEVICE_CLOUD}, mode, query, callback, SYNC_WAIT_TIME), dbStatus);
256         if (dbStatus == DBStatus::OK) {
257             WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME);
258         }
259     }
260 
CloseDb(RelationalStoreObserverUnitTest * & observer,std::shared_ptr<VirtualCloudDb> & virtualCloudDb,RelationalStoreDelegate * & delegate)261     void CloudDBSyncUtilsTest::CloseDb(RelationalStoreObserverUnitTest *&observer,
262         std::shared_ptr<VirtualCloudDb> &virtualCloudDb, RelationalStoreDelegate *&delegate)
263     {
264         delete observer;
265         virtualCloudDb = nullptr;
266         if (delegate != nullptr) {
267             EXPECT_EQ(g_mgr.CloseStore(delegate), DBStatus::OK);
268             delegate = nullptr;
269         }
270     }
271 
QueryCountCallback(void * data,int count,char ** colValue,char ** colName)272     int CloudDBSyncUtilsTest::QueryCountCallback(void *data, int count, char **colValue, char **colName)
273     {
274         if (count != 1) {
275             return 0;
276         }
277         auto expectCount = reinterpret_cast<int64_t>(data);
278         EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal
279         return 0;
280     }
281 
CheckDownloadResult(sqlite3 * & db,std::vector<int64_t> expectCounts,const std::string & keyStr)282     void CloudDBSyncUtilsTest::CheckDownloadResult(sqlite3 *&db, std::vector<int64_t> expectCounts,
283         const std::string &keyStr)
284     {
285         for (size_t i = 0; i < g_tables.size(); ++i) {
286             string queryDownload = "select count(*) from " + g_tables[i] + " where name " +
287                 " like '" + keyStr + "%'";
288             EXPECT_EQ(sqlite3_exec(db, queryDownload.c_str(), QueryCountCallback,
289                 reinterpret_cast<void *>(expectCounts[i]), nullptr), SQLITE_OK);
290         }
291     }
292 
CheckLocalRecordNum(sqlite3 * & db,const std::string & tableName,int count)293     void CloudDBSyncUtilsTest::CheckLocalRecordNum(sqlite3 *&db, const std::string &tableName, int count)
294     {
295         std::string sql = "select count(*) from " + tableName + ";";
296         EXPECT_EQ(sqlite3_exec(db, sql.c_str(), QueryCountCallback,
297             reinterpret_cast<void *>(count), nullptr), SQLITE_OK);
298     }
299 
GetCloudDbSchema(const std::string & tableName,const std::vector<Field> & cloudField,DataBaseSchema & dataBaseSchema)300     void CloudDBSyncUtilsTest::GetCloudDbSchema(const std::string &tableName, const std::vector<Field> &cloudField,
301         DataBaseSchema &dataBaseSchema)
302     {
303         TableSchema tableSchema = {
304             .name = tableName,
305             .sharedTableName = tableName + "_shared",
306             .fields = cloudField
307         };
308         dataBaseSchema.tables.push_back(tableSchema);
309     }
310 
InitStoreProp(const std::string & storePath,const std::string & appId,const std::string & userId,const std::string & storeId,RelationalDBProperties & properties)311     void CloudDBSyncUtilsTest::InitStoreProp(const std::string &storePath, const std::string &appId,
312         const std::string &userId, const std::string &storeId, RelationalDBProperties &properties)
313     {
314         properties.SetStringProp(RelationalDBProperties::DATA_DIR, storePath);
315         properties.SetStringProp(RelationalDBProperties::APP_ID, appId);
316         properties.SetStringProp(RelationalDBProperties::USER_ID, userId);
317         properties.SetStringProp(RelationalDBProperties::STORE_ID, storeId);
318         std::string identifier = userId + "-" + appId + "-" + storeId;
319         std::string hashIdentifier = DBCommon::TransferHashString(identifier);
320         properties.SetStringProp(RelationalDBProperties::IDENTIFIER_DATA, hashIdentifier);
321     }
322 
CheckCount(sqlite3 * db,const std::string & sql,int64_t count)323     void CloudDBSyncUtilsTest::CheckCount(sqlite3 *db, const std::string &sql, int64_t count)
324     {
325         EXPECT_EQ(sqlite3_exec(db, sql.c_str(), QueryCountCallback, reinterpret_cast<void *>(count), nullptr),
326             SQLITE_OK);
327     }
328 
GetHashKey(const std::string & tableName,const std::string & condition,sqlite3 * db,std::vector<std::vector<uint8_t>> & hashKey)329     void CloudDBSyncUtilsTest::GetHashKey(const std::string &tableName, const std::string &condition, sqlite3 *db,
330         std::vector<std::vector<uint8_t>> &hashKey)
331     {
332         sqlite3_stmt *stmt = nullptr;
333         std::string sql = "select hash_key from " + DBCommon::GetLogTableName(tableName) + " where " + condition;
334         EXPECT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
335         while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
336             std::vector<uint8_t> blob;
337             EXPECT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blob), E_OK);
338             hashKey.push_back(blob);
339         }
340         int errCode;
341         SQLiteUtils::ResetStatement(stmt, true, errCode);
342     }
343 }
344