1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <gtest/gtest.h>
16
17 #include <utility>
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud/cloud_db_data_utils.h"
20 #include "cloud/cloud_db_proxy.h"
21 #include "cloud/cloud_db_types.h"
22 #include "cloud/cloud_sync_utils.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_errno.h"
25 #include "mock_icloud_sync_storage_interface.h"
26 #include "virtual_cloud_db.h"
27 #include "virtual_cloud_syncer.h"
28 #include "virtual_communicator_aggregator.h"
29
30 using namespace std;
31 using namespace testing::ext;
32 using namespace DistributedDB;
33
34 namespace {
35 constexpr const char *TABLE_NAME = "Table";
GetFields()36 std::vector<Field> GetFields()
37 {
38 return {
39 {
40 .colName = "col1",
41 .type = TYPE_INDEX<int64_t>,
42 .primary = true,
43 .nullable = false
44 },
45 {
46 .colName = "col2",
47 .type = TYPE_INDEX<std::string>,
48 .primary = false
49 },
50 {
51 .colName = "col3",
52 .type = TYPE_INDEX<Bytes>,
53 .primary = false
54 }
55 };
56 }
57
ModifyRecords(std::vector<VBucket> & expectRecord)58 void ModifyRecords(std::vector<VBucket> &expectRecord)
59 {
60 std::vector<VBucket> tempRecord;
61 for (const auto &record: expectRecord) {
62 VBucket bucket;
63 for (auto &[field, val] : record) {
64 LOGD("modify field %s", field.c_str());
65 if (val.index() == TYPE_INDEX<int64_t>) {
66 int64_t v = std::get<int64_t>(val);
67 bucket.insert({ field, static_cast<int64_t>(v + 1) });
68 } else {
69 bucket.insert({ field, val });
70 }
71 }
72 tempRecord.push_back(bucket);
73 }
74 expectRecord = tempRecord;
75 }
76
Sync(CloudSyncer * cloudSyncer,int & callCount)77 DBStatus Sync(CloudSyncer *cloudSyncer, int &callCount)
78 {
79 std::mutex processMutex;
80 std::condition_variable cv;
81 SyncProcess syncProcess;
82 const auto callback = [&callCount, &syncProcess, &processMutex, &cv](
83 const std::map<std::string, SyncProcess> &process) {
84 {
85 std::lock_guard<std::mutex> autoLock(processMutex);
86 syncProcess = process.begin()->second;
87 if (!process.empty()) {
88 syncProcess = process.begin()->second;
89 } else {
90 SyncProcess tmpProcess;
91 syncProcess = tmpProcess;
92 }
93 callCount++;
94 }
95 cv.notify_all();
96 };
97 EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, callback, 0), E_OK);
98 {
99 LOGI("begin to wait sync");
100 std::unique_lock<std::mutex> uniqueLock(processMutex);
101 cv.wait(uniqueLock, [&syncProcess]() {
102 return syncProcess.process == ProcessStatus::FINISHED;
103 });
104 LOGI("end to wait sync");
105 }
106 return syncProcess.errCode;
107 }
108
109 class DistributedDBCloudDBProxyTest : public testing::Test {
110 public:
111 static void SetUpTestCase();
112 static void TearDownTestCase();
113 void SetUp() override;
114 void TearDown() override;
115
116 protected:
117 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
118 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
119 };
120
SetUpTestCase()121 void DistributedDBCloudDBProxyTest::SetUpTestCase()
122 {
123 }
124
TearDownTestCase()125 void DistributedDBCloudDBProxyTest::TearDownTestCase()
126 {
127 }
128
SetUp()129 void DistributedDBCloudDBProxyTest::SetUp()
130 {
131 DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
132 virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
133 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
134 ASSERT_TRUE(communicatorAggregator_ != nullptr);
135 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
136 }
137
TearDown()138 void DistributedDBCloudDBProxyTest::TearDown()
139 {
140 virtualCloudDb_ = nullptr;
141 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
142 communicatorAggregator_ = nullptr;
143 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
144 }
145
146 /**
147 * @tc.name: CloudDBProxyTest001
148 * @tc.desc: Verify cloud db init and close function.
149 * @tc.type: FUNC
150 * @tc.require:
151 * @tc.author: zhangqiquan
152 */
153 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest001, TestSize.Level0)
154 {
155 /**
156 * @tc.steps: step1. set cloud db to proxy
157 * @tc.expected: step1. E_OK
158 */
159 CloudDBProxy proxy;
160 proxy.SetCloudDB(virtualCloudDb_);
161 /**
162 * @tc.steps: step2. proxy close cloud db with cloud error
163 * @tc.expected: step2. -E_CLOUD_ERROR
164 */
165 virtualCloudDb_->SetCloudError(true);
166 EXPECT_EQ(proxy.Close(), -E_CLOUD_ERROR);
167 /**
168 * @tc.steps: step3. proxy close cloud db again
169 * @tc.expected: step3. E_OK because cloud db has been set nullptr
170 */
171 EXPECT_EQ(proxy.Close(), E_OK);
172 virtualCloudDb_->SetCloudError(false);
173 EXPECT_EQ(proxy.Close(), E_OK);
174 }
175
176 /**
177 * @tc.name: CloudDBProxyTest002
178 * @tc.desc: Verify cloud db insert function.
179 * @tc.type: FUNC
180 * @tc.require:
181 * @tc.author: zhangqiquan
182 */
183 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest002, TestSize.Level0)
184 {
185 /**
186 * @tc.steps: step1. set cloud db to proxy
187 * @tc.expected: step1. E_OK
188 */
189 CloudDBProxy proxy;
190 proxy.SetCloudDB(virtualCloudDb_);
191 /**
192 * @tc.steps: step2. insert data to cloud db
193 * @tc.expected: step2. OK
194 */
195 TableSchema schema = {
196 .name = TABLE_NAME,
197 .sharedTableName = "",
198 .fields = GetFields()
199 };
200 std::vector<VBucket> expectRecords = CloudDBDataUtils::GenerateRecords(10, schema); // generate 10 records
201 std::vector<VBucket> expectExtends = CloudDBDataUtils::GenerateExtends(10); // generate 10 extends
202 Info uploadInfo;
203 std::vector<VBucket> insert = expectRecords;
204 EXPECT_EQ(proxy.BatchInsert(TABLE_NAME, insert, expectExtends, uploadInfo), E_OK);
205
206 VBucket extend;
207 extend[CloudDbConstant::CURSOR_FIELD] = std::string("");
208 std::vector<VBucket> actualRecords;
209 EXPECT_EQ(proxy.Query(TABLE_NAME, extend, actualRecords), -E_QUERY_END);
210 /**
211 * @tc.steps: step3. proxy query data
212 * @tc.expected: step3. data is equal to expect
213 */
214 ASSERT_EQ(actualRecords.size(), expectRecords.size());
215 for (size_t i = 0; i < actualRecords.size(); ++i) {
216 for (const auto &field: schema.fields) {
217 Type expect = expectRecords[i][field.colName];
218 Type actual = actualRecords[i][field.colName];
219 EXPECT_EQ(expect.index(), actual.index());
220 }
221 }
222 /**
223 * @tc.steps: step4. proxy close cloud db
224 * @tc.expected: step4. E_OK
225 */
226 EXPECT_EQ(proxy.Close(), E_OK);
227 }
228
229 /**
230 * @tc.name: CloudDBProxyTest003
231 * @tc.desc: Verify cloud db update function.
232 * @tc.type: FUNC
233 * @tc.require:
234 * @tc.author: zhangqiquan
235 */
236 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest003, TestSize.Level0)
237 {
238 TableSchema schema = {
239 .name = TABLE_NAME,
240 .sharedTableName = "",
241 .fields = GetFields()
242 };
243 /**
244 * @tc.steps: step1. set cloud db to proxy
245 * @tc.expected: step1. E_OK
246 */
247 CloudDBProxy proxy;
248 proxy.SetCloudDB(virtualCloudDb_);
249 /**
250 * @tc.steps: step2. insert data to cloud db
251 * @tc.expected: step2. OK
252 */
253 std::vector<VBucket> expectRecords = CloudDBDataUtils::GenerateRecords(10, schema); // generate 10 records
254 std::vector<VBucket> expectExtends = CloudDBDataUtils::GenerateExtends(10); // generate 10 extends
255 Info uploadInfo;
256 std::vector<VBucket> insert = expectRecords;
257 EXPECT_EQ(proxy.BatchInsert(TABLE_NAME, insert, expectExtends, uploadInfo), E_OK);
258 /**
259 * @tc.steps: step3. update data to cloud db
260 * @tc.expected: step3. E_OK
261 */
262 ModifyRecords(expectRecords);
263 std::vector<VBucket> update = expectRecords;
264 EXPECT_EQ(proxy.BatchUpdate(TABLE_NAME, update, expectExtends, uploadInfo), E_OK);
265 /**
266 * @tc.steps: step3. proxy close cloud db
267 * @tc.expected: step3. E_OK
268 */
269 VBucket extend;
270 extend[CloudDbConstant::CURSOR_FIELD] = std::string("");
271 std::vector<VBucket> actualRecords;
272 EXPECT_EQ(proxy.Query(TABLE_NAME, extend, actualRecords), -E_QUERY_END);
273 ASSERT_EQ(actualRecords.size(), expectRecords.size());
274 for (size_t i = 0; i < actualRecords.size(); ++i) {
275 for (const auto &field: schema.fields) {
276 Type expect = expectRecords[i][field.colName];
277 Type actual = actualRecords[i][field.colName];
278 EXPECT_EQ(expect.index(), actual.index());
279 }
280 }
281 /**
282 * @tc.steps: step4. proxy close cloud db
283 * @tc.expected: step4. E_OK
284 */
285 EXPECT_EQ(proxy.Close(), E_OK);
286 }
287
288 /**
289 * @tc.name: CloudDBProxyTest005
290 * @tc.desc: Verify sync failed after cloud error.
291 * @tc.type: FUNC
292 * @tc.require:
293 * @tc.author: zhangqiquan
294 */
295 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest005, TestSize.Level0)
296 {
297 /**
298 * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
299 * @tc.expected: step1. E_OK
300 */
301 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
302 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
303 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
304 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
305 ASSERT_NE(cloudSyncer, nullptr);
306 cloudSyncer->SetCloudDB(virtualCloudDb_);
307 cloudSyncer->SetSyncAction(true, false);
308 virtualCloudDb_->SetCloudError(true);
309 /**
310 * @tc.steps: step2. call sync and wait sync finish
311 * @tc.expected: step2. CLOUD_ERROR by lock error
312 */
313 int callCount = 0;
314 EXPECT_EQ(Sync(cloudSyncer, callCount), CLOUD_ERROR);
315 /**
316 * @tc.steps: step3. get cloud lock status and heartbeat count
317 * @tc.expected: step3. cloud is unlock and no heartbeat
318 */
319 EXPECT_FALSE(virtualCloudDb_->GetLockStatus());
320 EXPECT_GE(virtualCloudDb_->GetHeartbeatCount(), 0);
321 virtualCloudDb_->ClearHeartbeatCount();
322 cloudSyncer->Close();
323 RefObject::KillAndDecObjRef(cloudSyncer);
324 }
325
326 /**
327 * @tc.name: CloudDBProxyTest008
328 * @tc.desc: Verify cloud db heartbeat with diff status.
329 * @tc.type: FUNC
330 * @tc.require:
331 * @tc.author: zhangqiquan
332 */
333 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest008, TestSize.Level0)
334 {
335 /**
336 * @tc.steps: step1. set cloud db to proxy
337 * @tc.expected: step1. E_OK
338 */
339 CloudDBProxy proxy;
340 proxy.SetCloudDB(virtualCloudDb_);
341 /**
342 * @tc.steps: step2. proxy heartbeat with diff status
343 */
344 virtualCloudDb_->SetActionStatus(CLOUD_NETWORK_ERROR);
345 int errCode = proxy.HeartBeat();
346 EXPECT_EQ(errCode, -E_CLOUD_NETWORK_ERROR);
347 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_NETWORK_ERROR);
348
349 virtualCloudDb_->SetActionStatus(CLOUD_SYNC_UNSET);
350 errCode = proxy.HeartBeat();
351 EXPECT_EQ(errCode, -E_CLOUD_SYNC_UNSET);
352 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_SYNC_UNSET);
353
354 virtualCloudDb_->SetActionStatus(CLOUD_FULL_RECORDS);
355 errCode = proxy.HeartBeat();
356 EXPECT_EQ(errCode, -E_CLOUD_FULL_RECORDS);
357 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_FULL_RECORDS);
358
359 virtualCloudDb_->SetActionStatus(CLOUD_LOCK_ERROR);
360 errCode = proxy.HeartBeat();
361 EXPECT_EQ(errCode, -E_CLOUD_LOCK_ERROR);
362 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_LOCK_ERROR);
363
364 virtualCloudDb_->SetActionStatus(DB_ERROR);
365 errCode = proxy.HeartBeat();
366 EXPECT_EQ(errCode, -E_CLOUD_ERROR);
367 EXPECT_EQ(TransferDBErrno(errCode), CLOUD_ERROR);
368
369 /**
370 * @tc.steps: step3. proxy close cloud db
371 * @tc.expected: step3. E_OK
372 */
373 EXPECT_EQ(proxy.Close(), E_OK);
374 }
375
376 /**
377 * @tc.name: CloudDBProxyTest009
378 * @tc.desc: Verify cloud db closed and current task exit .
379 * @tc.type: FUNC
380 * @tc.require:
381 * @tc.author: zhangqiquan
382 */
383 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest009, TestSize.Level3)
384 {
385 /**
386 * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
387 * @tc.expected: step1. E_OK
388 */
389 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
390 ASSERT_NE(iCloud, nullptr);
391 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
392 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
393 EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
394 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
395 ASSERT_NE(cloudSyncer, nullptr);
396 cloudSyncer->SetCloudDB(virtualCloudDb_);
397 cloudSyncer->SetSyncAction(true, false);
__anon9cf3a58a0402() 398 cloudSyncer->SetDownloadFunc([]() {
399 std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s
400 return -E_CLOUD_ERROR;
401 });
402 /**
403 * @tc.steps: step2. call sync and wait sync finish
404 * @tc.expected: step2. E_OK
405 */
406 std::mutex processMutex;
407 bool finished = false;
408 std::condition_variable cv;
409 LOGI("[CloudDBProxyTest009] Call cloud sync");
__anon9cf3a58a0502(const std::map<std::string, SyncProcess> &process) 410 const auto callback = [&finished, &processMutex, &cv](const std::map<std::string, SyncProcess> &process) {
411 {
412 std::lock_guard<std::mutex> autoLock(processMutex);
413 for (const auto &item: process) {
414 if (item.second.process == DistributedDB::FINISHED) {
415 finished = true;
416 EXPECT_EQ(item.second.errCode, DB_CLOSED);
417 }
418 }
419 }
420 cv.notify_all();
421 };
422 EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, callback, 0), E_OK);
423 std::this_thread::sleep_for(std::chrono::seconds(1));
424 cloudSyncer->Close();
425 {
426 LOGI("[CloudDBProxyTest009] begin to wait sync");
427 std::unique_lock<std::mutex> uniqueLock(processMutex);
__anon9cf3a58a0602() 428 cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&finished]() {
429 return finished;
430 });
431 LOGI("[CloudDBProxyTest009] end to wait sync");
432 }
433 RefObject::KillAndDecObjRef(cloudSyncer);
434 }
435
436 /**
437 * @tc.name: CloudDBProxyTest010
438 * @tc.desc: Verify cloud db lock with diff status.
439 * @tc.type: FUNC
440 * @tc.require:
441 * @tc.author: zhangqiquan
442 */
443 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest010, TestSize.Level0)
444 {
445 /**
446 * @tc.steps: step1. set cloud db to proxy
447 * @tc.expected: step1. E_OK
448 */
449 CloudDBProxy proxy;
450 proxy.SetCloudDB(virtualCloudDb_);
451 /**
452 * @tc.steps: step2. proxy lock with diff status
453 */
454 virtualCloudDb_->SetActionStatus(CLOUD_NETWORK_ERROR);
455 auto ret = proxy.Lock();
456 EXPECT_EQ(ret.first, -E_CLOUD_NETWORK_ERROR);
457 EXPECT_EQ(TransferDBErrno(ret.first), CLOUD_NETWORK_ERROR);
458
459 virtualCloudDb_->SetActionStatus(CLOUD_LOCK_ERROR);
460 ret = proxy.Lock();
461 EXPECT_EQ(ret.first, -E_CLOUD_LOCK_ERROR);
462 EXPECT_EQ(TransferDBErrno(ret.first), CLOUD_LOCK_ERROR);
463 /**
464 * @tc.steps: step3. proxy close cloud db
465 * @tc.expected: step3. E_OK
466 */
467 EXPECT_EQ(proxy.Close(), E_OK);
468 }
469
470 /**
471 * @tc.name: CloudDBProxyTest008
472 * @tc.desc: Verify cloud db heartbeat with diff status.
473 * @tc.type: FUNC
474 * @tc.require:
475 * @tc.author: zhangqiquan
476 */
477 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest011, TestSize.Level2)
478 {
479 /**
480 * @tc.steps: step1. set cloud db to proxy
481 * @tc.expected: step1. E_OK
482 */
483 CloudDBProxy proxy;
484 proxy.SetCloudDB(virtualCloudDb_);
485 virtualCloudDb_->SetHeartbeatBlockTime(100); // block 100 ms
486 std::mutex waitMutex;
487 std::condition_variable waitCv;
488 const int scheduleCount = 12;
489 int currentCount = 0;
490 for (int i = 0; i < scheduleCount; ++i) {
__anon9cf3a58a0702() 491 RuntimeContext::GetInstance()->ScheduleTask([&proxy, &waitMutex, &waitCv, ¤tCount]() {
492 proxy.HeartBeat();
493 {
494 std::lock_guard<std::mutex> autoLock(waitMutex);
495 currentCount++;
496 LOGI("[CloudDBProxyTest011] CurrentCount %d", currentCount);
497 }
498 waitCv.notify_all();
499 });
500 }
501 LOGI("[CloudDBProxyTest011] Begin wait all task finish");
502 std::unique_lock<std::mutex> uniqueLock(waitMutex);
__anon9cf3a58a0802() 503 waitCv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MAX_TIMEOUT), [¤tCount, scheduleCount]() {
504 return currentCount >= scheduleCount;
505 });
506 LOGI("[CloudDBProxyTest011] End wait all task finish");
507 EXPECT_EQ(currentCount, scheduleCount);
508 }
509
510 /**
511 * @tc.name: CloudDBProxyTest012
512 * @tc.desc: Asset data deduplication.
513 * @tc.type: FUNC
514 * @tc.require:
515 * @tc.author: tankaisheng
516 */
517 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest012, TestSize.Level2)
518 {
519 /**
520 * @tc.steps: step1. set cloud db to proxy
521 * @tc.expected: step1. E_OK
522 */
523 Assets assets;
524 Asset asset1;
525 asset1.name = "assetName1";
526 asset1.assetId = "";
527 asset1.modifyTime = "20240730";
528 assets.push_back(asset1);
529
530 Asset asset2;
531 asset2.name = "assetName1";
532 asset2.assetId = "1";
533 asset2.modifyTime = "20240730";
534 assets.push_back(asset2);
535
536 Asset asset3;
537 asset3.name = "assetName2";
538 asset3.assetId = "2";
539 asset3.modifyTime = "20240730";
540 assets.push_back(asset3);
541
542 Asset asset4;
543 asset4.name = "assetName2";
544 asset4.assetId = "3";
545 asset4.modifyTime = "20240731";
546 assets.push_back(asset4);
547
548 Asset asset5;
549 asset5.name = "assetName3";
550 asset5.assetId = "4";
551 asset5.modifyTime = "20240730";
552 assets.push_back(asset5);
553
554 Asset asset6;
555 asset6.name = "assetName3";
556 asset6.assetId = "5";
557 asset6.modifyTime = "20240730";
558 assets.push_back(asset6);
559
560 Asset asset7;
561 asset7.name = "assetName1";
562 asset7.assetId = "6";
563 asset7.modifyTime = "20240731";
564 assets.push_back(asset7);
565
566 DBCommon::RemoveDuplicateAssetsData(assets);
567
568 /**
569 * @tc.steps: step2. check data
570 * @tc.expected: step2. E_OK
571 */
572 std::string assetNameArr[] = {"assetName2", "assetName3", "assetName1"};
573 std::string assetIdArr[] = {"3", "5", "6"};
574 EXPECT_EQ(assets.size(), 3u);
575 for (std::vector<DistributedDB::Asset>::size_type i = 0; i < assets.size(); ++i) {
576 EXPECT_EQ(assets.at(i).name, assetNameArr[i]);
577 EXPECT_EQ(assets.at(i).assetId, assetIdArr[i]);
578 }
579 }
580
581 /**
582 * @tc.name: CloudDBProxyTest014
583 * @tc.desc: Test asset deduplication with empty assetId.
584 * @tc.type: FUNC
585 * @tc.require:
586 * @tc.author: liaoyonghuang
587 */
588 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest014, TestSize.Level0)
589 {
590 /**
591 * @tc.steps: step1. set cloud db to proxy
592 * @tc.expected: step1. E_OK
593 */
594 Assets assets;
595 Asset asset1;
596 asset1.name = "assetName";
597 asset1.assetId = "";
598 asset1.modifyTime = "1";
599 assets.push_back(asset1);
600
601 Asset asset2;
602 asset2.name = "assetName";
603 asset2.assetId = "";
604 asset2.modifyTime = "3";
605 assets.push_back(asset2);
606
607 Asset asset3;
608 asset3.name = "assetName";
609 asset3.assetId = "";
610 asset3.modifyTime = "2";
611 assets.push_back(asset3);
612
613 /**
614 * @tc.steps: step2. Remove duplicate assets and check data
615 * @tc.expected: step2. E_OK
616 */
617 DBCommon::RemoveDuplicateAssetsData(assets);
618 ASSERT_EQ(assets.size(), 1u);
619 EXPECT_EQ(assets[0].modifyTime, "3");
620 }
621
622 /**
623 * @tc.name: CloudSyncQueue001
624 * @tc.desc: Verify sync task count decrease after sync finished.
625 * @tc.type: FUNC
626 * @tc.require:
627 * @tc.author: zhangqiquan
628 */
629 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncQueue001, TestSize.Level2)
630 {
631 /**
632 * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
633 * @tc.expected: step1. E_OK
634 */
635 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
636 ASSERT_NE(iCloud, nullptr);
637 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
638 ASSERT_NE(cloudSyncer, nullptr);
639 EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
640 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
641 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
642 cloudSyncer->SetCloudDB(virtualCloudDb_);
643 cloudSyncer->SetSyncAction(true, false);
__anon9cf3a58a0902() 644 cloudSyncer->SetDownloadFunc([cloudSyncer]() {
645 EXPECT_EQ(cloudSyncer->GetQueueCount(), 1u);
646 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s
647 return E_OK;
648 });
649 /**
650 * @tc.steps: step2. call sync and wait sync finish
651 */
652 int callCount = 0;
653 EXPECT_EQ(Sync(cloudSyncer, callCount), OK);
654 RuntimeContext::GetInstance()->StopTaskPool();
655 EXPECT_EQ(callCount, 1);
656 RefObject::KillAndDecObjRef(cloudSyncer);
657 }
658
659 /**
660 * @tc.name: CloudSyncQueue002
661 * @tc.desc: Verify sync task abort after close.
662 * @tc.type: FUNC
663 * @tc.require:
664 * @tc.author: zhangqiquan
665 */
666 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncQueue002, TestSize.Level2)
667 {
668 /**
669 * @tc.steps: step1. set cloud db to proxy and sleep 2s when download
670 * @tc.expected: step1. E_OK
671 */
672 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
673 ASSERT_NE(iCloud, nullptr);
674 EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
675 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
676 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
677 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
678 ASSERT_NE(cloudSyncer, nullptr);
679 cloudSyncer->SetCloudDB(virtualCloudDb_);
680 cloudSyncer->SetSyncAction(true, false);
681 std::atomic<bool> close = false;
__anon9cf3a58a0a02() 682 cloudSyncer->SetDownloadFunc([cloudSyncer, &close]() {
683 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s
684 cloudSyncer->PauseCurrentTask();
685 EXPECT_TRUE(close);
686 return -E_TASK_PAUSED;
687 });
688 /**
689 * @tc.steps: step2. call sync and wait sync finish
690 */
691 EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, nullptr, 0), E_OK);
692 std::this_thread::sleep_for(std::chrono::seconds(1));
693 close = true;
694 cloudSyncer->Close();
695 RuntimeContext::GetInstance()->StopTaskPool();
696 RefObject::KillAndDecObjRef(cloudSyncer);
697 }
698
699 /**
700 * @tc.name: CloudSyncerTest001
701 * @tc.desc: Verify syncer notify by queue schedule.
702 * @tc.type: FUNC
703 * @tc.require:
704 * @tc.author: zhangqiquan
705 */
706 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncerTest001, TestSize.Level2)
707 {
708 /**
709 * @tc.steps: step1. set cloud db to proxy
710 * @tc.expected: step1. E_OK
711 */
712 auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
713 EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
714 EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
715 EXPECT_CALL(*iCloud, GetIdentify).WillRepeatedly(testing::Return("CloudSyncerTest001"));
716 auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
717 std::atomic<int> callCount = 0;
718 std::condition_variable cv;
__anon9cf3a58a0b02(const std::map<std::string, SyncProcess> &) 719 cloudSyncer->SetCurrentTaskInfo([&callCount, &cv](const std::map<std::string, SyncProcess> &) {
720 callCount++;
721 int before = callCount;
722 LOGD("on callback %d", before);
723 std::this_thread::sleep_for(std::chrono::seconds(1));
724 EXPECT_EQ(before, callCount);
725 cv.notify_all();
726 }, 1u);
727 const int notifyCount = 2;
728 for (int i = 0; i < notifyCount; ++i) {
729 cloudSyncer->Notify();
730 }
731 cloudSyncer->SetCurrentTaskInfo(nullptr, 0); // 0 is invalid task id
732 std::mutex processMutex;
733 std::unique_lock<std::mutex> uniqueLock(processMutex);
__anon9cf3a58a0c02() 734 cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&callCount]() {
735 return callCount == notifyCount;
736 });
737 cloudSyncer->Close();
738 RefObject::KillAndDecObjRef(cloudSyncer);
739 }
740
741 /**
742 * @tc.name: SameBatchTest001
743 * @tc.desc: Verify update cache in same batch.
744 * @tc.type: FUNC
745 * @tc.require:
746 * @tc.author: zhangqiquan
747 */
748 HWTEST_F(DistributedDBCloudDBProxyTest, SameBatchTest001, TestSize.Level0)
749 {
750 std::map<std::string, LogInfo> localLogInfoCache;
751 LogInfo cloudInfo;
752 LogInfo localInfo;
753 localInfo.hashKey = {'k'};
754 cloudInfo.cloudGid = "gid";
755 /**
756 * @tc.steps: step1. insert cloud into local
757 * @tc.expected: step1. local cache has gid
758 */
759 CloudSyncUtils::UpdateLocalCache(OpType::INSERT, cloudInfo, localInfo, localLogInfoCache);
760 std::string hashKey(localInfo.hashKey.begin(), localInfo.hashKey.end());
761 EXPECT_EQ(localLogInfoCache[hashKey].cloudGid, cloudInfo.cloudGid);
762 /**
763 * @tc.steps: step2. delete local
764 * @tc.expected: step2. local flag is delete
765 */
766 CloudSyncUtils::UpdateLocalCache(OpType::DELETE, cloudInfo, localInfo, localLogInfoCache);
767 EXPECT_EQ(localLogInfoCache[hashKey].flag, static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE));
768 }
769
770 /**
771 * @tc.name: SameBatchTest002
772 * @tc.desc: Verify cal opType in same batch.
773 * @tc.type: FUNC
774 * @tc.require:
775 * @tc.author: zhangqiquan
776 */
777 HWTEST_F(DistributedDBCloudDBProxyTest, SameBatchTest002, TestSize.Level0)
778 {
779 /**
780 * @tc.steps: step1. prepare two data with same pk
781 */
782 ICloudSyncer::SyncParam param;
783 param.downloadData.opType.push_back(OpType::INSERT);
784 param.downloadData.opType.push_back(OpType::UPDATE);
785 const std::string pkField = "pk";
786 param.changedData.field.push_back(pkField);
787 VBucket oneRow;
788 oneRow[pkField] = static_cast<int64_t>(1); // 1 is pk
789 param.downloadData.data.push_back(oneRow);
790 param.downloadData.data.push_back(oneRow);
791 /**
792 * @tc.steps: step2. cal opType by utils
793 * @tc.expected: step2. all type should be INSERT
794 */
795 for (size_t i = 0; i < param.downloadData.data.size(); ++i) {
796 EXPECT_EQ(CloudSyncUtils::CalOpType(param, i), OpType::INSERT);
797 }
798 /**
799 * @tc.steps: step3. cal opType by utils
800 * @tc.expected: step3. should be UPDATE because diff pk
801 */
802 oneRow[pkField] = static_cast<int64_t>(2); // 2 is pk
803 param.downloadData.data.push_back(oneRow);
804 param.downloadData.opType.push_back(OpType::UPDATE);
805 // index start with zero
806 EXPECT_EQ(CloudSyncUtils::CalOpType(param, param.downloadData.data.size() - 1), OpType::UPDATE);
807 }
808
809 /**
810 * @tc.name: CloudDBProxyTest013
811 * @tc.desc: Verify CloudDBProxy interfaces.
812 * @tc.type: FUNC
813 * @tc.require: DTS2024073106613
814 * @tc.author: suyue
815 */
816 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest013, TestSize.Level0)
817 {
818 /**
819 * @tc.steps: step1. call CloudDBProxy interfaces when ICloudDb is nullptr.
820 * @tc.expected: step1. return -E_CLOUD_ERROR.
821 */
822 CloudDBProxy proxy;
823 int ret = proxy.UnLock();
824 EXPECT_EQ(ret, -E_CLOUD_ERROR);
825 ret = proxy.HeartBeat();
826 EXPECT_EQ(ret, -E_CLOUD_ERROR);
827 VBucket extend;
828 const std::string tableName = "test";
829 std::vector<VBucket> record;
830 ret = proxy.Query(tableName, extend, record);
831 EXPECT_EQ(ret, -E_CLOUD_ERROR);
832 Info info;
833 ret = proxy.BatchInsert(tableName, record, record, info);
834 EXPECT_EQ(ret, -E_CLOUD_ERROR);
835 ret = proxy.BatchUpdate(tableName, record, record, info);
836 EXPECT_EQ(ret, -E_CLOUD_ERROR);
837 ret = proxy.BatchDelete(tableName, record, record, info);
838 EXPECT_EQ(ret, -E_CLOUD_ERROR);
839 std::pair<int, uint64_t> res = proxy.Lock();
840 EXPECT_EQ(res.first, -E_CLOUD_ERROR);
841 std::pair<int, std::string> cursor = proxy.GetEmptyCursor(tableName);
842 EXPECT_EQ(cursor.first, -E_CLOUD_ERROR);
843
844 /**
845 * @tc.steps: step2. call CloudDBProxy interfaces when para is err.
846 * @tc.expected: step2. return fail.
847 */
848 std::pair<int, std::string> ver = proxy.GetCloudVersion("test");
849 EXPECT_EQ(ver.first, -E_NOT_SUPPORT);
850 std::vector<Asset> assets;
851 ret = proxy.RemoveLocalAssets(assets);
852 EXPECT_EQ(ret, -E_OK);
853 assets = {{}};
854 ret = proxy.RemoveLocalAssets(assets);
855 EXPECT_EQ(ret, -E_OK);
856 }
857
858 /**
859 * @tc.name: CloudSyncUtilsTest
860 * @tc.desc: Verify CloudSyncUtils interfaces
861 * @tc.type: FUNC
862 * @tc.require: DTS2024073106613
863 * @tc.author: suyue
864 */
865 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncUtilsTest, TestSize.Level0)
866 {
867 /**
868 * @tc.steps: step1. Test type translation interfaces.
869 * @tc.expected: step1. success.
870 */
871 CloudSyncUtils utilsObj;
872 EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::INSERT), AssetOpType::INSERT);
873 EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::DELETE), AssetOpType::DELETE);
874 EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::UPDATE), AssetOpType::UPDATE);
875 EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::NORMAL), AssetOpType::NO_CHANGE);
876 EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::DOWNLOADING), AssetOpType::NO_CHANGE);
877 EXPECT_EQ(utilsObj.OpTypeToChangeType(OpType::ONLY_UPDATE_GID), ChangeType::OP_BUTT);
878
879 /**
880 * @tc.steps: step2. call CloudSyncUtils interfaces when para is err.
881 * @tc.expected: step2. return false.
882 */
883 const std::vector<DeviceID> devices = {"test"};
884 int mode = 10; // set metaMode to 10 not in enum class MetaMode
885 int ret = utilsObj.CheckParamValid(devices, static_cast<SyncMode>(mode));
886 EXPECT_EQ(ret, -E_INVALID_ARGS);
887 VBucket record;
888 const std::vector<std::string> pkColNames;
889 std::vector<Type> cloudPkVals = {{}};
890 ret = utilsObj.GetCloudPkVals(record, pkColNames, 0, cloudPkVals);
891 EXPECT_EQ(ret, -E_INVALID_ARGS);
892 Assets assets = {{}};
893 utilsObj.StatusToFlagForAssets(assets);
894 std::vector<Field> fields = {{"test", TYPE_INDEX<Assets>, true, true}};
895 utilsObj.StatusToFlagForAssetsInRecord(fields, record);
896 Timestamp timestamp;
897 CloudSyncData uploadData;
898 const int64_t count = 0;
899 ret = utilsObj.UpdateExtendTime(uploadData, count, 0, timestamp);
900 EXPECT_EQ(ret, -E_INTERNAL_ERROR);
901 CloudSyncBatch data;
902 data.assets = {{}};
903 ret = utilsObj.FillAssetIdToAssets(data, 0, CloudWaterType::UPDATE);
904 EXPECT_EQ(ret, -E_CLOUD_ERROR);
905
906 /**
907 * @tc.steps: step3. call IsChangeDataEmpty interface when para is different.
908 * @tc.expected: step3. success.
909 */
910 ChangedData changedData;
911 EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), true);
912 changedData.primaryData[OP_INSERT] = {{}};
913 EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), true);
914 changedData.primaryData[OP_UPDATE] = {{}};
915 EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), true);
916 changedData.primaryData[OP_DELETE] = {{}};
917 EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), false);
918 }
919 }