1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #include "cloud_db_sync_utils_test.h" 16 17 using namespace DistributedDB; 18 using namespace DistributedDBUnitTest; 19 using namespace std; 20 21 namespace DistributedDB { 22 string g_storeID = "Relational_Store_SYNC"; 23 const string TABLE_NAME = "worker"; 24 const string DEVICE_CLOUD = "cloud_dev"; 25 const int64_t SYNC_WAIT_TIME = 60; 26 int g_syncIndex = 0; 27 string g_storePath = ""; 28 std::mutex g_processMutex; 29 std::condition_variable g_processCondition; 30 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID); 31 SyncProcess g_syncProcess; 32 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>; 33 const std::vector<std::string> g_tables = {TABLE_NAME}; 34 const Asset g_cloudAsset1 = { 35 .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync", 36 .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC" 37 }; 38 const Asset g_cloudAsset2 = { 39 .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync", 40 .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "UPDATE" 41 }; 42 SetStorePath(const std::string & path)43 void CloudDBSyncUtilsTest::SetStorePath(const std::string &path) 44 { 45 g_storePath = path; 46 } 47 InitSyncUtils(const std::vector<Field> & cloudField,RelationalStoreObserverUnitTest * & observer,std::shared_ptr<VirtualCloudDb> & virtualCloudDb,std::shared_ptr<VirtualAssetLoader> & virtualAssetLoader,RelationalStoreDelegate * & delegate)48 void CloudDBSyncUtilsTest::InitSyncUtils(const std::vector<Field> &cloudField, 49 RelationalStoreObserverUnitTest *&observer, std::shared_ptr<VirtualCloudDb> &virtualCloudDb, 50 std::shared_ptr<VirtualAssetLoader> &virtualAssetLoader, RelationalStoreDelegate *&delegate) 51 { 52 observer = new (std::nothrow) RelationalStoreObserverUnitTest(); 53 ASSERT_NE(observer, nullptr); 54 ASSERT_EQ(g_mgr.OpenStore(g_storePath, g_storeID, RelationalStoreDelegate::Option { .observer = observer }, 55 delegate), DBStatus::OK); 56 ASSERT_NE(delegate, nullptr); 57 ASSERT_EQ(delegate->CreateDistributedTable(TABLE_NAME, CLOUD_COOPERATION), DBStatus::OK); 58 virtualCloudDb = make_shared<VirtualCloudDb>(); 59 virtualAssetLoader = make_shared<VirtualAssetLoader>(); 60 g_syncProcess = {}; 61 ASSERT_EQ(delegate->SetCloudDB(virtualCloudDb), DBStatus::OK); 62 ASSERT_EQ(delegate->SetIAssetLoader(virtualAssetLoader), DBStatus::OK); 63 // sync before setting cloud db schema,it should return SCHEMA_MISMATCH 64 Query query = Query::Select().FromTable(g_tables); 65 CloudSyncStatusCallback callback; 66 ASSERT_EQ(delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, SYNC_WAIT_TIME), 67 DBStatus::SCHEMA_MISMATCH); 68 DataBaseSchema dataBaseSchema; 69 GetCloudDbSchema(TABLE_NAME, cloudField, dataBaseSchema); 70 ASSERT_EQ(delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK); 71 } 72 CreateUserDBAndTable(sqlite3 * & db,std::string sql)73 void CloudDBSyncUtilsTest::CreateUserDBAndTable(sqlite3 *&db, std::string sql) 74 { 75 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK); 76 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK); 77 } 78 InsertCloudTableRecord(int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull,std::shared_ptr<VirtualCloudDb> & virtualCloudDb)79 void CloudDBSyncUtilsTest::InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull, 80 std::shared_ptr<VirtualCloudDb> &virtualCloudDb) 81 { 82 std::vector<uint8_t> photo(photoSize, 'v'); 83 std::vector<VBucket> record; 84 std::vector<VBucket> extend; 85 Timestamp now = TimeHelper::GetSysCurrentTime(); 86 for (int64_t i = begin; i < begin + count; ++i) { 87 VBucket data; 88 data.insert_or_assign("name", "Cloud" + std::to_string(i)); 89 data.insert_or_assign("height", 166.0); // 166.0 is random double value 90 data.insert_or_assign("married", false); 91 data.insert_or_assign("photo", photo); 92 data.insert_or_assign("age", 13L); // 13 is random int64_t value 93 Asset asset = g_cloudAsset1; 94 asset.name = asset.name + std::to_string(i); 95 assetIsNull ? data.insert_or_assign("asset", Nil()) : data.insert_or_assign("asset", asset); 96 record.push_back(data); 97 VBucket log; 98 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, 99 static_cast<int64_t>(now / CloudDbConstant::TEN_THOUSAND + i)); 100 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, 101 static_cast<int64_t>(now / CloudDbConstant::TEN_THOUSAND + i)); 102 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false); 103 extend.push_back(log); 104 } 105 ASSERT_EQ(virtualCloudDb->BatchInsert(TABLE_NAME, std::move(record), extend), DBStatus::OK); 106 LOGD("insert cloud record worker[primary key]:[cloud%" PRId64 " - cloud%" PRId64")", begin, count); 107 std::this_thread::sleep_for(std::chrono::milliseconds(count)); 108 } 109 UpdateCloudTableRecord(int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull,std::shared_ptr<VirtualCloudDb> & virtualCloudDb)110 void CloudDBSyncUtilsTest::UpdateCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull, 111 std::shared_ptr<VirtualCloudDb> &virtualCloudDb) 112 { 113 std::vector<uint8_t> photo(photoSize, 'v'); 114 std::vector<VBucket> record; 115 std::vector<VBucket> extend; 116 Timestamp now = TimeHelper::GetSysCurrentTime(); 117 for (int64_t i = begin; i < begin + count; ++i) { 118 VBucket data; 119 data.insert_or_assign("name", "Cloud" + std::to_string(i)); 120 data.insert_or_assign("height", 188.0); // 188.0 is random double value 121 data.insert_or_assign("married", false); 122 data.insert_or_assign("photo", photo); 123 data.insert_or_assign("age", 13L); // 13 is random int64_t value 124 Asset asset = g_cloudAsset2; 125 asset.name = asset.name + std::to_string(i); 126 assetIsNull ? data.insert_or_assign("asset", Nil()) : data.insert_or_assign("asset", asset); 127 record.push_back(data); 128 VBucket log; 129 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, 130 static_cast<int64_t>(now / CloudDbConstant::TEN_THOUSAND + i)); 131 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, 132 static_cast<int64_t>(now / CloudDbConstant::TEN_THOUSAND + i)); 133 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false); 134 log.insert_or_assign(CloudDbConstant::GID_FIELD, to_string(i)); 135 extend.push_back(log); 136 } 137 ASSERT_EQ(virtualCloudDb->BatchUpdate(TABLE_NAME, std::move(record), extend), DBStatus::OK); 138 LOGD("update cloud record worker[primary key]:[cloud%" PRId64 " - cloud%" PRId64")", begin, count); 139 std::this_thread::sleep_for(std::chrono::milliseconds(count)); 140 } 141 DeleteCloudTableRecordByGid(int64_t begin,int64_t count,std::shared_ptr<VirtualCloudDb> & virtualCloudDb)142 void CloudDBSyncUtilsTest::DeleteCloudTableRecordByGid(int64_t begin, int64_t count, 143 std::shared_ptr<VirtualCloudDb> &virtualCloudDb) 144 { 145 for (int64_t i = begin; i < begin + count; ++i) { 146 VBucket data; 147 data.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i)); 148 ASSERT_EQ(virtualCloudDb->DeleteByGid(TABLE_NAME, data), DBStatus::OK); 149 } 150 LOGD("delete cloud record worker[primary key]:[cloud%" PRId64 " - cloud%" PRId64")", begin, count); 151 std::this_thread::sleep_for(std::chrono::milliseconds(count)); 152 } 153 DeleteUserTableRecord(sqlite3 * & db,int64_t begin,int64_t count)154 void CloudDBSyncUtilsTest::DeleteUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count) 155 { 156 for (size_t i = 0; i < g_tables.size(); i++) { 157 for (int64_t j = begin; j < begin + count; j++) { 158 string sql = "Delete from " + g_tables[i] + " where name = 'Cloud" + std::to_string(j) + "';"; 159 ASSERT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK); 160 } 161 } 162 } 163 GetCallback(SyncProcess & syncProcess,CloudSyncStatusCallback & callback,std::vector<SyncProcess> & expectProcess)164 void CloudDBSyncUtilsTest::GetCallback(SyncProcess &syncProcess, CloudSyncStatusCallback &callback, 165 std::vector<SyncProcess> &expectProcess) 166 { 167 g_syncIndex = 0; 168 callback = [&syncProcess, &expectProcess](const std::map<std::string, SyncProcess> &process) { 169 LOGI("devices size = %d", process.size()); 170 ASSERT_EQ(process.size(), 1u); 171 syncProcess = std::move(process.begin()->second); 172 ASSERT_EQ(process.begin()->first, DEVICE_CLOUD); 173 ASSERT_NE(syncProcess.tableProcess.empty(), true); 174 LOGI("current sync process status:%d, db status:%d ", syncProcess.process, syncProcess.errCode); 175 std::for_each(g_tables.begin(), g_tables.end(), [&](const auto &item) { 176 auto table1 = syncProcess.tableProcess.find(item); 177 if (table1 != syncProcess.tableProcess.end()) { 178 LOGI("table[%s], table process status:%d, [downloadInfo](batchIndex:%u, total:%u, successCount:%u, " 179 "failCount:%u) [uploadInfo](batchIndex:%u, total:%u, successCount:%u,failCount:%u", 180 item.c_str(), table1->second.process, table1->second.downLoadInfo.batchIndex, 181 table1->second.downLoadInfo.total, table1->second.downLoadInfo.successCount, 182 table1->second.downLoadInfo.failCount, table1->second.upLoadInfo.batchIndex, 183 table1->second.upLoadInfo.total, table1->second.upLoadInfo.successCount, 184 table1->second.upLoadInfo.failCount); 185 } 186 }); 187 if (expectProcess.empty()) { 188 if (syncProcess.process == FINISHED) { 189 g_processCondition.notify_one(); 190 } 191 return; 192 } 193 ASSERT_LE(static_cast<size_t>(g_syncIndex), expectProcess.size()); 194 for (size_t i = 0; i < g_tables.size() && static_cast<size_t>(g_syncIndex) < expectProcess.size(); ++i) { 195 SyncProcess head = expectProcess[g_syncIndex]; 196 for (auto &expect : head.tableProcess) { 197 auto real = syncProcess.tableProcess.find(expect.first); 198 ASSERT_NE(real, syncProcess.tableProcess.end()); 199 EXPECT_EQ(expect.second.process, real->second.process); 200 EXPECT_EQ(expect.second.downLoadInfo.batchIndex, real->second.downLoadInfo.batchIndex); 201 EXPECT_EQ(expect.second.downLoadInfo.total, real->second.downLoadInfo.total); 202 EXPECT_EQ(expect.second.downLoadInfo.successCount, real->second.downLoadInfo.successCount); 203 EXPECT_EQ(expect.second.downLoadInfo.failCount, real->second.downLoadInfo.failCount); 204 EXPECT_EQ(expect.second.upLoadInfo.batchIndex, real->second.upLoadInfo.batchIndex); 205 EXPECT_EQ(expect.second.upLoadInfo.total, real->second.upLoadInfo.total); 206 EXPECT_EQ(expect.second.upLoadInfo.successCount, real->second.upLoadInfo.successCount); 207 EXPECT_EQ(expect.second.upLoadInfo.failCount, real->second.upLoadInfo.failCount); 208 } 209 } 210 g_syncIndex++; 211 if (syncProcess.process == FINISHED) { 212 g_processCondition.notify_one(); 213 } 214 }; 215 } 216 CheckCloudTotalCount(std::vector<int64_t> expectCounts,const std::shared_ptr<VirtualCloudDb> & virtualCloudDb)217 void CloudDBSyncUtilsTest::CheckCloudTotalCount(std::vector<int64_t> expectCounts, 218 const std::shared_ptr<VirtualCloudDb> &virtualCloudDb) 219 { 220 VBucket extend; 221 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0); 222 for (size_t i = 0; i < g_tables.size(); ++i) { 223 int64_t realCount = 0; 224 std::vector<VBucket> data; 225 virtualCloudDb->Query(g_tables[i], extend, data); 226 for (size_t j = 0; j < data.size(); ++j) { 227 auto entry = data[j].find(CloudDbConstant::DELETE_FIELD); 228 if (entry != data[j].end() && std::get<bool>(entry->second)) { 229 continue; 230 } 231 realCount++; 232 } 233 EXPECT_EQ(realCount, expectCounts[i]); // ExpectCount represents the total amount of cloud data. 234 } 235 } 236 WaitForSyncFinish(SyncProcess & syncProcess,const int64_t & waitTime)237 void CloudDBSyncUtilsTest::WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime) 238 { 239 std::unique_lock<std::mutex> lock(g_processMutex); 240 bool result = g_processCondition.wait_for(lock, std::chrono::seconds(waitTime), [&syncProcess]() { 241 return syncProcess.process == FINISHED; 242 }); 243 ASSERT_EQ(result, true); 244 LOGD("-------------------sync end--------------"); 245 } 246 callSync(const std::vector<std::string> & tableNames,SyncMode mode,DBStatus dbStatus,RelationalStoreDelegate * & delegate)247 void CloudDBSyncUtilsTest::callSync(const std::vector<std::string> &tableNames, SyncMode mode, DBStatus dbStatus, 248 RelationalStoreDelegate *&delegate) 249 { 250 g_syncProcess = {}; 251 Query query = Query::Select().FromTable(tableNames); 252 std::vector<SyncProcess> expectProcess; 253 CloudSyncStatusCallback callback; 254 GetCallback(g_syncProcess, callback, expectProcess); 255 ASSERT_EQ(delegate->Sync({DEVICE_CLOUD}, mode, query, callback, SYNC_WAIT_TIME), dbStatus); 256 if (dbStatus == DBStatus::OK) { 257 WaitForSyncFinish(g_syncProcess, SYNC_WAIT_TIME); 258 } 259 } 260 CloseDb(RelationalStoreObserverUnitTest * & observer,std::shared_ptr<VirtualCloudDb> & virtualCloudDb,RelationalStoreDelegate * & delegate)261 void CloudDBSyncUtilsTest::CloseDb(RelationalStoreObserverUnitTest *&observer, 262 std::shared_ptr<VirtualCloudDb> &virtualCloudDb, RelationalStoreDelegate *&delegate) 263 { 264 delete observer; 265 virtualCloudDb = nullptr; 266 if (delegate != nullptr) { 267 EXPECT_EQ(g_mgr.CloseStore(delegate), DBStatus::OK); 268 delegate = nullptr; 269 } 270 } 271 QueryCountCallback(void * data,int count,char ** colValue,char ** colName)272 int CloudDBSyncUtilsTest::QueryCountCallback(void *data, int count, char **colValue, char **colName) 273 { 274 if (count != 1) { 275 return 0; 276 } 277 auto expectCount = reinterpret_cast<int64_t>(data); 278 EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal 279 return 0; 280 } 281 CheckDownloadResult(sqlite3 * & db,std::vector<int64_t> expectCounts,const std::string & keyStr)282 void CloudDBSyncUtilsTest::CheckDownloadResult(sqlite3 *&db, std::vector<int64_t> expectCounts, 283 const std::string &keyStr) 284 { 285 for (size_t i = 0; i < g_tables.size(); ++i) { 286 string queryDownload = "select count(*) from " + g_tables[i] + " where name " + 287 " like '" + keyStr + "%'"; 288 EXPECT_EQ(sqlite3_exec(db, queryDownload.c_str(), QueryCountCallback, 289 reinterpret_cast<void *>(expectCounts[i]), nullptr), SQLITE_OK); 290 } 291 } 292 CheckLocalRecordNum(sqlite3 * & db,const std::string & tableName,int count)293 void CloudDBSyncUtilsTest::CheckLocalRecordNum(sqlite3 *&db, const std::string &tableName, int count) 294 { 295 std::string sql = "select count(*) from " + tableName + ";"; 296 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), QueryCountCallback, 297 reinterpret_cast<void *>(count), nullptr), SQLITE_OK); 298 } 299 GetCloudDbSchema(const std::string & tableName,const std::vector<Field> & cloudField,DataBaseSchema & dataBaseSchema)300 void CloudDBSyncUtilsTest::GetCloudDbSchema(const std::string &tableName, const std::vector<Field> &cloudField, 301 DataBaseSchema &dataBaseSchema) 302 { 303 TableSchema tableSchema = { 304 .name = tableName, 305 .sharedTableName = tableName + "_shared", 306 .fields = cloudField 307 }; 308 dataBaseSchema.tables.push_back(tableSchema); 309 } 310 InitStoreProp(const std::string & storePath,const std::string & appId,const std::string & userId,const std::string & storeId,RelationalDBProperties & properties)311 void CloudDBSyncUtilsTest::InitStoreProp(const std::string &storePath, const std::string &appId, 312 const std::string &userId, const std::string &storeId, RelationalDBProperties &properties) 313 { 314 properties.SetStringProp(RelationalDBProperties::DATA_DIR, storePath); 315 properties.SetStringProp(RelationalDBProperties::APP_ID, appId); 316 properties.SetStringProp(RelationalDBProperties::USER_ID, userId); 317 properties.SetStringProp(RelationalDBProperties::STORE_ID, storeId); 318 std::string identifier = userId + "-" + appId + "-" + storeId; 319 std::string hashIdentifier = DBCommon::TransferHashString(identifier); 320 properties.SetStringProp(RelationalDBProperties::IDENTIFIER_DATA, hashIdentifier); 321 } 322 CheckCount(sqlite3 * db,const std::string & sql,int64_t count)323 void CloudDBSyncUtilsTest::CheckCount(sqlite3 *db, const std::string &sql, int64_t count) 324 { 325 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), QueryCountCallback, reinterpret_cast<void *>(count), nullptr), 326 SQLITE_OK); 327 } 328 GetHashKey(const std::string & tableName,const std::string & condition,sqlite3 * db,std::vector<std::vector<uint8_t>> & hashKey)329 void CloudDBSyncUtilsTest::GetHashKey(const std::string &tableName, const std::string &condition, sqlite3 *db, 330 std::vector<std::vector<uint8_t>> &hashKey) 331 { 332 sqlite3_stmt *stmt = nullptr; 333 std::string sql = "select hash_key from " + DBCommon::GetLogTableName(tableName) + " where " + condition; 334 EXPECT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK); 335 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { 336 std::vector<uint8_t> blob; 337 EXPECT_EQ(SQLiteUtils::GetColumnBlobValue(stmt, 0, blob), E_OK); 338 hashKey.push_back(blob); 339 } 340 int errCode; 341 SQLiteUtils::ResetStatement(stmt, true, errCode); 342 } 343 } 344