1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef OMIT_MULTI_VER
16 #include <gtest/gtest.h>
17 #include <thread>
18 
19 #include "commit_history_sync.h"
20 #include "db_common.h"
21 #include "distributeddb_data_generate_unit_test.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "ikvdb_connection.h"
24 #include "kv_store_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "kvdb_manager.h"
27 #include "kvdb_pragma.h"
28 #include "log_print.h"
29 #include "meta_data.h"
30 #include "multi_ver_data_sync.h"
31 #include "platform_specific.h"
32 #include "sync_types.h"
33 #include "time_sync.h"
34 #include "virtual_multi_ver_sync_db_interface.h"
35 
36 using namespace testing::ext;
37 using namespace DistributedDB;
38 using namespace DistributedDBUnitTest;
39 using namespace std;
40 
41 #ifndef LOW_LEVEL_MEM_DEV
42 namespace {
43     string g_testDir;
44     const string STORE_ID = "kv_store_sync_test";
45     const string STORE_ID_A = "kv_store_sync_test_a";
46     const string STORE_ID_B = "kv_store_sync_test_b";
47     const int WAIT_TIME_1 = 1000;
48     const int WAIT_TIME_2 = 2000;
49     const int WAIT_LONG_TIME = 10000;
50     const int WAIT_LIMIT_TIME = 30000;
51     const std::string DEVICE_B = "deviceB";
52     const std::string DEVICE_C = "deviceC";
53     const int LIMIT_KEY_SIZE = 1024;
54     constexpr int BIG_VALUE_SIZE = 1024 + 1; // > 1K
55     constexpr int LIMIT_VALUE_SIZE = 4 * 1024 * 1024; // 4M
56     KvStoreDelegateManager g_mgr("sync_test", "sync_test");
57     KvStoreConfig g_config;
58     KvStoreDelegate::Option g_option;
59 
60     // define the g_kvDelegateCallback, used to get some information when open a kv store.
61     DBStatus g_kvDelegateStatus = INVALID_ARGS;
62     KvStoreDelegate *g_kvDelegatePtr = nullptr;
63     MultiVerNaturalStoreConnection *g_connectionA;
64     MultiVerNaturalStoreConnection *g_connectionB;
65     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
66     KvVirtualDevice *g_deviceB = nullptr;
67     KvVirtualDevice *g_deviceC = nullptr;
68 
69     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
70     auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreDelegateCallback,
71         placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
72 
GetConnection(const std::string & dir,const std::string & storeId,int errCode)73     MultiVerNaturalStoreConnection *GetConnection(const std::string &dir, const std::string &storeId, int errCode)
74     {
75         KvDBProperties prop;
76         prop.SetStringProp(KvDBProperties::USER_ID, "sync_test");
77         prop.SetStringProp(KvDBProperties::APP_ID, "sync_test");
78         prop.SetStringProp(KvDBProperties::STORE_ID, storeId);
79         std::string identifier = DBCommon::TransferHashString("sync_test-sync_test-" + storeId);
80 
81         prop.SetStringProp(KvDBProperties::IDENTIFIER_DATA, identifier);
82         std::string identifierDir = DBCommon::TransferStringToHex(identifier);
83         prop.SetStringProp(KvDBProperties::IDENTIFIER_DIR, identifierDir);
84         prop.SetStringProp(KvDBProperties::DATA_DIR, dir);
85         prop.SetIntProp(KvDBProperties::DATABASE_TYPE, KvDBProperties::MULTI_VER_TYPE_SQLITE);
86         prop.SetBoolProp(KvDBProperties::CREATE_IF_NECESSARY, true);
87         errCode = E_OK;
88         auto conn = KvDBManager::GetDatabaseConnection(prop, errCode);
89         if (errCode != E_OK) {
90             LOGE("[DistributeddbMultiVerP2PSyncTes] db create failed path, err %d", errCode);
91             return nullptr;
92         }
93         return static_cast<MultiVerNaturalStoreConnection *>(conn);
94     }
95 
GetDataFromConnection(IKvDBConnection * conn,const Key & key,Value & value)96     int GetDataFromConnection(IKvDBConnection *conn, const Key &key, Value &value)
97     {
98         IKvDBSnapshot *snapshot = nullptr;
99         int errCode = conn->GetSnapshot(snapshot);
100         if (errCode != E_OK) {
101             return errCode;
102         }
103         errCode = snapshot->Get(key, value);
104         conn->ReleaseSnapshot(snapshot);
105         return errCode;
106     }
107 }
108 
109 class DistributedDBMultiVerP2PSyncTest : public testing::Test {
110 public:
111     static void SetUpTestCase(void);
112     static void TearDownTestCase(void);
113     void SetUp();
114     void TearDown();
115 };
116 
SetUpTestCase(void)117 void DistributedDBMultiVerP2PSyncTest::SetUpTestCase(void)
118 {
119     /**
120      * @tc.setup: Init datadir and Virtual Communicator.
121      */
122     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
123     string dir = g_testDir + "/commitstore";
124     g_config.dataDir = dir;
125     DIR* dirTmp = opendir(dir.c_str());
126     if (dirTmp == nullptr) {
127         OS::MakeDBDirectory(dir);
128     } else {
129         closedir(dirTmp);
130     }
131     g_mgr.SetKvStoreConfig(g_config);
132     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
133     ASSERT_TRUE(g_communicatorAggregator != nullptr);
134     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
135 }
136 
TearDownTestCase(void)137 void DistributedDBMultiVerP2PSyncTest::TearDownTestCase(void)
138 {
139     /**
140      * @tc.teardown: Release virtual Communicator and clear data dir.
141      */
142     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
143         LOGE("rm test db files error!");
144     }
145 
146     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
147     g_communicatorAggregator = nullptr;
148 }
149 
SetUp(void)150 void DistributedDBMultiVerP2PSyncTest::SetUp(void)
151 {
152     DistributedDBToolsUnitTest::PrintTestCaseInfo();
153     /**
154      * @tc.setup: create virtual device B and C
155      */
156     g_communicatorAggregator->Disable();
157     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
158     ASSERT_TRUE(g_deviceB != nullptr);
159     VirtualMultiVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualMultiVerSyncDBInterface;
160     ASSERT_TRUE(syncInterfaceB != nullptr);
161     ASSERT_EQ(syncInterfaceB->Initialize(DEVICE_B), E_OK);
162     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
163 
164     g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
165     ASSERT_TRUE(g_deviceC != nullptr);
166     VirtualMultiVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualMultiVerSyncDBInterface;
167     ASSERT_TRUE(syncInterfaceC != nullptr);
168     ASSERT_EQ(syncInterfaceC->Initialize(DEVICE_C), E_OK);
169     ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
170     g_communicatorAggregator->Enable();
171 
172     auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
173                                 const std::string &deviceId, uint8_t flag) -> bool {
174                                 return true;};
175     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
176 }
177 
TearDown(void)178 void DistributedDBMultiVerP2PSyncTest::TearDown(void)
179 {
180     /**
181      * @tc.teardown: Release device A, B, C, connectionA and connectionB
182      */
183     if (g_kvDelegatePtr != nullptr) {
184         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
185         g_kvDelegatePtr = nullptr;
186         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
187         LOGD("delete kv store status %d", status);
188         ASSERT_TRUE(status == OK);
189     }
190     if (g_deviceB != nullptr) {
191         delete g_deviceB;
192         g_deviceB = nullptr;
193     }
194     if (g_deviceC != nullptr) {
195         delete g_deviceC;
196         g_deviceC = nullptr;
197     }
198     if (g_connectionA != nullptr) {
199         g_connectionA->Close();
200         ASSERT_EQ(g_mgr.DeleteKvStore(STORE_ID_A), OK);
201         g_connectionA = nullptr;
202     }
203     if (g_connectionB != nullptr) {
204         g_connectionB->Close();
205         ASSERT_EQ(g_mgr.DeleteKvStore(STORE_ID_B), OK);
206         g_connectionB = nullptr;
207     }
208     PermissionCheckCallbackV2 nullCallback;
209     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
210 }
211 
GetData(KvStoreDelegate * kvStore,const Key & key,Value & value)212 static DBStatus GetData(KvStoreDelegate *kvStore, const Key &key, Value &value)
213 {
214     KvStoreSnapshotDelegate *snapshotTmp = nullptr;
215     DBStatus statusTmp;
216     kvStore->GetKvStoreSnapshot(nullptr,
217         [&statusTmp, &snapshotTmp](DBStatus status, KvStoreSnapshotDelegate *snapshot) {
218         statusTmp = status;
219         snapshotTmp = snapshot;
220         });
221     if (statusTmp != E_OK) {
222         return statusTmp;
223     }
224     snapshotTmp->Get(key, [&statusTmp, &value](DBStatus status, const Value &outValue) {
225         statusTmp = status;
226         value = outValue;
227     });
228     if (statusTmp == OK) {
229         LOGD("[DistributeddbMultiVerP2PSyncTes] GetData key %c, value = %c", key[0], value[0]);
230     }
231     kvStore->ReleaseKvStoreSnapshot(snapshotTmp);
232     return statusTmp;
233 }
234 
235 /**
236  * @tc.name: Transaction Sync 001
237  * @tc.desc: Verify put transaction sync function.
238  * @tc.type: FUNC
239  * @tc.require: AR000BVRO4 AR000CQE0K
240  * @tc.author: xushaohua
241  */
242 HWTEST_F(DistributedDBMultiVerP2PSyncTest, TransactionSync001, TestSize.Level2)
243 {
244     /**
245      * @tc.steps: step1. open a KvStoreNbDelegate as deviceA
246      */
247     g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
248     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
249     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
250 
251     /**
252      * @tc.steps: step2. deviceB put {k1, v1}, {k2,v2} in a transaction
253      */
254     g_deviceB->StartTransaction();
255     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
256     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_2, DistributedDBUnitTest::VALUE_2), E_OK);
257     g_deviceB->Commit();
258 
259     /**
260      * @tc.steps: step3. deviceB online and wait for sync
261      */
262     g_deviceB->Online();
263     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
264 
265     /**
266      * @tc.steps: step4. deviceC put {k3, v3}, {k4,v4} in a transaction
267      */
268     g_deviceC->StartTransaction();
269     ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_3, DistributedDBUnitTest::VALUE_3), E_OK);
270     ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_4, DistributedDBUnitTest::VALUE_4), E_OK);
271     g_deviceC->Commit();
272 
273     /**
274      * @tc.steps: step5. deviceC online for sync
275      */
276     g_deviceC->Online();
277 
278     /**
279      * @tc.steps: step6. deviceC offline
280      */
281     g_deviceC->Offline();
282     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
283 
284     /**
285      * @tc.expected: step6. deviceA have {k1, v1}, {k2, v2}, not have k3, k4
286      */
287     Value value;
288     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_1, value), E_OK);
289     EXPECT_EQ(value, DistributedDBUnitTest::VALUE_1);
290     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_2, value), E_OK);
291     EXPECT_EQ(value, DistributedDBUnitTest::VALUE_2);
292 
293     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_3, value), NOT_FOUND);
294     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_4, value), NOT_FOUND);
295 }
296 
297 /**
298  * @tc.name: Transaction Sync 002
299  * @tc.desc: Verify delete transaction sync function.
300  * @tc.type: FUNC
301  * @tc.require: AR000BVRO4 AR000CQE0K
302  * @tc.author: xushaohua
303  */
304 HWTEST_F(DistributedDBMultiVerP2PSyncTest, TransactionSync002, TestSize.Level2)
305 {
306     /**
307      * @tc.steps: step1. open a KvStoreNbDelegate as deviceA
308      */
309     g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
310     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
311     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
312 
313     /**
314      * @tc.steps: step2. deviceB put {k1, v1}, {k2,v2} in a transaction
315      */
316     g_deviceB->StartTransaction();
317     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
318     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_2, DistributedDBUnitTest::VALUE_2), E_OK);
319     g_deviceB->Commit();
320 
321     /**
322      * @tc.steps: step3. deviceB online and wait for sync
323      */
324     g_deviceB->Online();
325     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
326 
327     /**
328      * @tc.steps: step4. deviceC put {k3, v3}, and delete k3 in a transaction
329      */
330     g_deviceC->StartTransaction();
331     ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_3, DistributedDBUnitTest::VALUE_3), E_OK);
332     ASSERT_EQ(g_deviceC->DeleteData(DistributedDBUnitTest::KEY_3), E_OK);
333     g_deviceC->Commit();
334 
335     /**
336      * @tc.steps: step5. deviceB online for sync
337      */
338     g_deviceC->Online();
339 
340     /**
341      * @tc.steps: step6. deviceC offline
342      */
343     g_deviceC->Offline();
344     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
345 
346     /**
347      * @tc.expected: step6. deviceA have {k1, v1}, {k2, v2}, not have k3, k4
348      */
349     Value value;
350     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_1, value), E_OK);
351     EXPECT_EQ(value, DistributedDBUnitTest::VALUE_1);
352     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_2, value), E_OK);
353     EXPECT_EQ(value, DistributedDBUnitTest::VALUE_2);
354     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_3, value), NOT_FOUND);
355     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_3, value), NOT_FOUND);
356 }
357 
358 /**
359  * @tc.name: Transaction Sync 003
360  * @tc.desc: Verify update transaction sync function.
361  * @tc.type: FUNC
362  * @tc.require: AR000BVRO4 AR000CQE0K
363  * @tc.author: xushaohua
364  */
365 HWTEST_F(DistributedDBMultiVerP2PSyncTest, TransactionSync003, TestSize.Level2)
366 {
367     /**
368      * @tc.steps: step1. open a KvStoreNbDelegate as deviceA
369      */
370     g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
371     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
372     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
373 
374     /**
375      * @tc.steps: step2. deviceB put {k1, v1}, {k2,v2} in a transaction
376      */
377     g_deviceB->StartTransaction();
378     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
379     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_2, DistributedDBUnitTest::VALUE_2), E_OK);
380     g_deviceB->Commit();
381 
382     /**
383      * @tc.steps: step3. deviceB online and wait for sync
384      */
385     g_deviceB->Online();
386     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
387 
388     /**
389      * @tc.steps: step4. deviceC put {k3, v3}, and update {k3, v4} in a transaction
390      */
391     g_deviceC->StartTransaction();
392     ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_3, DistributedDBUnitTest::VALUE_3), E_OK);
393     ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_3, DistributedDBUnitTest::VALUE_4), E_OK);
394     g_deviceC->Commit();
395 
396     /**
397      * @tc.steps: step5. deviceB online for sync
398      */
399     g_deviceC->Online();
400 
401     /**
402      * @tc.steps: step6. deviceC offline
403      */
404     g_deviceC->Offline();
405     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
406 
407     /**
408      * @tc.expected: step6. deviceA have {k1, v1}, {k2, v2}, not have k3, k4
409      */
410     Value value;
411     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_1, value), E_OK);
412     EXPECT_EQ(value, DistributedDBUnitTest::VALUE_1);
413     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_2, value), E_OK);
414     EXPECT_EQ(value, DistributedDBUnitTest::VALUE_2);
415     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_3, value), NOT_FOUND);
416     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_3, value), NOT_FOUND);
417 }
418 
419 /**
420  * @tc.name: Metadata 001
421  * @tc.desc: Verify metadata add and update function
422  * @tc.type: FUNC
423  * @tc.require: AR000CQE0P AR000CQE0S
424  * @tc.author: xushaohua
425  */
426 HWTEST_F(DistributedDBMultiVerP2PSyncTest, Metadata001, TestSize.Level1)
427 {
428     /**
429      * @tc.steps: step1. Create a metadata and use VirtualMultiVerSyncDBInterface to init
430      * @tc.expected: step1. metadata init ok
431      */
432     Metadata metadata;
433     VirtualMultiVerSyncDBInterface *syncInterface = new (std::nothrow) VirtualMultiVerSyncDBInterface;
434     ASSERT_TRUE(syncInterface != nullptr);
435     EXPECT_EQ(syncInterface->Initialize("metadata_test"), E_OK);
436     EXPECT_EQ(metadata.Initialize(syncInterface), E_OK);
437 
438     /**
439      * @tc.steps: step2. call SaveTimeOffset to write t1.
440      * @tc.expected: step2. SaveTimeOffset return ok
441      */
442     const TimeOffset timeOffsetA = 1024;
443     EXPECT_EQ(metadata.SaveTimeOffset(DEVICE_B, timeOffsetA), E_OK);
444     TimeOffset timeOffsetB = 0;
445 
446     /**
447      * @tc.steps: step3. call GetTimeOffset to read t2.
448      * @tc.expected: step3. t1 == t2
449      */
450     metadata.GetTimeOffset(DEVICE_B, timeOffsetB);
451     EXPECT_EQ(timeOffsetA, timeOffsetB);
452 
453     /**
454      * @tc.steps: step4. call SaveTimeOffset to write t3. t3 != t1
455      * @tc.expected: step4. SaveTimeOffset return ok
456      */
457     const TimeOffset timeOffsetC = 2048;
458     EXPECT_EQ(metadata.SaveTimeOffset(DEVICE_B, timeOffsetC), E_OK);
459 
460     /**
461      * @tc.steps: step5. call GetTimeOffset to read t2.
462      * @tc.expected: step5. t4 == t3
463      */
464     TimeOffset timeOffsetD = 0;
465     metadata.GetTimeOffset(DEVICE_B, timeOffsetD);
466     EXPECT_EQ(timeOffsetC, timeOffsetD);
467     syncInterface->DeleteDatabase();
468     delete syncInterface;
469     syncInterface = nullptr;
470 }
471 
472 /**
473  * @tc.name: Isolation Sync 001
474  * @tc.desc: Verify add sync isolation between different kvstore.
475  * @tc.type: FUNC
476  * @tc.require: AR000BVDGP
477  * @tc.author: xushaohua
478  */
479 HWTEST_F(DistributedDBMultiVerP2PSyncTest, IsolationSync001, TestSize.Level2)
480 {
481     int errCode = 0;
482 
483     /**
484      * @tc.steps: step1. Get connectionA, connectionB from different kvstore,
485      *     connectionB not in g_communicatorAggregator
486      */
487     g_communicatorAggregator->Disable();
488     g_connectionB = GetConnection(g_config.dataDir, STORE_ID_B, errCode);
489     ASSERT_TRUE(g_connectionB != nullptr);
490     g_communicatorAggregator->Enable();
491     g_connectionA = GetConnection(g_config.dataDir, STORE_ID_A, errCode);
492     ASSERT_TRUE(g_connectionA != nullptr);
493 
494     /**
495      * @tc.steps: step2. deviceB put {k1, v1}
496      */
497     std::vector<std::string> devices;
498     devices.push_back(g_deviceB->GetDeviceId());
499     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
500 
501     /**
502      * @tc.steps: step3. connectionA pull from deviceB
503      * @tc.expected: step3. Pragma OK, connectionA have {k1, v1} , connectionB don't have k1.
504      */
505     PragmaSync pragmaData(devices, SYNC_MODE_PULL_ONLY, nullptr);
506     ASSERT_TRUE(g_connectionA->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData) == E_OK);
507     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
508     Value value;
509     ASSERT_EQ(GetDataFromConnection(g_connectionA, DistributedDBUnitTest::KEY_1, value), E_OK);
510     EXPECT_EQ(value, DistributedDBUnitTest::VALUE_1);
511     EXPECT_EQ(GetDataFromConnection(g_connectionB, DistributedDBUnitTest::KEY_1, value), -E_NOT_FOUND);
512 }
513 
514 /**
515  * @tc.name: Isolation Sync 002
516  * @tc.desc: Verify update sync isolation between different kvstore.
517  * @tc.type: FUNC
518  * @tc.require: AR000BVDGP
519  * @tc.author: xushaohua
520  */
521 HWTEST_F(DistributedDBMultiVerP2PSyncTest, IsolationSync002, TestSize.Level2)
522 {
523     int errCode = 0;
524 
525     /**
526      * @tc.steps: step1. Get connectionA, connectionB from different kvstore,
527      *     connectionB not in g_communicatorAggregator
528      */
529     g_communicatorAggregator->Disable();
530     g_connectionB = GetConnection(g_config.dataDir, STORE_ID_B, errCode);
531     ASSERT_TRUE(g_connectionB != nullptr);
532     g_communicatorAggregator->Enable();
533     g_connectionA = GetConnection(g_config.dataDir, STORE_ID_A, errCode);
534     ASSERT_TRUE(g_connectionA != nullptr);
535 
536     /**
537      * @tc.steps: step2. deviceB put {k1, v1} and update {k1, v2}
538      */
539     std::vector<std::string> devices;
540     devices.push_back(g_deviceB->GetDeviceId());
541     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
542     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_2), E_OK);
543     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
544 
545     /**
546      * @tc.steps: step3. connectionA pull from deviceB
547      * @tc.expected: step3. Pragma OK, connectionA have {k1, v2} , connectionB don't have k1.
548      */
549     PragmaSync pragmaData(devices, SYNC_MODE_PULL_ONLY, nullptr);
550     ASSERT_TRUE(g_connectionA->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData) == E_OK);
551     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
552 
553     Value value;
554     EXPECT_EQ(GetDataFromConnection(g_connectionA, DistributedDBUnitTest::KEY_1, value), E_OK);
555     EXPECT_EQ(value, DistributedDBUnitTest::VALUE_2);
556     EXPECT_EQ(GetDataFromConnection(g_connectionB, DistributedDBUnitTest::KEY_1, value), -E_NOT_FOUND);
557 }
558 
559 /**
560  * @tc.name: Isolation Sync 003
561  * @tc.desc: Verify delete sync isolation between different kvstore.
562  * @tc.type: FUNC
563  * @tc.require: AR000BVDGP
564  * @tc.author: xushaohua
565  */
566 HWTEST_F(DistributedDBMultiVerP2PSyncTest, IsolationSync003, TestSize.Level2)
567 {
568     int errCode = 0;
569 
570     /**
571      * @tc.steps: step1. Get connectionA, connectionB from different kvstore,
572      *     connectionB not in g_communicatorAggregator, connectionB put {k1,v1}
573      */
574     g_communicatorAggregator->Disable();
575     g_connectionB = GetConnection(g_config.dataDir, STORE_ID_B, errCode);
576     ASSERT_TRUE(g_connectionB != nullptr);
577     IOption option;
578     ASSERT_EQ(g_connectionB->Put(option, KEY_1, VALUE_1), E_OK);
579     g_communicatorAggregator->Enable();
580     g_connectionA = GetConnection(g_config.dataDir, STORE_ID_A, errCode);
581     ASSERT_TRUE(g_connectionA != nullptr);
582 
583     /**
584      * @tc.steps: step2. deviceB put {k1, v1} and delete k1
585      */
586     std::vector<std::string> devices;
587     devices.push_back(g_deviceB->GetDeviceId());
588     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
589     ASSERT_EQ(g_deviceB->DeleteData(DistributedDBUnitTest::KEY_1), E_OK);
590     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
591 
592     /**
593      * @tc.steps: step3. connectionA pull from deviceB
594      * @tc.expected: step3. Pragma OK, connectionA don't have k1, connectionB have {k1.v1}
595      */
596     LOGD("[DistributeddbMultiVerP2PSyncTes] start sync");
597     PragmaSync pragmaData(devices, SYNC_MODE_PULL_ONLY, nullptr);
598     ASSERT_TRUE(g_connectionA->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData) == E_OK);
599     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
600 
601     Value value;
602     EXPECT_EQ(GetDataFromConnection(g_connectionA, DistributedDBUnitTest::KEY_1, value), -E_NOT_FOUND);
603     EXPECT_EQ(GetDataFromConnection(g_connectionB, DistributedDBUnitTest::KEY_1, value), E_OK);
604     EXPECT_EQ(value, DistributedDBUnitTest::VALUE_1);
605 }
606 
SetTimeSyncPacketField(TimeSyncPacket & inPacket,Timestamp sourceBegin,Timestamp sourceEnd,Timestamp targetBegin,Timestamp targetEnd,SyncId theId)607 static void SetTimeSyncPacketField(TimeSyncPacket &inPacket, Timestamp sourceBegin, Timestamp sourceEnd,
608     Timestamp targetBegin, Timestamp targetEnd, SyncId theId)
609 {
610     inPacket.SetSourceTimeBegin(sourceBegin);
611     inPacket.SetSourceTimeEnd(sourceEnd);
612     inPacket.SetTargetTimeBegin(targetBegin);
613     inPacket.SetTargetTimeEnd(targetEnd);
614 }
615 
IsTimeSyncPacketEqual(const TimeSyncPacket & inPacketA,const TimeSyncPacket & inPacketB)616 static bool IsTimeSyncPacketEqual(const TimeSyncPacket &inPacketA, const TimeSyncPacket &inPacketB)
617 {
618     bool equal = true;
619     equal = inPacketA.GetSourceTimeBegin() == inPacketB.GetSourceTimeBegin() ? equal : false;
620     equal = inPacketA.GetSourceTimeEnd() == inPacketB.GetSourceTimeEnd() ? equal : false;
621     equal = inPacketA.GetTargetTimeBegin() == inPacketB.GetTargetTimeBegin() ? equal : false;
622     equal = inPacketA.GetTargetTimeEnd() == inPacketB.GetTargetTimeEnd() ? equal : false;
623     return equal;
624 }
625 
626 /**
627  * @tc.name: Timesync Packet 001
628  * @tc.desc: Verify TimesyncPacket Serialization and DeSerialization
629  * @tc.type: FUNC
630  * @tc.require: AR000BVRNU AR000CQE0J
631  * @tc.author: xiaozhenjian
632  */
633 HWTEST_F(DistributedDBMultiVerP2PSyncTest, TimesyncPacket001, TestSize.Level1)
634 {
635     /**
636      * @tc.steps: step1. create TimeSyncPacket packetA aand packetB
637      */
638     TimeSyncPacket packetA;
639     TimeSyncPacket packetB;
640     SetTimeSyncPacketField(packetA, 1, 2, 3, 4, 5); // 1, 2, 3, 4, 5 is five field for time sync packet
641     SetTimeSyncPacketField(packetB, 5, 4, 3, 2, 1); // 1, 2, 3, 4, 5 is five field for time sync packet
642     Message oriMsgA;
643     Message oriMsgB;
644     oriMsgA.SetCopiedObject(packetA);
645     oriMsgA.SetMessageId(TIME_SYNC_MESSAGE);
646     oriMsgA.SetMessageType(TYPE_REQUEST);
647     oriMsgB.SetCopiedObject(packetB);
648     oriMsgB.SetMessageId(TIME_SYNC_MESSAGE);
649     oriMsgB.SetMessageType(TYPE_RESPONSE);
650 
651     /**
652      * @tc.steps: step2. Serialization packetA to bufferA
653      */
654     uint32_t lenA = TimeSync::CalculateLen(&oriMsgA);
655     vector<uint8_t> bufferA;
656     bufferA.resize(lenA);
657     int ret = TimeSync::Serialization(bufferA.data(), lenA, &oriMsgA);
658     ASSERT_EQ(ret, E_OK);
659 
660     /**
661      * @tc.steps: step3. Serialization packetB to bufferB
662      */
663     uint32_t lenB = TimeSync::CalculateLen(&oriMsgB);
664     vector<uint8_t> bufferB;
665     bufferB.resize(lenB);
666     ret = TimeSync::Serialization(bufferB.data(), lenB, &oriMsgB);
667     ASSERT_EQ(ret, E_OK);
668 
669     /**
670      * @tc.steps: step4. DeSerialization bufferA to outPktA
671      * @tc.expected: step4. packetA == outPktA
672      */
673     Message outMsgA;
674     outMsgA.SetMessageId(TIME_SYNC_MESSAGE);
675     outMsgA.SetMessageType(TYPE_REQUEST);
676     ret = TimeSync::DeSerialization(bufferA.data(), lenA, &outMsgA);
677     ASSERT_EQ(ret, E_OK);
678     const TimeSyncPacket *outPktA = outMsgA.GetObject<TimeSyncPacket>();
679     ASSERT_NE(outPktA, nullptr);
680     EXPECT_EQ(IsTimeSyncPacketEqual(packetA, *outPktA), true);
681 
682     /**
683      * @tc.steps: step5. DeSerialization bufferA to outPktA
684      * @tc.expected: step5. packetB == outPktB  outPktB != outPktA
685      */
686     Message outMsgB;
687     outMsgB.SetMessageId(TIME_SYNC_MESSAGE);
688     outMsgB.SetMessageType(TYPE_RESPONSE);
689     ret = TimeSync::DeSerialization(bufferB.data(), lenB, &outMsgB);
690     ASSERT_EQ(ret, E_OK);
691     const TimeSyncPacket *outPktB = outMsgB.GetObject<TimeSyncPacket>();
692     ASSERT_NE(outPktB, nullptr);
693     EXPECT_EQ(IsTimeSyncPacketEqual(packetB, *outPktB), true);
694     EXPECT_EQ(IsTimeSyncPacketEqual(*outPktA, *outPktB), false);
695 }
696 
MakeMultiVerCommitA()697 static MultiVerCommitNode MakeMultiVerCommitA()
698 {
699     MultiVerCommitNode outCommit;
700     outCommit.commitId = vector<uint8_t>(1, 11); // 1 is length, 11 is value
701     outCommit.leftParent = vector<uint8_t>(2, 22); // 2 is length, 22 is value
702     outCommit.rightParent = vector<uint8_t>(3, 33); // 3 is length, 33 is value
703     outCommit.timestamp = 444; // 444 is value
704     outCommit.version = 5555; // 5555 is value
705     outCommit.isLocal = 66666; // 66666 is value
706     outCommit.deviceInfo = "AAAAAA";
707     return outCommit;
708 }
709 
MakeMultiVerCommitB()710 static MultiVerCommitNode MakeMultiVerCommitB()
711 {
712     MultiVerCommitNode outCommit;
713     outCommit.commitId = vector<uint8_t>(9, 99); // 9 is length, 99 is value
714     outCommit.leftParent = vector<uint8_t>(8, 88); // 8 is length, 88 is value
715     outCommit.rightParent = vector<uint8_t>(7, 77); // 7 is length, 77 is value
716     outCommit.timestamp = 666; // 666 is value
717     outCommit.version = 5555; // 5555 is value
718     outCommit.isLocal = 44444; // 44444 is value
719     outCommit.deviceInfo = "BBBBBB";
720     return outCommit;
721 }
722 
MakeMultiVerCommitC()723 static MultiVerCommitNode MakeMultiVerCommitC()
724 {
725     MultiVerCommitNode outCommit;
726     outCommit.commitId = vector<uint8_t>(1, 99); // 1 is length, 99 is value
727     outCommit.leftParent = vector<uint8_t>(2, 88); // 2 is length, 88 is value
728     outCommit.rightParent = vector<uint8_t>(3, 77); // 3 is length, 77 is value
729     outCommit.timestamp = 466; // 466 is value
730     outCommit.version = 5555; // 5555 is value
731     outCommit.isLocal = 66444; // 66444 is value
732     outCommit.deviceInfo = "CCCCCC";
733     return outCommit;
734 }
735 
IsMultiVerCommitEqual(const MultiVerCommitNode & inCommitA,const MultiVerCommitNode & inCommitB)736 static bool IsMultiVerCommitEqual(const MultiVerCommitNode &inCommitA, const MultiVerCommitNode &inCommitB)
737 {
738     bool equal = true;
739     equal = inCommitA.commitId == inCommitB.commitId ? equal : false;
740     equal = inCommitA.leftParent == inCommitB.leftParent ? equal : false;
741     equal = inCommitA.rightParent == inCommitB.rightParent ? equal : false;
742     equal = inCommitA.timestamp == inCommitB.timestamp ? equal : false;
743     equal = inCommitA.version == inCommitB.version ? equal : false;
744     equal = inCommitA.isLocal == inCommitB.isLocal ? equal : false;
745     equal = inCommitA.deviceInfo == inCommitB.deviceInfo ? equal : false;
746     return equal;
747 }
748 
MakeCommitHistorySyncRequestPacketA(CommitHistorySyncRequestPacket & inPacket)749 static void MakeCommitHistorySyncRequestPacketA(CommitHistorySyncRequestPacket &inPacket)
750 {
751     std::map<std::string, MultiVerCommitNode> commitMap;
752     commitMap[string("A")] = MakeMultiVerCommitA();
753     commitMap[string("C")] = MakeMultiVerCommitC();
754     inPacket.SetCommitMap(commitMap);
755 }
756 
MakeCommitHistorySyncRequestPacketB(CommitHistorySyncRequestPacket & inPacket)757 static void MakeCommitHistorySyncRequestPacketB(CommitHistorySyncRequestPacket &inPacket)
758 {
759     std::map<std::string, MultiVerCommitNode> commitMap;
760     commitMap[string("B")] = MakeMultiVerCommitB();
761     commitMap[string("C")] = MakeMultiVerCommitC();
762     commitMap[string("BB")] = MakeMultiVerCommitB();
763     inPacket.SetCommitMap(commitMap);
764 }
765 
IsCommitHistorySyncRequestPacketEqual(const CommitHistorySyncRequestPacket & inPacketA,const CommitHistorySyncRequestPacket & inPacketB)766 static bool IsCommitHistorySyncRequestPacketEqual(const CommitHistorySyncRequestPacket &inPacketA,
767     const CommitHistorySyncRequestPacket &inPacketB)
768 {
769     std::map<std::string, MultiVerCommitNode> commitMapA;
770     std::map<std::string, MultiVerCommitNode> commitMapB;
771     inPacketA.GetCommitMap(commitMapA);
772     inPacketB.GetCommitMap(commitMapB);
773     for (const auto &entry : commitMapA) {
774         if (commitMapB.count(entry.first) == 0) {
775             return false;
776         }
777         if (!IsMultiVerCommitEqual(entry.second, commitMapB[entry.first])) {
778             return false;
779         }
780     }
781     for (const auto &entry : commitMapB) {
782         if (commitMapA.count(entry.first) == 0) {
783             return false;
784         }
785         if (!IsMultiVerCommitEqual(entry.second, commitMapA[entry.first])) {
786             return false;
787         }
788     }
789     return true;
790 }
791 
792 /**
793  * @tc.name: Commit History Sync Request Packet 001
794  * @tc.desc: Verify CommitHistorySyncRequestPacket Serialization and DeSerialization
795  * @tc.type: FUNC
796  * @tc.require: AR000BVRNU AR000CQE0J
797  * @tc.author: xiaozhenjian
798  */
799 HWTEST_F(DistributedDBMultiVerP2PSyncTest, CommitHistorySyncRequestPacket001, TestSize.Level1)
800 {
801     /**
802      * @tc.steps: step1. create CommitHistorySyncRequestPacket packetA aand packetB
803      */
804     CommitHistorySyncRequestPacket packetA;
805     CommitHistorySyncRequestPacket packetB;
806     MakeCommitHistorySyncRequestPacketA(packetA);
807     MakeCommitHistorySyncRequestPacketB(packetB);
808     Message oriMsgA;
809     Message oriMsgB;
810     oriMsgA.SetCopiedObject(packetA);
811     oriMsgA.SetMessageId(COMMIT_HISTORY_SYNC_MESSAGE);
812     oriMsgA.SetMessageType(TYPE_REQUEST);
813     oriMsgB.SetCopiedObject(packetB);
814     oriMsgB.SetMessageId(COMMIT_HISTORY_SYNC_MESSAGE);
815     oriMsgB.SetMessageType(TYPE_REQUEST);
816 
817     /**
818      * @tc.steps: step2. Serialization packetA to bufferA
819      */
820     uint32_t lenA = CommitHistorySync::CalculateLen(&oriMsgA);
821     vector<uint8_t> bufferA;
822     bufferA.resize(lenA);
823     int ret = CommitHistorySync::Serialization(bufferA.data(), lenA, &oriMsgA);
824     ASSERT_EQ(ret, E_OK);
825 
826     /**
827      * @tc.steps: step3. Serialization packetB to bufferB
828      */
829     uint32_t lenB = CommitHistorySync::CalculateLen(&oriMsgB);
830     vector<uint8_t> bufferB;
831     bufferB.resize(lenB);
832     ret = CommitHistorySync::Serialization(bufferB.data(), lenB, &oriMsgB);
833     ASSERT_EQ(ret, E_OK);
834 
835     /**
836      * @tc.steps: step4. DeSerialization bufferA to outPktA
837      * @tc.expected: step4. packetA == outPktA
838      */
839     Message outMsgA;
840     outMsgA.SetMessageId(COMMIT_HISTORY_SYNC_MESSAGE);
841     outMsgA.SetMessageType(TYPE_REQUEST);
842     ret = CommitHistorySync::DeSerialization(bufferA.data(), lenA, &outMsgA);
843     ASSERT_EQ(ret, E_OK);
844     const CommitHistorySyncRequestPacket *outPktA = outMsgA.GetObject<CommitHistorySyncRequestPacket>();
845     ASSERT_NE(outPktA, nullptr);
846     EXPECT_EQ(IsCommitHistorySyncRequestPacketEqual(packetA, *outPktA), true);
847 
848     /**
849      * @tc.steps: step5. DeSerialization bufferB to outPktB
850      * @tc.expected: step5. packetB == outPktB, outPktB != outPktA
851      */
852     Message outMsgB;
853     outMsgB.SetMessageId(COMMIT_HISTORY_SYNC_MESSAGE);
854     outMsgB.SetMessageType(TYPE_REQUEST);
855     ret = CommitHistorySync::DeSerialization(bufferB.data(), lenB, &outMsgB);
856     ASSERT_EQ(ret, E_OK);
857     const CommitHistorySyncRequestPacket *outPktB = outMsgB.GetObject<CommitHistorySyncRequestPacket>();
858     ASSERT_NE(outPktB, nullptr);
859     EXPECT_EQ(IsCommitHistorySyncRequestPacketEqual(packetB, *outPktB), true);
860     EXPECT_EQ(IsCommitHistorySyncRequestPacketEqual(*outPktA, *outPktB), false);
861 }
862 
MakeCommitHistorySyncAckPacketA(CommitHistorySyncAckPacket & inPacket)863 static void MakeCommitHistorySyncAckPacketA(CommitHistorySyncAckPacket &inPacket)
864 {
865     std::vector<MultiVerCommitNode> commitVec;
866     commitVec.push_back(MakeMultiVerCommitA());
867     commitVec.push_back(MakeMultiVerCommitC());
868     inPacket.SetData(commitVec);
869     inPacket.SetErrorCode(10086); // 10086 is errorcode
870 }
871 
MakeCommitHistorySyncAckPacketB(CommitHistorySyncAckPacket & inPacket)872 static void MakeCommitHistorySyncAckPacketB(CommitHistorySyncAckPacket &inPacket)
873 {
874     std::vector<MultiVerCommitNode> commitVec;
875     commitVec.push_back(MakeMultiVerCommitB());
876     commitVec.push_back(MakeMultiVerCommitC());
877     commitVec.push_back(MakeMultiVerCommitB());
878     inPacket.SetData(commitVec);
879     inPacket.SetErrorCode(10010); // 10010 is errorcode
880 }
881 
IsCommitHistorySyncAckPacketEqual(const CommitHistorySyncAckPacket & inPacketA,const CommitHistorySyncAckPacket & inPacketB)882 static bool IsCommitHistorySyncAckPacketEqual(const CommitHistorySyncAckPacket &inPacketA,
883     const CommitHistorySyncAckPacket &inPacketB)
884 {
885     int errCodeA;
886     int errCodeB;
887     std::vector<MultiVerCommitNode> commitVecA;
888     std::vector<MultiVerCommitNode> commitVecB;
889     inPacketA.GetData(commitVecA);
890     inPacketB.GetData(commitVecB);
891     inPacketA.GetErrorCode(errCodeA);
892     inPacketB.GetErrorCode(errCodeB);
893     if (errCodeA != errCodeB) {
894         return false;
895     }
896     if (commitVecA.size() != commitVecB.size()) {
897         return false;
898     }
899     int count = 0;
900     for (const auto &entry : commitVecA) {
901         if (!IsMultiVerCommitEqual(entry, commitVecB[count++])) {
902             return false;
903         }
904     }
905     return true;
906 }
907 
908 /**
909  * @tc.name: Commit History Sync Ack Packet 001
910  * @tc.desc: Verify CommitHistorySyncAckPacket Serialization and DeSerialization
911  * @tc.type: FUNC
912  * @tc.require: AR000BVRNU AR000CQE0J
913  * @tc.author: xiaozhenjian
914  */
915 HWTEST_F(DistributedDBMultiVerP2PSyncTest, CommitHistorySyncAckPacket001, TestSize.Level1)
916 {
917     /**
918      * @tc.steps: step1. create CommitHistorySyncAckPacket packetA aand packetB
919      */
920     CommitHistorySyncAckPacket packetA;
921     CommitHistorySyncAckPacket packetB;
922     MakeCommitHistorySyncAckPacketA(packetA);
923     MakeCommitHistorySyncAckPacketB(packetB);
924     Message oriMsgA;
925     Message oriMsgB;
926     oriMsgA.SetCopiedObject(packetA);
927     oriMsgA.SetMessageId(COMMIT_HISTORY_SYNC_MESSAGE);
928     oriMsgA.SetMessageType(TYPE_RESPONSE);
929     oriMsgB.SetCopiedObject(packetB);
930     oriMsgB.SetMessageId(COMMIT_HISTORY_SYNC_MESSAGE);
931     oriMsgB.SetMessageType(TYPE_RESPONSE);
932 
933     /**
934      * @tc.steps: step2. Serialization packetA to bufferA
935      */
936     uint32_t lenA = CommitHistorySync::CalculateLen(&oriMsgA);
937     vector<uint8_t> bufferA;
938     bufferA.resize(lenA);
939     int ret = CommitHistorySync::Serialization(bufferA.data(), lenA, &oriMsgA);
940     ASSERT_EQ(ret, E_OK);
941 
942     /**
943      * @tc.steps: step3. Serialization packetB to bufferB
944      */
945     uint32_t lenB = CommitHistorySync::CalculateLen(&oriMsgB);
946     vector<uint8_t> bufferB;
947     bufferB.resize(lenB);
948     ret = CommitHistorySync::Serialization(bufferB.data(), lenB, &oriMsgB);
949     ASSERT_EQ(ret, E_OK);
950 
951     /**
952      * @tc.steps: step4. DeSerialization bufferA to outPktA
953      * @tc.expected: step4. packetA == outPktA
954      */
955     Message outMsgA;
956     outMsgA.SetMessageId(COMMIT_HISTORY_SYNC_MESSAGE);
957     outMsgA.SetMessageType(TYPE_RESPONSE);
958     ret = CommitHistorySync::DeSerialization(bufferA.data(), lenA, &outMsgA);
959     ASSERT_EQ(ret, E_OK);
960     const CommitHistorySyncAckPacket *outPktA = outMsgA.GetObject<CommitHistorySyncAckPacket>();
961     ASSERT_NE(outPktA, nullptr);
962     EXPECT_EQ(IsCommitHistorySyncAckPacketEqual(packetA, *outPktA), true);
963 
964     /**
965      * @tc.steps: step5. DeSerialization bufferB to outPktB
966      * @tc.expected: step5. packetB == outPktB, outPktB!= outPktA
967      */
968     Message outMsgB;
969     outMsgB.SetMessageId(COMMIT_HISTORY_SYNC_MESSAGE);
970     outMsgB.SetMessageType(TYPE_RESPONSE);
971     ret = CommitHistorySync::DeSerialization(bufferB.data(), lenB, &outMsgB);
972     ASSERT_EQ(ret, E_OK);
973     const CommitHistorySyncAckPacket *outPktB = outMsgB.GetObject<CommitHistorySyncAckPacket>();
974     ASSERT_NE(outPktB, nullptr);
975     EXPECT_EQ(IsCommitHistorySyncAckPacketEqual(packetB, *outPktB), true);
976     EXPECT_EQ(IsCommitHistorySyncAckPacketEqual(*outPktA, *outPktB), false);
977 }
978 
IsMultiVerRequestPacketEqual(const MultiVerRequestPacket & inPacketA,const MultiVerRequestPacket & inPacketB)979 static bool IsMultiVerRequestPacketEqual(const MultiVerRequestPacket &inPacketA,
980     const MultiVerRequestPacket &inPacketB)
981 {
982     MultiVerCommitNode commitA;
983     MultiVerCommitNode commitB;
984     inPacketA.GetCommit(commitA);
985     inPacketB.GetCommit(commitB);
986     return IsMultiVerCommitEqual(commitA, commitB);
987 }
988 
989 /**
990  * @tc.name: MultiVerValueObject Request Packet 001
991  * @tc.desc: Verify MultiVerRequestPacket Serialization and DeSerialization
992  * @tc.type: FUNC
993  * @tc.require: AR000BVRNU AR000CQE0J
994  * @tc.author: xiaozhenjian
995  */
996 HWTEST_F(DistributedDBMultiVerP2PSyncTest, MultiVerRequestPacket001, TestSize.Level1)
997 {
998     /**
999      * @tc.steps: step1. create CommitHistorySyncAckPacket packetA aand packetB
1000      */
1001     MultiVerRequestPacket packetA;
1002     MultiVerRequestPacket packetB;
1003     MultiVerCommitNode commitA = MakeMultiVerCommitA();
1004     MultiVerCommitNode commitB = MakeMultiVerCommitB();
1005     packetA.SetCommit(commitA);
1006     packetB.SetCommit(commitB);
1007     Message oriMsgA;
1008     Message oriMsgB;
1009     oriMsgA.SetCopiedObject(packetA);
1010     oriMsgA.SetMessageId(MULTI_VER_DATA_SYNC_MESSAGE);
1011     oriMsgA.SetMessageType(TYPE_REQUEST);
1012     oriMsgB.SetCopiedObject(packetB);
1013     oriMsgB.SetMessageId(MULTI_VER_DATA_SYNC_MESSAGE);
1014     oriMsgB.SetMessageType(TYPE_REQUEST);
1015 
1016     /**
1017      * @tc.steps: step2. Serialization packetA to bufferA
1018      */
1019     uint32_t lenA = MultiVerDataSync::CalculateLen(&oriMsgA);
1020     vector<uint8_t> bufferA;
1021     bufferA.resize(lenA);
1022     int ret = MultiVerDataSync::Serialization(bufferA.data(), lenA, &oriMsgA);
1023     ASSERT_EQ(ret, E_OK);
1024 
1025     /**
1026      * @tc.steps: step3. Serialization packetB to bufferB
1027      */
1028     uint32_t lenB = MultiVerDataSync::CalculateLen(&oriMsgB);
1029     vector<uint8_t> bufferB;
1030     bufferB.resize(lenB);
1031     ret = MultiVerDataSync::Serialization(bufferB.data(), lenB, &oriMsgB);
1032     ASSERT_EQ(ret, E_OK);
1033 
1034     /**
1035      * @tc.steps: step4. DeSerialization bufferA to outPktA
1036      * @tc.expected: step4. packetA == outPktA
1037      */
1038     Message outMsgA;
1039     outMsgA.SetMessageId(MULTI_VER_DATA_SYNC_MESSAGE);
1040     outMsgA.SetMessageType(TYPE_REQUEST);
1041     ret = MultiVerDataSync::DeSerialization(bufferA.data(), lenA, &outMsgA);
1042     ASSERT_EQ(ret, E_OK);
1043     const MultiVerRequestPacket *outPktA = outMsgA.GetObject<MultiVerRequestPacket>();
1044     ASSERT_NE(outPktA, nullptr);
1045     EXPECT_EQ(IsMultiVerRequestPacketEqual(packetA, *outPktA), true);
1046 
1047     /**
1048      * @tc.steps: step5. DeSerialization bufferB to outPktB
1049      * @tc.expected: step5. packetB == outPktB, outPktB!= outPktA
1050      */
1051     Message outMsgB;
1052     outMsgB.SetMessageId(MULTI_VER_DATA_SYNC_MESSAGE);
1053     outMsgB.SetMessageType(TYPE_REQUEST);
1054     ret = MultiVerDataSync::DeSerialization(bufferB.data(), lenB, &outMsgB);
1055     ASSERT_EQ(ret, E_OK);
1056     const MultiVerRequestPacket *outPktB = outMsgB.GetObject<MultiVerRequestPacket>();
1057     ASSERT_NE(outPktB, nullptr);
1058     EXPECT_EQ(IsMultiVerRequestPacketEqual(packetB, *outPktB), true);
1059     EXPECT_EQ(IsMultiVerRequestPacketEqual(*outPktA, *outPktB), false);
1060 }
1061 
MakeMultiVerAckPacketA(MultiVerAckPacket & inPacket)1062 static void MakeMultiVerAckPacketA(MultiVerAckPacket &inPacket)
1063 {
1064     std::vector<std::vector<uint8_t>> entryVec;
1065     entryVec.push_back(vector<uint8_t>(111, 11)); // 111 is length, 11 is value
1066     entryVec.push_back(vector<uint8_t>(222, 22)); // 222 is length, 22 is value
1067     inPacket.SetData(entryVec);
1068     inPacket.SetErrorCode(333); // 333 is errorcode
1069 }
1070 
MakeMultiVerAckPacketB(MultiVerAckPacket & inPacket)1071 static void MakeMultiVerAckPacketB(MultiVerAckPacket &inPacket)
1072 {
1073     std::vector<std::vector<uint8_t>> entryVec;
1074     entryVec.push_back(vector<uint8_t>(999, 99)); // 999 is length, 99 is value
1075     entryVec.push_back(vector<uint8_t>(888, 88)); // 888 is length, 88 is value
1076     inPacket.SetData(entryVec);
1077     inPacket.SetErrorCode(777); // 777 is errorcode
1078 }
1079 
IsMultiVerAckPacketEqual(const MultiVerAckPacket & inPacketA,const MultiVerAckPacket & inPacketB)1080 static bool IsMultiVerAckPacketEqual(const MultiVerAckPacket &inPacketA, const MultiVerAckPacket &inPacketB)
1081 {
1082     int errCodeA;
1083     int errCodeB;
1084     std::vector<std::vector<uint8_t>> entryVecA;
1085     std::vector<std::vector<uint8_t>> entryVecB;
1086     inPacketA.GetData(entryVecA);
1087     inPacketB.GetData(entryVecB);
1088     inPacketA.GetErrorCode(errCodeA);
1089     inPacketB.GetErrorCode(errCodeB);
1090     if (errCodeA != errCodeB) {
1091         return false;
1092     }
1093     if (entryVecA != entryVecB) {
1094         return false;
1095     }
1096     return true;
1097 }
1098 
1099 /**
1100  * @tc.name: MultiVerValueObject Ack Packet 001
1101  * @tc.desc: Verify MultiVerAckPacket Serialization and DeSerialization
1102  * @tc.type: FUNC
1103  * @tc.require: AR000BVRNU AR000CQE0J
1104  * @tc.author: xiaozhenjian
1105  */
1106 HWTEST_F(DistributedDBMultiVerP2PSyncTest, MultiVerAckPacket001, TestSize.Level1)
1107 {
1108     /**
1109      * @tc.steps: step1. create MultiVerAckPacket packetA aand packetB
1110      */
1111     MultiVerAckPacket packetA;
1112     MultiVerAckPacket packetB;
1113     MakeMultiVerAckPacketA(packetA);
1114     MakeMultiVerAckPacketB(packetB);
1115     Message oriMsgA;
1116     Message oriMsgB;
1117     oriMsgA.SetCopiedObject(packetA);
1118     oriMsgA.SetMessageId(MULTI_VER_DATA_SYNC_MESSAGE);
1119     oriMsgA.SetMessageType(TYPE_RESPONSE);
1120     oriMsgB.SetCopiedObject(packetB);
1121     oriMsgB.SetMessageId(MULTI_VER_DATA_SYNC_MESSAGE);
1122     oriMsgB.SetMessageType(TYPE_RESPONSE);
1123 
1124     /**
1125      * @tc.steps: step2. Serialization packetA to bufferA
1126      */
1127     uint32_t lenA = MultiVerDataSync::CalculateLen(&oriMsgA);
1128     vector<uint8_t> bufferA;
1129     bufferA.resize(lenA);
1130     int ret = MultiVerDataSync::Serialization(bufferA.data(), lenA, &oriMsgA);
1131     ASSERT_EQ(ret, E_OK);
1132 
1133     /**
1134      * @tc.steps: step3. Serialization packetB to bufferB
1135      */
1136     uint32_t lenB = MultiVerDataSync::CalculateLen(&oriMsgB);
1137     vector<uint8_t> bufferB;
1138     bufferB.resize(lenB);
1139     ret = MultiVerDataSync::Serialization(bufferB.data(), lenB, &oriMsgB);
1140     ASSERT_EQ(ret, E_OK);
1141 
1142     /**
1143      * @tc.steps: step4. DeSerialization bufferA to outPktA
1144      * @tc.expected: step4. packetA == outPktA
1145      */
1146     Message outMsgA;
1147     outMsgA.SetMessageId(MULTI_VER_DATA_SYNC_MESSAGE);
1148     outMsgA.SetMessageType(TYPE_RESPONSE);
1149     ret = MultiVerDataSync::DeSerialization(bufferA.data(), lenA, &outMsgA);
1150     ASSERT_EQ(ret, E_OK);
1151     const MultiVerAckPacket *outPktA = outMsgA.GetObject<MultiVerAckPacket>();
1152     ASSERT_NE(outPktA, nullptr);
1153     EXPECT_EQ(IsMultiVerAckPacketEqual(packetA, *outPktA), true);
1154 
1155     /**
1156      * @tc.steps: step5. DeSerialization bufferB to outPktB
1157      * @tc.expected: step5. packetB == outPktB, outPktB!= outPktA
1158      */
1159     Message outMsgB;
1160     outMsgB.SetMessageId(MULTI_VER_DATA_SYNC_MESSAGE);
1161     outMsgB.SetMessageType(TYPE_RESPONSE);
1162     ret = MultiVerDataSync::DeSerialization(bufferB.data(), lenB, &outMsgB);
1163     ASSERT_EQ(ret, E_OK);
1164     const MultiVerAckPacket *outPktB = outMsgB.GetObject<MultiVerAckPacket>();
1165     ASSERT_NE(outPktB, nullptr);
1166     EXPECT_EQ(IsMultiVerAckPacketEqual(packetB, *outPktB), true);
1167     EXPECT_EQ(IsMultiVerAckPacketEqual(*outPktA, *outPktB), false);
1168 }
1169 
1170 /**
1171  * @tc.name: Simple Data Sync 001
1172  * @tc.desc: Verify normal simple data sync function.
1173  * @tc.type: FUNC
1174  * @tc.require: AR000BVDGR
1175  * @tc.author: xushaohua
1176  */
1177 HWTEST_F(DistributedDBMultiVerP2PSyncTest, SimpleDataSync001, TestSize.Level2)
1178 {
1179     /**
1180      * @tc.steps: step1. open a KvStoreNbDelegate as deviceA
1181      */
1182     g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
1183     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1184     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
1185 
1186     /**
1187      * @tc.steps: step2. deviceB put {k1, v1}
1188      */
1189     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
1190 
1191     /**
1192      * @tc.steps: step4. deviceB put {k2, v2}
1193      */
1194     ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_2, DistributedDBUnitTest::VALUE_2), E_OK);
1195 
1196     /**
1197      * @tc.steps: step5. enable communicator and set deviceB,C online
1198      */
1199     g_deviceB->Online();
1200     g_deviceC->Online();
1201 
1202     /**
1203      * @tc.steps: step6. wait for sync
1204      * @tc.expected: step6. deviceA has {k1, v2} {k2, v2}
1205      */
1206     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
1207     Value value;
1208     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_1, value), E_OK);
1209     EXPECT_EQ(value, DistributedDBUnitTest::VALUE_1);
1210     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_2, value), E_OK);
1211     EXPECT_EQ(value, DistributedDBUnitTest::VALUE_2);
1212 }
1213 
1214 /**
1215  * @tc.name: Big Data Sync 001
1216  * @tc.desc: Verify normal big data sync function.
1217  * @tc.type: FUNC
1218  * @tc.require: AR000BVDGR
1219  * @tc.author: xushaohua
1220  */
1221 HWTEST_F(DistributedDBMultiVerP2PSyncTest, BigDataSync001, TestSize.Level2)
1222 {
1223     /**
1224      * @tc.steps: step1. open a KvStoreNbDelegate as deviceA
1225      */
1226     g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
1227     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1228     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
1229 
1230     /**
1231      * @tc.steps: step2. deviceB put {k1, v1}, v1 size 1k
1232      */
1233     Value value1;
1234     DistributedDBToolsUnitTest::GetRandomKeyValue(value1, BIG_VALUE_SIZE); // 1k +1
1235     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, value1), E_OK);
1236 
1237     /**
1238      * @tc.steps: step4. deviceC put {k2, v2}, v2 size 1k
1239      */
1240     Value value2;
1241     DistributedDBToolsUnitTest::GetRandomKeyValue(value2, BIG_VALUE_SIZE); // 1k +1
1242     ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_2, value2), E_OK);
1243 
1244     /**
1245      * @tc.steps: step5. set deviceB,C online
1246      */
1247     g_deviceB->Online();
1248     g_deviceC->Online();
1249 
1250     /**
1251      * @tc.steps: step5. wait 2s for sync
1252      * @tc.expected: step5. deviceA has {k1, v2} {k2, v2}
1253      */
1254     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
1255     Value value;
1256     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_1, value), E_OK);
1257     EXPECT_EQ(value, value1);
1258     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_2, value), E_OK);
1259     EXPECT_EQ(value, value2);
1260 }
1261 
1262 /**
1263  * @tc.name: Limit Data Sync 001
1264  * @tc.desc: Verify normal limit data sync function.
1265  * @tc.type: FUNC
1266  * @tc.require: AR000BVDGR
1267  * @tc.author: xushaohua
1268  */
1269 HWTEST_F(DistributedDBMultiVerP2PSyncTest, LimitDataSync001, TestSize.Level2)
1270 {
1271     /**
1272      * @tc.steps: step1. open a KvStoreNbDelegate as deviceA
1273      */
1274     g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
1275     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1276     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
1277     /**
1278      * @tc.steps: step2. deviceB put {k1, v1}, k1 size 1k, v1 size 4M
1279      */
1280     Key key1;
1281     Value value1;
1282     DistributedDBToolsUnitTest::GetRandomKeyValue(key1, LIMIT_KEY_SIZE);
1283     DistributedDBToolsUnitTest::GetRandomKeyValue(value1, LIMIT_VALUE_SIZE);
1284     ASSERT_EQ(g_deviceB->PutData(key1, value1), E_OK);
1285 
1286     /**
1287      * @tc.steps: step3. deviceC put {k2, v2}, k2 size 1k, v2 size 4M
1288      */
1289     Key key2;
1290     Value value2;
1291     DistributedDBToolsUnitTest::GetRandomKeyValue(key2, LIMIT_KEY_SIZE);
1292     DistributedDBToolsUnitTest::GetRandomKeyValue(value2, LIMIT_VALUE_SIZE);
1293     ASSERT_EQ(g_deviceC->PutData(key2, value2), E_OK);
1294 
1295     /**
1296      * @tc.steps: step4. set deviceB,C online
1297      */
1298     g_deviceB->Online();
1299     g_deviceC->Online();
1300 
1301     /**
1302      * @tc.steps: step5. wait 30 for sync
1303      * @tc.expected: step5. deviceA has {k1, v2} {k2, v2}
1304      */
1305     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_LIMIT_TIME));
1306     Value value;
1307     EXPECT_EQ(GetData(g_kvDelegatePtr, key1, value), E_OK);
1308     EXPECT_EQ(value, value1);
1309     EXPECT_EQ(GetData(g_kvDelegatePtr, key2, value), E_OK);
1310     EXPECT_EQ(value, value2);
1311 }
1312 
1313 /**
1314  * @tc.name: Multi Record 001
1315  * @tc.desc: Verify normal multi record sync function.
1316  * @tc.type: FUNC
1317  * @tc.require: AR000BVDGR
1318  * @tc.author: xushaohua
1319  */
1320 HWTEST_F(DistributedDBMultiVerP2PSyncTest, MultiRecord001, TestSize.Level2)
1321 {
1322     /**
1323      * @tc.steps: step1. open a KvStoreNbDelegate as deviceA
1324      */
1325     g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
1326     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1327     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
1328 
1329     /**
1330      * @tc.steps: step2. deviceB put {k1, v1}
1331      */
1332     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
1333 
1334     /**
1335      * @tc.steps: step4. deviceB put {k1, v2} v2 > 1K
1336      */
1337     Value value2;
1338     DistributedDBToolsUnitTest::GetRandomKeyValue(value2, BIG_VALUE_SIZE); // 1k +1
1339     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, value2), E_OK);
1340 
1341     /**
1342      * @tc.steps: step4. deviceB put {k2, v3}
1343      */
1344     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_2, DistributedDBUnitTest::VALUE_3), E_OK);
1345 
1346     /**
1347      * @tc.steps: step5. deviceB put {k3, v3} and delete k3
1348      */
1349     ASSERT_TRUE(g_deviceB->StartTransaction() == E_OK);
1350     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_3, DistributedDBUnitTest::VALUE_3), E_OK);
1351     ASSERT_EQ(g_deviceB->DeleteData(DistributedDBUnitTest::KEY_3), E_OK);
1352     ASSERT_TRUE(g_deviceB->Commit() == E_OK);
1353 
1354     /**
1355      * @tc.steps: step6. deviceC put {k4, v4}
1356      */
1357     ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_4, DistributedDBUnitTest::VALUE_4), E_OK);
1358 
1359     /**
1360      * @tc.steps: step7. deviceB put {k4, v5} v2 > 1K
1361      */
1362     Value value5;
1363     DistributedDBToolsUnitTest::GetRandomKeyValue(value5, BIG_VALUE_SIZE); // 1k +1
1364     ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_4, value5), E_OK);
1365 
1366     /**
1367      * @tc.steps: step8. deviceB put {k5, v6}
1368      */
1369     ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_5, DistributedDBUnitTest::VALUE_6), E_OK);
1370 
1371     /**
1372      * @tc.steps: step9. deviceB put {k6, v6} and delete k6
1373      */
1374     ASSERT_TRUE(g_deviceC->StartTransaction() == E_OK);
1375     ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_6, DistributedDBUnitTest::VALUE_6), E_OK);
1376     ASSERT_EQ(g_deviceC->DeleteData(DistributedDBUnitTest::KEY_6), E_OK);
1377     ASSERT_TRUE(g_deviceC->Commit() == E_OK);
1378 
1379     /**
1380      * @tc.steps: step10. set deviceB,C online
1381      */
1382     g_deviceB->Online();
1383     g_deviceC->Online();
1384 
1385     /**
1386      * @tc.steps: step11. wait 5s for sync
1387      * @tc.expected: step11. deviceA has {k1, v2}, {k2, v3}, {k4, v5}, {k5, v6}
1388      */
1389     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_LONG_TIME));
1390     Value value;
1391     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_1, value), E_OK);
1392     EXPECT_EQ(value, value2);
1393     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_2, value), E_OK);
1394     EXPECT_EQ(value, DistributedDBUnitTest::VALUE_3);
1395     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_4, value), E_OK);
1396     EXPECT_EQ(value, value5);
1397     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_5, value), E_OK);
1398     EXPECT_EQ(value, DistributedDBUnitTest::VALUE_6);
1399 }
1400 
1401 /**
1402  * @tc.name: Net Disconnect Sync 001
1403  * @tc.desc: Test exception sync when net disconnected.
1404  * @tc.type: FUNC
1405  * @tc.require: AR000BVDGR
1406  * @tc.author: xushaohua
1407  */
1408 HWTEST_F(DistributedDBMultiVerP2PSyncTest, NetDisconnectSync001, TestSize.Level3)
1409 {
1410     /**
1411      * @tc.steps: step1. open a KvStoreNbDelegate as deviceA
1412      */
1413     g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
1414     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1415     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
1416 
1417     std::vector<std::string> devices;
1418     devices.push_back(g_deviceB->GetDeviceId());
1419     devices.push_back(g_deviceC->GetDeviceId());
1420 
1421     ASSERT_TRUE(g_deviceB->StartTransaction() == E_OK);
1422     /**
1423      * @tc.steps: step2. deviceB put {k1, v1}
1424      */
1425     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
1426 
1427     /**
1428      * @tc.steps: step4. deviceB put {k1, v2} v2 > 1K
1429      */
1430     Value value2;
1431     DistributedDBToolsUnitTest::GetRandomKeyValue(value2, 1024 + 1); // 1k +1
1432     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, value2), E_OK);
1433 
1434     /**
1435      * @tc.steps: step4. deviceB put {k2, v3}
1436      */
1437     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_2, DistributedDBUnitTest::VALUE_3), E_OK);
1438 
1439     /**
1440      * @tc.steps: step5. deviceB put {k3, v3} and delete k3
1441      */
1442     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_3, DistributedDBUnitTest::VALUE_3), E_OK);
1443     ASSERT_EQ(g_deviceB->DeleteData(DistributedDBUnitTest::KEY_3), E_OK);
1444     ASSERT_TRUE(g_deviceB->Commit() == E_OK);
1445 
1446     /**
1447      * @tc.steps: step6. deviceB online and enable communicator
1448      */
1449     g_deviceB->Online();
1450 
1451     /**
1452      * @tc.steps: step7. disable communicator and wait 5s
1453      * @tc.expected: step7. deviceA has no key1, key2
1454      */
1455     g_communicatorAggregator->Disable();
1456     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_LONG_TIME + WAIT_LONG_TIME));
1457 
1458     Value value;
1459     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_1, value), NOT_FOUND);
1460     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_2, value), NOT_FOUND);
1461 
1462     ASSERT_TRUE(g_deviceC->StartTransaction() == E_OK);
1463     /**
1464      * @tc.steps: step8. deviceC put {k4, v4}
1465      */
1466     ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_4, DistributedDBUnitTest::VALUE_4), E_OK);
1467 
1468     /**
1469      * @tc.steps: step9. deviceB put {k4, v5} v2 > 1K
1470      */
1471     Value value5;
1472     DistributedDBToolsUnitTest::GetRandomKeyValue(value5, BIG_VALUE_SIZE); // 1k +1
1473     ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_4, value5), E_OK);
1474 
1475     /**
1476      * @tc.steps: step10. deviceB put {k5, v6}
1477      */
1478     ASSERT_TRUE(g_deviceC->PutData(DistributedDBUnitTest::KEY_5, DistributedDBUnitTest::VALUE_6) == E_OK);
1479 
1480     /**
1481      * @tc.steps: step11. deviceB put {k6, v6} and delete k6
1482      */
1483     ASSERT_TRUE(g_deviceC->PutData(DistributedDBUnitTest::KEY_6, DistributedDBUnitTest::VALUE_6) == E_OK);
1484     ASSERT_TRUE(g_deviceC->DeleteData(DistributedDBUnitTest::KEY_6) == E_OK);
1485     ASSERT_TRUE(g_deviceC->Commit() == E_OK);
1486 
1487     /**
1488      * @tc.steps: step12. deviceC online and enable communicator
1489      */
1490     g_communicatorAggregator->Enable();
1491     g_deviceC->Online();
1492 
1493     /**
1494      * @tc.steps: step13. wait 5s for sync
1495      * @tc.expected: step13. deviceA has {k4, v5}, {k5, v6}
1496      */
1497     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_LONG_TIME)); // wait 5s
1498     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_4, value), E_OK);
1499     EXPECT_EQ(value, value5);
1500     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_5, value), E_OK);
1501     EXPECT_EQ(value, DistributedDBUnitTest::VALUE_6);
1502 }
1503 
1504 /**
1505   * @tc.name: SyncQueue006
1506   * @tc.desc: multi version not support sync queue
1507   * @tc.type: FUNC
1508   * @tc.require: AR000D4876
1509   * @tc.author: wangchuanqing
1510   */
1511 HWTEST_F(DistributedDBMultiVerP2PSyncTest, SyncQueue006, TestSize.Level3)
1512 {
1513     /**
1514      * @tc.steps:step1. open a KvStoreNbDelegate as deviceA
1515      */
1516     g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
1517     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1518 
1519     /**
1520      * @tc.steps:step2. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE
1521      * @tc.expected: step2. Expect return NOT_SUPPORT.
1522      */
1523     int param;
1524     PragmaData input = static_cast<PragmaData>(&param);
1525     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), NOT_SUPPORT);
1526     EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), NOT_SUPPORT);
1527     EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), NOT_SUPPORT);
1528 }
1529 
1530 /**
1531  * @tc.name: PermissionCheck001
1532  * @tc.desc: deviceA permission check not pass
1533  * @tc.type: FUNC
1534  * @tc.require: AR000D4876
1535  * @tc.author: xushaohua
1536  */
1537 HWTEST_F(DistributedDBMultiVerP2PSyncTest, PermissionCheck001, TestSize.Level2)
1538 {
1539     /**
1540      * @tc.steps: step1. SetPermissionCheckCallback
1541      * @tc.expected: step1. return OK.
1542      */
1543     auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
__anonc16bfaeb0502(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, uint8_t flag) 1544                                         const std::string &deviceId, uint8_t flag) -> bool {
1545                                         if (flag & CHECK_FLAG_RECEIVE) {
1546                                             LOGD("in RunPermissionCheck callback func, check not pass, flag:%d", flag);
1547                                             return false;
1548                                         } else {
1549                                             LOGD("in RunPermissionCheck callback func, check pass, flag:%d", flag);
1550                                             return true;
1551                                         }
1552                                         };
1553     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
1554 
1555     /**
1556      * @tc.steps: step2. open a KvStoreNbDelegate as deviceA
1557      */
1558     g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
1559     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1560     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
1561 
1562     /**
1563      * @tc.steps: step3. deviceB put {k1, v1}
1564      */
1565     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
1566 
1567     /**
1568      * @tc.steps: step4. deviceC put {k2, v2}
1569      */
1570     ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_2, DistributedDBUnitTest::VALUE_2), E_OK);
1571 
1572     /**
1573      * @tc.steps: step5. enable communicator and set deviceB,C online
1574      */
1575     g_deviceB->Online();
1576     g_deviceC->Online();
1577 
1578     /**
1579      * @tc.steps: step6. wait for sync
1580      * @tc.expected: step6. deviceA do not has {k1, v2} {k2, v2}
1581      */
1582     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
1583     Value value;
1584     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_1, value), NOT_FOUND);
1585     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_2, value), NOT_FOUND);
1586     PermissionCheckCallbackV2 nullCallback;
1587     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
1588 }
1589 
1590 /**
1591  * @tc.name: PermissionCheck002
1592  * @tc.desc: deviceB deviceC permission check not pass
1593  * @tc.type: FUNC
1594  * @tc.require: AR000D4876
1595  * @tc.author: xushaohua
1596  */
1597 HWTEST_F(DistributedDBMultiVerP2PSyncTest, PermissionCheck002, TestSize.Level2)
1598 {
1599     /**
1600      * @tc.steps: step1. SetPermissionCheckCallback
1601      * @tc.expected: step1. return OK.
1602      */
1603     auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
__anonc16bfaeb0602(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, uint8_t flag) 1604                                         const std::string &deviceId, uint8_t flag) -> bool {
1605                                         if (flag & CHECK_FLAG_SEND) {
1606                                             LOGD("in RunPermissionCheck callback func, check not pass, flag:%d", flag);
1607                                             return false;
1608                                         } else {
1609                                             LOGD("in RunPermissionCheck callback func, check pass, flag:%d", flag);
1610                                             return true;
1611                                         }
1612                                         };
1613     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
1614 
1615     /**
1616      * @tc.steps: step2. open a KvStoreNbDelegate as deviceA
1617      */
1618     g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
1619     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1620     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
1621 
1622     /**
1623      * @tc.steps: step3. deviceB put {k1, v1}
1624      */
1625     ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
1626 
1627     /**
1628      * @tc.steps: step4. deviceC put {k2, v2}
1629      */
1630     ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_2, DistributedDBUnitTest::VALUE_2), E_OK);
1631 
1632     /**
1633      * @tc.steps: step5. enable communicator and set deviceB,C online
1634      */
1635     g_deviceB->Online();
1636     g_deviceC->Online();
1637 
1638     /**
1639      * @tc.steps: step6. wait for sync
1640      * @tc.expected: step6. deviceA do not has {k1, v2} {k2, v2}
1641      */
1642     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
1643     Value value;
1644     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_1, value), NOT_FOUND);
1645     EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_2, value), NOT_FOUND);
1646     PermissionCheckCallbackV2 nullCallback;
1647     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
1648 }
1649 #endif
1650 #endif // OMIT_MULTI_VER