1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_db_types.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "log_print.h"
22 #include "relational_store_delegate.h"
23 #include "relational_store_manager.h"
24 #include "runtime_config.h"
25 #include "time_helper.h"
26 #include "virtual_asset_loader.h"
27 #include "virtual_cloud_data_translate.h"
28 #include "virtual_cloud_db.h"
29 #include "virtual_communicator_aggregator.h"
30 #include "sqlite_relational_utils.h"
31 #include "cloud/cloud_storage_utils.h"
32
33 namespace {
34 using namespace testing::ext;
35 using namespace DistributedDB;
36 using namespace DistributedDBUnitTest;
37 const char *g_createSQL =
38 "CREATE TABLE IF NOT EXISTS DistributedDBCloudAssetsOperationSyncTest(" \
39 "id TEXT PRIMARY KEY," \
40 "name TEXT," \
41 "height REAL ," \
42 "photo BLOB," \
43 "asset ASSET," \
44 "assets ASSETS," \
45 "age INT);";
46 const int64_t g_syncWaitTime = 60;
47 const int g_assetsNum = 3;
48 const Asset g_localAsset = {
49 .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
50 .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
51 };
52 SyncProcess lastProcess_;
53
CreateUserDBAndTable(sqlite3 * & db)54 void CreateUserDBAndTable(sqlite3 *&db)
55 {
56 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
57 EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createSQL), SQLITE_OK);
58 }
59
BlockSync(const Query & query,RelationalStoreDelegate * delegate,SyncMode syncMode=SYNC_MODE_CLOUD_MERGE)60 void BlockSync(const Query &query, RelationalStoreDelegate *delegate, SyncMode syncMode = SYNC_MODE_CLOUD_MERGE)
61 {
62 std::mutex dataMutex;
63 std::condition_variable cv;
64 bool finish = false;
65 SyncProcess last;
66 auto callback = [&last, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
67 for (const auto &item: process) {
68 if (item.second.process == DistributedDB::FINISHED) {
69 {
70 std::lock_guard<std::mutex> autoLock(dataMutex);
71 finish = true;
72 }
73 last = item.second;
74 cv.notify_one();
75 }
76 }
77 };
78 LOGW("begin call sync");
79 ASSERT_EQ(delegate->Sync({ "CLOUD" }, syncMode, query, callback, g_syncWaitTime), OK);
80 std::unique_lock<std::mutex> uniqueLock(dataMutex);
81 cv.wait(uniqueLock, [&finish]() {
82 return finish;
83 });
84 lastProcess_ = last;
85 LOGW("end call sync");
86 }
87
88 class DistributedDBCloudAssetsOperationSyncTest : public testing::Test {
89 public:
90 static void SetUpTestCase();
91 static void TearDownTestCase();
92 void SetUp() override;
93 void TearDown() override;
94 void WriteDataWithoutCommitTransaction();
95 protected:
96 void InitTestDir();
97 DataBaseSchema GetSchema();
98 void CloseDb();
99 void InsertUserTableRecord(const std::string &tableName, int64_t begin, int64_t count, size_t assetCount = 2u,
100 const Assets &templateAsset = {});
101 void CheckAssetsCount(const std::vector<size_t> &expectCount, bool checkAsset = false);
102 void UpdateCloudTableRecord(int64_t begin, int64_t count, bool assetIsNull);
103 void ForkDownloadAndRemoveAsset(DBStatus removeStatus, int &downLoadCount, int &removeCount);
104 void InsertLocalAssetData(const std::string &assetHash);
105 void InsertCloudAssetData(const std::string &assetHash);
106 void PrepareForAssetOperation010();
107 std::vector<Asset> GetAssets(const std::string &baseName, const Assets &templateAsset, size_t assetCount);
108 std::string testDir_;
109 std::string storePath_;
110 sqlite3 *db_ = nullptr;
111 RelationalStoreDelegate *delegate_ = nullptr;
112 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
113 std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
114 std::shared_ptr<VirtualCloudDataTranslate> virtualTranslator_ = nullptr;
115 std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
116 std::string tableName_ = "DistributedDBCloudAssetsOperationSyncTest";
117 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
118 TrackerSchema trackerSchema = {
119 .tableName = tableName_, .extendColName = "name", .trackerColNames = {"age"}
120 };
121 };
122
SetUpTestCase()123 void DistributedDBCloudAssetsOperationSyncTest::SetUpTestCase()
124 {
125 RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
126 }
127
TearDownTestCase()128 void DistributedDBCloudAssetsOperationSyncTest::TearDownTestCase()
129 {}
130
SetUp()131 void DistributedDBCloudAssetsOperationSyncTest::SetUp()
132 {
133 DistributedDBToolsUnitTest::PrintTestCaseInfo();
134 InitTestDir();
135 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
136 LOGE("rm test db files error.");
137 }
138 DistributedDBToolsUnitTest::PrintTestCaseInfo();
139 LOGD("Test dir is %s", testDir_.c_str());
140 db_ = RelationalTestUtils::CreateDataBase(storePath_);
141 ASSERT_NE(db_, nullptr);
142 CreateUserDBAndTable(db_);
143 mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
144 RelationalStoreDelegate::Option option;
145 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
146 ASSERT_NE(delegate_, nullptr);
147 ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
148 ASSERT_EQ(delegate_->SetTrackerTable(trackerSchema), DBStatus::OK);
149 virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
150 virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
151 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
152 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
153 virtualTranslator_ = std::make_shared<VirtualCloudDataTranslate>();
154 DataBaseSchema dataBaseSchema = GetSchema();
155 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
156 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
157 ASSERT_TRUE(communicatorAggregator_ != nullptr);
158 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
159 }
160
TearDown()161 void DistributedDBCloudAssetsOperationSyncTest::TearDown()
162 {
163 CloseDb();
164 EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
165 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
166 LOGE("rm test db files error.");
167 }
168 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
169 communicatorAggregator_ = nullptr;
170 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
171 }
172
InitTestDir()173 void DistributedDBCloudAssetsOperationSyncTest::InitTestDir()
174 {
175 if (!testDir_.empty()) {
176 return;
177 }
178 DistributedDBToolsUnitTest::TestDirInit(testDir_);
179 storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
180 LOGI("The test db is:%s", testDir_.c_str());
181 }
182
GetSchema()183 DataBaseSchema DistributedDBCloudAssetsOperationSyncTest::GetSchema()
184 {
185 DataBaseSchema schema;
186 TableSchema tableSchema;
187 tableSchema.name = tableName_;
188 tableSchema.sharedTableName = tableName_ + "_shared";
189 tableSchema.fields = {
190 {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
191 {"photo", TYPE_INDEX<Bytes>}, {"asset", TYPE_INDEX<Asset>}, {"assets", TYPE_INDEX<Assets>},
192 {"age", TYPE_INDEX<int64_t>}
193 };
194 schema.tables.push_back(tableSchema);
195 return schema;
196 }
197
CloseDb()198 void DistributedDBCloudAssetsOperationSyncTest::CloseDb()
199 {
200 virtualCloudDb_->ForkUpload(nullptr);
201 virtualCloudDb_ = nullptr;
202 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
203 delegate_ = nullptr;
204 mgr_ = nullptr;
205 }
206
InsertUserTableRecord(const std::string & tableName,int64_t begin,int64_t count,size_t assetCount,const Assets & templateAsset)207 void DistributedDBCloudAssetsOperationSyncTest::InsertUserTableRecord(const std::string &tableName, int64_t begin,
208 int64_t count, size_t assetCount, const Assets &templateAsset)
209 {
210 std::string photo = "phone";
211 int errCode;
212 std::vector<uint8_t> assetBlob;
213 std::vector<uint8_t> assetsBlob;
214 const int64_t index2 = 2;
215 for (int64_t i = begin; i < begin + count; ++i) {
216 std::string name = g_localAsset.name + std::to_string(i);
217 Asset asset = g_localAsset;
218 asset.name = name;
219 RuntimeContext::GetInstance()->AssetToBlob(asset, assetBlob);
220 std::vector<Asset> assets = GetAssets(name, templateAsset, assetCount);
221 string sql = "INSERT OR REPLACE INTO " + tableName +
222 " (id, name, height, photo, asset, assets, age) VALUES ('" + std::to_string(i) +
223 "', 'local', '178.0', '" + photo + "', ?, ?, '18');";
224 sqlite3_stmt *stmt = nullptr;
225 ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
226 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
227 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
228 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, index2, assetsBlob, false), E_OK);
229 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
230 SQLiteUtils::ResetStatement(stmt, true, errCode);
231 }
232 }
233
GetAssets(const std::string & baseName,const Assets & templateAsset,size_t assetCount)234 std::vector<Asset> DistributedDBCloudAssetsOperationSyncTest::GetAssets(const std::string &baseName,
235 const Assets &templateAsset, size_t assetCount)
236 {
237 std::vector<Asset> assets;
238 for (size_t i = 1; i <= assetCount; ++i) {
239 Asset asset;
240 if (i - 1 < templateAsset.size()) {
241 asset = templateAsset[i - 1];
242 } else {
243 asset = g_localAsset;
244 asset.name = baseName + "_" + std::to_string(i);
245 asset.status = static_cast<uint32_t>(AssetStatus::INSERT);
246 }
247 assets.push_back(asset);
248 }
249 return assets;
250 }
251
UpdateCloudTableRecord(int64_t begin,int64_t count,bool assetIsNull)252 void DistributedDBCloudAssetsOperationSyncTest::UpdateCloudTableRecord(int64_t begin, int64_t count, bool assetIsNull)
253 {
254 std::vector<VBucket> record;
255 std::vector<VBucket> extend;
256 Timestamp now = TimeHelper::GetSysCurrentTime();
257 const int assetCount = 2;
258 for (int64_t i = begin; i < (begin + count); ++i) {
259 VBucket data;
260 data.insert_or_assign("id", std::to_string(i));
261 data.insert_or_assign("name", "Cloud" + std::to_string(i));
262 Assets assets;
263 for (int j = 1; j <= assetCount; ++j) {
264 Asset asset;
265 asset.name = "Phone_" + std::to_string(j);
266 asset.assetId = std::to_string(j);
267 asset.status = AssetStatus::UPDATE;
268 assets.push_back(asset);
269 }
270 assetIsNull ? data.insert_or_assign("assets", Nil()) : data.insert_or_assign("assets", assets);
271 record.push_back(data);
272 VBucket log;
273 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
274 now / CloudDbConstant::TEN_THOUSAND));
275 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
276 now / CloudDbConstant::TEN_THOUSAND));
277 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
278 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i));
279 extend.push_back(log);
280 }
281
282 ASSERT_EQ(virtualCloudDb_->BatchUpdate(tableName_, std::move(record), extend), DBStatus::OK);
283 }
284
CheckAssetsCount(const std::vector<size_t> & expectCount,bool checkAsset)285 void DistributedDBCloudAssetsOperationSyncTest::CheckAssetsCount(const std::vector<size_t> &expectCount,
286 bool checkAsset)
287 {
288 std::vector<VBucket> allData;
289 auto dbSchema = GetSchema();
290 ASSERT_GT(dbSchema.tables.size(), 0u);
291 ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
292 int index = 0;
293 ASSERT_EQ(allData.size(), expectCount.size());
294 for (const auto &data : allData) {
295 auto colIter = data.find("assets");
296 EXPECT_NE(colIter, data.end());
297 if (colIter == data.end()) {
298 index++;
299 continue;
300 }
301 Type colValue = data.at("assets");
302 auto translate = std::dynamic_pointer_cast<ICloudDataTranslate>(virtualTranslator_);
303 auto assets = RelationalTestUtils::GetAssets(colValue, translate);
304 size_t size = assets.size();
305 if (checkAsset) {
306 Type colValue1 = data.at("asset");
307 auto assets1 = RelationalTestUtils::GetAssets(colValue1, translate, true);
308 size += assets1.size();
309 }
310 LOGI("[DistributedDBCloudAssetsOperationSyncTest] Check data index %d", index);
311 EXPECT_EQ(static_cast<size_t>(size), expectCount[index]);
312 for (const auto &item : assets) {
313 LOGI("[DistributedDBCloudAssetsOperationSyncTest] Asset name %s status %" PRIu32, item.name.c_str(),
314 item.status);
315 }
316 index++;
317 }
318 }
319
ForkDownloadAndRemoveAsset(DBStatus removeStatus,int & downLoadCount,int & removeCount)320 void DistributedDBCloudAssetsOperationSyncTest::ForkDownloadAndRemoveAsset(DBStatus removeStatus, int &downLoadCount,
321 int &removeCount)
322 {
323 virtualAssetLoader_->ForkDownload([this, &downLoadCount](std::map<std::string, Assets> &assets) {
324 downLoadCount++;
325 if (downLoadCount == 1) {
326 std::string sql = "UPDATE " + tableName_ + " SET assets = NULL WHERE id = 0;";
327 ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
328 }
329 });
330 virtualAssetLoader_->ForkRemoveLocalAssets([removeStatus, &removeCount](const std::vector<Asset> &assets) {
331 EXPECT_EQ(assets.size(), 2u); // one record has 2 asset
332 removeCount++;
333 return removeStatus;
334 });
335 }
336
337 /**
338 * @tc.name: SyncWithAssetOperation001
339 * @tc.desc: Delete Assets When Download
340 * @tc.type: FUNC
341 * @tc.require:
342 * @tc.author: zhangqiquan
343 */
344 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation001, TestSize.Level0)
345 {
346 const int actualCount = 10;
347 const int deleteDataCount = 5;
348 const int deleteAssetsCount = 4;
349 InsertUserTableRecord(tableName_, 0, actualCount);
350 std::string tableName = tableName_;
__anond35eef630602(const std::string &, VBucket &) 351 virtualCloudDb_->ForkUpload([this, deleteDataCount, deleteAssetsCount](const std::string &, VBucket &) {
352 for (int64_t i = 0; i < deleteDataCount; i++) {
353 std::string sql = "DELETE FROM " + tableName_ + " WHERE id = " + std::to_string(i) + ";";
354 ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
355 }
356 for (int64_t i = deleteDataCount; i < deleteDataCount + deleteAssetsCount; i++) {
357 std::string sql = "UPDATE " + tableName_ + " SET asset = NULL, assets = NULL WHERE id = " +
358 std::to_string(i) + ";";
359 ASSERT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
360 }
361 });
362 Query query = Query::Select().FromTable({ tableName_ });
363 BlockSync(query, delegate_);
364 virtualCloudDb_->ForkUpload(nullptr);
365 std::vector<size_t> expectCount(actualCount - deleteDataCount, 0);
366 expectCount[expectCount.size() - 1] = 2; // default one row has 2 assets
367 CheckAssetsCount(expectCount);
368 }
369
370 /**
371 * @tc.name: SyncWithAssetOperation002
372 * @tc.desc: Download Assets When local assets was removed
373 * @tc.type: FUNC
374 * @tc.require:
375 * @tc.author: zhangqiquan
376 */
377 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation002, TestSize.Level0)
378 {
379 const int actualCount = 1;
380 InsertUserTableRecord(tableName_, 0, actualCount);
381 Query query = Query::Select().FromTable({ tableName_ });
382 BlockSync(query, delegate_);
383 int downLoadCount = 0;
384 int removeCount = 0;
385 ForkDownloadAndRemoveAsset(OK, downLoadCount, removeCount);
386 UpdateCloudTableRecord(0, actualCount, false);
387 RelationalTestUtils::CloudBlockSync(query, delegate_);
388 EXPECT_EQ(downLoadCount, 1); // local asset was removed should download 1 times
389 EXPECT_EQ(removeCount, 1);
390 virtualAssetLoader_->ForkDownload(nullptr);
391 virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
392
393 std::vector<size_t> expectCount = { 0 };
394 CheckAssetsCount(expectCount);
395 }
396
397 /**
398 * @tc.name: SyncWithAssetOperation003
399 * @tc.desc: Delete Assets When Download
400 * @tc.type: FUNC
401 * @tc.require:
402 * @tc.author: bty
403 */
404 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation003, TestSize.Level0)
405 {
406 InsertUserTableRecord(tableName_, 0, 1); // 1 is count
407 int uploadCount = 0;
__anond35eef630702(const std::string &, VBucket &) 408 virtualCloudDb_->ForkUpload([this, &uploadCount](const std::string &, VBucket &) {
409 if (uploadCount > 0) {
410 return;
411 }
412 SqlCondition condition;
413 condition.sql = "UPDATE " + tableName_ + " SET age = '666' WHERE id = 0;";
414 std::vector<VBucket> records;
415 EXPECT_EQ(delegate_->ExecuteSql(condition, records), OK);
416 uploadCount++;
417 });
418 Query query = Query::Select().FromTable({ tableName_ });
419 BlockSync(query, delegate_);
420 virtualCloudDb_->ForkUpload(nullptr);
421
422 std::string sql = "SELECT assets from " + tableName_ + " where id = 0;";
423 sqlite3_stmt *stmt = nullptr;
424 ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
425 while (SQLiteUtils::StepWithRetry(stmt) == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
426 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
427 Type cloudValue;
428 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, cloudValue), E_OK);
429 std::vector<uint8_t> assetsBlob;
430 Assets assets;
431 ASSERT_EQ(CloudStorageUtils::GetValueFromOneField(cloudValue, assetsBlob), E_OK);
432 ASSERT_EQ(RuntimeContext::GetInstance()->BlobToAssets(assetsBlob, assets), E_OK);
433 ASSERT_EQ(assets.size(), 2u); // 2 is asset num
434 for (size_t i = 0; i < assets.size(); ++i) {
435 EXPECT_EQ(assets[i].status, AssetStatus::NORMAL);
436 }
437 }
438 int errCode;
439 SQLiteUtils::ResetStatement(stmt, true, errCode);
440 }
441
442 /**
443 * @tc.name: SyncWithAssetOperation004
444 * @tc.desc: Download Assets When local assets was removed
445 * @tc.type: FUNC
446 * @tc.require:
447 * @tc.author: zhangqiquan
448 */
449 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation004, TestSize.Level0)
450 {
451 const int actualCount = 5; // 5 record
452 InsertUserTableRecord(tableName_, 0, actualCount);
453 Query query = Query::Select().FromTable({ tableName_ });
454 BlockSync(query, delegate_);
455 int downLoadCount = 0;
456 int removeCount = 0;
457 ForkDownloadAndRemoveAsset(DB_ERROR, downLoadCount, removeCount);
458 UpdateCloudTableRecord(0, actualCount, false);
459 RelationalTestUtils::CloudBlockSync(query, delegate_, DBStatus::OK, DBStatus::REMOTE_ASSETS_FAIL);
460 EXPECT_EQ(downLoadCount, 5); // local asset was removed should download 5 times
461 EXPECT_EQ(removeCount, 1);
462 virtualAssetLoader_->ForkDownload(nullptr);
463 virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
464
465 std::vector<size_t> expectCount = { 0, 2, 2, 2, 2 };
466 CheckAssetsCount(expectCount);
467 }
468
InsertLocalAssetData(const std::string & assetHash)469 void DistributedDBCloudAssetsOperationSyncTest::InsertLocalAssetData(const std::string &assetHash)
470 {
471 Assets assets;
472 std::string assetNameBegin = "Phone";
473 for (int j = 1; j <= g_assetsNum; ++j) {
474 Asset asset;
475 asset.name = assetNameBegin + "_" + std::to_string(j);
476 asset.status = AssetStatus::NORMAL;
477 asset.flag = static_cast<uint32_t>(AssetOpType::NO_CHANGE);
478 asset.hash = assetHash + "_" + std::to_string(j);
479 asset.assetId = std::to_string(j);
480 assets.push_back(asset);
481 }
482 string sql = "INSERT OR REPLACE INTO " + tableName_ + " (id,name,asset,assets) VALUES('0','CloudTest0',?,?);";
483 sqlite3_stmt *stmt = nullptr;
484 ASSERT_EQ(SQLiteUtils::GetStatement(db_, sql, stmt), E_OK);
485 std::vector<uint8_t> assetBlob;
486 std::vector<uint8_t> assetsBlob;
487 RuntimeContext::GetInstance()->AssetToBlob(g_localAsset, assetBlob);
488 RuntimeContext::GetInstance()->AssetsToBlob(assets, assetsBlob);
489 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 1, assetBlob, false), E_OK);
490 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 2, assetsBlob, false), E_OK); // 2 is assetsBlob
491 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
492 int errCode;
493 SQLiteUtils::ResetStatement(stmt, true, errCode);
494 }
495
WriteDataWithoutCommitTransaction()496 void DistributedDBCloudAssetsOperationSyncTest::WriteDataWithoutCommitTransaction()
497 {
498 ASSERT_NE(db_, nullptr);
499 SQLiteUtils::BeginTransaction(db_);
500 InsertLocalAssetData("localAsset");
501 constexpr int kSleepDurationSeconds = 3;
502 std::this_thread::sleep_for(std::chrono::seconds(kSleepDurationSeconds));
503 }
504
505 /**
506 * @tc.name: TestOpenDatabaseBusy001
507 * @tc.desc: Test open database when the database is busy.
508 * @tc.type: FUNC
509 * @tc.require:
510 * @tc.author: liufuchenxing
511 */
512 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, TestOpenDatabaseBusy001, TestSize.Level2)
513 {
514 /**
515 * @tc.steps:step1. close store.
516 * @tc.expected:step1. check ok.
517 */
518 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
519 delegate_ = nullptr;
520 /**
521 * @tc.steps:step2. Another thread write data into database into database without commit.
522 * @tc.expected:step2. check ok.
523 */
524 std::thread thread(&DistributedDBCloudAssetsOperationSyncTest::WriteDataWithoutCommitTransaction, this);
525 std::this_thread::sleep_for(std::chrono::seconds(1));
526 /**
527 * @tc.steps:step3. open relational delegate.
528 * @tc.expected:step3. open success.
529 */
530 RelationalStoreDelegate::Option option;
531 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
532 thread.join();
533 }
534
535 /**
536 * @tc.name: SyncWithAssetOperation006
537 * @tc.desc: Remove Local Datas When local assets was empty
538 * @tc.type: FUNC
539 * @tc.require:
540 * @tc.author: lijun
541 */
542 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation006, TestSize.Level0)
543 {
544 const int actualCount = 5;
545 InsertUserTableRecord(tableName_, 0, actualCount);
546 Query query = Query::Select().FromTable({ tableName_ });
547 BlockSync(query, delegate_);
548
549 UpdateCloudTableRecord(0, 2, true);
550 BlockSync(query, delegate_);
551
552 int removeCount = 0;
__anond35eef630802(const std::vector<Asset> &assets) 553 virtualAssetLoader_->ForkRemoveLocalAssets([&removeCount](const std::vector<Asset> &assets) {
554 removeCount = assets.size();
555 return DBStatus::OK;
556 });
557 std::string device = "";
558 ASSERT_EQ(delegate_->RemoveDeviceData(device, FLAG_AND_DATA), DBStatus::OK);
559 ASSERT_EQ(9, removeCount);
560 virtualAssetLoader_->ForkRemoveLocalAssets(nullptr);
561 }
562
563 /**
564 * @tc.name: SyncWithAssetOperation006
565 * @tc.desc: Test assetId fill when assetId changed
566 * @tc.type: FUNC
567 * @tc.require:
568 * @tc.author: wangxiangdong
569 */
570 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation007, TestSize.Level0)
571 {
572 /**
573 * @tc.steps:step1. Insert 5 records and sync.
574 * @tc.expected: step1. ok.
575 */
576 const int actualCount = 5;
577 std::string name = g_localAsset.name + std::to_string(0);
578 Assets expectAssets = GetAssets(name, {}, 3u); // contain 3 assets
579 expectAssets[0].hash.append("change"); // modify first asset
580 InsertUserTableRecord(tableName_, 0, actualCount, expectAssets.size(), expectAssets);
581 Query query = Query::Select().FromTable({ tableName_ });
582 BlockSync(query, delegate_);
583 /**
584 * @tc.steps:step2. modify data and sync.
585 * @tc.expected: step2. ok.
586 */
587 UpdateCloudTableRecord(0, 1, true);
588 BlockSync(query, delegate_);
589 /**
590 * @tc.steps:step3. check modified data cursor.
591 * @tc.expected: step3. ok.
592 */
593 std::string sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=1";
594 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
595 reinterpret_cast<void *>(7), nullptr), SQLITE_OK);
596 sql = "SELECT cursor FROM " + DBCommon::GetLogTableName(tableName_) + " where data_key=5";
597 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
598 reinterpret_cast<void *>(5), nullptr), SQLITE_OK);
599 }
600
601 /**
602 * @tc.name: SyncWithAssetOperation009
603 * @tc.desc: Test asset remove local and check db asset empty finally.
604 * @tc.type: FUNC
605 * @tc.require:
606 * @tc.author: wangxiangdong
607 */
608 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation009, TestSize.Level0)
609 {
610 /**
611 * @tc.steps:step1. Insert 5 records and sync.
612 * @tc.expected: step1. ok.
613 */
614 const int actualCount = 5;
615 RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
616 InsertUserTableRecord(tableName_, 0, actualCount);
617 /**
618 * @tc.steps:step2. modify data and sync.
619 * @tc.expected: step2. ok.
620 */
621 UpdateCloudTableRecord(0, 1, true);
622 Query query = Query::Select().FromTable({ tableName_ });
623 BlockSync(query, delegate_);
624 /**
625 * @tc.steps:step3. check asset number.
626 * @tc.expected: step3. ok.
627 */
628 std::vector<size_t> expectCount = { 0, 3, 3, 3, 3 };
629 CheckAssetsCount(expectCount, true);
630 }
631
InsertCloudAssetData(const std::string & assetHash)632 void DistributedDBCloudAssetsOperationSyncTest::InsertCloudAssetData(const std::string &assetHash)
633 {
634 std::vector<VBucket> record;
635 std::vector<VBucket> extend;
636 Timestamp now = DistributedDB::TimeHelper::GetSysCurrentTime();
637 VBucket data;
638 data.insert_or_assign("id", "0");
639 data.insert_or_assign("name", "CloudTest0");
640 Asset asset = g_localAsset;
641 data.insert_or_assign("asset", "asset");
642 Assets assets;
643 std::string assetNameBegin = "Phone";
644 for (int j = 1; j <= g_assetsNum; ++j) {
645 Asset assetTmp;
646 assetTmp.name = assetNameBegin + "_" + std::to_string(j);
647 assetTmp.status = AssetStatus::NORMAL;
648 assetTmp.hash = assetHash + "_" + std::to_string(j);
649 assetTmp.assetId = std::to_string(j);
650 assets.push_back(assetTmp);
651 }
652 data.insert_or_assign("assets", assets);
653 record.push_back(data);
654 VBucket log;
655 log.insert_or_assign(DistributedDB::CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
656 now / DistributedDB::CloudDbConstant::TEN_THOUSAND));
657 log.insert_or_assign(DistributedDB::CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
658 now / DistributedDB::CloudDbConstant::TEN_THOUSAND));
659 log.insert_or_assign(DistributedDB::CloudDbConstant::DELETE_FIELD, false);
660 extend.push_back(log);
661 virtualCloudDb_->BatchInsert(tableName_, std::move(record), extend);
662 }
663
PrepareForAssetOperation010()664 void DistributedDBCloudAssetsOperationSyncTest::PrepareForAssetOperation010()
665 {
666 InsertCloudAssetData("cloudAsset");
667 InsertLocalAssetData("localAsset");
668 }
669
670 /**
671 * @tc.name: SyncWithAssetOperation010
672 * @tc.desc: Test check status of asset, when the hash of asset is different.
673 * @tc.type: FUNC
674 * @tc.require:
675 * @tc.author: liufuchenxing
676 */
677 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetOperation010, TestSize.Level0)
678 {
679 /**
680 * @tc.steps:step1. prepare local and cloud asset data.
681 * @tc.expected: step1. ok.
682 */
683 PrepareForAssetOperation010();
684
685 /**
686 * @tc.steps:step2. sync and check the status of assets.
687 * @tc.expected: step2. ok.
688 */
689 virtualCloudDb_->ForkBeforeBatchUpdate([](const std::string &, std::vector<VBucket> &record,
__anond35eef630902(const std::string &, std::vector<VBucket> &record, std::vector<VBucket> &extend, bool) 690 std::vector<VBucket> &extend, bool) {
691 ASSERT_EQ(static_cast<int>(record.size()), 1);
692 VBucket &bucket = record[0];
693 ASSERT_TRUE(bucket.find("assets") != bucket.end());
694 Assets assets = std::get<Assets>(bucket["assets"]);
695 ASSERT_EQ(static_cast<int>(assets.size()), 3);
696 for (size_t i = 0; i < assets.size(); i++) {
697 ASSERT_EQ(assets[i].status, AssetStatus::UPDATE);
698 }
699 });
700
701 Query query = Query::Select().FromTable({ tableName_ });
702 BlockSync(query, delegate_, SYNC_MODE_CLOUD_FORCE_PUSH);
703 }
704
705 /**
706 * @tc.name: IgnoreRecord001
707 * @tc.desc: Download Assets When local assets was removed
708 * @tc.type: FUNC
709 * @tc.require:
710 * @tc.author: zhangqiquan
711 */
712 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord001, TestSize.Level0)
713 {
714 const int actualCount = 1;
715 InsertUserTableRecord(tableName_, 0, actualCount);
716 Query query = Query::Select().FromTable({ tableName_ });
717 BlockSync(query, delegate_);
718 std::vector<size_t> expectCount = { 2 };
719 CheckAssetsCount(expectCount);
720
721 VBucket record;
722 record["id"] = std::to_string(0);
723 record["assets"] = Assets();
724 EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
725 record["id"] = std::to_string(1);
726 EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
727 expectCount = { 0, 0 };
728 CheckAssetsCount(expectCount);
729
730 std::vector<VBucket> logs;
731 EXPECT_EQ(RelationalTestUtils::GetRecordLog(db_, tableName_, logs), E_OK);
732 for (const auto &log : logs) {
733 int64_t cursor = std::get<int64_t>(log.at("cursor"));
734 EXPECT_GE(cursor, 0);
735 }
736 }
737
738 /**
739 * @tc.name: IgnoreRecord002
740 * @tc.desc: Ignore Assets When Download
741 * @tc.type: FUNC
742 * @tc.require:
743 * @tc.author: zhangqiquan
744 */
745 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord002, TestSize.Level0)
746 {
747 const int actualCount = 1;
748 InsertUserTableRecord(tableName_, 0, actualCount);
749 Query query = Query::Select().FromTable({ tableName_ });
750 RelationalTestUtils::CloudBlockSync(query, delegate_);
751 UpdateCloudTableRecord(0, actualCount, false);
752
753 virtualAssetLoader_->SetDownloadStatus(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
754 RelationalTestUtils::CloudBlockSync(query, delegate_);
755 virtualAssetLoader_->SetDownloadStatus(DBStatus::OK);
756 std::vector<size_t> expectCount = { 4 };
757 CheckAssetsCount(expectCount);
758 RelationalTestUtils::CloudBlockSync(query, delegate_);
759 }
760
761 /**
762 * @tc.name: IgnoreRecord003
763 * @tc.desc: Ignore Assets When Upload
764 * @tc.type: FUNC
765 * @tc.require:
766 * @tc.author: zhangqiquan
767 */
768 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, IgnoreRecord003, TestSize.Level0)
769 {
770 const int actualCount = 1;
771 InsertUserTableRecord(tableName_, 0, actualCount);
772 Query query = Query::Select().FromTable({ tableName_ });
773 virtualCloudDb_->SetConflictInUpload(true);
774 RelationalTestUtils::CloudBlockSync(query, delegate_);
775 virtualCloudDb_->SetConflictInUpload(false);
776 std::vector<size_t> expectCount = { 2 };
777 CheckAssetsCount(expectCount);
778 RelationalTestUtils::CloudBlockSync(query, delegate_);
779 }
780
781 /**
782 * @tc.name: UpsertData001
783 * @tc.desc: Upsert data after delete it
784 * @tc.type: FUNC
785 * @tc.require:
786 * @tc.author: zhangqiquan
787 */
788 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertData001, TestSize.Level0)
789 {
790 // insert id 0 to local
791 const int actualCount = 1;
792 InsertUserTableRecord(tableName_, 0, actualCount); // 10 is phone size
793 std::vector<std::map<std::string, std::string>> conditions;
794 std::map<std::string, std::string> entries;
795 entries["id"] = "0";
796 conditions.push_back(entries);
797 // delete id 0 in local
798 RelationalTestUtils::DeleteRecord(db_, tableName_, conditions);
799 // upsert id 0 to local
800 VBucket record;
801 record["id"] = std::to_string(0);
802 record["assets"] = Assets();
803 EXPECT_EQ(delegate_->UpsertData(tableName_, { record }), OK);
804 // check id 0 exist
805 CheckAssetsCount({ 0 });
806 }
807
808 /**
809 * @tc.name: UpsertData002
810 * @tc.desc: Test sync after Upsert.
811 * @tc.type: FUNC
812 * @tc.require:
813 * @tc.author: liaoyonghuang
814 */
815 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertData002, TestSize.Level0)
816 {
817 /**
818 * @tc.steps:step1. Insert 5 records and sync.
819 * @tc.expected: step1. ok.
820 */
821 const int actualCount = 5;
822 InsertUserTableRecord(tableName_, 0, actualCount);
823 Query query = Query::Select().FromTable({ tableName_ });
824 BlockSync(query, delegate_);
825
826 /**
827 * @tc.steps:step2. UpsertData and sync.
828 * @tc.expected: step2. ok.
829 */
830 int dataCnt = -1;
831 std::string checkLogSql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(tableName_) + " where cursor = 5";
__anond35eef630a02(sqlite3_stmt *stmt) 832 RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
833 dataCnt = sqlite3_column_int(stmt, 0);
834 return E_OK;
835 });
836 EXPECT_EQ(dataCnt, 1);
837 vector<VBucket> records;
838 for (int i = 0; i < actualCount; i++) {
839 VBucket record;
840 record["id"] = std::to_string(i);
841 record["name"] = std::string("UpsertName");
842 records.push_back(record);
843 }
844 EXPECT_EQ(delegate_->UpsertData(tableName_, records), OK);
845 // check cursor has been increased
846 checkLogSql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(tableName_) + " where cursor = 10";
__anond35eef630b02(sqlite3_stmt *stmt) 847 RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
848 dataCnt = sqlite3_column_int(stmt, 0);
849 return E_OK;
850 });
851 EXPECT_EQ(dataCnt, 1);
852 BlockSync(query, delegate_);
853
854 /**
855 * @tc.steps:step3. Check local data.
856 * @tc.expected: step3. All local data has been merged by the cloud.
857 */
858 std::vector<VBucket> allData;
859 auto dbSchema = GetSchema();
860 ASSERT_GT(dbSchema.tables.size(), 0u);
861 ASSERT_EQ(RelationalTestUtils::SelectData(db_, dbSchema.tables[0], allData), E_OK);
862 for (const auto &data : allData) {
863 ASSERT_EQ(std::get<std::string>(data.at("name")), "local");
864 }
865 }
866
867 /**
868 * @tc.name: SyncWithAssetConflict001
869 * @tc.desc: Upload with asset no change
870 * @tc.type: FUNC
871 * @tc.require:
872 * @tc.author: zhangqiquan
873 */
874 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, SyncWithAssetConflict001, TestSize.Level0)
875 {
876 // cloud and local insert same data
877 const int actualCount = 1;
878 RelationalTestUtils::InsertCloudRecord(0, actualCount, tableName_, virtualCloudDb_);
879 std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for data conflict
880 InsertUserTableRecord(tableName_, 0, actualCount);
881 // sync and local asset's status are normal
882 Query query = Query::Select().FromTable({ tableName_ });
883 RelationalTestUtils::CloudBlockSync(query, delegate_);
884 auto dbSchema = GetSchema();
885 ASSERT_GT(dbSchema.tables.size(), 0u);
886 auto assets = RelationalTestUtils::GetAllAssets(db_, dbSchema.tables[0], virtualTranslator_);
887 for (const auto &oneRow : assets) {
888 for (const auto &asset : oneRow) {
889 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
890 }
891 }
892 }
893
894 /**
895 * @tc.name: UpsertDataInvalid001
896 * @tc.desc: Upsert invalid data
897 * @tc.type: FUNC
898 * @tc.require:
899 * @tc.author: wangxiangdong
900 */
901 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertDataInvalid001, TestSize.Level0)
902 {
903 VBucket record;
904 record["id"] = std::to_string(0);
905 record["assets"] = Assets();
906 /**
907 * @tc.steps:step1. UpsertData to empty table.
908 * @tc.expected: step1. INVALID_ARGS.
909 */
910 EXPECT_EQ(delegate_->UpsertData("", { record }), INVALID_ARGS);
911 /**
912 * @tc.steps:step2. UpsertData to shared table.
913 * @tc.expected: step2. INVALID_ARGS.
914 */
915 EXPECT_EQ(delegate_->UpsertData(tableName_ + "_shared", { record }), NOT_SUPPORT);
916 /**
917 * @tc.steps:step3. UpsertData to not device table and shared table.
918 * @tc.expected: step3. NOT_FOUND.
919 */
920 const char *createSQL =
921 "CREATE TABLE IF NOT EXISTS testing(" \
922 "id TEXT PRIMARY KEY," \
923 "name TEXT," \
924 "height REAL ," \
925 "photo BLOB," \
926 "asset ASSET," \
927 "assets ASSETS," \
928 "age INT);";
929 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
930 EXPECT_EQ(delegate_->UpsertData("testing", { record }), NOT_FOUND);
931 /**
932 * @tc.steps:step4. UpsertData to not exist table.
933 * @tc.expected: step4. NOT_FOUND.
934 */
935 EXPECT_EQ(delegate_->UpsertData("TABLE_NOT_EXIST", { record }), NOT_FOUND);
936 }
937
938 /**
939 * @tc.name: UpsertDataInvalid002
940 * @tc.desc: Upsert device data
941 * @tc.type: FUNC
942 * @tc.require:
943 * @tc.author: wangxiangdong
944 */
945 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UpsertDataInvalid002, TestSize.Level0)
946 {
947 VBucket record;
948 record["id"] = std::to_string(0);
949 record["assets"] = Assets();
950 /**
951 * @tc.steps:step1. create user table.
952 * @tc.expected: step1. INVALID_ARGS.
953 */
954 const char *createSQL =
955 "CREATE TABLE IF NOT EXISTS devTable(" \
956 "id TEXT PRIMARY KEY," \
957 "name TEXT," \
958 "height REAL ," \
959 "photo BLOB," \
960 "asset ASSET," \
961 "assets ASSETS," \
962 "age INT);";
963 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
964 /**
965 * @tc.steps:step2. create device table.
966 * @tc.expected: step2. OK.
967 */
968 RelationalStoreDelegate *delegate1 = nullptr;
969 std::shared_ptr<RelationalStoreManager> mgr1 = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
970 RelationalStoreDelegate::Option option;
971 ASSERT_EQ(mgr1->OpenStore(storePath_, STORE_ID_1, option, delegate1), DBStatus::OK);
972 ASSERT_NE(delegate1, nullptr);
973 std::string deviceTableName = "devTable";
974 ASSERT_EQ(delegate1->CreateDistributedTable(deviceTableName, DEVICE_COOPERATION), DBStatus::OK);
975 DataBaseSchema dataBaseSchema;
976 TableSchema tableSchema;
977 tableSchema.name = deviceTableName;
978 tableSchema.sharedTableName = deviceTableName + "_shared";
979 tableSchema.fields = {
980 {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
981 {"photo", TYPE_INDEX<Bytes>}, {"asset", TYPE_INDEX<Asset>}, {"assets", TYPE_INDEX<Assets>},
982 {"age", TYPE_INDEX<int64_t>}
983 };
984 dataBaseSchema.tables.push_back(tableSchema);
985 ASSERT_EQ(delegate1->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
986 /**
987 * @tc.steps:step3. UpsertData to device table.
988 * @tc.expected: step3. NOT_FOUND.
989 */
990 EXPECT_EQ(delegate1->UpsertData(deviceTableName, { record }), NOT_FOUND);
991 EXPECT_EQ(mgr1->CloseStore(delegate1), DBStatus::OK);
992 delegate1 = nullptr;
993 mgr1 = nullptr;
994 }
995
996 /**
997 * @tc.name: UploadAssetsTest001
998 * @tc.desc: Test upload asset with error.
999 * @tc.type: FUNC
1000 * @tc.require:
1001 * @tc.author: liaoyonghuang
1002 */
1003 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest001, TestSize.Level1)
1004 {
1005 /**
1006 * @tc.steps:step1. Insert 10 records.
1007 * @tc.expected: step1. ok.
1008 */
1009 const int actualCount = 10;
1010 InsertUserTableRecord(tableName_, 0, actualCount);
1011 /**
1012 * @tc.steps:step2. Set callback function to cause some upstream data to fail.
1013 * @tc.expected: step2. ok.
1014 */
1015 int recordIndex = 0;
1016 Asset tempAsset = {
1017 .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
1018 .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
1019 };
__anond35eef630c02(const std::string &tableName, VBucket &extend) 1020 virtualCloudDb_->ForkUpload([&tempAsset, &recordIndex](const std::string &tableName, VBucket &extend) {
1021 Asset asset;
1022 Assets assets;
1023 switch (recordIndex) {
1024 case 0: // record[0] is successful because ERROR_FIELD is not verified when BatchInsert returns OK status.
1025 extend[std::string(CloudDbConstant::ERROR_FIELD)] = static_cast<int64_t>(DBStatus::CLOUD_ERROR);
1026 break;
1027 case 1: // record[1] is considered successful because it is a conflict.
1028 extend[std::string(CloudDbConstant::ERROR_FIELD)] =
1029 static_cast<int64_t>(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
1030 break;
1031 case 2: // record[2] fail because of empty gid.
1032 extend[std::string(CloudDbConstant::GID_FIELD)] = std::string("");
1033 break;
1034 case 3: // record[3] fail because of empty assetId.
1035 asset = tempAsset;
1036 asset.assetId = "";
1037 extend[std::string(CloudDbConstant::ASSET)] = asset;
1038 break;
1039 case 4: // record[4] fail because of empty assetId.
1040 assets.push_back(tempAsset);
1041 assets[0].assetId = "";
1042 extend[std::string(CloudDbConstant::ASSETS)] = assets;
1043 break;
1044 case 5: // record[5] is successful because ERROR_FIELD is not verified when BatchInsert returns OK status.
1045 extend[std::string(CloudDbConstant::ERROR_FIELD)] = std::string("");
1046 break;
1047 default:
1048 break;
1049 }
1050 recordIndex++;
1051 });
1052 /**
1053 * @tc.steps:step3. Sync and check upLoadInfo.
1054 * @tc.expected: step3. failCount is 5 and successCount is 5.
1055 */
1056 Query query = Query::Select().FromTable({ tableName_ });
1057 BlockSync(query, delegate_);
1058 for (const auto &table : lastProcess_.tableProcess) {
1059 EXPECT_EQ(table.second.upLoadInfo.total, 10u);
1060 EXPECT_EQ(table.second.upLoadInfo.failCount, 3u);
1061 EXPECT_EQ(table.second.upLoadInfo.successCount, 7u);
1062 }
1063 virtualCloudDb_->ForkUpload(nullptr);
1064 }
1065
1066 /**
1067 * @tc.name: UploadAssetsTest002
1068 * @tc.desc: Test upload asset with error.
1069 * @tc.type: FUNC
1070 * @tc.require:
1071 * @tc.author: liaoyonghuang
1072 */
1073 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest002, TestSize.Level1)
1074 {
1075 /**
1076 * @tc.steps:step1. Insert 10 records.
1077 * @tc.expected: step1. ok.
1078 */
1079 const int actualCount = 10;
1080 InsertUserTableRecord(tableName_, 0, actualCount);
1081 Query query = Query::Select().FromTable({ tableName_ });
1082 BlockSync(query, delegate_);
1083 /**
1084 * @tc.steps:step2. Delete local data.
1085 * @tc.expected: step2. OK.
1086 */
1087 std::string sql = "delete from " + tableName_ + " where id >= " + std::to_string(actualCount / 2);
1088 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), SQLITE_OK);
1089 /**
1090 * @tc.steps:step3. Set callback function to cause some upstream data to fail.
1091 * @tc.expected: step3. ok.
1092 */
__anond35eef630d02(const std::string &tableName, VBucket &extend) 1093 virtualCloudDb_->ForkUpload([](const std::string &tableName, VBucket &extend) {
1094 extend[std::string(CloudDbConstant::GID_FIELD)] = "";
1095 });
1096 BlockSync(query, delegate_);
1097 for (const auto &table : lastProcess_.tableProcess) {
1098 EXPECT_EQ(table.second.upLoadInfo.total, 5u);
1099 EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
1100 EXPECT_EQ(table.second.upLoadInfo.successCount, 5u);
1101 }
1102 virtualCloudDb_->ForkUpload(nullptr);
1103 }
1104
1105 /**
1106 * @tc.name: UploadAssetsTest003
1107 * @tc.desc: Test upload asset with error CLOUD_RECORD_ALREADY_EXISTED.
1108 * @tc.type: FUNC
1109 * @tc.require:
1110 * @tc.author: liaoyonghuang
1111 */
1112 HWTEST_F(DistributedDBCloudAssetsOperationSyncTest, UploadAssetsTest003, TestSize.Level0)
1113 {
1114 /**
1115 * @tc.steps:step1. Insert 100 records.
1116 * @tc.expected: step1. ok.
1117 */
1118 const int actualCount = 100;
1119 InsertUserTableRecord(tableName_, 0, actualCount);
1120 /**
1121 * @tc.steps:step2. Set callback function to return CLOUD_RECORD_ALREADY_EXISTED in 1st batch.
1122 * @tc.expected: step2. ok.
1123 */
1124 int uploadCount = 0;
__anond35eef630e02(const std::string &tableName, VBucket &extend) 1125 virtualCloudDb_->ForkUpload([&uploadCount](const std::string &tableName, VBucket &extend) {
1126 if (uploadCount < 30) { // There are a total of 30 pieces of data in one batch of upstream data
1127 extend[std::string(CloudDbConstant::ERROR_FIELD)] =
1128 static_cast<int64_t>(DBStatus::CLOUD_RECORD_ALREADY_EXISTED);
1129 }
1130 uploadCount++;
1131 });
1132 Query query = Query::Select().FromTable({ tableName_ });
1133 BlockSync(query, delegate_);
1134 for (const auto &table : lastProcess_.tableProcess) {
1135 EXPECT_EQ(table.second.upLoadInfo.batchIndex, 4u);
1136 EXPECT_EQ(table.second.upLoadInfo.total, 70u);
1137 EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
1138 EXPECT_EQ(table.second.upLoadInfo.successCount, 70u);
1139 EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
1140 }
1141 virtualCloudDb_->ForkUpload(nullptr);
1142 }
1143 }
1144 #endif
1145