1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifndef OMIT_MULTI_VER
16 #include <gtest/gtest.h>
17 #include <thread>
18
19 #include "commit_history_sync.h"
20 #include "db_common.h"
21 #include "distributeddb_data_generate_unit_test.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "ikvdb_connection.h"
24 #include "kv_store_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "kvdb_manager.h"
27 #include "kvdb_pragma.h"
28 #include "log_print.h"
29 #include "meta_data.h"
30 #include "multi_ver_data_sync.h"
31 #include "platform_specific.h"
32 #include "sync_types.h"
33 #include "time_sync.h"
34 #include "virtual_multi_ver_sync_db_interface.h"
35
36 using namespace testing::ext;
37 using namespace DistributedDB;
38 using namespace DistributedDBUnitTest;
39 using namespace std;
40
41 #ifndef LOW_LEVEL_MEM_DEV
42 namespace {
43 string g_testDir;
44 const string STORE_ID = "kv_store_sync_test";
45 const string STORE_ID_A = "kv_store_sync_test_a";
46 const string STORE_ID_B = "kv_store_sync_test_b";
47 const int WAIT_TIME_1 = 1000;
48 const int WAIT_TIME_2 = 2000;
49 const int WAIT_LONG_TIME = 10000;
50 const int WAIT_LIMIT_TIME = 30000;
51 const std::string DEVICE_B = "deviceB";
52 const std::string DEVICE_C = "deviceC";
53 const int LIMIT_KEY_SIZE = 1024;
54 constexpr int BIG_VALUE_SIZE = 1024 + 1; // > 1K
55 constexpr int LIMIT_VALUE_SIZE = 4 * 1024 * 1024; // 4M
56 KvStoreDelegateManager g_mgr("sync_test", "sync_test");
57 KvStoreConfig g_config;
58 KvStoreDelegate::Option g_option;
59
60 // define the g_kvDelegateCallback, used to get some information when open a kv store.
61 DBStatus g_kvDelegateStatus = INVALID_ARGS;
62 KvStoreDelegate *g_kvDelegatePtr = nullptr;
63 MultiVerNaturalStoreConnection *g_connectionA;
64 MultiVerNaturalStoreConnection *g_connectionB;
65 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
66 KvVirtualDevice *g_deviceB = nullptr;
67 KvVirtualDevice *g_deviceC = nullptr;
68
69 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
70 auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreDelegateCallback,
71 placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
72
GetConnection(const std::string & dir,const std::string & storeId,int errCode)73 MultiVerNaturalStoreConnection *GetConnection(const std::string &dir, const std::string &storeId, int errCode)
74 {
75 KvDBProperties prop;
76 prop.SetStringProp(KvDBProperties::USER_ID, "sync_test");
77 prop.SetStringProp(KvDBProperties::APP_ID, "sync_test");
78 prop.SetStringProp(KvDBProperties::STORE_ID, storeId);
79 std::string identifier = DBCommon::TransferHashString("sync_test-sync_test-" + storeId);
80
81 prop.SetStringProp(KvDBProperties::IDENTIFIER_DATA, identifier);
82 std::string identifierDir = DBCommon::TransferStringToHex(identifier);
83 prop.SetStringProp(KvDBProperties::IDENTIFIER_DIR, identifierDir);
84 prop.SetStringProp(KvDBProperties::DATA_DIR, dir);
85 prop.SetIntProp(KvDBProperties::DATABASE_TYPE, KvDBProperties::MULTI_VER_TYPE_SQLITE);
86 prop.SetBoolProp(KvDBProperties::CREATE_IF_NECESSARY, true);
87 errCode = E_OK;
88 auto conn = KvDBManager::GetDatabaseConnection(prop, errCode);
89 if (errCode != E_OK) {
90 LOGE("[DistributeddbMultiVerP2PSyncTes] db create failed path, err %d", errCode);
91 return nullptr;
92 }
93 return static_cast<MultiVerNaturalStoreConnection *>(conn);
94 }
95
GetDataFromConnection(IKvDBConnection * conn,const Key & key,Value & value)96 int GetDataFromConnection(IKvDBConnection *conn, const Key &key, Value &value)
97 {
98 IKvDBSnapshot *snapshot = nullptr;
99 int errCode = conn->GetSnapshot(snapshot);
100 if (errCode != E_OK) {
101 return errCode;
102 }
103 errCode = snapshot->Get(key, value);
104 conn->ReleaseSnapshot(snapshot);
105 return errCode;
106 }
107 }
108
109 class DistributedDBMultiVerP2PSyncTest : public testing::Test {
110 public:
111 static void SetUpTestCase(void);
112 static void TearDownTestCase(void);
113 void SetUp();
114 void TearDown();
115 };
116
SetUpTestCase(void)117 void DistributedDBMultiVerP2PSyncTest::SetUpTestCase(void)
118 {
119 /**
120 * @tc.setup: Init datadir and Virtual Communicator.
121 */
122 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
123 string dir = g_testDir + "/commitstore";
124 g_config.dataDir = dir;
125 DIR* dirTmp = opendir(dir.c_str());
126 if (dirTmp == nullptr) {
127 OS::MakeDBDirectory(dir);
128 } else {
129 closedir(dirTmp);
130 }
131 g_mgr.SetKvStoreConfig(g_config);
132 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
133 ASSERT_TRUE(g_communicatorAggregator != nullptr);
134 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
135 }
136
TearDownTestCase(void)137 void DistributedDBMultiVerP2PSyncTest::TearDownTestCase(void)
138 {
139 /**
140 * @tc.teardown: Release virtual Communicator and clear data dir.
141 */
142 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
143 LOGE("rm test db files error!");
144 }
145
146 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
147 g_communicatorAggregator = nullptr;
148 }
149
SetUp(void)150 void DistributedDBMultiVerP2PSyncTest::SetUp(void)
151 {
152 DistributedDBToolsUnitTest::PrintTestCaseInfo();
153 /**
154 * @tc.setup: create virtual device B and C
155 */
156 g_communicatorAggregator->Disable();
157 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
158 ASSERT_TRUE(g_deviceB != nullptr);
159 VirtualMultiVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualMultiVerSyncDBInterface;
160 ASSERT_TRUE(syncInterfaceB != nullptr);
161 ASSERT_EQ(syncInterfaceB->Initialize(DEVICE_B), E_OK);
162 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
163
164 g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
165 ASSERT_TRUE(g_deviceC != nullptr);
166 VirtualMultiVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualMultiVerSyncDBInterface;
167 ASSERT_TRUE(syncInterfaceC != nullptr);
168 ASSERT_EQ(syncInterfaceC->Initialize(DEVICE_C), E_OK);
169 ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
170 g_communicatorAggregator->Enable();
171
172 auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
173 const std::string &deviceId, uint8_t flag) -> bool {
174 return true;};
175 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
176 }
177
TearDown(void)178 void DistributedDBMultiVerP2PSyncTest::TearDown(void)
179 {
180 /**
181 * @tc.teardown: Release device A, B, C, connectionA and connectionB
182 */
183 if (g_kvDelegatePtr != nullptr) {
184 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
185 g_kvDelegatePtr = nullptr;
186 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
187 LOGD("delete kv store status %d", status);
188 ASSERT_TRUE(status == OK);
189 }
190 if (g_deviceB != nullptr) {
191 delete g_deviceB;
192 g_deviceB = nullptr;
193 }
194 if (g_deviceC != nullptr) {
195 delete g_deviceC;
196 g_deviceC = nullptr;
197 }
198 if (g_connectionA != nullptr) {
199 g_connectionA->Close();
200 ASSERT_EQ(g_mgr.DeleteKvStore(STORE_ID_A), OK);
201 g_connectionA = nullptr;
202 }
203 if (g_connectionB != nullptr) {
204 g_connectionB->Close();
205 ASSERT_EQ(g_mgr.DeleteKvStore(STORE_ID_B), OK);
206 g_connectionB = nullptr;
207 }
208 PermissionCheckCallbackV2 nullCallback;
209 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
210 }
211
GetData(KvStoreDelegate * kvStore,const Key & key,Value & value)212 static DBStatus GetData(KvStoreDelegate *kvStore, const Key &key, Value &value)
213 {
214 KvStoreSnapshotDelegate *snapshotTmp = nullptr;
215 DBStatus statusTmp;
216 kvStore->GetKvStoreSnapshot(nullptr,
217 [&statusTmp, &snapshotTmp](DBStatus status, KvStoreSnapshotDelegate *snapshot) {
218 statusTmp = status;
219 snapshotTmp = snapshot;
220 });
221 if (statusTmp != E_OK) {
222 return statusTmp;
223 }
224 snapshotTmp->Get(key, [&statusTmp, &value](DBStatus status, const Value &outValue) {
225 statusTmp = status;
226 value = outValue;
227 });
228 if (statusTmp == OK) {
229 LOGD("[DistributeddbMultiVerP2PSyncTes] GetData key %c, value = %c", key[0], value[0]);
230 }
231 kvStore->ReleaseKvStoreSnapshot(snapshotTmp);
232 return statusTmp;
233 }
234
235 /**
236 * @tc.name: Transaction Sync 001
237 * @tc.desc: Verify put transaction sync function.
238 * @tc.type: FUNC
239 * @tc.require: AR000BVRO4 AR000CQE0K
240 * @tc.author: xushaohua
241 */
242 HWTEST_F(DistributedDBMultiVerP2PSyncTest, TransactionSync001, TestSize.Level2)
243 {
244 /**
245 * @tc.steps: step1. open a KvStoreNbDelegate as deviceA
246 */
247 g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
248 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
249 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
250
251 /**
252 * @tc.steps: step2. deviceB put {k1, v1}, {k2,v2} in a transaction
253 */
254 g_deviceB->StartTransaction();
255 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
256 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_2, DistributedDBUnitTest::VALUE_2), E_OK);
257 g_deviceB->Commit();
258
259 /**
260 * @tc.steps: step3. deviceB online and wait for sync
261 */
262 g_deviceB->Online();
263 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
264
265 /**
266 * @tc.steps: step4. deviceC put {k3, v3}, {k4,v4} in a transaction
267 */
268 g_deviceC->StartTransaction();
269 ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_3, DistributedDBUnitTest::VALUE_3), E_OK);
270 ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_4, DistributedDBUnitTest::VALUE_4), E_OK);
271 g_deviceC->Commit();
272
273 /**
274 * @tc.steps: step5. deviceC online for sync
275 */
276 g_deviceC->Online();
277
278 /**
279 * @tc.steps: step6. deviceC offline
280 */
281 g_deviceC->Offline();
282 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
283
284 /**
285 * @tc.expected: step6. deviceA have {k1, v1}, {k2, v2}, not have k3, k4
286 */
287 Value value;
288 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_1, value), E_OK);
289 EXPECT_EQ(value, DistributedDBUnitTest::VALUE_1);
290 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_2, value), E_OK);
291 EXPECT_EQ(value, DistributedDBUnitTest::VALUE_2);
292
293 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_3, value), NOT_FOUND);
294 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_4, value), NOT_FOUND);
295 }
296
297 /**
298 * @tc.name: Transaction Sync 002
299 * @tc.desc: Verify delete transaction sync function.
300 * @tc.type: FUNC
301 * @tc.require: AR000BVRO4 AR000CQE0K
302 * @tc.author: xushaohua
303 */
304 HWTEST_F(DistributedDBMultiVerP2PSyncTest, TransactionSync002, TestSize.Level2)
305 {
306 /**
307 * @tc.steps: step1. open a KvStoreNbDelegate as deviceA
308 */
309 g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
310 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
311 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
312
313 /**
314 * @tc.steps: step2. deviceB put {k1, v1}, {k2,v2} in a transaction
315 */
316 g_deviceB->StartTransaction();
317 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
318 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_2, DistributedDBUnitTest::VALUE_2), E_OK);
319 g_deviceB->Commit();
320
321 /**
322 * @tc.steps: step3. deviceB online and wait for sync
323 */
324 g_deviceB->Online();
325 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
326
327 /**
328 * @tc.steps: step4. deviceC put {k3, v3}, and delete k3 in a transaction
329 */
330 g_deviceC->StartTransaction();
331 ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_3, DistributedDBUnitTest::VALUE_3), E_OK);
332 ASSERT_EQ(g_deviceC->DeleteData(DistributedDBUnitTest::KEY_3), E_OK);
333 g_deviceC->Commit();
334
335 /**
336 * @tc.steps: step5. deviceB online for sync
337 */
338 g_deviceC->Online();
339
340 /**
341 * @tc.steps: step6. deviceC offline
342 */
343 g_deviceC->Offline();
344 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
345
346 /**
347 * @tc.expected: step6. deviceA have {k1, v1}, {k2, v2}, not have k3, k4
348 */
349 Value value;
350 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_1, value), E_OK);
351 EXPECT_EQ(value, DistributedDBUnitTest::VALUE_1);
352 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_2, value), E_OK);
353 EXPECT_EQ(value, DistributedDBUnitTest::VALUE_2);
354 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_3, value), NOT_FOUND);
355 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_3, value), NOT_FOUND);
356 }
357
358 /**
359 * @tc.name: Transaction Sync 003
360 * @tc.desc: Verify update transaction sync function.
361 * @tc.type: FUNC
362 * @tc.require: AR000BVRO4 AR000CQE0K
363 * @tc.author: xushaohua
364 */
365 HWTEST_F(DistributedDBMultiVerP2PSyncTest, TransactionSync003, TestSize.Level2)
366 {
367 /**
368 * @tc.steps: step1. open a KvStoreNbDelegate as deviceA
369 */
370 g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
371 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
372 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
373
374 /**
375 * @tc.steps: step2. deviceB put {k1, v1}, {k2,v2} in a transaction
376 */
377 g_deviceB->StartTransaction();
378 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
379 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_2, DistributedDBUnitTest::VALUE_2), E_OK);
380 g_deviceB->Commit();
381
382 /**
383 * @tc.steps: step3. deviceB online and wait for sync
384 */
385 g_deviceB->Online();
386 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
387
388 /**
389 * @tc.steps: step4. deviceC put {k3, v3}, and update {k3, v4} in a transaction
390 */
391 g_deviceC->StartTransaction();
392 ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_3, DistributedDBUnitTest::VALUE_3), E_OK);
393 ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_3, DistributedDBUnitTest::VALUE_4), E_OK);
394 g_deviceC->Commit();
395
396 /**
397 * @tc.steps: step5. deviceB online for sync
398 */
399 g_deviceC->Online();
400
401 /**
402 * @tc.steps: step6. deviceC offline
403 */
404 g_deviceC->Offline();
405 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
406
407 /**
408 * @tc.expected: step6. deviceA have {k1, v1}, {k2, v2}, not have k3, k4
409 */
410 Value value;
411 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_1, value), E_OK);
412 EXPECT_EQ(value, DistributedDBUnitTest::VALUE_1);
413 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_2, value), E_OK);
414 EXPECT_EQ(value, DistributedDBUnitTest::VALUE_2);
415 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_3, value), NOT_FOUND);
416 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_3, value), NOT_FOUND);
417 }
418
419 /**
420 * @tc.name: Metadata 001
421 * @tc.desc: Verify metadata add and update function
422 * @tc.type: FUNC
423 * @tc.require: AR000CQE0P AR000CQE0S
424 * @tc.author: xushaohua
425 */
426 HWTEST_F(DistributedDBMultiVerP2PSyncTest, Metadata001, TestSize.Level1)
427 {
428 /**
429 * @tc.steps: step1. Create a metadata and use VirtualMultiVerSyncDBInterface to init
430 * @tc.expected: step1. metadata init ok
431 */
432 Metadata metadata;
433 VirtualMultiVerSyncDBInterface *syncInterface = new (std::nothrow) VirtualMultiVerSyncDBInterface;
434 ASSERT_TRUE(syncInterface != nullptr);
435 EXPECT_EQ(syncInterface->Initialize("metadata_test"), E_OK);
436 EXPECT_EQ(metadata.Initialize(syncInterface), E_OK);
437
438 /**
439 * @tc.steps: step2. call SaveTimeOffset to write t1.
440 * @tc.expected: step2. SaveTimeOffset return ok
441 */
442 const TimeOffset timeOffsetA = 1024;
443 EXPECT_EQ(metadata.SaveTimeOffset(DEVICE_B, timeOffsetA), E_OK);
444 TimeOffset timeOffsetB = 0;
445
446 /**
447 * @tc.steps: step3. call GetTimeOffset to read t2.
448 * @tc.expected: step3. t1 == t2
449 */
450 metadata.GetTimeOffset(DEVICE_B, timeOffsetB);
451 EXPECT_EQ(timeOffsetA, timeOffsetB);
452
453 /**
454 * @tc.steps: step4. call SaveTimeOffset to write t3. t3 != t1
455 * @tc.expected: step4. SaveTimeOffset return ok
456 */
457 const TimeOffset timeOffsetC = 2048;
458 EXPECT_EQ(metadata.SaveTimeOffset(DEVICE_B, timeOffsetC), E_OK);
459
460 /**
461 * @tc.steps: step5. call GetTimeOffset to read t2.
462 * @tc.expected: step5. t4 == t3
463 */
464 TimeOffset timeOffsetD = 0;
465 metadata.GetTimeOffset(DEVICE_B, timeOffsetD);
466 EXPECT_EQ(timeOffsetC, timeOffsetD);
467 syncInterface->DeleteDatabase();
468 delete syncInterface;
469 syncInterface = nullptr;
470 }
471
472 /**
473 * @tc.name: Isolation Sync 001
474 * @tc.desc: Verify add sync isolation between different kvstore.
475 * @tc.type: FUNC
476 * @tc.require: AR000BVDGP
477 * @tc.author: xushaohua
478 */
479 HWTEST_F(DistributedDBMultiVerP2PSyncTest, IsolationSync001, TestSize.Level2)
480 {
481 int errCode = 0;
482
483 /**
484 * @tc.steps: step1. Get connectionA, connectionB from different kvstore,
485 * connectionB not in g_communicatorAggregator
486 */
487 g_communicatorAggregator->Disable();
488 g_connectionB = GetConnection(g_config.dataDir, STORE_ID_B, errCode);
489 ASSERT_TRUE(g_connectionB != nullptr);
490 g_communicatorAggregator->Enable();
491 g_connectionA = GetConnection(g_config.dataDir, STORE_ID_A, errCode);
492 ASSERT_TRUE(g_connectionA != nullptr);
493
494 /**
495 * @tc.steps: step2. deviceB put {k1, v1}
496 */
497 std::vector<std::string> devices;
498 devices.push_back(g_deviceB->GetDeviceId());
499 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
500
501 /**
502 * @tc.steps: step3. connectionA pull from deviceB
503 * @tc.expected: step3. Pragma OK, connectionA have {k1, v1} , connectionB don't have k1.
504 */
505 PragmaSync pragmaData(devices, SYNC_MODE_PULL_ONLY, nullptr);
506 ASSERT_TRUE(g_connectionA->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData) == E_OK);
507 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
508 Value value;
509 ASSERT_EQ(GetDataFromConnection(g_connectionA, DistributedDBUnitTest::KEY_1, value), E_OK);
510 EXPECT_EQ(value, DistributedDBUnitTest::VALUE_1);
511 EXPECT_EQ(GetDataFromConnection(g_connectionB, DistributedDBUnitTest::KEY_1, value), -E_NOT_FOUND);
512 }
513
514 /**
515 * @tc.name: Isolation Sync 002
516 * @tc.desc: Verify update sync isolation between different kvstore.
517 * @tc.type: FUNC
518 * @tc.require: AR000BVDGP
519 * @tc.author: xushaohua
520 */
521 HWTEST_F(DistributedDBMultiVerP2PSyncTest, IsolationSync002, TestSize.Level2)
522 {
523 int errCode = 0;
524
525 /**
526 * @tc.steps: step1. Get connectionA, connectionB from different kvstore,
527 * connectionB not in g_communicatorAggregator
528 */
529 g_communicatorAggregator->Disable();
530 g_connectionB = GetConnection(g_config.dataDir, STORE_ID_B, errCode);
531 ASSERT_TRUE(g_connectionB != nullptr);
532 g_communicatorAggregator->Enable();
533 g_connectionA = GetConnection(g_config.dataDir, STORE_ID_A, errCode);
534 ASSERT_TRUE(g_connectionA != nullptr);
535
536 /**
537 * @tc.steps: step2. deviceB put {k1, v1} and update {k1, v2}
538 */
539 std::vector<std::string> devices;
540 devices.push_back(g_deviceB->GetDeviceId());
541 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
542 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_2), E_OK);
543 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
544
545 /**
546 * @tc.steps: step3. connectionA pull from deviceB
547 * @tc.expected: step3. Pragma OK, connectionA have {k1, v2} , connectionB don't have k1.
548 */
549 PragmaSync pragmaData(devices, SYNC_MODE_PULL_ONLY, nullptr);
550 ASSERT_TRUE(g_connectionA->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData) == E_OK);
551 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
552
553 Value value;
554 EXPECT_EQ(GetDataFromConnection(g_connectionA, DistributedDBUnitTest::KEY_1, value), E_OK);
555 EXPECT_EQ(value, DistributedDBUnitTest::VALUE_2);
556 EXPECT_EQ(GetDataFromConnection(g_connectionB, DistributedDBUnitTest::KEY_1, value), -E_NOT_FOUND);
557 }
558
559 /**
560 * @tc.name: Isolation Sync 003
561 * @tc.desc: Verify delete sync isolation between different kvstore.
562 * @tc.type: FUNC
563 * @tc.require: AR000BVDGP
564 * @tc.author: xushaohua
565 */
566 HWTEST_F(DistributedDBMultiVerP2PSyncTest, IsolationSync003, TestSize.Level2)
567 {
568 int errCode = 0;
569
570 /**
571 * @tc.steps: step1. Get connectionA, connectionB from different kvstore,
572 * connectionB not in g_communicatorAggregator, connectionB put {k1,v1}
573 */
574 g_communicatorAggregator->Disable();
575 g_connectionB = GetConnection(g_config.dataDir, STORE_ID_B, errCode);
576 ASSERT_TRUE(g_connectionB != nullptr);
577 IOption option;
578 ASSERT_EQ(g_connectionB->Put(option, KEY_1, VALUE_1), E_OK);
579 g_communicatorAggregator->Enable();
580 g_connectionA = GetConnection(g_config.dataDir, STORE_ID_A, errCode);
581 ASSERT_TRUE(g_connectionA != nullptr);
582
583 /**
584 * @tc.steps: step2. deviceB put {k1, v1} and delete k1
585 */
586 std::vector<std::string> devices;
587 devices.push_back(g_deviceB->GetDeviceId());
588 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
589 ASSERT_EQ(g_deviceB->DeleteData(DistributedDBUnitTest::KEY_1), E_OK);
590 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
591
592 /**
593 * @tc.steps: step3. connectionA pull from deviceB
594 * @tc.expected: step3. Pragma OK, connectionA don't have k1, connectionB have {k1.v1}
595 */
596 LOGD("[DistributeddbMultiVerP2PSyncTes] start sync");
597 PragmaSync pragmaData(devices, SYNC_MODE_PULL_ONLY, nullptr);
598 ASSERT_TRUE(g_connectionA->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData) == E_OK);
599 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
600
601 Value value;
602 EXPECT_EQ(GetDataFromConnection(g_connectionA, DistributedDBUnitTest::KEY_1, value), -E_NOT_FOUND);
603 EXPECT_EQ(GetDataFromConnection(g_connectionB, DistributedDBUnitTest::KEY_1, value), E_OK);
604 EXPECT_EQ(value, DistributedDBUnitTest::VALUE_1);
605 }
606
SetTimeSyncPacketField(TimeSyncPacket & inPacket,Timestamp sourceBegin,Timestamp sourceEnd,Timestamp targetBegin,Timestamp targetEnd,SyncId theId)607 static void SetTimeSyncPacketField(TimeSyncPacket &inPacket, Timestamp sourceBegin, Timestamp sourceEnd,
608 Timestamp targetBegin, Timestamp targetEnd, SyncId theId)
609 {
610 inPacket.SetSourceTimeBegin(sourceBegin);
611 inPacket.SetSourceTimeEnd(sourceEnd);
612 inPacket.SetTargetTimeBegin(targetBegin);
613 inPacket.SetTargetTimeEnd(targetEnd);
614 }
615
IsTimeSyncPacketEqual(const TimeSyncPacket & inPacketA,const TimeSyncPacket & inPacketB)616 static bool IsTimeSyncPacketEqual(const TimeSyncPacket &inPacketA, const TimeSyncPacket &inPacketB)
617 {
618 bool equal = true;
619 equal = inPacketA.GetSourceTimeBegin() == inPacketB.GetSourceTimeBegin() ? equal : false;
620 equal = inPacketA.GetSourceTimeEnd() == inPacketB.GetSourceTimeEnd() ? equal : false;
621 equal = inPacketA.GetTargetTimeBegin() == inPacketB.GetTargetTimeBegin() ? equal : false;
622 equal = inPacketA.GetTargetTimeEnd() == inPacketB.GetTargetTimeEnd() ? equal : false;
623 return equal;
624 }
625
626 /**
627 * @tc.name: Timesync Packet 001
628 * @tc.desc: Verify TimesyncPacket Serialization and DeSerialization
629 * @tc.type: FUNC
630 * @tc.require: AR000BVRNU AR000CQE0J
631 * @tc.author: xiaozhenjian
632 */
633 HWTEST_F(DistributedDBMultiVerP2PSyncTest, TimesyncPacket001, TestSize.Level1)
634 {
635 /**
636 * @tc.steps: step1. create TimeSyncPacket packetA aand packetB
637 */
638 TimeSyncPacket packetA;
639 TimeSyncPacket packetB;
640 SetTimeSyncPacketField(packetA, 1, 2, 3, 4, 5); // 1, 2, 3, 4, 5 is five field for time sync packet
641 SetTimeSyncPacketField(packetB, 5, 4, 3, 2, 1); // 1, 2, 3, 4, 5 is five field for time sync packet
642 Message oriMsgA;
643 Message oriMsgB;
644 oriMsgA.SetCopiedObject(packetA);
645 oriMsgA.SetMessageId(TIME_SYNC_MESSAGE);
646 oriMsgA.SetMessageType(TYPE_REQUEST);
647 oriMsgB.SetCopiedObject(packetB);
648 oriMsgB.SetMessageId(TIME_SYNC_MESSAGE);
649 oriMsgB.SetMessageType(TYPE_RESPONSE);
650
651 /**
652 * @tc.steps: step2. Serialization packetA to bufferA
653 */
654 uint32_t lenA = TimeSync::CalculateLen(&oriMsgA);
655 vector<uint8_t> bufferA;
656 bufferA.resize(lenA);
657 int ret = TimeSync::Serialization(bufferA.data(), lenA, &oriMsgA);
658 ASSERT_EQ(ret, E_OK);
659
660 /**
661 * @tc.steps: step3. Serialization packetB to bufferB
662 */
663 uint32_t lenB = TimeSync::CalculateLen(&oriMsgB);
664 vector<uint8_t> bufferB;
665 bufferB.resize(lenB);
666 ret = TimeSync::Serialization(bufferB.data(), lenB, &oriMsgB);
667 ASSERT_EQ(ret, E_OK);
668
669 /**
670 * @tc.steps: step4. DeSerialization bufferA to outPktA
671 * @tc.expected: step4. packetA == outPktA
672 */
673 Message outMsgA;
674 outMsgA.SetMessageId(TIME_SYNC_MESSAGE);
675 outMsgA.SetMessageType(TYPE_REQUEST);
676 ret = TimeSync::DeSerialization(bufferA.data(), lenA, &outMsgA);
677 ASSERT_EQ(ret, E_OK);
678 const TimeSyncPacket *outPktA = outMsgA.GetObject<TimeSyncPacket>();
679 ASSERT_NE(outPktA, nullptr);
680 EXPECT_EQ(IsTimeSyncPacketEqual(packetA, *outPktA), true);
681
682 /**
683 * @tc.steps: step5. DeSerialization bufferA to outPktA
684 * @tc.expected: step5. packetB == outPktB outPktB != outPktA
685 */
686 Message outMsgB;
687 outMsgB.SetMessageId(TIME_SYNC_MESSAGE);
688 outMsgB.SetMessageType(TYPE_RESPONSE);
689 ret = TimeSync::DeSerialization(bufferB.data(), lenB, &outMsgB);
690 ASSERT_EQ(ret, E_OK);
691 const TimeSyncPacket *outPktB = outMsgB.GetObject<TimeSyncPacket>();
692 ASSERT_NE(outPktB, nullptr);
693 EXPECT_EQ(IsTimeSyncPacketEqual(packetB, *outPktB), true);
694 EXPECT_EQ(IsTimeSyncPacketEqual(*outPktA, *outPktB), false);
695 }
696
MakeMultiVerCommitA()697 static MultiVerCommitNode MakeMultiVerCommitA()
698 {
699 MultiVerCommitNode outCommit;
700 outCommit.commitId = vector<uint8_t>(1, 11); // 1 is length, 11 is value
701 outCommit.leftParent = vector<uint8_t>(2, 22); // 2 is length, 22 is value
702 outCommit.rightParent = vector<uint8_t>(3, 33); // 3 is length, 33 is value
703 outCommit.timestamp = 444; // 444 is value
704 outCommit.version = 5555; // 5555 is value
705 outCommit.isLocal = 66666; // 66666 is value
706 outCommit.deviceInfo = "AAAAAA";
707 return outCommit;
708 }
709
MakeMultiVerCommitB()710 static MultiVerCommitNode MakeMultiVerCommitB()
711 {
712 MultiVerCommitNode outCommit;
713 outCommit.commitId = vector<uint8_t>(9, 99); // 9 is length, 99 is value
714 outCommit.leftParent = vector<uint8_t>(8, 88); // 8 is length, 88 is value
715 outCommit.rightParent = vector<uint8_t>(7, 77); // 7 is length, 77 is value
716 outCommit.timestamp = 666; // 666 is value
717 outCommit.version = 5555; // 5555 is value
718 outCommit.isLocal = 44444; // 44444 is value
719 outCommit.deviceInfo = "BBBBBB";
720 return outCommit;
721 }
722
MakeMultiVerCommitC()723 static MultiVerCommitNode MakeMultiVerCommitC()
724 {
725 MultiVerCommitNode outCommit;
726 outCommit.commitId = vector<uint8_t>(1, 99); // 1 is length, 99 is value
727 outCommit.leftParent = vector<uint8_t>(2, 88); // 2 is length, 88 is value
728 outCommit.rightParent = vector<uint8_t>(3, 77); // 3 is length, 77 is value
729 outCommit.timestamp = 466; // 466 is value
730 outCommit.version = 5555; // 5555 is value
731 outCommit.isLocal = 66444; // 66444 is value
732 outCommit.deviceInfo = "CCCCCC";
733 return outCommit;
734 }
735
IsMultiVerCommitEqual(const MultiVerCommitNode & inCommitA,const MultiVerCommitNode & inCommitB)736 static bool IsMultiVerCommitEqual(const MultiVerCommitNode &inCommitA, const MultiVerCommitNode &inCommitB)
737 {
738 bool equal = true;
739 equal = inCommitA.commitId == inCommitB.commitId ? equal : false;
740 equal = inCommitA.leftParent == inCommitB.leftParent ? equal : false;
741 equal = inCommitA.rightParent == inCommitB.rightParent ? equal : false;
742 equal = inCommitA.timestamp == inCommitB.timestamp ? equal : false;
743 equal = inCommitA.version == inCommitB.version ? equal : false;
744 equal = inCommitA.isLocal == inCommitB.isLocal ? equal : false;
745 equal = inCommitA.deviceInfo == inCommitB.deviceInfo ? equal : false;
746 return equal;
747 }
748
MakeCommitHistorySyncRequestPacketA(CommitHistorySyncRequestPacket & inPacket)749 static void MakeCommitHistorySyncRequestPacketA(CommitHistorySyncRequestPacket &inPacket)
750 {
751 std::map<std::string, MultiVerCommitNode> commitMap;
752 commitMap[string("A")] = MakeMultiVerCommitA();
753 commitMap[string("C")] = MakeMultiVerCommitC();
754 inPacket.SetCommitMap(commitMap);
755 }
756
MakeCommitHistorySyncRequestPacketB(CommitHistorySyncRequestPacket & inPacket)757 static void MakeCommitHistorySyncRequestPacketB(CommitHistorySyncRequestPacket &inPacket)
758 {
759 std::map<std::string, MultiVerCommitNode> commitMap;
760 commitMap[string("B")] = MakeMultiVerCommitB();
761 commitMap[string("C")] = MakeMultiVerCommitC();
762 commitMap[string("BB")] = MakeMultiVerCommitB();
763 inPacket.SetCommitMap(commitMap);
764 }
765
IsCommitHistorySyncRequestPacketEqual(const CommitHistorySyncRequestPacket & inPacketA,const CommitHistorySyncRequestPacket & inPacketB)766 static bool IsCommitHistorySyncRequestPacketEqual(const CommitHistorySyncRequestPacket &inPacketA,
767 const CommitHistorySyncRequestPacket &inPacketB)
768 {
769 std::map<std::string, MultiVerCommitNode> commitMapA;
770 std::map<std::string, MultiVerCommitNode> commitMapB;
771 inPacketA.GetCommitMap(commitMapA);
772 inPacketB.GetCommitMap(commitMapB);
773 for (const auto &entry : commitMapA) {
774 if (commitMapB.count(entry.first) == 0) {
775 return false;
776 }
777 if (!IsMultiVerCommitEqual(entry.second, commitMapB[entry.first])) {
778 return false;
779 }
780 }
781 for (const auto &entry : commitMapB) {
782 if (commitMapA.count(entry.first) == 0) {
783 return false;
784 }
785 if (!IsMultiVerCommitEqual(entry.second, commitMapA[entry.first])) {
786 return false;
787 }
788 }
789 return true;
790 }
791
792 /**
793 * @tc.name: Commit History Sync Request Packet 001
794 * @tc.desc: Verify CommitHistorySyncRequestPacket Serialization and DeSerialization
795 * @tc.type: FUNC
796 * @tc.require: AR000BVRNU AR000CQE0J
797 * @tc.author: xiaozhenjian
798 */
799 HWTEST_F(DistributedDBMultiVerP2PSyncTest, CommitHistorySyncRequestPacket001, TestSize.Level1)
800 {
801 /**
802 * @tc.steps: step1. create CommitHistorySyncRequestPacket packetA aand packetB
803 */
804 CommitHistorySyncRequestPacket packetA;
805 CommitHistorySyncRequestPacket packetB;
806 MakeCommitHistorySyncRequestPacketA(packetA);
807 MakeCommitHistorySyncRequestPacketB(packetB);
808 Message oriMsgA;
809 Message oriMsgB;
810 oriMsgA.SetCopiedObject(packetA);
811 oriMsgA.SetMessageId(COMMIT_HISTORY_SYNC_MESSAGE);
812 oriMsgA.SetMessageType(TYPE_REQUEST);
813 oriMsgB.SetCopiedObject(packetB);
814 oriMsgB.SetMessageId(COMMIT_HISTORY_SYNC_MESSAGE);
815 oriMsgB.SetMessageType(TYPE_REQUEST);
816
817 /**
818 * @tc.steps: step2. Serialization packetA to bufferA
819 */
820 uint32_t lenA = CommitHistorySync::CalculateLen(&oriMsgA);
821 vector<uint8_t> bufferA;
822 bufferA.resize(lenA);
823 int ret = CommitHistorySync::Serialization(bufferA.data(), lenA, &oriMsgA);
824 ASSERT_EQ(ret, E_OK);
825
826 /**
827 * @tc.steps: step3. Serialization packetB to bufferB
828 */
829 uint32_t lenB = CommitHistorySync::CalculateLen(&oriMsgB);
830 vector<uint8_t> bufferB;
831 bufferB.resize(lenB);
832 ret = CommitHistorySync::Serialization(bufferB.data(), lenB, &oriMsgB);
833 ASSERT_EQ(ret, E_OK);
834
835 /**
836 * @tc.steps: step4. DeSerialization bufferA to outPktA
837 * @tc.expected: step4. packetA == outPktA
838 */
839 Message outMsgA;
840 outMsgA.SetMessageId(COMMIT_HISTORY_SYNC_MESSAGE);
841 outMsgA.SetMessageType(TYPE_REQUEST);
842 ret = CommitHistorySync::DeSerialization(bufferA.data(), lenA, &outMsgA);
843 ASSERT_EQ(ret, E_OK);
844 const CommitHistorySyncRequestPacket *outPktA = outMsgA.GetObject<CommitHistorySyncRequestPacket>();
845 ASSERT_NE(outPktA, nullptr);
846 EXPECT_EQ(IsCommitHistorySyncRequestPacketEqual(packetA, *outPktA), true);
847
848 /**
849 * @tc.steps: step5. DeSerialization bufferB to outPktB
850 * @tc.expected: step5. packetB == outPktB, outPktB != outPktA
851 */
852 Message outMsgB;
853 outMsgB.SetMessageId(COMMIT_HISTORY_SYNC_MESSAGE);
854 outMsgB.SetMessageType(TYPE_REQUEST);
855 ret = CommitHistorySync::DeSerialization(bufferB.data(), lenB, &outMsgB);
856 ASSERT_EQ(ret, E_OK);
857 const CommitHistorySyncRequestPacket *outPktB = outMsgB.GetObject<CommitHistorySyncRequestPacket>();
858 ASSERT_NE(outPktB, nullptr);
859 EXPECT_EQ(IsCommitHistorySyncRequestPacketEqual(packetB, *outPktB), true);
860 EXPECT_EQ(IsCommitHistorySyncRequestPacketEqual(*outPktA, *outPktB), false);
861 }
862
MakeCommitHistorySyncAckPacketA(CommitHistorySyncAckPacket & inPacket)863 static void MakeCommitHistorySyncAckPacketA(CommitHistorySyncAckPacket &inPacket)
864 {
865 std::vector<MultiVerCommitNode> commitVec;
866 commitVec.push_back(MakeMultiVerCommitA());
867 commitVec.push_back(MakeMultiVerCommitC());
868 inPacket.SetData(commitVec);
869 inPacket.SetErrorCode(10086); // 10086 is errorcode
870 }
871
MakeCommitHistorySyncAckPacketB(CommitHistorySyncAckPacket & inPacket)872 static void MakeCommitHistorySyncAckPacketB(CommitHistorySyncAckPacket &inPacket)
873 {
874 std::vector<MultiVerCommitNode> commitVec;
875 commitVec.push_back(MakeMultiVerCommitB());
876 commitVec.push_back(MakeMultiVerCommitC());
877 commitVec.push_back(MakeMultiVerCommitB());
878 inPacket.SetData(commitVec);
879 inPacket.SetErrorCode(10010); // 10010 is errorcode
880 }
881
IsCommitHistorySyncAckPacketEqual(const CommitHistorySyncAckPacket & inPacketA,const CommitHistorySyncAckPacket & inPacketB)882 static bool IsCommitHistorySyncAckPacketEqual(const CommitHistorySyncAckPacket &inPacketA,
883 const CommitHistorySyncAckPacket &inPacketB)
884 {
885 int errCodeA;
886 int errCodeB;
887 std::vector<MultiVerCommitNode> commitVecA;
888 std::vector<MultiVerCommitNode> commitVecB;
889 inPacketA.GetData(commitVecA);
890 inPacketB.GetData(commitVecB);
891 inPacketA.GetErrorCode(errCodeA);
892 inPacketB.GetErrorCode(errCodeB);
893 if (errCodeA != errCodeB) {
894 return false;
895 }
896 if (commitVecA.size() != commitVecB.size()) {
897 return false;
898 }
899 int count = 0;
900 for (const auto &entry : commitVecA) {
901 if (!IsMultiVerCommitEqual(entry, commitVecB[count++])) {
902 return false;
903 }
904 }
905 return true;
906 }
907
908 /**
909 * @tc.name: Commit History Sync Ack Packet 001
910 * @tc.desc: Verify CommitHistorySyncAckPacket Serialization and DeSerialization
911 * @tc.type: FUNC
912 * @tc.require: AR000BVRNU AR000CQE0J
913 * @tc.author: xiaozhenjian
914 */
915 HWTEST_F(DistributedDBMultiVerP2PSyncTest, CommitHistorySyncAckPacket001, TestSize.Level1)
916 {
917 /**
918 * @tc.steps: step1. create CommitHistorySyncAckPacket packetA aand packetB
919 */
920 CommitHistorySyncAckPacket packetA;
921 CommitHistorySyncAckPacket packetB;
922 MakeCommitHistorySyncAckPacketA(packetA);
923 MakeCommitHistorySyncAckPacketB(packetB);
924 Message oriMsgA;
925 Message oriMsgB;
926 oriMsgA.SetCopiedObject(packetA);
927 oriMsgA.SetMessageId(COMMIT_HISTORY_SYNC_MESSAGE);
928 oriMsgA.SetMessageType(TYPE_RESPONSE);
929 oriMsgB.SetCopiedObject(packetB);
930 oriMsgB.SetMessageId(COMMIT_HISTORY_SYNC_MESSAGE);
931 oriMsgB.SetMessageType(TYPE_RESPONSE);
932
933 /**
934 * @tc.steps: step2. Serialization packetA to bufferA
935 */
936 uint32_t lenA = CommitHistorySync::CalculateLen(&oriMsgA);
937 vector<uint8_t> bufferA;
938 bufferA.resize(lenA);
939 int ret = CommitHistorySync::Serialization(bufferA.data(), lenA, &oriMsgA);
940 ASSERT_EQ(ret, E_OK);
941
942 /**
943 * @tc.steps: step3. Serialization packetB to bufferB
944 */
945 uint32_t lenB = CommitHistorySync::CalculateLen(&oriMsgB);
946 vector<uint8_t> bufferB;
947 bufferB.resize(lenB);
948 ret = CommitHistorySync::Serialization(bufferB.data(), lenB, &oriMsgB);
949 ASSERT_EQ(ret, E_OK);
950
951 /**
952 * @tc.steps: step4. DeSerialization bufferA to outPktA
953 * @tc.expected: step4. packetA == outPktA
954 */
955 Message outMsgA;
956 outMsgA.SetMessageId(COMMIT_HISTORY_SYNC_MESSAGE);
957 outMsgA.SetMessageType(TYPE_RESPONSE);
958 ret = CommitHistorySync::DeSerialization(bufferA.data(), lenA, &outMsgA);
959 ASSERT_EQ(ret, E_OK);
960 const CommitHistorySyncAckPacket *outPktA = outMsgA.GetObject<CommitHistorySyncAckPacket>();
961 ASSERT_NE(outPktA, nullptr);
962 EXPECT_EQ(IsCommitHistorySyncAckPacketEqual(packetA, *outPktA), true);
963
964 /**
965 * @tc.steps: step5. DeSerialization bufferB to outPktB
966 * @tc.expected: step5. packetB == outPktB, outPktB!= outPktA
967 */
968 Message outMsgB;
969 outMsgB.SetMessageId(COMMIT_HISTORY_SYNC_MESSAGE);
970 outMsgB.SetMessageType(TYPE_RESPONSE);
971 ret = CommitHistorySync::DeSerialization(bufferB.data(), lenB, &outMsgB);
972 ASSERT_EQ(ret, E_OK);
973 const CommitHistorySyncAckPacket *outPktB = outMsgB.GetObject<CommitHistorySyncAckPacket>();
974 ASSERT_NE(outPktB, nullptr);
975 EXPECT_EQ(IsCommitHistorySyncAckPacketEqual(packetB, *outPktB), true);
976 EXPECT_EQ(IsCommitHistorySyncAckPacketEqual(*outPktA, *outPktB), false);
977 }
978
IsMultiVerRequestPacketEqual(const MultiVerRequestPacket & inPacketA,const MultiVerRequestPacket & inPacketB)979 static bool IsMultiVerRequestPacketEqual(const MultiVerRequestPacket &inPacketA,
980 const MultiVerRequestPacket &inPacketB)
981 {
982 MultiVerCommitNode commitA;
983 MultiVerCommitNode commitB;
984 inPacketA.GetCommit(commitA);
985 inPacketB.GetCommit(commitB);
986 return IsMultiVerCommitEqual(commitA, commitB);
987 }
988
989 /**
990 * @tc.name: MultiVerValueObject Request Packet 001
991 * @tc.desc: Verify MultiVerRequestPacket Serialization and DeSerialization
992 * @tc.type: FUNC
993 * @tc.require: AR000BVRNU AR000CQE0J
994 * @tc.author: xiaozhenjian
995 */
996 HWTEST_F(DistributedDBMultiVerP2PSyncTest, MultiVerRequestPacket001, TestSize.Level1)
997 {
998 /**
999 * @tc.steps: step1. create CommitHistorySyncAckPacket packetA aand packetB
1000 */
1001 MultiVerRequestPacket packetA;
1002 MultiVerRequestPacket packetB;
1003 MultiVerCommitNode commitA = MakeMultiVerCommitA();
1004 MultiVerCommitNode commitB = MakeMultiVerCommitB();
1005 packetA.SetCommit(commitA);
1006 packetB.SetCommit(commitB);
1007 Message oriMsgA;
1008 Message oriMsgB;
1009 oriMsgA.SetCopiedObject(packetA);
1010 oriMsgA.SetMessageId(MULTI_VER_DATA_SYNC_MESSAGE);
1011 oriMsgA.SetMessageType(TYPE_REQUEST);
1012 oriMsgB.SetCopiedObject(packetB);
1013 oriMsgB.SetMessageId(MULTI_VER_DATA_SYNC_MESSAGE);
1014 oriMsgB.SetMessageType(TYPE_REQUEST);
1015
1016 /**
1017 * @tc.steps: step2. Serialization packetA to bufferA
1018 */
1019 uint32_t lenA = MultiVerDataSync::CalculateLen(&oriMsgA);
1020 vector<uint8_t> bufferA;
1021 bufferA.resize(lenA);
1022 int ret = MultiVerDataSync::Serialization(bufferA.data(), lenA, &oriMsgA);
1023 ASSERT_EQ(ret, E_OK);
1024
1025 /**
1026 * @tc.steps: step3. Serialization packetB to bufferB
1027 */
1028 uint32_t lenB = MultiVerDataSync::CalculateLen(&oriMsgB);
1029 vector<uint8_t> bufferB;
1030 bufferB.resize(lenB);
1031 ret = MultiVerDataSync::Serialization(bufferB.data(), lenB, &oriMsgB);
1032 ASSERT_EQ(ret, E_OK);
1033
1034 /**
1035 * @tc.steps: step4. DeSerialization bufferA to outPktA
1036 * @tc.expected: step4. packetA == outPktA
1037 */
1038 Message outMsgA;
1039 outMsgA.SetMessageId(MULTI_VER_DATA_SYNC_MESSAGE);
1040 outMsgA.SetMessageType(TYPE_REQUEST);
1041 ret = MultiVerDataSync::DeSerialization(bufferA.data(), lenA, &outMsgA);
1042 ASSERT_EQ(ret, E_OK);
1043 const MultiVerRequestPacket *outPktA = outMsgA.GetObject<MultiVerRequestPacket>();
1044 ASSERT_NE(outPktA, nullptr);
1045 EXPECT_EQ(IsMultiVerRequestPacketEqual(packetA, *outPktA), true);
1046
1047 /**
1048 * @tc.steps: step5. DeSerialization bufferB to outPktB
1049 * @tc.expected: step5. packetB == outPktB, outPktB!= outPktA
1050 */
1051 Message outMsgB;
1052 outMsgB.SetMessageId(MULTI_VER_DATA_SYNC_MESSAGE);
1053 outMsgB.SetMessageType(TYPE_REQUEST);
1054 ret = MultiVerDataSync::DeSerialization(bufferB.data(), lenB, &outMsgB);
1055 ASSERT_EQ(ret, E_OK);
1056 const MultiVerRequestPacket *outPktB = outMsgB.GetObject<MultiVerRequestPacket>();
1057 ASSERT_NE(outPktB, nullptr);
1058 EXPECT_EQ(IsMultiVerRequestPacketEqual(packetB, *outPktB), true);
1059 EXPECT_EQ(IsMultiVerRequestPacketEqual(*outPktA, *outPktB), false);
1060 }
1061
MakeMultiVerAckPacketA(MultiVerAckPacket & inPacket)1062 static void MakeMultiVerAckPacketA(MultiVerAckPacket &inPacket)
1063 {
1064 std::vector<std::vector<uint8_t>> entryVec;
1065 entryVec.push_back(vector<uint8_t>(111, 11)); // 111 is length, 11 is value
1066 entryVec.push_back(vector<uint8_t>(222, 22)); // 222 is length, 22 is value
1067 inPacket.SetData(entryVec);
1068 inPacket.SetErrorCode(333); // 333 is errorcode
1069 }
1070
MakeMultiVerAckPacketB(MultiVerAckPacket & inPacket)1071 static void MakeMultiVerAckPacketB(MultiVerAckPacket &inPacket)
1072 {
1073 std::vector<std::vector<uint8_t>> entryVec;
1074 entryVec.push_back(vector<uint8_t>(999, 99)); // 999 is length, 99 is value
1075 entryVec.push_back(vector<uint8_t>(888, 88)); // 888 is length, 88 is value
1076 inPacket.SetData(entryVec);
1077 inPacket.SetErrorCode(777); // 777 is errorcode
1078 }
1079
IsMultiVerAckPacketEqual(const MultiVerAckPacket & inPacketA,const MultiVerAckPacket & inPacketB)1080 static bool IsMultiVerAckPacketEqual(const MultiVerAckPacket &inPacketA, const MultiVerAckPacket &inPacketB)
1081 {
1082 int errCodeA;
1083 int errCodeB;
1084 std::vector<std::vector<uint8_t>> entryVecA;
1085 std::vector<std::vector<uint8_t>> entryVecB;
1086 inPacketA.GetData(entryVecA);
1087 inPacketB.GetData(entryVecB);
1088 inPacketA.GetErrorCode(errCodeA);
1089 inPacketB.GetErrorCode(errCodeB);
1090 if (errCodeA != errCodeB) {
1091 return false;
1092 }
1093 if (entryVecA != entryVecB) {
1094 return false;
1095 }
1096 return true;
1097 }
1098
1099 /**
1100 * @tc.name: MultiVerValueObject Ack Packet 001
1101 * @tc.desc: Verify MultiVerAckPacket Serialization and DeSerialization
1102 * @tc.type: FUNC
1103 * @tc.require: AR000BVRNU AR000CQE0J
1104 * @tc.author: xiaozhenjian
1105 */
1106 HWTEST_F(DistributedDBMultiVerP2PSyncTest, MultiVerAckPacket001, TestSize.Level1)
1107 {
1108 /**
1109 * @tc.steps: step1. create MultiVerAckPacket packetA aand packetB
1110 */
1111 MultiVerAckPacket packetA;
1112 MultiVerAckPacket packetB;
1113 MakeMultiVerAckPacketA(packetA);
1114 MakeMultiVerAckPacketB(packetB);
1115 Message oriMsgA;
1116 Message oriMsgB;
1117 oriMsgA.SetCopiedObject(packetA);
1118 oriMsgA.SetMessageId(MULTI_VER_DATA_SYNC_MESSAGE);
1119 oriMsgA.SetMessageType(TYPE_RESPONSE);
1120 oriMsgB.SetCopiedObject(packetB);
1121 oriMsgB.SetMessageId(MULTI_VER_DATA_SYNC_MESSAGE);
1122 oriMsgB.SetMessageType(TYPE_RESPONSE);
1123
1124 /**
1125 * @tc.steps: step2. Serialization packetA to bufferA
1126 */
1127 uint32_t lenA = MultiVerDataSync::CalculateLen(&oriMsgA);
1128 vector<uint8_t> bufferA;
1129 bufferA.resize(lenA);
1130 int ret = MultiVerDataSync::Serialization(bufferA.data(), lenA, &oriMsgA);
1131 ASSERT_EQ(ret, E_OK);
1132
1133 /**
1134 * @tc.steps: step3. Serialization packetB to bufferB
1135 */
1136 uint32_t lenB = MultiVerDataSync::CalculateLen(&oriMsgB);
1137 vector<uint8_t> bufferB;
1138 bufferB.resize(lenB);
1139 ret = MultiVerDataSync::Serialization(bufferB.data(), lenB, &oriMsgB);
1140 ASSERT_EQ(ret, E_OK);
1141
1142 /**
1143 * @tc.steps: step4. DeSerialization bufferA to outPktA
1144 * @tc.expected: step4. packetA == outPktA
1145 */
1146 Message outMsgA;
1147 outMsgA.SetMessageId(MULTI_VER_DATA_SYNC_MESSAGE);
1148 outMsgA.SetMessageType(TYPE_RESPONSE);
1149 ret = MultiVerDataSync::DeSerialization(bufferA.data(), lenA, &outMsgA);
1150 ASSERT_EQ(ret, E_OK);
1151 const MultiVerAckPacket *outPktA = outMsgA.GetObject<MultiVerAckPacket>();
1152 ASSERT_NE(outPktA, nullptr);
1153 EXPECT_EQ(IsMultiVerAckPacketEqual(packetA, *outPktA), true);
1154
1155 /**
1156 * @tc.steps: step5. DeSerialization bufferB to outPktB
1157 * @tc.expected: step5. packetB == outPktB, outPktB!= outPktA
1158 */
1159 Message outMsgB;
1160 outMsgB.SetMessageId(MULTI_VER_DATA_SYNC_MESSAGE);
1161 outMsgB.SetMessageType(TYPE_RESPONSE);
1162 ret = MultiVerDataSync::DeSerialization(bufferB.data(), lenB, &outMsgB);
1163 ASSERT_EQ(ret, E_OK);
1164 const MultiVerAckPacket *outPktB = outMsgB.GetObject<MultiVerAckPacket>();
1165 ASSERT_NE(outPktB, nullptr);
1166 EXPECT_EQ(IsMultiVerAckPacketEqual(packetB, *outPktB), true);
1167 EXPECT_EQ(IsMultiVerAckPacketEqual(*outPktA, *outPktB), false);
1168 }
1169
1170 /**
1171 * @tc.name: Simple Data Sync 001
1172 * @tc.desc: Verify normal simple data sync function.
1173 * @tc.type: FUNC
1174 * @tc.require: AR000BVDGR
1175 * @tc.author: xushaohua
1176 */
1177 HWTEST_F(DistributedDBMultiVerP2PSyncTest, SimpleDataSync001, TestSize.Level2)
1178 {
1179 /**
1180 * @tc.steps: step1. open a KvStoreNbDelegate as deviceA
1181 */
1182 g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
1183 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1184 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
1185
1186 /**
1187 * @tc.steps: step2. deviceB put {k1, v1}
1188 */
1189 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
1190
1191 /**
1192 * @tc.steps: step4. deviceB put {k2, v2}
1193 */
1194 ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_2, DistributedDBUnitTest::VALUE_2), E_OK);
1195
1196 /**
1197 * @tc.steps: step5. enable communicator and set deviceB,C online
1198 */
1199 g_deviceB->Online();
1200 g_deviceC->Online();
1201
1202 /**
1203 * @tc.steps: step6. wait for sync
1204 * @tc.expected: step6. deviceA has {k1, v2} {k2, v2}
1205 */
1206 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
1207 Value value;
1208 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_1, value), E_OK);
1209 EXPECT_EQ(value, DistributedDBUnitTest::VALUE_1);
1210 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_2, value), E_OK);
1211 EXPECT_EQ(value, DistributedDBUnitTest::VALUE_2);
1212 }
1213
1214 /**
1215 * @tc.name: Big Data Sync 001
1216 * @tc.desc: Verify normal big data sync function.
1217 * @tc.type: FUNC
1218 * @tc.require: AR000BVDGR
1219 * @tc.author: xushaohua
1220 */
1221 HWTEST_F(DistributedDBMultiVerP2PSyncTest, BigDataSync001, TestSize.Level2)
1222 {
1223 /**
1224 * @tc.steps: step1. open a KvStoreNbDelegate as deviceA
1225 */
1226 g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
1227 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1228 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
1229
1230 /**
1231 * @tc.steps: step2. deviceB put {k1, v1}, v1 size 1k
1232 */
1233 Value value1;
1234 DistributedDBToolsUnitTest::GetRandomKeyValue(value1, BIG_VALUE_SIZE); // 1k +1
1235 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, value1), E_OK);
1236
1237 /**
1238 * @tc.steps: step4. deviceC put {k2, v2}, v2 size 1k
1239 */
1240 Value value2;
1241 DistributedDBToolsUnitTest::GetRandomKeyValue(value2, BIG_VALUE_SIZE); // 1k +1
1242 ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_2, value2), E_OK);
1243
1244 /**
1245 * @tc.steps: step5. set deviceB,C online
1246 */
1247 g_deviceB->Online();
1248 g_deviceC->Online();
1249
1250 /**
1251 * @tc.steps: step5. wait 2s for sync
1252 * @tc.expected: step5. deviceA has {k1, v2} {k2, v2}
1253 */
1254 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
1255 Value value;
1256 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_1, value), E_OK);
1257 EXPECT_EQ(value, value1);
1258 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_2, value), E_OK);
1259 EXPECT_EQ(value, value2);
1260 }
1261
1262 /**
1263 * @tc.name: Limit Data Sync 001
1264 * @tc.desc: Verify normal limit data sync function.
1265 * @tc.type: FUNC
1266 * @tc.require: AR000BVDGR
1267 * @tc.author: xushaohua
1268 */
1269 HWTEST_F(DistributedDBMultiVerP2PSyncTest, LimitDataSync001, TestSize.Level2)
1270 {
1271 /**
1272 * @tc.steps: step1. open a KvStoreNbDelegate as deviceA
1273 */
1274 g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
1275 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1276 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
1277 /**
1278 * @tc.steps: step2. deviceB put {k1, v1}, k1 size 1k, v1 size 4M
1279 */
1280 Key key1;
1281 Value value1;
1282 DistributedDBToolsUnitTest::GetRandomKeyValue(key1, LIMIT_KEY_SIZE);
1283 DistributedDBToolsUnitTest::GetRandomKeyValue(value1, LIMIT_VALUE_SIZE);
1284 ASSERT_EQ(g_deviceB->PutData(key1, value1), E_OK);
1285
1286 /**
1287 * @tc.steps: step3. deviceC put {k2, v2}, k2 size 1k, v2 size 4M
1288 */
1289 Key key2;
1290 Value value2;
1291 DistributedDBToolsUnitTest::GetRandomKeyValue(key2, LIMIT_KEY_SIZE);
1292 DistributedDBToolsUnitTest::GetRandomKeyValue(value2, LIMIT_VALUE_SIZE);
1293 ASSERT_EQ(g_deviceC->PutData(key2, value2), E_OK);
1294
1295 /**
1296 * @tc.steps: step4. set deviceB,C online
1297 */
1298 g_deviceB->Online();
1299 g_deviceC->Online();
1300
1301 /**
1302 * @tc.steps: step5. wait 30 for sync
1303 * @tc.expected: step5. deviceA has {k1, v2} {k2, v2}
1304 */
1305 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_LIMIT_TIME));
1306 Value value;
1307 EXPECT_EQ(GetData(g_kvDelegatePtr, key1, value), E_OK);
1308 EXPECT_EQ(value, value1);
1309 EXPECT_EQ(GetData(g_kvDelegatePtr, key2, value), E_OK);
1310 EXPECT_EQ(value, value2);
1311 }
1312
1313 /**
1314 * @tc.name: Multi Record 001
1315 * @tc.desc: Verify normal multi record sync function.
1316 * @tc.type: FUNC
1317 * @tc.require: AR000BVDGR
1318 * @tc.author: xushaohua
1319 */
1320 HWTEST_F(DistributedDBMultiVerP2PSyncTest, MultiRecord001, TestSize.Level2)
1321 {
1322 /**
1323 * @tc.steps: step1. open a KvStoreNbDelegate as deviceA
1324 */
1325 g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
1326 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1327 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
1328
1329 /**
1330 * @tc.steps: step2. deviceB put {k1, v1}
1331 */
1332 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
1333
1334 /**
1335 * @tc.steps: step4. deviceB put {k1, v2} v2 > 1K
1336 */
1337 Value value2;
1338 DistributedDBToolsUnitTest::GetRandomKeyValue(value2, BIG_VALUE_SIZE); // 1k +1
1339 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, value2), E_OK);
1340
1341 /**
1342 * @tc.steps: step4. deviceB put {k2, v3}
1343 */
1344 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_2, DistributedDBUnitTest::VALUE_3), E_OK);
1345
1346 /**
1347 * @tc.steps: step5. deviceB put {k3, v3} and delete k3
1348 */
1349 ASSERT_TRUE(g_deviceB->StartTransaction() == E_OK);
1350 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_3, DistributedDBUnitTest::VALUE_3), E_OK);
1351 ASSERT_EQ(g_deviceB->DeleteData(DistributedDBUnitTest::KEY_3), E_OK);
1352 ASSERT_TRUE(g_deviceB->Commit() == E_OK);
1353
1354 /**
1355 * @tc.steps: step6. deviceC put {k4, v4}
1356 */
1357 ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_4, DistributedDBUnitTest::VALUE_4), E_OK);
1358
1359 /**
1360 * @tc.steps: step7. deviceB put {k4, v5} v2 > 1K
1361 */
1362 Value value5;
1363 DistributedDBToolsUnitTest::GetRandomKeyValue(value5, BIG_VALUE_SIZE); // 1k +1
1364 ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_4, value5), E_OK);
1365
1366 /**
1367 * @tc.steps: step8. deviceB put {k5, v6}
1368 */
1369 ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_5, DistributedDBUnitTest::VALUE_6), E_OK);
1370
1371 /**
1372 * @tc.steps: step9. deviceB put {k6, v6} and delete k6
1373 */
1374 ASSERT_TRUE(g_deviceC->StartTransaction() == E_OK);
1375 ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_6, DistributedDBUnitTest::VALUE_6), E_OK);
1376 ASSERT_EQ(g_deviceC->DeleteData(DistributedDBUnitTest::KEY_6), E_OK);
1377 ASSERT_TRUE(g_deviceC->Commit() == E_OK);
1378
1379 /**
1380 * @tc.steps: step10. set deviceB,C online
1381 */
1382 g_deviceB->Online();
1383 g_deviceC->Online();
1384
1385 /**
1386 * @tc.steps: step11. wait 5s for sync
1387 * @tc.expected: step11. deviceA has {k1, v2}, {k2, v3}, {k4, v5}, {k5, v6}
1388 */
1389 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_LONG_TIME));
1390 Value value;
1391 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_1, value), E_OK);
1392 EXPECT_EQ(value, value2);
1393 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_2, value), E_OK);
1394 EXPECT_EQ(value, DistributedDBUnitTest::VALUE_3);
1395 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_4, value), E_OK);
1396 EXPECT_EQ(value, value5);
1397 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_5, value), E_OK);
1398 EXPECT_EQ(value, DistributedDBUnitTest::VALUE_6);
1399 }
1400
1401 /**
1402 * @tc.name: Net Disconnect Sync 001
1403 * @tc.desc: Test exception sync when net disconnected.
1404 * @tc.type: FUNC
1405 * @tc.require: AR000BVDGR
1406 * @tc.author: xushaohua
1407 */
1408 HWTEST_F(DistributedDBMultiVerP2PSyncTest, NetDisconnectSync001, TestSize.Level3)
1409 {
1410 /**
1411 * @tc.steps: step1. open a KvStoreNbDelegate as deviceA
1412 */
1413 g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
1414 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1415 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
1416
1417 std::vector<std::string> devices;
1418 devices.push_back(g_deviceB->GetDeviceId());
1419 devices.push_back(g_deviceC->GetDeviceId());
1420
1421 ASSERT_TRUE(g_deviceB->StartTransaction() == E_OK);
1422 /**
1423 * @tc.steps: step2. deviceB put {k1, v1}
1424 */
1425 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
1426
1427 /**
1428 * @tc.steps: step4. deviceB put {k1, v2} v2 > 1K
1429 */
1430 Value value2;
1431 DistributedDBToolsUnitTest::GetRandomKeyValue(value2, 1024 + 1); // 1k +1
1432 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, value2), E_OK);
1433
1434 /**
1435 * @tc.steps: step4. deviceB put {k2, v3}
1436 */
1437 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_2, DistributedDBUnitTest::VALUE_3), E_OK);
1438
1439 /**
1440 * @tc.steps: step5. deviceB put {k3, v3} and delete k3
1441 */
1442 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_3, DistributedDBUnitTest::VALUE_3), E_OK);
1443 ASSERT_EQ(g_deviceB->DeleteData(DistributedDBUnitTest::KEY_3), E_OK);
1444 ASSERT_TRUE(g_deviceB->Commit() == E_OK);
1445
1446 /**
1447 * @tc.steps: step6. deviceB online and enable communicator
1448 */
1449 g_deviceB->Online();
1450
1451 /**
1452 * @tc.steps: step7. disable communicator and wait 5s
1453 * @tc.expected: step7. deviceA has no key1, key2
1454 */
1455 g_communicatorAggregator->Disable();
1456 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_LONG_TIME + WAIT_LONG_TIME));
1457
1458 Value value;
1459 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_1, value), NOT_FOUND);
1460 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_2, value), NOT_FOUND);
1461
1462 ASSERT_TRUE(g_deviceC->StartTransaction() == E_OK);
1463 /**
1464 * @tc.steps: step8. deviceC put {k4, v4}
1465 */
1466 ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_4, DistributedDBUnitTest::VALUE_4), E_OK);
1467
1468 /**
1469 * @tc.steps: step9. deviceB put {k4, v5} v2 > 1K
1470 */
1471 Value value5;
1472 DistributedDBToolsUnitTest::GetRandomKeyValue(value5, BIG_VALUE_SIZE); // 1k +1
1473 ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_4, value5), E_OK);
1474
1475 /**
1476 * @tc.steps: step10. deviceB put {k5, v6}
1477 */
1478 ASSERT_TRUE(g_deviceC->PutData(DistributedDBUnitTest::KEY_5, DistributedDBUnitTest::VALUE_6) == E_OK);
1479
1480 /**
1481 * @tc.steps: step11. deviceB put {k6, v6} and delete k6
1482 */
1483 ASSERT_TRUE(g_deviceC->PutData(DistributedDBUnitTest::KEY_6, DistributedDBUnitTest::VALUE_6) == E_OK);
1484 ASSERT_TRUE(g_deviceC->DeleteData(DistributedDBUnitTest::KEY_6) == E_OK);
1485 ASSERT_TRUE(g_deviceC->Commit() == E_OK);
1486
1487 /**
1488 * @tc.steps: step12. deviceC online and enable communicator
1489 */
1490 g_communicatorAggregator->Enable();
1491 g_deviceC->Online();
1492
1493 /**
1494 * @tc.steps: step13. wait 5s for sync
1495 * @tc.expected: step13. deviceA has {k4, v5}, {k5, v6}
1496 */
1497 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_LONG_TIME)); // wait 5s
1498 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_4, value), E_OK);
1499 EXPECT_EQ(value, value5);
1500 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_5, value), E_OK);
1501 EXPECT_EQ(value, DistributedDBUnitTest::VALUE_6);
1502 }
1503
1504 /**
1505 * @tc.name: SyncQueue006
1506 * @tc.desc: multi version not support sync queue
1507 * @tc.type: FUNC
1508 * @tc.require: AR000D4876
1509 * @tc.author: wangchuanqing
1510 */
1511 HWTEST_F(DistributedDBMultiVerP2PSyncTest, SyncQueue006, TestSize.Level3)
1512 {
1513 /**
1514 * @tc.steps:step1. open a KvStoreNbDelegate as deviceA
1515 */
1516 g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
1517 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1518
1519 /**
1520 * @tc.steps:step2. Set PragmaCmd to be GET_QUEUED_SYNC_SIZE
1521 * @tc.expected: step2. Expect return NOT_SUPPORT.
1522 */
1523 int param;
1524 PragmaData input = static_cast<PragmaData>(¶m);
1525 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_SIZE, input), NOT_SUPPORT);
1526 EXPECT_EQ(g_kvDelegatePtr->Pragma(SET_QUEUED_SYNC_LIMIT, input), NOT_SUPPORT);
1527 EXPECT_EQ(g_kvDelegatePtr->Pragma(GET_QUEUED_SYNC_LIMIT, input), NOT_SUPPORT);
1528 }
1529
1530 /**
1531 * @tc.name: PermissionCheck001
1532 * @tc.desc: deviceA permission check not pass
1533 * @tc.type: FUNC
1534 * @tc.require: AR000D4876
1535 * @tc.author: xushaohua
1536 */
1537 HWTEST_F(DistributedDBMultiVerP2PSyncTest, PermissionCheck001, TestSize.Level2)
1538 {
1539 /**
1540 * @tc.steps: step1. SetPermissionCheckCallback
1541 * @tc.expected: step1. return OK.
1542 */
1543 auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
__anonc16bfaeb0502(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, uint8_t flag) 1544 const std::string &deviceId, uint8_t flag) -> bool {
1545 if (flag & CHECK_FLAG_RECEIVE) {
1546 LOGD("in RunPermissionCheck callback func, check not pass, flag:%d", flag);
1547 return false;
1548 } else {
1549 LOGD("in RunPermissionCheck callback func, check pass, flag:%d", flag);
1550 return true;
1551 }
1552 };
1553 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
1554
1555 /**
1556 * @tc.steps: step2. open a KvStoreNbDelegate as deviceA
1557 */
1558 g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
1559 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1560 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
1561
1562 /**
1563 * @tc.steps: step3. deviceB put {k1, v1}
1564 */
1565 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
1566
1567 /**
1568 * @tc.steps: step4. deviceC put {k2, v2}
1569 */
1570 ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_2, DistributedDBUnitTest::VALUE_2), E_OK);
1571
1572 /**
1573 * @tc.steps: step5. enable communicator and set deviceB,C online
1574 */
1575 g_deviceB->Online();
1576 g_deviceC->Online();
1577
1578 /**
1579 * @tc.steps: step6. wait for sync
1580 * @tc.expected: step6. deviceA do not has {k1, v2} {k2, v2}
1581 */
1582 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
1583 Value value;
1584 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_1, value), NOT_FOUND);
1585 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_2, value), NOT_FOUND);
1586 PermissionCheckCallbackV2 nullCallback;
1587 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
1588 }
1589
1590 /**
1591 * @tc.name: PermissionCheck002
1592 * @tc.desc: deviceB deviceC permission check not pass
1593 * @tc.type: FUNC
1594 * @tc.require: AR000D4876
1595 * @tc.author: xushaohua
1596 */
1597 HWTEST_F(DistributedDBMultiVerP2PSyncTest, PermissionCheck002, TestSize.Level2)
1598 {
1599 /**
1600 * @tc.steps: step1. SetPermissionCheckCallback
1601 * @tc.expected: step1. return OK.
1602 */
1603 auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
__anonc16bfaeb0602(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, uint8_t flag) 1604 const std::string &deviceId, uint8_t flag) -> bool {
1605 if (flag & CHECK_FLAG_SEND) {
1606 LOGD("in RunPermissionCheck callback func, check not pass, flag:%d", flag);
1607 return false;
1608 } else {
1609 LOGD("in RunPermissionCheck callback func, check pass, flag:%d", flag);
1610 return true;
1611 }
1612 };
1613 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
1614
1615 /**
1616 * @tc.steps: step2. open a KvStoreNbDelegate as deviceA
1617 */
1618 g_mgr.GetKvStore(STORE_ID, g_option, g_kvDelegateCallback);
1619 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1620 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_1));
1621
1622 /**
1623 * @tc.steps: step3. deviceB put {k1, v1}
1624 */
1625 ASSERT_EQ(g_deviceB->PutData(DistributedDBUnitTest::KEY_1, DistributedDBUnitTest::VALUE_1), E_OK);
1626
1627 /**
1628 * @tc.steps: step4. deviceC put {k2, v2}
1629 */
1630 ASSERT_EQ(g_deviceC->PutData(DistributedDBUnitTest::KEY_2, DistributedDBUnitTest::VALUE_2), E_OK);
1631
1632 /**
1633 * @tc.steps: step5. enable communicator and set deviceB,C online
1634 */
1635 g_deviceB->Online();
1636 g_deviceC->Online();
1637
1638 /**
1639 * @tc.steps: step6. wait for sync
1640 * @tc.expected: step6. deviceA do not has {k1, v2} {k2, v2}
1641 */
1642 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME_2));
1643 Value value;
1644 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_1, value), NOT_FOUND);
1645 EXPECT_EQ(GetData(g_kvDelegatePtr, DistributedDBUnitTest::KEY_2, value), NOT_FOUND);
1646 PermissionCheckCallbackV2 nullCallback;
1647 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
1648 }
1649 #endif
1650 #endif // OMIT_MULTI_VER