1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_db_types.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "log_print.h"
22 #include "relational_store_delegate.h"
23 #include "relational_store_manager.h"
24 #include "runtime_config.h"
25 #include "time_helper.h"
26 #include "virtual_asset_loader.h"
27 #include "virtual_cloud_data_translate.h"
28 #include "virtual_cloud_db.h"
29 #include "virtual_communicator_aggregator.h"
30 #include "sqlite_relational_utils.h"
31 #include "cloud/cloud_storage_utils.h"
32 
33 namespace {
34 using namespace testing::ext;
35 using namespace DistributedDB;
36 using namespace DistributedDBUnitTest;
37 const char *g_createSQL =
38     "CREATE TABLE IF NOT EXISTS DistributedDBCloudAssetsOperationSyncTest(" \
39     "id TEXT PRIMARY KEY," \
40     "name TEXT," \
41     "height REAL ," \
42     "photo BLOB," \
43     "asset ASSET," \
44     "assets ASSETS," \
45     "age INT);";
46 const int64_t g_syncWaitTime = 60;
47 const int g_assetsNum = 3;
48 const Asset g_localAsset = {
49     .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
50     .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
51 };
52 SyncProcess lastProcess_;
53 
CreateUserDBAndTable(sqlite3 * & db)54 void CreateUserDBAndTable(sqlite3 *&db)
55 {
56     EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
57     EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createSQL), SQLITE_OK);
58 }
59 
BlockSync(const Query & query,RelationalStoreDelegate * delegate,SyncMode syncMode=SYNC_MODE_CLOUD_MERGE)60 void BlockSync(const Query &query, RelationalStoreDelegate *delegate, SyncMode syncMode = SYNC_MODE_CLOUD_MERGE)
61 {
62     std::mutex dataMutex;
63     std::condition_variable cv;
64     bool finish = false;
65     SyncProcess last;
66     auto callback = [&last, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
67         for (const auto &item: process) {
68             if (item.second.process == DistributedDB::FINISHED) {
69                 {
70                     std::lock_guard<std::mutex> autoLock(dataMutex);
71                     finish = true;
72                 }
73                 last = item.second;
74                 cv.notify_one();
75             }
76         }
77     };
78     LOGW("begin call sync");
79     ASSERT_EQ(delegate->Sync({ "CLOUD" }, syncMode, query, callback, g_syncWaitTime), OK);
80     std::unique_lock<std::mutex> uniqueLock(dataMutex);
81     cv.wait(uniqueLock, [&finish]() {
82         return finish;
83     });
84     lastProcess_ = last;
85     LOGW("end call sync");
86 }
87 
88 class DistributedDBCloudAssetsOperationSyncTest : public testing::Test {
89 public:
90     static void SetUpTestCase();
91     static void TearDownTestCase();
92     void SetUp() override;
93     void TearDown() override;
94     void WriteDataWithoutCommitTransaction();
95 protected:
96     void InitTestDir();
97     DataBaseSchema GetSchema();
98     void CloseDb();
99     void InsertUserTableRecord(const std::string &tableName, int64_t begin, int64_t count, size_t assetCount = 2u,
100         const Assets &templateAsset = {});
101     void CheckAssetsCount(const std::vector<size_t> &expectCount, bool checkAsset = false);
102     void UpdateCloudTableRecord(int64_t begin, int64_t count, bool assetIsNull);
103     void ForkDownloadAndRemoveAsset(DBStatus removeStatus, int &downLoadCount, int &removeCount);
104     void InsertLocalAssetData(const std::string &assetHash);
105     void InsertCloudAssetData(const std::string &assetHash);
106     void PrepareForAssetOperation010();
107     std::vector<Asset> GetAssets(const std::string &baseName, const Assets &templateAsset, size_t assetCount);
108     std::string testDir_;
109     std::string storePath_;
110     sqlite3 *db_ = nullptr;
111     RelationalStoreDelegate *delegate_ = nullptr;
112     std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
113     std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
114     std::shared_ptr<VirtualCloudDataTranslate> virtualTranslator_ = nullptr;
115     std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
116     std::string tableName_ = "DistributedDBCloudAssetsOperationSyncTest";
117     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
118     TrackerSchema trackerSchema = {
119         .tableName = tableName_, .extendColName = "name", .trackerColNames = {"age"}
120     };
121 };
122 
SetUpTestCase()123 void DistributedDBCloudAssetsOperationSyncTest::SetUpTestCase()
124 {
125     RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
126 }
127 
TearDownTestCase()128 void DistributedDBCloudAssetsOperationSyncTest::TearDownTestCase()
129 {}
130 
SetUp()131 void DistributedDBCloudAssetsOperationSyncTest::SetUp()
132 {
133     DistributedDBToolsUnitTest::PrintTestCaseInfo();
134     InitTestDir();
135     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
136         LOGE("rm test db files error.");
137     }
138     DistributedDBToolsUnitTest::PrintTestCaseInfo();
139     LOGD("Test dir is %s", testDir_.c_str());
140     db_ = RelationalTestUtils::CreateDataBase(storePath_);
141     ASSERT_NE(db_, nullptr);
142     CreateUserDBAndTable(db_);
143     mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
144     RelationalStoreDelegate::Option option;
145     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
146     ASSERT_NE(delegate_, nullptr);
147     ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
148     ASSERT_EQ(delegate_->SetTrackerTable(trackerSchema), DBStatus::OK);
149     virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
150     virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
151     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
152     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
153     virtualTranslator_ = std::make_shared<VirtualCloudDataTranslate>();
154     DataBaseSchema dataBaseSchema = GetSchema();
155     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
156     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
157     ASSERT_TRUE(communicatorAggregator_ != nullptr);
158     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
159 }
160 
TearDown()161 void DistributedDBCloudAssetsOperationSyncTest::TearDown()
162 {
163     CloseDb();
164     EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
165     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
166         LOGE("rm test db files error.");
167     }
168     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
169     communicatorAggregator_ = nullptr;
170     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
171 }
172 
InitTestDir()173 void DistributedDBCloudAssetsOperationSyncTest::InitTestDir()
174 {
175     if (!testDir_.empty()) {
176         return;
177     }
178     DistributedDBToolsUnitTest::TestDirInit(testDir_);
179     storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
180     LOGI("The test db is:%s", testDir_.c_str());
181 }
182 
GetSchema()183 DataBaseSchema DistributedDBCloudAssetsOperationSyncTest::GetSchema()
184 {
185     DataBaseSchema schema;
186     TableSchema tableSchema;
187     tableSchema.name = tableName_;
188     tableSchema.sharedTableName = tableName_ + "_shared";
189     tableSchema.fields = {
190         {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
191         {"photo", TYPE_INDEX<Bytes>}, {"asset", TYPE_INDEX<Asset>}, {"assets", TYPE_INDEX<Assets>},
192         {"age", TYPE_INDEX<int64_t>}
193     };
194     schema.tables.push_back(tableSchema);
195     return schema;
196 }
197 
CloseDb()198 void DistributedDBCloudAssetsOperationSyncTest::CloseDb()
199 {
200     virtualCloudDb_->ForkUpload(nullptr);
201     virtualCloudDb_ = nullptr;
202     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
203     delegate_ = nullptr;
204     mgr_ = nullptr;
205 }
206 
InsertUserTableRecord(const std::string & tableName,int64_t begin,int64_t count,size_t assetCount,const Assets & templateAsset)207 void DistributedDBCloudAssetsOperationSyncTest::InsertUserTableRecord(const std::string &tableName, int64_t begin,
208     int64_t count, size_t assetCount, const Assets &templateAsset)
209 {
210     std::string photo = "phone";
211     int errCode;
212     std::vector<uint8_t> assetBlob;
213     std::vector<uint8_t> assetsBlob;
214     const int64_t index2 = 2;
215     for (int64_t i = begin; i < begin + count; ++i) {
216         std::string name = g_localAsset.name + std::to_string(i);
217         Asset asset = g_localAsset;
218         asset.name = name;
219         RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
220         std::vector<Asset> assets = GetAssets(name, templateAsset, assetCount);
221         string sql = "INSERT OR REPLACE INTO " + tableName +
222             " (id, name, height, photo, asset, assets, age) VALUES ('" + std::to_string(i) +
223             "', 'local', '178.0', '" + photo + "', ?, ?, '18');";
224         sqlite3_stmt *stmt = nullptr;
225         ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
226         RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
227         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
228         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, index2, assetsBlob, false), E_OK);
229         EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
230         SQLiteUtils::ResetStatement(stmt, true, errCode);
231     }
232 }
233 
GetAssets(const std::string & baseName,const Assets & templateAsset,size_t assetCount)234 std::vector<Asset> DistributedDBCloudAssetsOperationSyncTest::GetAssets(const std::string &baseName,
235     const Assets &templateAsset, size_t assetCount)
236 {
237     std::vector<Asset> assets;
238     for (size_t i = 1; i <= assetCount; ++i) {
239         Asset asset;
240         if (i - 1 < templateAsset.size()) {
241             asset = templateAsset[i - 1];
242         } else {
243             asset = g_localAsset;
244             asset.name = baseName + "_" + std::to_string(i);
245             asset.status = static_cast<uint32_t>(AssetStatus::INSERT);
246         }
247         assets.push_back(asset);
248     }
249     return assets;
250 }
251 
UpdateCloudTableRecord(int64_t begin,int64_t count,bool assetIsNull)252 void DistributedDBCloudAssetsOperationSyncTest::UpdateCloudTableRecord(int64_t begin, int64_t count, bool assetIsNull)
253 {
254     std::vector<VBucket> record;
255     std::vector<VBucket> extend;
256     Timestamp now = TimeHelper::GetSysCurrentTime();
257     const int assetCount = 2;
258     for (int64_t i = begin; i < (begin + count); ++i) {
259         VBucket data;
260         data.insert_or_assign("id", std::to_string(i));
261         data.insert_or_assign("name", "Cloud" + std::to_string(i));
262         Assets assets;
263         for (int j = 1; j <= assetCount; ++j) {
264             Asset asset;
265             asset.name = "Phone_" + std::to_string(j);
266             asset.assetId = std::to_string(j);
267             asset.status = AssetStatus::UPDATE;
268             assets.push_back(asset);
269         }
270         assetIsNull ? data.insert_or_assign("assets", Nil()) : data.insert_or_assign("assets", assets);
271         record.push_back(data);
272         VBucket log;
273         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
274             now / CloudDbConstant::TEN_THOUSAND));
275         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
276             now / CloudDbConstant::TEN_THOUSAND));
277         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
278         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i));
279         extend.push_back(log);
280     }
281 
282     ASSERT_EQ(virtualCloudDb_->BatchUpdate(tableName_, std::move(record), extend), DBStatus::OK);
283 }
284 
CheckAssetsCount(const std::vector<size_t> & expectCount,bool checkAsset)285 void DistributedDBCloudAssetsOperationSyncTest::CheckAssetsCount(const std::vector<size_t> &expectCount,
286     bool checkAsset)
287 {
288     std::vector<VBucket> allData;
289     auto dbSchema = GetSchema();
290     ASSERT_GT(dbSchema.tables.size(), 0u);
291     ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
292     int index = 0;
293     ASSERT_EQ(allData.size(), expectCount.size());
294     for (const auto &data : allData) {
295         auto colIter = data.find("assets");
296         EXPECT_NE(colIter, data.end());
297         if (colIter == data.end()) {
298             index++;
299             continue;
300         }
301         Type colValue = data.at("assets");
302         auto translate = std::dynamic_pointer_cast<ICloudDataTranslate>(virtualTranslator_);
303         auto assets = RelationalTestUtils::GetAssets(colValue, translate);
304         size_t size = assets.size();
305         if (checkAsset) {
306             Type colValue1 = data.at("asset");
307             auto assets1 = RelationalTestUtils::GetAssets(colValue1, translate, true);
308             size += assets1.size();
309         }
310         LOGI("[DistributedDBCloudAssetsOperationSyncTest] Check data index %d", index);
311         EXPECT_EQ(static_cast<size_t>(size), expectCount[index]);
312         for (const auto &item : assets) {
313             LOGI("[DistributedDBCloudAssetsOperationSyncTest] Asset name %s status %" PRIu32, item.name.c_str(),
314                 item.status);
315         }
316         index++;
317     }
318 }
319 
ForkDownloadAndRemoveAsset(DBStatus removeStatus,int & downLoadCount,int & removeCount)320 void DistributedDBCloudAssetsOperationSyncTest::ForkDownloadAndRemoveAsset(DBStatus removeStatus, int &downLoadCount,
321     int &removeCount)
322 {
323     virtualAssetLoader_->ForkDownload([this, &downLoadCount](std::map<std::string, Assets> &assets) {
324         downLoadCount++;
325         if (downLoadCount == 1) {
326             std::string sql = "UPDATE " + tableName_ + " SET assets = NULL WHERE id = 0;";
327             ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
328         }
329     });
330     virtualAssetLoader_->ForkRemoveLocalAssets([removeStatus, &removeCount](const std::vector<Asset> &assets) {
331         EXPECT_EQ(assets.size(), 2u); // one record has 2 asset
332         removeCount++;
333         return removeStatus;
334     });
335 }
336 
337 /**
338  * @tc.name: SyncWithAssetOperation001
339  * @tc.desc: Delete Assets When Download
340  * @tc.type: FUNC
341  * @tc.require:
342  * @tc.author: zhangqiquan
343  */
344 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation001, TestSize.Level0)
345 {
346     const int actualCount = 10;
347     const int deleteDataCount = 5;
348     const int deleteAssetsCount = 4;
349     InsertUserTableRecord(tableName_, 0, actualCount);
350     std::string tableName = tableName_;
__anond35eef630602(const std::string &, VBucket &) 351     virtualCloudDb_->ForkUpload([this, deleteDataCount, deleteAssetsCount](const std::string &, VBucket &) {
352         for (int64_t i = 0; i < deleteDataCount; i++) {
353             std::string sql = "DELETE FROM " + tableName_ + " WHERE id = " + std::to_string(i) + ";";
354             ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
355         }
356         for (int64_t i = deleteDataCount; i < deleteDataCount + deleteAssetsCount; i++) {
357             std::string sql = "UPDATE " + tableName_ + " SET asset = NULL, assets = NULL WHERE id = " +
358                 std::to_string(i) + ";";
359             ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
360         }
361     });
362     Query query = Query::Select().FromTable({ tableName_ });
363     BlockSync(query, delegate_);
364     virtualCloudDb_->ForkUpload(nullptr);
365     std::vector<size_t> expectCount(actualCount - deleteDataCount, 0);
366     expectCount[expectCount.size() - 1] = 2; // default one row has 2 assets
367     CheckAssetsCount(expectCount);
368 }
369 
370 /**
371  * @tc.name: SyncWithAssetOperation002
372  * @tc.desc: Download Assets When local assets was removed
373  * @tc.type: FUNC
374  * @tc.require:
375  * @tc.author: zhangqiquan
376  */
377 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation002, TestSize.Level0)
378 {
379     const int actualCount = 1;
380     InsertUserTableRecord(tableName_, 0, actualCount);
381     Query query = Query::Select().FromTable({ tableName_ });
382     BlockSync(query, delegate_);
383     int downLoadCount = 0;
384     int removeCount = 0;
385     ForkDownloadAndRemoveAsset(OK, downLoadCount, removeCount);
386     UpdateCloudTableRecord(0, actualCount, false);
387     RelationalTestUtils::CloudBlockSync(query, delegate_);
388     EXPECT_EQ(downLoadCount, 1); // local asset was removed should download 1 times
389     EXPECT_EQ(removeCount, 1);
390     virtualAssetLoader_->ForkDownload(nullptr);
391     virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
392 
393     std::vector<size_t> expectCount = { 0 };
394     CheckAssetsCount(expectCount);
395 }
396 
397 /**
398  * @tc.name: SyncWithAssetOperation003
399  * @tc.desc: Delete Assets When Download
400  * @tc.type: FUNC
401  * @tc.require:
402  * @tc.author: bty
403  */
404 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation003, TestSize.Level0)
405 {
406     InsertUserTableRecord(tableName_, 0, 1); // 1 is count
407     int uploadCount = 0;
__anond35eef630702(const std::string &, VBucket &) 408     virtualCloudDb_->ForkUpload([this, &uploadCount](const std::string &, VBucket &) {
409         if (uploadCount > 0) {
410             return;
411         }
412         SqlCondition condition;
413         condition.sql = "UPDATE " + tableName_ + " SET age = '666' WHERE id = 0;";
414         std::vector<VBucket> records;
415         EXPECT_EQ(delegate_->ExecuteSql(condition, records), OK);
416         uploadCount++;
417     });
418     Query query = Query::Select().FromTable({ tableName_ });
419     BlockSync(query, delegate_);
420     virtualCloudDb_->ForkUpload(nullptr);
421 
422     std::string sql = "SELECT assets from " + tableName_ + " where id = 0;";
423     sqlite3_stmt *stmt = nullptr;
424     ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
425     while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
426         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
427         Type cloudValue;
428         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
429         std::vector<uint8_t> assetsBlob;
430         Assets assets;
431         ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
432         ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
433         ASSERT_EQ(assets.size(), 2u); // 2 is asset num
434         for (size_t i = 0; i < assets.size(); ++i) {
435             EXPECT_EQ(assets[i].status, AssetStatus::NORMAL);
436         }
437     }
438     int errCode;
439     SQLiteUtils::ResetStatement(stmt, true, errCode);
440 }
441 
442 /**
443  * @tc.name: SyncWithAssetOperation004
444  * @tc.desc: Download Assets When local assets was removed
445  * @tc.type: FUNC
446  * @tc.require:
447  * @tc.author: zhangqiquan
448  */
449 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation004, TestSize.Level0)
450 {
451     const int actualCount = 5; // 5 record
452     InsertUserTableRecord(tableName_, 0, actualCount);
453     Query query = Query::Select().FromTable({ tableName_ });
454     BlockSync(query, delegate_);
455     int downLoadCount = 0;
456     int removeCount = 0;
457     ForkDownloadAndRemoveAsset(DB_ERROR, downLoadCount, removeCount);
458     UpdateCloudTableRecord(0, actualCount, false);
459     RelationalTestUtils::CloudBlockSync(query, delegate_, DBStatus::OK, DBStatus::REMOTE_ASSETS_FAIL);
460     EXPECT_EQ(downLoadCount, 5); // local asset was removed should download 5 times
461     EXPECT_EQ(removeCount, 1);
462     virtualAssetLoader_->ForkDownload(nullptr);
463     virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
464 
465     std::vector<size_t> expectCount = { 0, 2, 2, 2, 2 };
466     CheckAssetsCount(expectCount);
467 }
468 
InsertLocalAssetData(const std::string & assetHash)469 void DistributedDBCloudAssetsOperationSyncTest::InsertLocalAssetData(const std::string &assetHash)
470 {
471     Assets assets;
472     std::string assetNameBegin = "Phone";
473     for (int j = 1; j <= g_assetsNum; ++j) {
474         Asset asset;
475         asset.name = assetNameBegin + "_" + std::to_string(j);
476         asset.status = AssetStatus::NORMAL;
477         asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
478         asset.hash = assetHash + "_" + std::to_string(j);
479         asset.assetId = std::to_string(j);
480         assets.push_back(asset);
481     }
482     string sql = "INSERT OR REPLACE INTO " + tableName_ + " (id,name,asset,assets) VALUES('0','CloudTest0',?,?);";
483     sqlite3_stmt *stmt = nullptr;
484     ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
485     std::vector<uint8_t> assetBlob;
486     std::vector<uint8_t> assetsBlob;
487     RuntimeContext::GetInstance()->AssetToBlob(g_localAsset, assetBlob);
488     RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
489     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
490     ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 2, assetsBlob, false), E_OK); // 2 is assetsBlob
491     EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
492     int errCode;
493     SQLiteUtils::ResetStatement(stmt, true, errCode);
494 }
495 
WriteDataWithoutCommitTransaction()496 void DistributedDBCloudAssetsOperationSyncTest::WriteDataWithoutCommitTransaction()
497 {
498     ASSERT_NE(db_, nullptr);
499     SQLiteUtils::BeginTransaction(db_);
500     InsertLocalAssetData("localAsset");
501     constexpr int kSleepDurationSeconds = 3;
502     std::this_thread::sleep_for(std::chrono::seconds(kSleepDurationSeconds));
503 }
504 
505 /**
506  * @tc.name: TestOpenDatabaseBusy001
507  * @tc.desc: Test open database when the database is busy.
508  * @tc.type: FUNC
509  * @tc.require:
510  * @tc.author: liufuchenxing
511  */
512 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, TestOpenDatabaseBusy001, TestSize.Level2)
513 {
514     /**
515      * @tc.steps:step1. close store.
516      * @tc.expected:step1. check ok.
517      */
518     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
519     delegate_ = nullptr;
520     /**
521      * @tc.steps:step2. Another thread write data into database into database without commit.
522      * @tc.expected:step2. check ok.
523      */
524     std::thread thread(&DistributedDBCloudAssetsOperationSyncTest::WriteDataWithoutCommitTransaction, this);
525     std::this_thread::sleep_for(std::chrono::seconds(1));
526     /**
527      * @tc.steps:step3. open relational delegate.
528      * @tc.expected:step3. open success.
529      */
530     RelationalStoreDelegate::Option option;
531     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
532     thread.join();
533 }
534 
535 /**
536  * @tc.name: SyncWithAssetOperation006
537  * @tc.desc: Remove Local Datas When local assets was empty
538  * @tc.type: FUNC
539  * @tc.require:
540  * @tc.author: lijun
541  */
542 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation006, TestSize.Level0)
543 {
544     const int actualCount = 5;
545     InsertUserTableRecord(tableName_, 0, actualCount);
546     Query query = Query::Select().FromTable({ tableName_ });
547     BlockSync(query, delegate_);
548 
549     UpdateCloudTableRecord(0, 2, true);
550     BlockSync(query, delegate_);
551 
552     int removeCount = 0;
__anond35eef630802(const std::vector<Asset> &assets) 553     virtualAssetLoader_->ForkRemoveLocalAssets([&removeCount](const std::vector<Asset> &assets) {
554         removeCount = assets.size();
555         return DBStatus::OK;
556     });
557     std::string device = "";
558     ASSERT_EQ(delegate_->RemoveDeviceData(device, FLAG_AND_DATA), DBStatus::OK);
559     ASSERT_EQ(9, removeCount);
560     virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
561 }
562 
563  /**
564  * @tc.name: SyncWithAssetOperation006
565  * @tc.desc: Test assetId fill when assetId changed
566  * @tc.type: FUNC
567  * @tc.require:
568  * @tc.author: wangxiangdong
569  */
570 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation007, TestSize.Level0)
571 {
572     /**
573      * @tc.steps:step1. Insert 5 records and sync.
574      * @tc.expected: step1. ok.
575      */
576     const int actualCount = 5;
577     std::string name = g_localAsset.name + std::to_string(0);
578     Assets expectAssets = GetAssets(name, {}, 3u); // contain 3 assets
579     expectAssets[0].hash.append("change"); // modify first asset
580     InsertUserTableRecord(tableName_, 0, actualCount, expectAssets.size(), expectAssets);
581     Query query = Query::Select().FromTable({ tableName_ });
582     BlockSync(query, delegate_);
583     /**
584      * @tc.steps:step2. modify data and sync.
585      * @tc.expected: step2. ok.
586      */
587     UpdateCloudTableRecord(0, 1, true);
588     BlockSync(query, delegate_);
589     /**
590      * @tc.steps:step3. check modified data cursor.
591      * @tc.expected: step3. ok.
592      */
593     std::string sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=1";
594     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
595         reinterpret_cast<void *>(7), nullptr), SQLITE_OK);
596     sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=5";
597     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
598         reinterpret_cast<void *>(5), nullptr), SQLITE_OK);
599 }
600 
601 /**
602  * @tc.name: SyncWithAssetOperation009
603  * @tc.desc: Test asset remove local and check db asset empty finally.
604  * @tc.type: FUNC
605  * @tc.require:
606  * @tc.author: wangxiangdong
607  */
608 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation009, TestSize.Level0)
609 {
610     /**
611      * @tc.steps:step1. Insert 5 records and sync.
612      * @tc.expected: step1. ok.
613      */
614     const int actualCount = 5;
615     RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
616     InsertUserTableRecord(tableName_, 0, actualCount);
617     /**
618      * @tc.steps:step2. modify data and sync.
619      * @tc.expected: step2. ok.
620      */
621     UpdateCloudTableRecord(0, 1, true);
622     Query query = Query::Select().FromTable({ tableName_ });
623     BlockSync(query, delegate_);
624     /**
625      * @tc.steps:step3. check asset number.
626      * @tc.expected: step3. ok.
627      */
628     std::vector<size_t> expectCount = { 0, 3, 3, 3, 3 };
629     CheckAssetsCount(expectCount, true);
630 }
631 
InsertCloudAssetData(const std::string & assetHash)632 void DistributedDBCloudAssetsOperationSyncTest::InsertCloudAssetData(const std::string &assetHash)
633 {
634     std::vector<VBucket> record;
635     std::vector<VBucket> extend;
636     Timestamp now = DistributedDB::TimeHelper::GetSysCurrentTime();
637     VBucket data;
638     data.insert_or_assign("id", "0");
639     data.insert_or_assign("name", "CloudTest0");
640     Asset asset = g_localAsset;
641     data.insert_or_assign("asset", "asset");
642     Assets assets;
643     std::string assetNameBegin = "Phone";
644     for (int j = 1; j <= g_assetsNum; ++j) {
645         Asset assetTmp;
646         assetTmp.name = assetNameBegin + "_" + std::to_string(j);
647         assetTmp.status = AssetStatus::NORMAL;
648         assetTmp.hash = assetHash + "_" + std::to_string(j);
649         assetTmp.assetId = std::to_string(j);
650         assets.push_back(assetTmp);
651     }
652     data.insert_or_assign("assets", assets);
653     record.push_back(data);
654     VBucket log;
655     log.insert_or_assign(DistributedDB::CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
656         now / DistributedDB::CloudDbConstant::TEN_THOUSAND));
657     log.insert_or_assign(DistributedDB::CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
658         now / DistributedDB::CloudDbConstant::TEN_THOUSAND));
659     log.insert_or_assign(DistributedDB::CloudDbConstant::DELETE_FIELD, false);
660     extend.push_back(log);
661     virtualCloudDb_->BatchInsert(tableName_, std::move(record), extend);
662 }
663 
PrepareForAssetOperation010()664 void DistributedDBCloudAssetsOperationSyncTest::PrepareForAssetOperation010()
665 {
666     InsertCloudAssetData("cloudAsset");
667     InsertLocalAssetData("localAsset");
668 }
669 
670 /**
671  * @tc.name: SyncWithAssetOperation010
672  * @tc.desc: Test check status of asset, when the hash of asset is different.
673  * @tc.type: FUNC
674  * @tc.require:
675  * @tc.author: liufuchenxing
676  */
677 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation010, TestSize.Level0)
678 {
679     /**
680      * @tc.steps:step1. prepare local and cloud asset data.
681      * @tc.expected: step1. ok.
682      */
683     PrepareForAssetOperation010();
684 
685     /**
686      * @tc.steps:step2. sync and check the status of assets.
687      * @tc.expected: step2. ok.
688      */
689     virtualCloudDb_->ForkBeforeBatchUpdate([](const std::string &, std::vector<VBucket> &record,
__anond35eef630902(const std::string &, std::vector<VBucket> &record, std::vector<VBucket> &extend, bool) 690         std::vector<VBucket> &extend, bool) {
691         ASSERT_EQ(static_cast<int>(record.size()), 1);
692         VBucket &bucket = record[0];
693         ASSERT_TRUE(bucket.find("assets") != bucket.end());
694         Assets assets = std::get<Assets>(bucket["assets"]);
695         ASSERT_EQ(static_cast<int>(assets.size()), 3);
696         for (size_t i = 0; i < assets.size(); i++) {
697             ASSERT_EQ(assets[i].status, AssetStatus::UPDATE);
698         }
699     });
700 
701     Query query = Query::Select().FromTable({ tableName_ });
702     BlockSync(query, delegate_, SYNC_MODE_CLOUD_FORCE_PUSH);
703 }
704 
705 /**
706  * @tc.name: IgnoreRecord001
707  * @tc.desc: Download Assets When local assets was removed
708  * @tc.type: FUNC
709  * @tc.require:
710  * @tc.author: zhangqiquan
711  */
712 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord001, TestSize.Level0)
713 {
714     const int actualCount = 1;
715     InsertUserTableRecord(tableName_, 0, actualCount);
716     Query query = Query::Select().FromTable({ tableName_ });
717     BlockSync(query, delegate_);
718     std::vector<size_t> expectCount = { 2 };
719     CheckAssetsCount(expectCount);
720 
721     VBucket record;
722     record["id"] = std::to_string(0);
723     record["assets"] = Assets();
724     EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
725     record["id"] = std::to_string(1);
726     EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
727     expectCount = { 0, 0 };
728     CheckAssetsCount(expectCount);
729 
730     std::vector<VBucket> logs;
731     EXPECT_EQ(RelationalTestUtils::GetRecordLog(db_, tableName_, logs), E_OK);
732     for (const auto &log : logs) {
733         int64_t cursor = std::get<int64_t>(log.at("cursor"));
734         EXPECT_GE(cursor, 0);
735     }
736 }
737 
738 /**
739  * @tc.name: IgnoreRecord002
740  * @tc.desc: Ignore Assets When Download
741  * @tc.type: FUNC
742  * @tc.require:
743  * @tc.author: zhangqiquan
744  */
745 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord002, TestSize.Level0)
746 {
747     const int actualCount = 1;
748     InsertUserTableRecord(tableName_, 0, actualCount);
749     Query query = Query::Select().FromTable({ tableName_ });
750     RelationalTestUtils::CloudBlockSync(query, delegate_);
751     UpdateCloudTableRecord(0, actualCount, false);
752 
753     virtualAssetLoader_->SetDownloadStatus(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
754     RelationalTestUtils::CloudBlockSync(query, delegate_);
755     virtualAssetLoader_->SetDownloadStatus(DBStatus::OK);
756     std::vector<size_t> expectCount = { 4 };
757     CheckAssetsCount(expectCount);
758     RelationalTestUtils::CloudBlockSync(query, delegate_);
759 }
760 
761 /**
762  * @tc.name: IgnoreRecord003
763  * @tc.desc: Ignore Assets When Upload
764  * @tc.type: FUNC
765  * @tc.require:
766  * @tc.author: zhangqiquan
767  */
768 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord003, TestSize.Level0)
769 {
770     const int actualCount = 1;
771     InsertUserTableRecord(tableName_, 0, actualCount);
772     Query query = Query::Select().FromTable({ tableName_ });
773     virtualCloudDb_->SetConflictInUpload(true);
774     RelationalTestUtils::CloudBlockSync(query, delegate_);
775     virtualCloudDb_->SetConflictInUpload(false);
776     std::vector<size_t> expectCount = { 2 };
777     CheckAssetsCount(expectCount);
778     RelationalTestUtils::CloudBlockSync(query, delegate_);
779 }
780 
781 /**
782  * @tc.name: UpsertData001
783  * @tc.desc: Upsert data after delete it
784  * @tc.type: FUNC
785  * @tc.require:
786  * @tc.author: zhangqiquan
787  */
788 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertData001, TestSize.Level0)
789 {
790     // insert id 0 to local
791     const int actualCount = 1;
792     InsertUserTableRecord(tableName_, 0, actualCount); // 10 is phone size
793     std::vector<std::map<std::string, std::string>> conditions;
794     std::map<std::string, std::string> entries;
795     entries["id"] = "0";
796     conditions.push_back(entries);
797     // delete id 0 in local
798     RelationalTestUtils::DeleteRecord(db_, tableName_, conditions);
799     // upsert id 0 to local
800     VBucket record;
801     record["id"] = std::to_string(0);
802     record["assets"] = Assets();
803     EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
804     // check id 0 exist
805     CheckAssetsCount({ 0 });
806 }
807 
808 /**
809  * @tc.name: UpsertData002
810  * @tc.desc: Test sync after Upsert.
811  * @tc.type: FUNC
812  * @tc.require:
813  * @tc.author: liaoyonghuang
814  */
815 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertData002, TestSize.Level0)
816 {
817     /**
818      * @tc.steps:step1. Insert 5 records and sync.
819      * @tc.expected: step1. ok.
820      */
821     const int actualCount = 5;
822     InsertUserTableRecord(tableName_, 0, actualCount);
823     Query query = Query::Select().FromTable({ tableName_ });
824     BlockSync(query, delegate_);
825 
826     /**
827      * @tc.steps:step2. UpsertData and sync.
828      * @tc.expected: step2. ok.
829      */
830     int dataCnt = -1;
831     std::string checkLogSql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(tableName_)  + " where cursor = 5";
__anond35eef630a02(sqlite3_stmt *stmt) 832     RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
833         dataCnt = sqlite3_column_int(stmt, 0);
834         return E_OK;
835     });
836     EXPECT_EQ(dataCnt, 1);
837     vector<VBucket> records;
838     for (int i = 0; i < actualCount; i++) {
839         VBucket record;
840         record["id"] = std::to_string(i);
841         record["name"] = std::string("UpsertName");
842         records.push_back(record);
843     }
844     EXPECT_EQ(delegate_->UpsertData(tableName_, records), OK);
845     // check cursor has been increased
846     checkLogSql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(tableName_)  + " where cursor = 10";
__anond35eef630b02(sqlite3_stmt *stmt) 847     RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
848         dataCnt = sqlite3_column_int(stmt, 0);
849         return E_OK;
850     });
851     EXPECT_EQ(dataCnt, 1);
852     BlockSync(query, delegate_);
853 
854     /**
855      * @tc.steps:step3. Check local data.
856      * @tc.expected: step3. All local data has been merged by the cloud.
857      */
858     std::vector<VBucket> allData;
859     auto dbSchema = GetSchema();
860     ASSERT_GT(dbSchema.tables.size(), 0u);
861     ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
862     for (const auto &data : allData) {
863         ASSERT_EQ(std::get<std::string>(data.at("name")), "local");
864     }
865 }
866 
867 /**
868  * @tc.name: SyncWithAssetConflict001
869  * @tc.desc: Upload with asset no change
870  * @tc.type: FUNC
871  * @tc.require:
872  * @tc.author: zhangqiquan
873  */
874 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetConflict001, TestSize.Level0)
875 {
876     // cloud and local insert same data
877     const int actualCount = 1;
878     RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
879     std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for data conflict
880     InsertUserTableRecord(tableName_, 0, actualCount);
881     // sync and local asset's status are normal
882     Query query = Query::Select().FromTable({ tableName_ });
883     RelationalTestUtils::CloudBlockSync(query, delegate_);
884     auto dbSchema = GetSchema();
885     ASSERT_GT(dbSchema.tables.size(), 0u);
886     auto assets = RelationalTestUtils::GetAllAssets(db_, dbSchema.tables[0], virtualTranslator_);
887     for (const auto &oneRow : assets) {
888         for (const auto &asset : oneRow) {
889             EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
890         }
891     }
892 }
893 
894 /**
895  * @tc.name: UpsertDataInvalid001
896  * @tc.desc: Upsert invalid data
897  * @tc.type: FUNC
898  * @tc.require:
899  * @tc.author: wangxiangdong
900  */
901 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertDataInvalid001, TestSize.Level0)
902 {
903     VBucket record;
904     record["id"] = std::to_string(0);
905     record["assets"] = Assets();
906     /**
907      * @tc.steps:step1. UpsertData to empty table.
908      * @tc.expected: step1. INVALID_ARGS.
909      */
910     EXPECT_EQ(delegate_->UpsertData("", { record }), INVALID_ARGS);
911     /**
912      * @tc.steps:step2. UpsertData to shared table.
913      * @tc.expected: step2. INVALID_ARGS.
914      */
915     EXPECT_EQ(delegate_->UpsertData(tableName_ + "_shared", { record }), NOT_SUPPORT);
916     /**
917      * @tc.steps:step3. UpsertData to not device table and shared table.
918      * @tc.expected: step3. NOT_FOUND.
919      */
920     const char *createSQL =
921         "CREATE TABLE IF NOT EXISTS testing(" \
922         "id TEXT PRIMARY KEY," \
923         "name TEXT," \
924         "height REAL ," \
925         "photo BLOB," \
926         "asset ASSET," \
927         "assets ASSETS," \
928         "age INT);";
929     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
930     EXPECT_EQ(delegate_->UpsertData("testing", { record }), NOT_FOUND);
931     /**
932      * @tc.steps:step4. UpsertData to not exist table.
933      * @tc.expected: step4. NOT_FOUND.
934      */
935     EXPECT_EQ(delegate_->UpsertData("TABLE_NOT_EXIST", { record }), NOT_FOUND);
936 }
937 
938 /**
939  * @tc.name: UpsertDataInvalid002
940  * @tc.desc: Upsert device data
941  * @tc.type: FUNC
942  * @tc.require:
943  * @tc.author: wangxiangdong
944  */
945 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertDataInvalid002, TestSize.Level0)
946 {
947     VBucket record;
948     record["id"] = std::to_string(0);
949     record["assets"] = Assets();
950     /**
951      * @tc.steps:step1. create user table.
952      * @tc.expected: step1. INVALID_ARGS.
953      */
954     const char *createSQL =
955         "CREATE TABLE IF NOT EXISTS devTable(" \
956         "id TEXT PRIMARY KEY," \
957         "name TEXT," \
958         "height REAL ," \
959         "photo BLOB," \
960         "asset ASSET," \
961         "assets ASSETS," \
962         "age INT);";
963     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
964     /**
965      * @tc.steps:step2. create device table.
966      * @tc.expected: step2. OK.
967      */
968     RelationalStoreDelegate *delegate1 = nullptr;
969     std::shared_ptr<RelationalStoreManager> mgr1 = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
970     RelationalStoreDelegate::Option option;
971     ASSERT_EQ(mgr1->OpenStore(storePath_, STORE_ID_1, option, delegate1), DBStatus::OK);
972     ASSERT_NE(delegate1, nullptr);
973     std::string deviceTableName = "devTable";
974     ASSERT_EQ(delegate1->CreateDistributedTable(deviceTableName, DEVICE_COOPERATION), DBStatus::OK);
975     DataBaseSchema dataBaseSchema;
976     TableSchema tableSchema;
977     tableSchema.name = deviceTableName;
978     tableSchema.sharedTableName = deviceTableName + "_shared";
979     tableSchema.fields = {
980         {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
981         {"photo", TYPE_INDEX<Bytes>}, {"asset", TYPE_INDEX<Asset>}, {"assets", TYPE_INDEX<Assets>},
982         {"age", TYPE_INDEX<int64_t>}
983     };
984     dataBaseSchema.tables.push_back(tableSchema);
985     ASSERT_EQ(delegate1->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
986     /**
987      * @tc.steps:step3. UpsertData to device table.
988      * @tc.expected: step3. NOT_FOUND.
989      */
990     EXPECT_EQ(delegate1->UpsertData(deviceTableName, { record }), NOT_FOUND);
991     EXPECT_EQ(mgr1->CloseStore(delegate1), DBStatus::OK);
992     delegate1 = nullptr;
993     mgr1 = nullptr;
994 }
995 
996 /**
997  * @tc.name: UploadAssetsTest001
998  * @tc.desc: Test upload asset with error.
999  * @tc.type: FUNC
1000  * @tc.require:
1001  * @tc.author: liaoyonghuang
1002  */
1003 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest001, TestSize.Level1)
1004 {
1005     /**
1006      * @tc.steps:step1. Insert 10 records.
1007      * @tc.expected: step1. ok.
1008      */
1009     const int actualCount = 10;
1010     InsertUserTableRecord(tableName_, 0, actualCount);
1011     /**
1012      * @tc.steps:step2. Set callback function to cause some upstream data to fail.
1013      * @tc.expected: step2. ok.
1014      */
1015     int recordIndex = 0;
1016     Asset tempAsset = {
1017             .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
1018             .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
1019     };
__anond35eef630c02(const std::string &tableName, VBucket &extend) 1020     virtualCloudDb_->ForkUpload([&tempAsset, &recordIndex](const std::string &tableName, VBucket &extend) {
1021         Asset asset;
1022         Assets assets;
1023         switch (recordIndex) {
1024             case 0: // record[0] is successful because ERROR_FIELD is not verified when BatchInsert returns OK status.
1025                 extend[std::string(CloudDbConstant::ERROR_FIELD)] = static_cast<int64_t>(DBStatus::CLOUD_ERROR);
1026                 break;
1027             case 1: // record[1] is considered successful because it is a conflict.
1028                 extend[std::string(CloudDbConstant::ERROR_FIELD)] =
1029                     static_cast<int64_t>(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
1030                 break;
1031             case 2: // record[2] fail because of empty gid.
1032                 extend[std::string(CloudDbConstant::GID_FIELD)] = std::string("");
1033                 break;
1034             case 3: // record[3] fail because of empty assetId.
1035                 asset = tempAsset;
1036                 asset.assetId = "";
1037                 extend[std::string(CloudDbConstant::ASSET)] = asset;
1038                 break;
1039             case 4: // record[4] fail because of empty assetId.
1040                 assets.push_back(tempAsset);
1041                 assets[0].assetId = "";
1042                 extend[std::string(CloudDbConstant::ASSETS)] = assets;
1043                 break;
1044             case 5: // record[5] is successful because ERROR_FIELD is not verified when BatchInsert returns OK status.
1045                 extend[std::string(CloudDbConstant::ERROR_FIELD)] = std::string("");
1046                 break;
1047             default:
1048                 break;
1049         }
1050         recordIndex++;
1051     });
1052     /**
1053      * @tc.steps:step3. Sync and check upLoadInfo.
1054      * @tc.expected: step3. failCount is 5 and successCount is 5.
1055      */
1056     Query query = Query::Select().FromTable({ tableName_ });
1057     BlockSync(query, delegate_);
1058     for (const auto &table : lastProcess_.tableProcess) {
1059         EXPECT_EQ(table.second.upLoadInfo.total, 10u);
1060         EXPECT_EQ(table.second.upLoadInfo.failCount, 3u);
1061         EXPECT_EQ(table.second.upLoadInfo.successCount, 7u);
1062     }
1063     virtualCloudDb_->ForkUpload(nullptr);
1064 }
1065 
1066 /**
1067  * @tc.name: UploadAssetsTest002
1068  * @tc.desc: Test upload asset with error.
1069  * @tc.type: FUNC
1070  * @tc.require:
1071  * @tc.author: liaoyonghuang
1072  */
1073 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest002, TestSize.Level1)
1074 {
1075     /**
1076      * @tc.steps:step1. Insert 10 records.
1077      * @tc.expected: step1. ok.
1078      */
1079     const int actualCount = 10;
1080     InsertUserTableRecord(tableName_, 0, actualCount);
1081     Query query = Query::Select().FromTable({ tableName_ });
1082     BlockSync(query, delegate_);
1083     /**
1084      * @tc.steps:step2. Delete local data.
1085      * @tc.expected: step2. OK.
1086      */
1087     std::string sql = "delete from " + tableName_ + " where id >= " + std::to_string(actualCount / 2);
1088     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
1089     /**
1090      * @tc.steps:step3. Set callback function to cause some upstream data to fail.
1091      * @tc.expected: step3. ok.
1092      */
__anond35eef630d02(const std::string &tableName, VBucket &extend) 1093     virtualCloudDb_->ForkUpload([](const std::string &tableName, VBucket &extend) {
1094         extend[std::string(CloudDbConstant::GID_FIELD)] = "";
1095     });
1096     BlockSync(query, delegate_);
1097     for (const auto &table : lastProcess_.tableProcess) {
1098         EXPECT_EQ(table.second.upLoadInfo.total, 5u);
1099         EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
1100         EXPECT_EQ(table.second.upLoadInfo.successCount, 5u);
1101     }
1102     virtualCloudDb_->ForkUpload(nullptr);
1103 }
1104 
1105 /**
1106  * @tc.name: UploadAssetsTest003
1107  * @tc.desc: Test upload asset with error CLOUD_RECORD_ALREADY_EXISTED.
1108  * @tc.type: FUNC
1109  * @tc.require:
1110  * @tc.author: liaoyonghuang
1111  */
1112 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest003, TestSize.Level0)
1113 {
1114     /**
1115      * @tc.steps:step1. Insert 100 records.
1116      * @tc.expected: step1. ok.
1117      */
1118     const int actualCount = 100;
1119     InsertUserTableRecord(tableName_, 0, actualCount);
1120     /**
1121      * @tc.steps:step2. Set callback function to return CLOUD_RECORD_ALREADY_EXISTED in 1st batch.
1122      * @tc.expected: step2. ok.
1123      */
1124     int uploadCount = 0;
__anond35eef630e02(const std::string &tableName, VBucket &extend) 1125     virtualCloudDb_->ForkUpload([&uploadCount](const std::string &tableName, VBucket &extend) {
1126         if (uploadCount < 30) { // There are a total of 30 pieces of data in one batch of upstream data
1127             extend[std::string(CloudDbConstant::ERROR_FIELD)] =
1128                 static_cast<int64_t>(DBStatus::CLOUD_RECORD_ALREADY_EXISTED);
1129         }
1130         uploadCount++;
1131     });
1132     Query query = Query::Select().FromTable({ tableName_ });
1133     BlockSync(query, delegate_);
1134     for (const auto &table : lastProcess_.tableProcess) {
1135         EXPECT_EQ(table.second.upLoadInfo.batchIndex, 4u);
1136         EXPECT_EQ(table.second.upLoadInfo.total, 70u);
1137         EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
1138         EXPECT_EQ(table.second.upLoadInfo.successCount, 70u);
1139         EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
1140     }
1141     virtualCloudDb_->ForkUpload(nullptr);
1142 }
1143 }
1144 #endif
1145