1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <gtest/gtest.h>
16 
17 #include <utility>
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud/cloud_db_data_utils.h"
20 #include "cloud/cloud_db_proxy.h"
21 #include "cloud/cloud_db_types.h"
22 #include "cloud/cloud_sync_utils.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_errno.h"
25 #include "mock_icloud_sync_storage_interface.h"
26 #include "virtual_cloud_db.h"
27 #include "virtual_cloud_syncer.h"
28 #include "virtual_communicator_aggregator.h"
29 
30 using namespace std;
31 using namespace testing::ext;
32 using namespace DistributedDB;
33 
34 namespace {
35 constexpr const char *TABLE_NAME = "Table";
GetFields()36 std::vector<Field> GetFields()
37 {
38     return {
39         {
40             .colName = "col1",
41             .type = TYPE_INDEX<int64_t>,
42             .primary = true,
43             .nullable = false
44         },
45         {
46             .colName = "col2",
47             .type = TYPE_INDEX<std::string>,
48             .primary = false
49         },
50         {
51             .colName = "col3",
52             .type = TYPE_INDEX<Bytes>,
53             .primary = false
54         }
55     };
56 }
57 
ModifyRecords(std::vector<VBucket> & expectRecord)58 void ModifyRecords(std::vector<VBucket> &expectRecord)
59 {
60     std::vector<VBucket> tempRecord;
61     for (const auto &record: expectRecord) {
62         VBucket bucket;
63         for (auto &[field, val] : record) {
64             LOGD("modify field %s", field.c_str());
65             if (val.index() == TYPE_INDEX<int64_t>) {
66                 int64_t v = std::get<int64_t>(val);
67                 bucket.insert({ field, static_cast<int64_t>(v + 1) });
68             } else {
69                 bucket.insert({ field, val });
70             }
71         }
72         tempRecord.push_back(bucket);
73     }
74     expectRecord = tempRecord;
75 }
76 
Sync(CloudSyncer * cloudSyncer,int & callCount)77 DBStatus Sync(CloudSyncer *cloudSyncer, int &callCount)
78 {
79     std::mutex processMutex;
80     std::condition_variable cv;
81     SyncProcess syncProcess;
82     const auto callback = [&callCount, &syncProcess, &processMutex, &cv](
83         const std::map<std::string, SyncProcess> &process) {
84         {
85             std::lock_guard<std::mutex> autoLock(processMutex);
86             syncProcess = process.begin()->second;
87             if (!process.empty()) {
88                 syncProcess = process.begin()->second;
89             } else {
90                 SyncProcess tmpProcess;
91                 syncProcess = tmpProcess;
92             }
93             callCount++;
94         }
95         cv.notify_all();
96     };
97     EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, callback, 0), E_OK);
98     {
99         LOGI("begin to wait sync");
100         std::unique_lock<std::mutex> uniqueLock(processMutex);
101         cv.wait(uniqueLock, [&syncProcess]() {
102             return syncProcess.process == ProcessStatus::FINISHED;
103         });
104         LOGI("end to wait sync");
105     }
106     return syncProcess.errCode;
107 }
108 
109 class DistributedDBCloudDBProxyTest : public testing::Test {
110 public:
111     static void SetUpTestCase();
112     static void TearDownTestCase();
113     void SetUp() override;
114     void TearDown() override;
115 
116 protected:
117     std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
118     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
119 };
120 
SetUpTestCase()121 void DistributedDBCloudDBProxyTest::SetUpTestCase()
122 {
123 }
124 
TearDownTestCase()125 void DistributedDBCloudDBProxyTest::TearDownTestCase()
126 {
127 }
128 
SetUp()129 void DistributedDBCloudDBProxyTest::SetUp()
130 {
131     DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
132     virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
133     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
134     ASSERT_TRUE(communicatorAggregator_ != nullptr);
135     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
136 }
137 
TearDown()138 void DistributedDBCloudDBProxyTest::TearDown()
139 {
140     virtualCloudDb_ = nullptr;
141     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
142     communicatorAggregator_ = nullptr;
143     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
144 }
145 
146 /**
147  * @tc.name: CloudDBProxyTest001
148  * @tc.desc: Verify cloud db init and close function.
149  * @tc.type: FUNC
150  * @tc.require:
151  * @tc.author: zhangqiquan
152  */
153 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest001, TestSize.Level0)
154 {
155     /**
156      * @tc.steps: step1. set cloud db to proxy
157      * @tc.expected: step1. E_OK
158      */
159     CloudDBProxy proxy;
160     proxy.SetCloudDB(virtualCloudDb_);
161     /**
162      * @tc.steps: step2. proxy close cloud db with cloud error
163      * @tc.expected: step2. -E_CLOUD_ERROR
164      */
165     virtualCloudDb_->SetCloudError(true);
166     EXPECT_EQ(proxy.Close(), -E_CLOUD_ERROR);
167     /**
168      * @tc.steps: step3. proxy close cloud db again
169      * @tc.expected: step3. E_OK because cloud db has been set nullptr
170      */
171     EXPECT_EQ(proxy.Close(), E_OK);
172     virtualCloudDb_->SetCloudError(false);
173     EXPECT_EQ(proxy.Close(), E_OK);
174 }
175 
176 /**
177  * @tc.name: CloudDBProxyTest002
178  * @tc.desc: Verify cloud db insert function.
179  * @tc.type: FUNC
180  * @tc.require:
181  * @tc.author: zhangqiquan
182  */
183 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest002, TestSize.Level0)
184 {
185     /**
186      * @tc.steps: step1. set cloud db to proxy
187      * @tc.expected: step1. E_OK
188      */
189     CloudDBProxy proxy;
190     proxy.SetCloudDB(virtualCloudDb_);
191     /**
192      * @tc.steps: step2. insert data to cloud db
193      * @tc.expected: step2. OK
194      */
195     TableSchema schema = {
196         .name = TABLE_NAME,
197         .sharedTableName = "",
198         .fields = GetFields()
199     };
200     std::vector<VBucket> expectRecords = CloudDBDataUtils::GenerateRecords(10, schema); // generate 10 records
201     std::vector<VBucket> expectExtends = CloudDBDataUtils::GenerateExtends(10); // generate 10 extends
202     Info uploadInfo;
203     std::vector<VBucket> insert = expectRecords;
204     EXPECT_EQ(proxy.BatchInsert(TABLE_NAME, insert, expectExtends, uploadInfo), E_OK);
205 
206     VBucket extend;
207     extend[CloudDbConstant::CURSOR_FIELD] = std::string("");
208     std::vector<VBucket> actualRecords;
209     EXPECT_EQ(proxy.Query(TABLE_NAME, extend, actualRecords), -E_QUERY_END);
210     /**
211      * @tc.steps: step3. proxy query data
212      * @tc.expected: step3. data is equal to expect
213      */
214     ASSERT_EQ(actualRecords.size(), expectRecords.size());
215     for (size_t i = 0; i < actualRecords.size(); ++i) {
216         for (const auto &field: schema.fields) {
217             Type expect = expectRecords[i][field.colName];
218             Type actual = actualRecords[i][field.colName];
219             EXPECT_EQ(expect.index(), actual.index());
220         }
221     }
222     /**
223      * @tc.steps: step4. proxy close cloud db
224      * @tc.expected: step4. E_OK
225      */
226     EXPECT_EQ(proxy.Close(), E_OK);
227 }
228 
229 /**
230  * @tc.name: CloudDBProxyTest003
231  * @tc.desc: Verify cloud db update function.
232  * @tc.type: FUNC
233  * @tc.require:
234  * @tc.author: zhangqiquan
235  */
236 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest003, TestSize.Level0)
237 {
238     TableSchema schema = {
239         .name = TABLE_NAME,
240         .sharedTableName = "",
241         .fields = GetFields()
242     };
243     /**
244      * @tc.steps: step1. set cloud db to proxy
245      * @tc.expected: step1. E_OK
246      */
247     CloudDBProxy proxy;
248     proxy.SetCloudDB(virtualCloudDb_);
249     /**
250      * @tc.steps: step2. insert data to cloud db
251      * @tc.expected: step2. OK
252      */
253     std::vector<VBucket> expectRecords = CloudDBDataUtils::GenerateRecords(10, schema); // generate 10 records
254     std::vector<VBucket> expectExtends = CloudDBDataUtils::GenerateExtends(10); // generate 10 extends
255     Info uploadInfo;
256     std::vector<VBucket> insert = expectRecords;
257     EXPECT_EQ(proxy.BatchInsert(TABLE_NAME, insert, expectExtends, uploadInfo), E_OK);
258     /**
259      * @tc.steps: step3. update data to cloud db
260      * @tc.expected: step3. E_OK
261      */
262     ModifyRecords(expectRecords);
263     std::vector<VBucket> update = expectRecords;
264     EXPECT_EQ(proxy.BatchUpdate(TABLE_NAME, update, expectExtends, uploadInfo), E_OK);
265     /**
266      * @tc.steps: step3. proxy close cloud db
267      * @tc.expected: step3. E_OK
268      */
269     VBucket extend;
270     extend[CloudDbConstant::CURSOR_FIELD] = std::string("");
271     std::vector<VBucket> actualRecords;
272     EXPECT_EQ(proxy.Query(TABLE_NAME, extend, actualRecords), -E_QUERY_END);
273     ASSERT_EQ(actualRecords.size(), expectRecords.size());
274     for (size_t i = 0; i < actualRecords.size(); ++i) {
275         for (const auto &field: schema.fields) {
276             Type expect = expectRecords[i][field.colName];
277             Type actual = actualRecords[i][field.colName];
278             EXPECT_EQ(expect.index(), actual.index());
279         }
280     }
281     /**
282      * @tc.steps: step4. proxy close cloud db
283      * @tc.expected: step4. E_OK
284      */
285     EXPECT_EQ(proxy.Close(), E_OK);
286 }
287 
288 /**
289  * @tc.name: CloudDBProxyTest005
290  * @tc.desc: Verify sync failed after cloud error.
291  * @tc.type: FUNC
292  * @tc.require:
293  * @tc.author: zhangqiquan
294  */
295 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest005, TestSize.Level0)
296 {
297     /**
298      * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
299      * @tc.expected: step1. E_OK
300      */
301     auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
302     auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
303     EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
304     EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
305     ASSERT_NE(cloudSyncer, nullptr);
306     cloudSyncer->SetCloudDB(virtualCloudDb_);
307     cloudSyncer->SetSyncAction(true, false);
308     virtualCloudDb_->SetCloudError(true);
309     /**
310      * @tc.steps: step2. call sync and wait sync finish
311      * @tc.expected: step2. CLOUD_ERROR by lock error
312      */
313     int callCount = 0;
314     EXPECT_EQ(Sync(cloudSyncer, callCount), CLOUD_ERROR);
315     /**
316      * @tc.steps: step3. get cloud lock status and heartbeat count
317      * @tc.expected: step3. cloud is unlock and no heartbeat
318      */
319     EXPECT_FALSE(virtualCloudDb_->GetLockStatus());
320     EXPECT_GE(virtualCloudDb_->GetHeartbeatCount(), 0);
321     virtualCloudDb_->ClearHeartbeatCount();
322     cloudSyncer->Close();
323     RefObject::KillAndDecObjRef(cloudSyncer);
324 }
325 
326 /**
327  * @tc.name: CloudDBProxyTest008
328  * @tc.desc: Verify cloud db heartbeat with diff status.
329  * @tc.type: FUNC
330  * @tc.require:
331  * @tc.author: zhangqiquan
332  */
333 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest008, TestSize.Level0)
334 {
335     /**
336      * @tc.steps: step1. set cloud db to proxy
337      * @tc.expected: step1. E_OK
338      */
339     CloudDBProxy proxy;
340     proxy.SetCloudDB(virtualCloudDb_);
341     /**
342      * @tc.steps: step2. proxy heartbeat with diff status
343      */
344     virtualCloudDb_->SetActionStatus(CLOUD_NETWORK_ERROR);
345     int errCode = proxy.HeartBeat();
346     EXPECT_EQ(errCode, -E_CLOUD_NETWORK_ERROR);
347     EXPECT_EQ(TransferDBErrno(errCode), CLOUD_NETWORK_ERROR);
348 
349     virtualCloudDb_->SetActionStatus(CLOUD_SYNC_UNSET);
350     errCode = proxy.HeartBeat();
351     EXPECT_EQ(errCode, -E_CLOUD_SYNC_UNSET);
352     EXPECT_EQ(TransferDBErrno(errCode), CLOUD_SYNC_UNSET);
353 
354     virtualCloudDb_->SetActionStatus(CLOUD_FULL_RECORDS);
355     errCode = proxy.HeartBeat();
356     EXPECT_EQ(errCode, -E_CLOUD_FULL_RECORDS);
357     EXPECT_EQ(TransferDBErrno(errCode), CLOUD_FULL_RECORDS);
358 
359     virtualCloudDb_->SetActionStatus(CLOUD_LOCK_ERROR);
360     errCode = proxy.HeartBeat();
361     EXPECT_EQ(errCode, -E_CLOUD_LOCK_ERROR);
362     EXPECT_EQ(TransferDBErrno(errCode), CLOUD_LOCK_ERROR);
363 
364     virtualCloudDb_->SetActionStatus(DB_ERROR);
365     errCode = proxy.HeartBeat();
366     EXPECT_EQ(errCode, -E_CLOUD_ERROR);
367     EXPECT_EQ(TransferDBErrno(errCode), CLOUD_ERROR);
368 
369     /**
370      * @tc.steps: step3. proxy close cloud db
371      * @tc.expected: step3. E_OK
372      */
373     EXPECT_EQ(proxy.Close(), E_OK);
374 }
375 
376 /**
377  * @tc.name: CloudDBProxyTest009
378  * @tc.desc: Verify cloud db closed and current task exit .
379  * @tc.type: FUNC
380  * @tc.require:
381  * @tc.author: zhangqiquan
382  */
383 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest009, TestSize.Level3)
384 {
385     /**
386      * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
387      * @tc.expected: step1. E_OK
388      */
389     auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
390     ASSERT_NE(iCloud, nullptr);
391     EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
392     EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
393     EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
394     auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
395     ASSERT_NE(cloudSyncer, nullptr);
396     cloudSyncer->SetCloudDB(virtualCloudDb_);
397     cloudSyncer->SetSyncAction(true, false);
__anon9cf3a58a0402() 398     cloudSyncer->SetDownloadFunc([]() {
399         std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s
400         return -E_CLOUD_ERROR;
401     });
402     /**
403      * @tc.steps: step2. call sync and wait sync finish
404      * @tc.expected: step2. E_OK
405      */
406     std::mutex processMutex;
407     bool finished = false;
408     std::condition_variable cv;
409     LOGI("[CloudDBProxyTest009] Call cloud sync");
__anon9cf3a58a0502(const std::map<std::string, SyncProcess> &process) 410     const auto callback = [&finished, &processMutex, &cv](const std::map<std::string, SyncProcess> &process) {
411         {
412             std::lock_guard<std::mutex> autoLock(processMutex);
413             for (const auto &item: process) {
414                 if (item.second.process == DistributedDB::FINISHED) {
415                     finished = true;
416                     EXPECT_EQ(item.second.errCode, DB_CLOSED);
417                 }
418             }
419         }
420         cv.notify_all();
421     };
422     EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, callback, 0), E_OK);
423     std::this_thread::sleep_for(std::chrono::seconds(1));
424     cloudSyncer->Close();
425     {
426         LOGI("[CloudDBProxyTest009] begin to wait sync");
427         std::unique_lock<std::mutex> uniqueLock(processMutex);
__anon9cf3a58a0602() 428         cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&finished]() {
429             return finished;
430         });
431         LOGI("[CloudDBProxyTest009] end to wait sync");
432     }
433     RefObject::KillAndDecObjRef(cloudSyncer);
434 }
435 
436 /**
437  * @tc.name: CloudDBProxyTest010
438  * @tc.desc: Verify cloud db lock with diff status.
439  * @tc.type: FUNC
440  * @tc.require:
441  * @tc.author: zhangqiquan
442  */
443 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest010, TestSize.Level0)
444 {
445     /**
446      * @tc.steps: step1. set cloud db to proxy
447      * @tc.expected: step1. E_OK
448      */
449     CloudDBProxy proxy;
450     proxy.SetCloudDB(virtualCloudDb_);
451     /**
452      * @tc.steps: step2. proxy lock with diff status
453      */
454     virtualCloudDb_->SetActionStatus(CLOUD_NETWORK_ERROR);
455     auto ret = proxy.Lock();
456     EXPECT_EQ(ret.first, -E_CLOUD_NETWORK_ERROR);
457     EXPECT_EQ(TransferDBErrno(ret.first), CLOUD_NETWORK_ERROR);
458 
459     virtualCloudDb_->SetActionStatus(CLOUD_LOCK_ERROR);
460     ret = proxy.Lock();
461     EXPECT_EQ(ret.first, -E_CLOUD_LOCK_ERROR);
462     EXPECT_EQ(TransferDBErrno(ret.first), CLOUD_LOCK_ERROR);
463     /**
464      * @tc.steps: step3. proxy close cloud db
465      * @tc.expected: step3. E_OK
466      */
467     EXPECT_EQ(proxy.Close(), E_OK);
468 }
469 
470 /**
471  * @tc.name: CloudDBProxyTest008
472  * @tc.desc: Verify cloud db heartbeat with diff status.
473  * @tc.type: FUNC
474  * @tc.require:
475  * @tc.author: zhangqiquan
476  */
477 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest011, TestSize.Level2)
478 {
479     /**
480      * @tc.steps: step1. set cloud db to proxy
481      * @tc.expected: step1. E_OK
482      */
483     CloudDBProxy proxy;
484     proxy.SetCloudDB(virtualCloudDb_);
485     virtualCloudDb_->SetHeartbeatBlockTime(100); // block 100 ms
486     std::mutex waitMutex;
487     std::condition_variable waitCv;
488     const int scheduleCount = 12;
489     int currentCount = 0;
490     for (int i = 0; i < scheduleCount; ++i) {
__anon9cf3a58a0702() 491         RuntimeContext::GetInstance()->ScheduleTask([&proxy, &waitMutex, &waitCv, &currentCount]() {
492             proxy.HeartBeat();
493             {
494                 std::lock_guard<std::mutex> autoLock(waitMutex);
495                 currentCount++;
496                 LOGI("[CloudDBProxyTest011] CurrentCount %d", currentCount);
497             }
498             waitCv.notify_all();
499         });
500     }
501     LOGI("[CloudDBProxyTest011] Begin wait all task finish");
502     std::unique_lock<std::mutex> uniqueLock(waitMutex);
__anon9cf3a58a0802() 503     waitCv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MAX_TIMEOUT), [&currentCount, scheduleCount]() {
504         return currentCount >= scheduleCount;
505     });
506     LOGI("[CloudDBProxyTest011] End wait all task finish");
507     EXPECT_EQ(currentCount, scheduleCount);
508 }
509 
510 /**
511  * @tc.name: CloudDBProxyTest012
512  * @tc.desc: Asset data deduplication.
513  * @tc.type: FUNC
514  * @tc.require:
515  * @tc.author: tankaisheng
516  */
517 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest012, TestSize.Level2)
518 {
519     /**
520      * @tc.steps: step1. set cloud db to proxy
521      * @tc.expected: step1. E_OK
522      */
523     Assets assets;
524     Asset asset1;
525     asset1.name = "assetName1";
526     asset1.assetId = "";
527     asset1.modifyTime = "20240730";
528     assets.push_back(asset1);
529 
530     Asset asset2;
531     asset2.name = "assetName1";
532     asset2.assetId = "1";
533     asset2.modifyTime = "20240730";
534     assets.push_back(asset2);
535 
536     Asset asset3;
537     asset3.name = "assetName2";
538     asset3.assetId = "2";
539     asset3.modifyTime = "20240730";
540     assets.push_back(asset3);
541 
542     Asset asset4;
543     asset4.name = "assetName2";
544     asset4.assetId = "3";
545     asset4.modifyTime = "20240731";
546     assets.push_back(asset4);
547 
548     Asset asset5;
549     asset5.name = "assetName3";
550     asset5.assetId = "4";
551     asset5.modifyTime = "20240730";
552     assets.push_back(asset5);
553 
554     Asset asset6;
555     asset6.name = "assetName3";
556     asset6.assetId = "5";
557     asset6.modifyTime = "20240730";
558     assets.push_back(asset6);
559 
560     Asset asset7;
561     asset7.name = "assetName1";
562     asset7.assetId = "6";
563     asset7.modifyTime = "20240731";
564     assets.push_back(asset7);
565 
566     DBCommon::RemoveDuplicateAssetsData(assets);
567 
568     /**
569      * @tc.steps: step2. check data
570      * @tc.expected: step2. E_OK
571      */
572     std::string assetNameArr[] = {"assetName2", "assetName3", "assetName1"};
573     std::string assetIdArr[] = {"3", "5", "6"};
574     EXPECT_EQ(assets.size(), 3u);
575     for (std::vector<DistributedDB::Asset>::size_type i = 0; i < assets.size(); ++i) {
576         EXPECT_EQ(assets.at(i).name, assetNameArr[i]);
577         EXPECT_EQ(assets.at(i).assetId, assetIdArr[i]);
578     }
579 }
580 
581 /**
582  * @tc.name: CloudDBProxyTest014
583  * @tc.desc: Test asset deduplication with empty assetId.
584  * @tc.type: FUNC
585  * @tc.require:
586  * @tc.author: liaoyonghuang
587  */
588 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest014, TestSize.Level0)
589 {
590     /**
591      * @tc.steps: step1. set cloud db to proxy
592      * @tc.expected: step1. E_OK
593      */
594     Assets assets;
595     Asset asset1;
596     asset1.name = "assetName";
597     asset1.assetId = "";
598     asset1.modifyTime = "1";
599     assets.push_back(asset1);
600 
601     Asset asset2;
602     asset2.name = "assetName";
603     asset2.assetId = "";
604     asset2.modifyTime = "3";
605     assets.push_back(asset2);
606 
607     Asset asset3;
608     asset3.name = "assetName";
609     asset3.assetId = "";
610     asset3.modifyTime = "2";
611     assets.push_back(asset3);
612 
613     /**
614      * @tc.steps: step2. Remove duplicate assets and check data
615      * @tc.expected: step2. E_OK
616      */
617     DBCommon::RemoveDuplicateAssetsData(assets);
618     ASSERT_EQ(assets.size(), 1u);
619     EXPECT_EQ(assets[0].modifyTime, "3");
620 }
621 
622 /**
623  * @tc.name: CloudSyncQueue001
624  * @tc.desc: Verify sync task count decrease after sync finished.
625  * @tc.type: FUNC
626  * @tc.require:
627  * @tc.author: zhangqiquan
628  */
629 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncQueue001, TestSize.Level2)
630 {
631     /**
632      * @tc.steps: step1. set cloud db to proxy and sleep 5s when download
633      * @tc.expected: step1. E_OK
634      */
635     auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
636     ASSERT_NE(iCloud, nullptr);
637     auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
638     ASSERT_NE(cloudSyncer, nullptr);
639     EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
640     EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
641     EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
642     cloudSyncer->SetCloudDB(virtualCloudDb_);
643     cloudSyncer->SetSyncAction(true, false);
__anon9cf3a58a0902() 644     cloudSyncer->SetDownloadFunc([cloudSyncer]() {
645         EXPECT_EQ(cloudSyncer->GetQueueCount(), 1u);
646         std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s
647         return E_OK;
648     });
649     /**
650      * @tc.steps: step2. call sync and wait sync finish
651      */
652     int callCount = 0;
653     EXPECT_EQ(Sync(cloudSyncer, callCount), OK);
654     RuntimeContext::GetInstance()->StopTaskPool();
655     EXPECT_EQ(callCount, 1);
656     RefObject::KillAndDecObjRef(cloudSyncer);
657 }
658 
659 /**
660  * @tc.name: CloudSyncQueue002
661  * @tc.desc: Verify sync task abort after close.
662  * @tc.type: FUNC
663  * @tc.require:
664  * @tc.author: zhangqiquan
665  */
666 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncQueue002, TestSize.Level2)
667 {
668     /**
669      * @tc.steps: step1. set cloud db to proxy and sleep 2s when download
670      * @tc.expected: step1. E_OK
671      */
672     auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
673     ASSERT_NE(iCloud, nullptr);
674     EXPECT_CALL(*iCloud, Rollback).WillRepeatedly(testing::Return(E_OK));
675     EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
676     EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
677     auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
678     ASSERT_NE(cloudSyncer, nullptr);
679     cloudSyncer->SetCloudDB(virtualCloudDb_);
680     cloudSyncer->SetSyncAction(true, false);
681     std::atomic<bool> close = false;
__anon9cf3a58a0a02() 682     cloudSyncer->SetDownloadFunc([cloudSyncer, &close]() {
683         std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s
684         cloudSyncer->PauseCurrentTask();
685         EXPECT_TRUE(close);
686         return -E_TASK_PAUSED;
687     });
688     /**
689      * @tc.steps: step2. call sync and wait sync finish
690      */
691     EXPECT_EQ(cloudSyncer->Sync({ "cloud" }, SyncMode::SYNC_MODE_CLOUD_MERGE, { TABLE_NAME }, nullptr, 0), E_OK);
692     std::this_thread::sleep_for(std::chrono::seconds(1));
693     close = true;
694     cloudSyncer->Close();
695     RuntimeContext::GetInstance()->StopTaskPool();
696     RefObject::KillAndDecObjRef(cloudSyncer);
697 }
698 
699 /**
700  * @tc.name: CloudSyncerTest001
701  * @tc.desc: Verify syncer notify by queue schedule.
702  * @tc.type: FUNC
703  * @tc.require:
704  * @tc.author: zhangqiquan
705  */
706 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncerTest001, TestSize.Level2)
707 {
708     /**
709      * @tc.steps: step1. set cloud db to proxy
710      * @tc.expected: step1. E_OK
711      */
712     auto iCloud = std::make_shared<MockICloudSyncStorageInterface>();
713     EXPECT_CALL(*iCloud, StartTransaction).WillRepeatedly(testing::Return(E_OK));
714     EXPECT_CALL(*iCloud, Commit).WillRepeatedly(testing::Return(E_OK));
715     EXPECT_CALL(*iCloud, GetIdentify).WillRepeatedly(testing::Return("CloudSyncerTest001"));
716     auto cloudSyncer = new(std::nothrow) VirtualCloudSyncer(StorageProxy::GetCloudDb(iCloud.get()));
717     std::atomic<int> callCount = 0;
718     std::condition_variable cv;
__anon9cf3a58a0b02(const std::map<std::string, SyncProcess> &) 719     cloudSyncer->SetCurrentTaskInfo([&callCount, &cv](const std::map<std::string, SyncProcess> &) {
720         callCount++;
721         int before = callCount;
722         LOGD("on callback %d", before);
723         std::this_thread::sleep_for(std::chrono::seconds(1));
724         EXPECT_EQ(before, callCount);
725         cv.notify_all();
726     }, 1u);
727     const int notifyCount = 2;
728     for (int i = 0; i < notifyCount; ++i) {
729         cloudSyncer->Notify();
730     }
731     cloudSyncer->SetCurrentTaskInfo(nullptr, 0); // 0 is invalid task id
732     std::mutex processMutex;
733     std::unique_lock<std::mutex> uniqueLock(processMutex);
__anon9cf3a58a0c02() 734     cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&callCount]() {
735         return callCount == notifyCount;
736     });
737     cloudSyncer->Close();
738     RefObject::KillAndDecObjRef(cloudSyncer);
739 }
740 
741 /**
742  * @tc.name: SameBatchTest001
743  * @tc.desc: Verify update cache in same batch.
744  * @tc.type: FUNC
745  * @tc.require:
746  * @tc.author: zhangqiquan
747  */
748 HWTEST_F(DistributedDBCloudDBProxyTest, SameBatchTest001, TestSize.Level0)
749 {
750     std::map<std::string, LogInfo> localLogInfoCache;
751     LogInfo cloudInfo;
752     LogInfo localInfo;
753     localInfo.hashKey = {'k'};
754     cloudInfo.cloudGid = "gid";
755     /**
756      * @tc.steps: step1. insert cloud into local
757      * @tc.expected: step1. local cache has gid
758      */
759     CloudSyncUtils::UpdateLocalCache(OpType::INSERT, cloudInfo, localInfo, localLogInfoCache);
760     std::string hashKey(localInfo.hashKey.begin(), localInfo.hashKey.end());
761     EXPECT_EQ(localLogInfoCache[hashKey].cloudGid, cloudInfo.cloudGid);
762     /**
763      * @tc.steps: step2. delete local
764      * @tc.expected: step2. local flag is delete
765      */
766     CloudSyncUtils::UpdateLocalCache(OpType::DELETE, cloudInfo, localInfo, localLogInfoCache);
767     EXPECT_EQ(localLogInfoCache[hashKey].flag, static_cast<uint64_t>(LogInfoFlag::FLAG_DELETE));
768 }
769 
770 /**
771  * @tc.name: SameBatchTest002
772  * @tc.desc: Verify cal opType in same batch.
773  * @tc.type: FUNC
774  * @tc.require:
775  * @tc.author: zhangqiquan
776  */
777 HWTEST_F(DistributedDBCloudDBProxyTest, SameBatchTest002, TestSize.Level0)
778 {
779     /**
780      * @tc.steps: step1. prepare two data with same pk
781      */
782     ICloudSyncer::SyncParam param;
783     param.downloadData.opType.push_back(OpType::INSERT);
784     param.downloadData.opType.push_back(OpType::UPDATE);
785     const std::string pkField = "pk";
786     param.changedData.field.push_back(pkField);
787     VBucket oneRow;
788     oneRow[pkField] = static_cast<int64_t>(1); // 1 is pk
789     param.downloadData.data.push_back(oneRow);
790     param.downloadData.data.push_back(oneRow);
791     /**
792      * @tc.steps: step2. cal opType by utils
793      * @tc.expected: step2. all type should be INSERT
794      */
795     for (size_t i = 0; i < param.downloadData.data.size(); ++i) {
796         EXPECT_EQ(CloudSyncUtils::CalOpType(param, i), OpType::INSERT);
797     }
798     /**
799      * @tc.steps: step3. cal opType by utils
800      * @tc.expected: step3. should be UPDATE because diff pk
801      */
802     oneRow[pkField] = static_cast<int64_t>(2); // 2 is pk
803     param.downloadData.data.push_back(oneRow);
804     param.downloadData.opType.push_back(OpType::UPDATE);
805     // index start with zero
806     EXPECT_EQ(CloudSyncUtils::CalOpType(param, param.downloadData.data.size() - 1), OpType::UPDATE);
807 }
808 
809 /**
810  * @tc.name: CloudDBProxyTest013
811  * @tc.desc: Verify CloudDBProxy interfaces.
812  * @tc.type: FUNC
813  * @tc.require: DTS2024073106613
814  * @tc.author: suyue
815  */
816 HWTEST_F(DistributedDBCloudDBProxyTest, CloudDBProxyTest013, TestSize.Level0)
817 {
818     /**
819      * @tc.steps: step1. call CloudDBProxy interfaces when ICloudDb is nullptr.
820      * @tc.expected: step1. return -E_CLOUD_ERROR.
821      */
822     CloudDBProxy proxy;
823     int ret = proxy.UnLock();
824     EXPECT_EQ(ret, -E_CLOUD_ERROR);
825     ret = proxy.HeartBeat();
826     EXPECT_EQ(ret, -E_CLOUD_ERROR);
827     VBucket extend;
828     const std::string tableName = "test";
829     std::vector<VBucket> record;
830     ret = proxy.Query(tableName, extend, record);
831     EXPECT_EQ(ret, -E_CLOUD_ERROR);
832     Info info;
833     ret = proxy.BatchInsert(tableName, record, record, info);
834     EXPECT_EQ(ret, -E_CLOUD_ERROR);
835     ret = proxy.BatchUpdate(tableName, record, record, info);
836     EXPECT_EQ(ret, -E_CLOUD_ERROR);
837     ret = proxy.BatchDelete(tableName, record, record, info);
838     EXPECT_EQ(ret, -E_CLOUD_ERROR);
839     std::pair<int, uint64_t> res = proxy.Lock();
840     EXPECT_EQ(res.first, -E_CLOUD_ERROR);
841     std::pair<int, std::string> cursor = proxy.GetEmptyCursor(tableName);
842     EXPECT_EQ(cursor.first, -E_CLOUD_ERROR);
843 
844     /**
845      * @tc.steps: step2. call CloudDBProxy interfaces when para is err.
846      * @tc.expected: step2. return fail.
847      */
848     std::pair<int, std::string> ver = proxy.GetCloudVersion("test");
849     EXPECT_EQ(ver.first, -E_NOT_SUPPORT);
850     std::vector<Asset> assets;
851     ret = proxy.RemoveLocalAssets(assets);
852     EXPECT_EQ(ret, -E_OK);
853     assets = {{}};
854     ret = proxy.RemoveLocalAssets(assets);
855     EXPECT_EQ(ret, -E_OK);
856 }
857 
858 /**
859  * @tc.name: CloudSyncUtilsTest
860  * @tc.desc: Verify CloudSyncUtils interfaces
861  * @tc.type: FUNC
862  * @tc.require: DTS2024073106613
863  * @tc.author: suyue
864  */
865 HWTEST_F(DistributedDBCloudDBProxyTest, CloudSyncUtilsTest, TestSize.Level0)
866 {
867     /**
868      * @tc.steps: step1. Test type translation interfaces.
869      * @tc.expected: step1. success.
870      */
871     CloudSyncUtils utilsObj;
872     EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::INSERT), AssetOpType::INSERT);
873     EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::DELETE), AssetOpType::DELETE);
874     EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::UPDATE), AssetOpType::UPDATE);
875     EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::NORMAL), AssetOpType::NO_CHANGE);
876     EXPECT_EQ(utilsObj.StatusToFlag(AssetStatus::DOWNLOADING), AssetOpType::NO_CHANGE);
877     EXPECT_EQ(utilsObj.OpTypeToChangeType(OpType::ONLY_UPDATE_GID), ChangeType::OP_BUTT);
878 
879     /**
880      * @tc.steps: step2. call CloudSyncUtils interfaces when para is err.
881      * @tc.expected: step2. return false.
882      */
883     const std::vector<DeviceID> devices = {"test"};
884     int mode = 10; // set metaMode to 10 not in enum class MetaMode
885     int ret = utilsObj.CheckParamValid(devices, static_cast<SyncMode>(mode));
886     EXPECT_EQ(ret, -E_INVALID_ARGS);
887     VBucket record;
888     const std::vector<std::string> pkColNames;
889     std::vector<Type> cloudPkVals = {{}};
890     ret = utilsObj.GetCloudPkVals(record, pkColNames, 0, cloudPkVals);
891     EXPECT_EQ(ret, -E_INVALID_ARGS);
892     Assets assets = {{}};
893     utilsObj.StatusToFlagForAssets(assets);
894     std::vector<Field> fields = {{"test", TYPE_INDEX<Assets>, true, true}};
895     utilsObj.StatusToFlagForAssetsInRecord(fields, record);
896     Timestamp timestamp;
897     CloudSyncData uploadData;
898     const int64_t count = 0;
899     ret = utilsObj.UpdateExtendTime(uploadData, count, 0, timestamp);
900     EXPECT_EQ(ret, -E_INTERNAL_ERROR);
901     CloudSyncBatch data;
902     data.assets = {{}};
903     ret = utilsObj.FillAssetIdToAssets(data, 0, CloudWaterType::UPDATE);
904     EXPECT_EQ(ret, -E_CLOUD_ERROR);
905 
906     /**
907      * @tc.steps: step3. call IsChangeDataEmpty interface when para is different.
908      * @tc.expected: step3. success.
909      */
910     ChangedData changedData;
911     EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), true);
912     changedData.primaryData[OP_INSERT] = {{}};
913     EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), true);
914     changedData.primaryData[OP_UPDATE] = {{}};
915     EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), true);
916     changedData.primaryData[OP_DELETE] = {{}};
917     EXPECT_EQ(utilsObj.IsChangeDataEmpty(changedData), false);
918 }
919 }