1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17 #include <thread>
18
19 #include "db_common.h"
20 #include "db_constant.h"
21 #include "distributeddb_data_generate_unit_test.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "generic_single_ver_kv_entry.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "platform_specific.h"
27 #include "query.h"
28 #include "query_sync_object.h"
29 #include "single_ver_data_sync.h"
30 #include "single_ver_serialize_manager.h"
31 #include "sync_types.h"
32 #include "virtual_communicator.h"
33 #include "virtual_communicator_aggregator.h"
34 #include "virtual_single_ver_sync_db_Interface.h"
35
36 using namespace testing::ext;
37 using namespace DistributedDB;
38 using namespace DistributedDBUnitTest;
39 using namespace std;
40
41 namespace {
42 string g_testDir;
43 const string STORE_ID = "kv_store_sync_test";
44 const string SCHEMA_STORE_ID = "kv_store_sync_schema_test";
45 const std::string DEVICE_B = "deviceB";
46
47 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
48 KvStoreDelegateManager g_schemaMgr(SCHEMA_APP_ID, USER_ID);
49 KvStoreConfig g_config;
50 DistributedDBToolsUnitTest g_tool;
51 DBStatus g_kvDelegateStatus = INVALID_ARGS;
52 DBStatus g_schemaKvDelegateStatus = INVALID_ARGS;
53 KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
54 KvStoreNbDelegate* g_schemaKvDelegatePtr = nullptr;
55 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
56 KvVirtualDevice *g_deviceB = nullptr;
57
58 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
59 auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
60 placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
61 auto g_schemaKvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
62 placeholders::_1, placeholders::_2, std::ref(g_schemaKvDelegateStatus), std::ref(g_schemaKvDelegatePtr));
63 const string SCHEMA_STRING =
64 "{\"SCHEMA_VERSION\":\"1.0\","
65 "\"SCHEMA_MODE\":\"STRICT\","
66 "\"SCHEMA_DEFINE\":{"
67 "\"field_name1\":\"BOOL\","
68 "\"field_name2\":\"BOOL\","
69 "\"field_name3\":\"INTEGER, NOT NULL\","
70 "\"field_name4\":\"LONG, DEFAULT 100\","
71 "\"field_name5\":\"DOUBLE, NOT NULL, DEFAULT 3.14\","
72 "\"field_name6\":\"STRING, NOT NULL, DEFAULT '3.1415'\","
73 "\"field_name7\":\"LONG, DEFAULT 100\","
74 "\"field_name8\":\"LONG, DEFAULT 100\","
75 "\"field_name9\":\"LONG, DEFAULT 100\","
76 "\"field_name10\":\"LONG, DEFAULT 100\""
77 "},"
78 "\"SCHEMA_INDEXES\":[\"$.field_name1\", \"$.field_name2\"]}";
79
80 const std::string SCHEMA_VALUE1 =
81 "{\"field_name1\":true,"
82 "\"field_name2\":false,"
83 "\"field_name3\":10,"
84 "\"field_name4\":20,"
85 "\"field_name5\":3.14,"
86 "\"field_name6\":\"3.1415\","
87 "\"field_name7\":100,"
88 "\"field_name8\":100,"
89 "\"field_name9\":100,"
90 "\"field_name10\":100}";
91
92 const std::string SCHEMA_VALUE2 =
93 "{\"field_name1\":false,"
94 "\"field_name2\":true,"
95 "\"field_name3\":100,"
96 "\"field_name4\":200,"
97 "\"field_name5\":3.14,"
98 "\"field_name6\":\"3.1415\","
99 "\"field_name7\":100,"
100 "\"field_name8\":100,"
101 "\"field_name9\":100,"
102 "\"field_name10\":100}";
103 }
104
105 class DistributedDBSingleVerP2PQuerySyncTest : public testing::Test {
106 public:
107 static void SetUpTestCase(void);
108 static void TearDownTestCase(void);
109 void SetUp();
110 void TearDown();
111 };
112
SetUpTestCase(void)113 void DistributedDBSingleVerP2PQuerySyncTest::SetUpTestCase(void)
114 {
115 /**
116 * @tc.setup: Init datadir and Virtual Communicator.
117 */
118 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
119 string dir = g_testDir + "/single_ver";
120 DIR* dirTmp = opendir(dir.c_str());
121 if (dirTmp == nullptr) {
122 OS::MakeDBDirectory(dir);
123 } else {
124 closedir(dirTmp);
125 }
126
127 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
128 ASSERT_TRUE(g_communicatorAggregator != nullptr);
129 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
130 }
131
TearDownTestCase(void)132 void DistributedDBSingleVerP2PQuerySyncTest::TearDownTestCase(void)
133 {
134 /**
135 * @tc.teardown: Release virtual Communicator and clear data dir.
136 */
137 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
138 LOGE("rm test db files error!");
139 }
140 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
141 }
142
SetUp(void)143 void DistributedDBSingleVerP2PQuerySyncTest::SetUp(void)
144 {
145 DistributedDBToolsUnitTest::PrintTestCaseInfo();
146 /**
147 * @tc.setup: create virtual device B and get a KvStoreNbDelegate as deviceA
148 */
149 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
150 ASSERT_TRUE(g_deviceB != nullptr);
151 VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
152 ASSERT_TRUE(syncInterfaceB != nullptr);
153 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
154 }
155
TearDown(void)156 void DistributedDBSingleVerP2PQuerySyncTest::TearDown(void)
157 {
158 /**
159 * @tc.teardown: Release device A, B
160 */
161 if (g_kvDelegatePtr != nullptr) {
162 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
163 g_kvDelegatePtr = nullptr;
164 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
165 LOGD("delete kv store status %d", status);
166 ASSERT_TRUE(status == OK);
167 }
168 if (g_schemaKvDelegatePtr != nullptr) {
169 ASSERT_EQ(g_schemaMgr.CloseKvStore(g_schemaKvDelegatePtr), OK);
170 g_schemaKvDelegatePtr = nullptr;
171 DBStatus status = g_schemaMgr.DeleteKvStore(SCHEMA_STORE_ID);
172 LOGD("delete kv store status %d", status);
173 ASSERT_TRUE(status == OK);
174 }
175 if (g_deviceB != nullptr) {
176 delete g_deviceB;
177 g_deviceB = nullptr;
178 }
179 PermissionCheckCallbackV2 nullCallback;
180 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
181 }
182
InitNormalDb()183 void InitNormalDb()
184 {
185 g_config.dataDir = g_testDir;
186 g_mgr.SetKvStoreConfig(g_config);
187 KvStoreNbDelegate::Option option;
188 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
189 ASSERT_TRUE(g_kvDelegateStatus == OK);
190 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
191 }
192
InitSchemaDb()193 void InitSchemaDb()
194 {
195 g_config.dataDir = g_testDir;
196 g_schemaMgr.SetKvStoreConfig(g_config);
197 KvStoreNbDelegate::Option option;
198 option.schema = SCHEMA_STRING;
199 g_schemaMgr.GetKvStore(SCHEMA_STORE_ID, option, g_schemaKvDelegateCallback);
200 ASSERT_TRUE(g_schemaKvDelegateStatus == OK);
201 ASSERT_TRUE(g_schemaKvDelegatePtr != nullptr);
202 }
203
204 /**
205 * @tc.name: Normal Sync 001
206 * @tc.desc: Test normal push sync for keyprefix data.
207 * @tc.type: FUNC
208 * @tc.require: AR000FN6G9
209 * @tc.author: xushaohua
210 */
211 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync001, TestSize.Level1)
212 {
213 InitNormalDb();
214 DBStatus status = OK;
215 std::vector<std::string> devices;
216 devices.push_back(g_deviceB->GetDeviceId());
217
218 /**
219 * @tc.steps: step1. deviceA put {k0, v0} - {k9, v9}
220 */
221 Key key = {'1'};
222 Value value = {'1'};
223 const int dataSize = 10;
224 for (int i = 0; i < dataSize; i++) {
225 key.push_back(i);
226 value.push_back(i);
227 status = g_kvDelegatePtr->Put(key, value);
228 ASSERT_TRUE(status == OK);
229 key.pop_back();
230 value.pop_back();
231 }
232 Key key2 = {'2'};
233 Value value2 = {'2'};
234 status = g_kvDelegatePtr->Put(key2, value2);
235 ASSERT_TRUE(status == OK);
236
237 /**
238 * @tc.steps: step2. deviceA call query sync and wait
239 * @tc.expected: step2. sync should return OK.
240 */
241 Query query = Query::Select().PrefixKey(key);
242 std::map<std::string, DBStatus> result;
243 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
244 ASSERT_TRUE(status == OK);
245
246 /**
247 * @tc.expected: step3. onComplete should be called, DeviceB have {k1,v1} - {k9, v9}
248 */
249 ASSERT_TRUE(result.size() == devices.size());
250 for (const auto &pair : result) {
251 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
252 EXPECT_TRUE(pair.second == OK);
253 }
254 VirtualDataItem item;
255 for (int i = 0; i < dataSize; i++) {
256 key.push_back(i);
257 value.push_back(i);
258 g_deviceB->GetData(key, item);
259 EXPECT_TRUE(item.value == value);
260 key.pop_back();
261 value.pop_back();
262 }
263 EXPECT_TRUE(g_deviceB->GetData(key2, item) != E_OK);
264 }
265
266 /**
267 * @tc.name: Normal Sync 002
268 * @tc.desc: Test normal push sync for limit and offset.
269 * @tc.type: FUNC
270 * @tc.require: AR000FN6G9
271 * @tc.author: xushaohua
272 */
273 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync002, TestSize.Level1)
274 {
275 InitNormalDb();
276 DBStatus status = OK;
277 std::vector<std::string> devices;
278 devices.push_back(g_deviceB->GetDeviceId());
279
280 /**
281 * @tc.steps: step1. deviceA put {k0, v0} - {k9, v9}
282 */
283 Key key = {'1'};
284 Value value = {'1'};
285 const int dataSize = 10;
286 for (int i = 0; i < dataSize; i++) {
287 key.push_back(i);
288 value.push_back(i);
289 status = g_kvDelegatePtr->Put(key, value);
290 ASSERT_TRUE(status == OK);
291 key.pop_back();
292 value.pop_back();
293 }
294
295 /**
296 * @tc.steps: step2. deviceA call sync and wait
297 * @tc.expected: step2. sync should return OK.
298 */
299 const int limit = 5;
300 const int offset = 4;
301 Query query = Query::Select().PrefixKey(key).Limit(limit, offset);
302 std::map<std::string, DBStatus> result;
303 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
304 ASSERT_TRUE(status == OK);
305
306 /**
307 * @tc.expected: step3. onComplete should be called, DeviceB have {k4,v4} {k8, v8}
308 */
309 ASSERT_TRUE(result.size() == devices.size());
310 for (const auto &pair : result) {
311 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
312 EXPECT_TRUE(pair.second == OK);
313 }
314
315 VirtualDataItem item;
316 for (int i = limit - 1; i < limit + offset; i++) {
317 key.push_back(i);
318 value.push_back(i);
319 g_deviceB->GetData(key, item);
320 EXPECT_TRUE(item.value == value);
321 key.pop_back();
322 value.pop_back();
323 }
324 }
325
326 /**
327 * @tc.name: Normal Sync 001
328 * @tc.desc: Test normal push_and_pull sync for keyprefix data.
329 * @tc.type: FUNC
330 * @tc.require: AR000FN6G9
331 * @tc.author: zhuwentao
332 */
333 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync003, TestSize.Level1)
334 {
335 InitNormalDb();
336 DBStatus status = OK;
337 std::vector<std::string> devices;
338 devices.push_back(g_deviceB->GetDeviceId());
339
340 /**
341 * @tc.steps: step1. deviceA put {k, v}, {b, v}
342 */
343 Key key = {'1'};
344 Value value = {'1'};
345 const int dataSize = 10;
346 status = g_kvDelegatePtr->Put(key, value);
347 ASSERT_TRUE(status == OK);
348 Key key2 = {'2'};
349 Value value2 = {'2'};
350 status = g_kvDelegatePtr->Put(key2, value2);
351 ASSERT_TRUE(status == OK);
352
353 /**
354 * @tc.steps: step2. deviceB put {b0, v0} - {b9, v9}, {c, v}
355 */
356 for (int i = 0; i < dataSize; i++) {
357 key2.push_back(i);
358 value2.push_back(i);
359 g_deviceB->PutData(key2, value2, 10 + i, 0);
360 key2.pop_back();
361 value2.pop_back();
362 }
363 Key key3 = {'3'};
364 Value value3 = {'3'};
365 g_deviceB->PutData(key3, value3, 20, 0);
366
367 /**
368 * @tc.steps: step2. deviceA call query sync and wait
369 * @tc.expected: step2. sync should return OK.
370 */
371 Query query = Query::Select().PrefixKey(key2);
372 std::map<std::string, DBStatus> result;
373 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, query);
374 ASSERT_TRUE(status == OK);
375
376 /**
377 * @tc.expected: step3. onComplete should be called, DeviceA have {b0, v0} - {b9, v9}, DeviceB have {b, v}
378 */
379 ASSERT_TRUE(result.size() == devices.size());
380 for (const auto &pair : result) {
381 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
382 EXPECT_TRUE(pair.second == OK);
383 }
384 VirtualDataItem item;
385 Value tmpValue;
386 for (int i = 0; i < dataSize; i++) {
387 key2.push_back(i);
388 value2.push_back(i);
389 g_kvDelegatePtr->Get(key2, tmpValue);
390 EXPECT_TRUE(tmpValue == value2);
391 key2.pop_back();
392 value2.pop_back();
393 }
394 EXPECT_TRUE(g_deviceB->GetData(key, item) != E_OK);
395 EXPECT_TRUE(g_deviceB->GetData(key2, item) == E_OK);
396 g_kvDelegatePtr->Get(key3, tmpValue);
397 EXPECT_TRUE(tmpValue != value3);
398 }
399
400 /**
401 * @tc.name: Normal Sync 001
402 * @tc.desc: Test normal pull sync for keyprefix data.
403 * @tc.type: FUNC
404 * @tc.require: AR000FN6G9
405 * @tc.author: zhuwentao
406 */
407 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync004, TestSize.Level1)
408 {
409 InitNormalDb();
410 DBStatus status = OK;
411 std::vector<std::string> devices;
412 devices.push_back(g_deviceB->GetDeviceId());
413 /**
414 * @tc.steps: step1. deviceB put {k1, v1} - {k9, k9}, {b0, v0} - {b9, v9}
415 */
416 Key key = {'1'};
417 Value value = {'1'};
418 const int dataSize = 10;
419 Key key2 = {'2'};
420 Value value2 = {'2'};
421 vector<std::pair<Key, Value>> key1Vec;
422 vector<std::pair<Key, Value>> key2Vec;
423 for (int i = 0; i < dataSize; i++) {
424 Key tmpKey(key);
425 Value tmpValue(value);
426 tmpKey.push_back(i);
427 tmpValue.push_back(i);
428 key1Vec.push_back(pair<Key, Value> {tmpKey, tmpValue});
429 }
430 for (int i = 0; i < dataSize; i++) {
431 Key tmpKey(key2);
432 Value tmpValue(value2);
433 tmpKey.push_back(i);
434 tmpValue.push_back(i);
435 key2Vec.push_back(pair<Key, Value> {tmpKey, tmpValue});
436 }
437 for (int i = 0; i < dataSize; i++) {
438 g_deviceB->PutData(key2Vec[i].first, key2Vec[i].second, 20 + i, 0);
439 g_deviceB->PutData(key1Vec[i].first, key1Vec[i].second, 10 + i, 0);
440 }
441
442 /**
443 * @tc.steps: step2. deviceA call query sync and wait
444 * @tc.expected: step2. sync should return OK.
445 */
446 Query query = Query::Select().PrefixKey(key2);
447 std::map<std::string, DBStatus> result;
448 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
449 ASSERT_TRUE(status == OK);
450
451 /**
452 * @tc.expected: step3. onComplete should be called, DeviceA have {b0, v0} - {b9, v9}
453 */
454 ASSERT_TRUE(result.size() == devices.size());
455 for (const auto &pair : result) {
456 EXPECT_TRUE(pair.second == OK);
457 }
458 VirtualDataItem item;
459 Value tmpValue;
460 for (int i = 0; i < dataSize; i++) {
461 g_kvDelegatePtr->Get(key2Vec[i].first, tmpValue);
462 EXPECT_TRUE(tmpValue == key2Vec[i].second);
463 g_kvDelegatePtr->Get(key1Vec[i].first, tmpValue);
464 EXPECT_TRUE(tmpValue != key1Vec[i].second);
465 }
466 }
467
468 /**
469 * @tc.name: NormalSync005
470 * @tc.desc: Test normal push sync for inkeys query.
471 * @tc.type: FUNC
472 * @tc.require: AR000GOHO7
473 * @tc.author: lidongwei
474 */
475 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync005, TestSize.Level1)
476 {
477 InitNormalDb();
478 std::vector<std::string> devices;
479 devices.push_back(g_deviceB->GetDeviceId());
480
481 /**
482 * @tc.steps: step1. deviceA put K1-K5
483 */
484 ASSERT_EQ(g_kvDelegatePtr->PutBatch(
485 {{KEY_1, VALUE_1}, {KEY_2, VALUE_2}, {KEY_3, VALUE_3}, {KEY_4, VALUE_4}, {KEY_5, VALUE_5}}), OK);
486
487 /**
488 * @tc.steps: step2. deviceA sync K2,K4 and wait
489 * @tc.expected: step2. sync should return OK.
490 */
491 Query query = Query::Select().InKeys({KEY_2, KEY_4});
492 std::map<std::string, DBStatus> result;
493 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), OK);
494
495 /**
496 * @tc.expected: step3. onComplete should be called.
497 */
498 ASSERT_EQ(result.size(), devices.size());
499 for (const auto &pair : result) {
500 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
501 EXPECT_EQ(pair.second, OK);
502 }
503
504 /**
505 * @tc.steps: step4. deviceB have K2K4 and have no K1K3K5.
506 * @tc.expected: step4. sync should return OK.
507 */
508 VirtualDataItem item;
509 EXPECT_EQ(g_deviceB->GetData(KEY_2, item), E_OK);
510 EXPECT_EQ(item.value, VALUE_2);
511 EXPECT_EQ(g_deviceB->GetData(KEY_4, item), E_OK);
512 EXPECT_EQ(item.value, VALUE_4);
513 EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
514 EXPECT_EQ(g_deviceB->GetData(KEY_3, item), -E_NOT_FOUND);
515 EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
516
517 /**
518 * @tc.steps: step5. deviceA sync with invalid inkeys query
519 * @tc.expected: step5. sync failed and the rc is right.
520 */
521 query = Query::Select().InKeys({});
522 result.clear();
523 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), INVALID_ARGS);
524
525 std::set<Key> keys;
526 for (uint8_t i = 0; i < DBConstant::MAX_BATCH_SIZE + 1; i++) {
527 Key key = { i };
528 keys.emplace(key);
529 }
530 query = Query::Select().InKeys(keys);
531 result.clear();
532 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), OVER_MAX_LIMITS);
533
534 query = Query::Select().InKeys({{}});
535 result.clear();
536 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), INVALID_ARGS);
537 }
538
539 /**
540 * @tc.name: NormalSync006
541 * @tc.desc: Test normal push sync with query by 32 devices;
542 * @tc.type: FUNC
543 * @tc.require:
544 * @tc.author: zhuwentao
545 */
546 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync006, TestSize.Level1)
547 {
548 /**
549 * @tc.steps: step1. init db and 32 devices
550 */
551 InitNormalDb();
552 uint32_t syncDevCount = 32u;
553 std::vector<KvVirtualDevice *> virtualDeviceVec(syncDevCount, nullptr);
554 const std::string device = "deviceTmp_";
555 std::vector<std::string> devices;
556 bool isError = false;
557 for (uint32_t i = 0; i < syncDevCount; i++) {
558 std::string tmpDev = device + std::to_string(i);
559 virtualDeviceVec[i] = new (std::nothrow) KvVirtualDevice(tmpDev);
560 if (virtualDeviceVec[i] == nullptr) {
561 isError = true;
562 break;
563 }
564 VirtualSingleVerSyncDBInterface *tmpSyncInterface = new (std::nothrow) VirtualSingleVerSyncDBInterface();
565 if (tmpSyncInterface == nullptr) {
566 isError = true;
567 break;
568 }
569 ASSERT_EQ(virtualDeviceVec[i]->Initialize(g_communicatorAggregator, tmpSyncInterface), E_OK);
570 devices.push_back(virtualDeviceVec[i]->GetDeviceId());
571 }
572 if (isError) {
573 for (uint32_t i = 0; i < syncDevCount; i++) {
574 if (virtualDeviceVec[i] != nullptr) {
575 delete virtualDeviceVec[i];
576 virtualDeviceVec[i] = nullptr;
577 }
578 }
579 ASSERT_TRUE(false);
580 }
581 /**
582 * @tc.steps: step2. deviceA put {k0, v0}
583 */
584 Key key = {'1'};
585 Value value = {'1'};
586 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
587 /**
588 * @tc.steps: step3. deviceA call query sync and wait
589 * @tc.expected: step3. sync should return OK.
590 */
591 Query query = Query::Select().PrefixKey(key);
592 std::map<std::string, DBStatus> result;
593 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query) == OK);
594
595 /**
596 * @tc.expected: step3. onComplete should be called, DeviceB have {k1,v1} - {k9, v9}
597 */
598 ASSERT_TRUE(result.size() == devices.size());
599 for (const auto &pair : result) {
600 EXPECT_TRUE(pair.second == OK);
601 }
602 VirtualDataItem item;
603 for (uint32_t i = 0; i < syncDevCount; i++) {
604 EXPECT_TRUE(virtualDeviceVec[i]->GetData(key, item) == E_OK);
605 EXPECT_EQ(item.value, value);
606 delete virtualDeviceVec[i];
607 virtualDeviceVec[i] = nullptr;
608 }
609 }
610
611 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, QueryRequestPacketTest001, TestSize.Level1)
612 {
613 /**
614 * @tc.steps: step1. prepare a QuerySyncRequestPacket.
615 */
616 auto packet = new (std::nothrow) DataRequestPacket;
617 ASSERT_TRUE(packet != nullptr);
618 auto kvEntry = new (std::nothrow) GenericSingleVerKvEntry;
619 ASSERT_TRUE(kvEntry != nullptr);
620 kvEntry->SetTimestamp(1);
621 SyncEntry syncData {.entries = {kvEntry}};
622 #ifndef OMIT_ZLIB
623 ASSERT_TRUE(GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
624 {CompressAlgorithm::ZLIB, SOFTWARE_VERSION_CURRENT}) == E_OK);
625 packet->SetCompressAlgo(CompressAlgorithm::ZLIB);
626 packet->SetFlag(4); // set IS_COMPRESS_DATA flag true
627 #endif
628 packet->SetBasicInfo(-E_NOT_SUPPORT, SOFTWARE_VERSION_CURRENT, SyncModeType::QUERY_PUSH_PULL);
629 packet->SetData(syncData.entries);
630 packet->SetCompressData(syncData.compressedEntries);
631 packet->SetEndWaterMark(INT8_MAX);
632 packet->SetWaterMark(INT16_MAX, INT32_MAX, INT64_MAX);
633 QuerySyncObject syncQuery(Query::Select().PrefixKey({'2'}));
634 packet->SetQuery(syncQuery);
635 packet->SetQueryId(syncQuery.GetIdentify());
636 packet->SetReserved(std::vector<uint64_t> {INT8_MAX});
637
638 /**
639 * @tc.steps: step2. put the QuerySyncRequestPacket into a message.
640 */
641 Message msg;
642 msg.SetExternalObject(packet);
643 msg.SetMessageId(QUERY_SYNC_MESSAGE);
644 msg.SetMessageType(TYPE_REQUEST);
645
646 /**
647 * @tc.steps: step3. Serialization the message to a buffer.
648 */
649 int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
650 vector<uint8_t> buffer(len);
651 ASSERT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), &msg), E_OK);
652
653 /**
654 * @tc.steps: step4. DeSerialization the buffer to a message.
655 */
656 Message outMsg(QUERY_SYNC_MESSAGE);
657 outMsg.SetMessageType(TYPE_REQUEST);
658 ASSERT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), &outMsg), E_OK);
659
660 /**
661 * @tc.steps: step5. checkout the outMsg.
662 * @tc.expected: step5. outMsg equal the the in msg
663 */
664 auto outPacket = outMsg.GetObject<DataRequestPacket>();
665 EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
666 EXPECT_EQ(outPacket->GetMode(), SyncModeType::QUERY_PUSH_PULL);
667 EXPECT_EQ(outPacket->GetEndWaterMark(), static_cast<uint64_t>(INT8_MAX));
668 EXPECT_EQ(outPacket->GetLocalWaterMark(), static_cast<uint64_t>(INT16_MAX));
669 EXPECT_EQ(outPacket->GetPeerWaterMark(), static_cast<uint64_t>(INT32_MAX));
670 EXPECT_EQ(outPacket->GetDeletedWaterMark(), static_cast<uint64_t>(INT64_MAX));
671 #ifndef OMIT_ZLIB
672 EXPECT_EQ(outPacket->GetFlag(), static_cast<uint32_t>(4)); // check IS_COMPRESS_DATA flag true
673 #endif
674 EXPECT_EQ(outPacket->GetQueryId(), syncQuery.GetIdentify());
675 EXPECT_EQ(outPacket->GetReserved(), std::vector<uint64_t> {INT8_MAX});
676 EXPECT_EQ(outPacket->GetSendCode(), -E_NOT_SUPPORT);
677 EXPECT_EQ(outPacket->GetData()[0]->GetTimestamp(), 1u);
678 }
679
680 /**
681 * @tc.name: QueryRequestPacketTest002
682 * @tc.desc: Test exception branch of serialization.
683 * @tc.type: FUNC
684 * @tc.require:
685 * @tc.author: zhangshijie
686 */
687 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, SerializationManager001, TestSize.Level1)
688 {
689 /**
690 * @tc.steps: step1. call SingleVerSerializeManager::Serialization with buffer = nullptr or msg = nullptr
691 * @tc.expected:step1 return -E_MESSAGE_ID_ERROR
692 */
693 Message msg;
694 msg.SetMessageType(TYPE_INVALID);
695 vector<uint8_t> buffer(10); // 10 is test buffer len
696 EXPECT_EQ(SingleVerSerializeManager::Serialization(nullptr, buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
697 EXPECT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), nullptr), -E_MESSAGE_ID_ERROR);
698
699 /**
700 * @tc.steps: step2. call SingleVerSerializeManager::Serialization with invalid type message
701 * @tc.expected:step2 return -E_MESSAGE_ID_ERROR
702 */
703 EXPECT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
704
705 /**
706 * @tc.steps: step3. call SingleVerSerializeManager::DeSerialization with buffer = nullptr or msg = nullptr
707 * @tc.expected:step3 return -E_MESSAGE_ID_ERROR
708 */
709 EXPECT_EQ(SingleVerSerializeManager::DeSerialization(nullptr, buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
710 EXPECT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), nullptr), -E_MESSAGE_ID_ERROR);
711
712 /**
713 * @tc.steps: step4. call SingleVerSerializeManager::DeSerialization with invalid type message
714 * @tc.expected:step4 return -E_MESSAGE_ID_ERROR
715 */
716 EXPECT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
717 }
718
719 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, QueryAckPacketTest001, TestSize.Level1)
720 {
721 /**
722 * @tc.steps: step1. prepare a QuerySyncAckPacket.
723 */
724 DataAckPacket packet;
725 packet.SetVersion(SOFTWARE_VERSION_CURRENT);
726 packet.SetData(INT64_MAX);
727 packet.SetRecvCode(-E_NOT_SUPPORT);
728 std::vector<uint64_t> reserved = {INT8_MAX};
729 packet.SetReserved(reserved);
730
731 /**
732 * @tc.steps: step2. put the QuerySyncAckPacket into a message.
733 */
734 Message msg;
735 msg.SetCopiedObject(packet);
736 msg.SetMessageId(QUERY_SYNC_MESSAGE);
737 msg.SetMessageType(TYPE_RESPONSE);
738
739 /**
740 * @tc.steps: step3. Serialization the message to a buffer.
741 */
742 int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
743 LOGE("test leng = %d", len);
744 uint8_t *buffer = new (nothrow) uint8_t[len];
745 ASSERT_TRUE(buffer != nullptr);
746 int errCode = SingleVerSerializeManager::Serialization(buffer, len, &msg);
747 ASSERT_EQ(errCode, E_OK);
748
749 /**
750 * @tc.steps: step4. DeSerialization the buffer to a message.
751 */
752 Message outMsg;
753 outMsg.SetMessageId(QUERY_SYNC_MESSAGE);
754 outMsg.SetMessageType(TYPE_RESPONSE);
755 errCode = SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg);
756 ASSERT_EQ(errCode, E_OK);
757
758 /**
759 * @tc.steps: step5. checkout the outMsg.
760 * @tc.expected: step5. outMsg equal the the in msg
761 */
762 auto outPacket = outMsg.GetObject<DataAckPacket>();
763 EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
764 EXPECT_EQ(outPacket->GetData(), static_cast<uint64_t>(INT64_MAX));
765 std::vector<uint64_t> reserved2 = {INT8_MAX};
766 EXPECT_EQ(outPacket->GetReserved(), reserved2);
767 EXPECT_EQ(outPacket->GetRecvCode(), -E_NOT_SUPPORT);
768 delete[] buffer;
769 }
770
771 /**
772 * @tc.name: GetQueryWaterMark 001
773 * @tc.desc: Test metaData save and get queryWaterMark.
774 * @tc.type: FUNC
775 * @tc.require: AR000FN6G9
776 * @tc.author: zhangqiquan
777 */
778 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark001, TestSize.Level1)
779 {
780 VirtualSingleVerSyncDBInterface storage;
781 Metadata meta;
782
783 /**
784 * @tc.steps: step1. initialize meta with storage
785 * @tc.expected: step1. E_OK
786 */
787 int errCode = meta.Initialize(&storage);
788 ASSERT_EQ(errCode, E_OK);
789
790 /**
791 * @tc.steps: step2. save receive and send watermark
792 * @tc.expected: step2. E_OK
793 */
794 WaterMark w1 = 1;
795 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w1), E_OK);
796 EXPECT_EQ(meta.SetSendQueryWaterMark("Q1", "D1", w1), E_OK);
797
798 /**
799 * @tc.steps: step3. get receive and send watermark
800 * @tc.expected: step3. E_OK and get the latest value
801 */
802 WaterMark w = 0;
803 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w), E_OK);
804 EXPECT_EQ(w1, w);
805 EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", w), E_OK);
806 EXPECT_EQ(w1, w);
807
808 /**
809 * @tc.steps: step4. set peer and local watermark
810 * @tc.expected: step4. E_OK
811 */
812 WaterMark w2 = 2;
813 EXPECT_EQ(meta.SaveLocalWaterMark("D1", w2), E_OK);
814 EXPECT_EQ(meta.SavePeerWaterMark("D1", w2, true), E_OK);
815
816 /**
817 * @tc.steps: step5. get receive and send watermark
818 * @tc.expected: step5. E_OK and get the w1
819 */
820 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w), E_OK);
821 EXPECT_EQ(w2, w);
822 EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", w), E_OK);
823 EXPECT_EQ(w2, w);
824
825 /**
826 * @tc.steps: step6. set peer and local watermark
827 * @tc.expected: step6. E_OK
828 */
829 WaterMark w3 = 3;
830 EXPECT_EQ(meta.SaveLocalWaterMark("D2", w3), E_OK);
831 EXPECT_EQ(meta.SavePeerWaterMark("D2", w3, true), E_OK);
832
833 /**
834 * @tc.steps: step7. get receive and send watermark
835 * @tc.expected: step7. E_OK and get the w3
836 */
837 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q2", "D2", w), E_OK);
838 EXPECT_EQ(w3, w);
839 EXPECT_EQ(meta.GetSendQueryWaterMark("Q2", "D2", w), E_OK);
840 EXPECT_EQ(w3, w);
841
842 /**
843 * @tc.steps: step8. get not exit receive and send watermark
844 * @tc.expected: step8. E_OK and get the 0
845 */
846 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q3", "D3", w), E_OK);
847 EXPECT_EQ(w, 0u);
848 EXPECT_EQ(meta.GetSendQueryWaterMark("Q3", "D3", w), E_OK);
849 EXPECT_EQ(w, 0u);
850 }
851
852 /**
853 * @tc.name: GetQueryWaterMark 002
854 * @tc.desc: Test metaData save and get queryWaterMark after push or pull mode.
855 * @tc.type: FUNC
856 * @tc.require: AR000FN6G9
857 * @tc.author: zhangqiquan
858 */
859 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark002, TestSize.Level1)
860 {
861 VirtualSingleVerSyncDBInterface storage;
862 Metadata meta;
863
864 /**
865 * @tc.steps: step1. initialize meta with storage
866 * @tc.expected: step1. E_OK
867 */
868 int errCode = meta.Initialize(&storage);
869 ASSERT_EQ(errCode, E_OK);
870
871 /**
872 * @tc.steps: step2. set peer and local watermark
873 * @tc.expected: step2. E_OK
874 */
875 WaterMark w1 = 2;
876 EXPECT_EQ(meta.SaveLocalWaterMark("D1", w1), E_OK);
877 EXPECT_EQ(meta.SavePeerWaterMark("D1", w1, true), E_OK);
878
879 /**
880 * @tc.steps: step2. save receive and send watermark
881 * @tc.expected: step2. E_OK
882 */
883 WaterMark w2 = 1;
884 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w2), E_OK);
885 EXPECT_EQ(meta.SetSendQueryWaterMark("Q1", "D1", w2), E_OK);
886
887 /**
888 * @tc.steps: step3. get receive and send watermark
889 * @tc.expected: step3. E_OK and get the bigger value
890 */
891 WaterMark w = 0;
892 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w), E_OK);
893 EXPECT_EQ(w1, w);
894 EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", w), E_OK);
895 EXPECT_EQ(w1, w);
896 }
897
898 /**
899 * @tc.name: GetQueryWaterMark 003
900 * @tc.desc: check time offset after remove water mark
901 * @tc.type: FUNC
902 * @tc.require:
903 * @tc.author: lianhuix
904 */
905 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark003, TestSize.Level1)
906 {
907 VirtualSingleVerSyncDBInterface storage;
908 Metadata meta;
909
910 int errCode = meta.Initialize(&storage);
911 ASSERT_EQ(errCode, E_OK);
912
913 const std::string DEVICE_B = "DEVICE_B";
914 TimeOffset offset = 100; // 100: offset
915 meta.SaveTimeOffset(DEVICE_B, offset);
916
917 WaterMark w1 = 2; // 2: watermark
918 meta.SavePeerWaterMark(DBCommon::TransferHashString(DEVICE_B), w1, false);
919
920 TimeOffset offsetGot;
921 meta.GetTimeOffset(DEVICE_B, offsetGot);
922 EXPECT_EQ(offsetGot, offset);
923 }
924
925 /**
926 * @tc.name: GetDeleteWaterMark001
927 * @tc.desc: Test metaData save and get deleteWaterMark.
928 * @tc.type: FUNC
929 * @tc.require: AR000FN6G9
930 * @tc.author: zhangqiquan
931 */
932 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteWaterMark001, TestSize.Level1)
933 {
934 VirtualSingleVerSyncDBInterface storage;
935 Metadata meta;
936
937 /**
938 * @tc.steps: step1. initialize meta with storage
939 * @tc.expected: step1. E_OK
940 */
941 int errCode = meta.Initialize(&storage);
942 ASSERT_EQ(errCode, E_OK);
943
944 /**
945 * @tc.steps: step2. set and get recv/send delete watermark
946 * @tc.expected: step2. set E_OK and get water mark is equal with last set
947 */
948 const std::string device = "DEVICE";
949 const WaterMark maxWaterMark = 1000u;
__anon6100a1770202() 950 std::thread recvThread([&meta, &device, &maxWaterMark]() {
951 for (WaterMark expectRecv = 0u; expectRecv < maxWaterMark; ++expectRecv) {
952 EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark(device, expectRecv), E_OK);
953 WaterMark actualRecv = 0u;
954 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark(device, actualRecv), E_OK);
955 EXPECT_EQ(actualRecv, expectRecv);
956 }
957 });
__anon6100a1770302() 958 std::thread sendThread([&meta, &device, &maxWaterMark]() {
959 for (WaterMark expectSend = 0u; expectSend < maxWaterMark; ++expectSend) {
960 EXPECT_EQ(meta.SetSendDeleteSyncWaterMark(device, expectSend), E_OK);
961 WaterMark actualSend = 0u;
962 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark(device, actualSend), E_OK);
963 EXPECT_EQ(actualSend, expectSend);
964 }
965 });
966 recvThread.join();
967 sendThread.join();
968 }
969
970 /**
971 * @tc.name: ClearQueryWaterMark 001
972 * @tc.desc: Test metaData clear watermark function.
973 * @tc.type: FUNC
974 * @tc.require: AR000FN6G9
975 * @tc.author: zhangqiquan
976 */
977 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark001, TestSize.Level1)
978 {
979 VirtualSingleVerSyncDBInterface storage;
980 Metadata meta;
981
982 /**
983 * @tc.steps: step1. initialize meta with storage
984 * @tc.expected: step1. E_OK
985 */
986 int errCode = meta.Initialize(&storage);
987 ASSERT_EQ(errCode, E_OK);
988
989 /**
990 * @tc.steps: step2. save receive watermark
991 * @tc.expected: step2. E_OK
992 */
993 WaterMark w1 = 1;
994 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w1), E_OK);
995
996 /**
997 * @tc.steps: step3. erase peer watermark
998 * @tc.expected: step3. E_OK
999 */
1000 EXPECT_EQ(meta.EraseDeviceWaterMark("D1", true), E_OK);
1001
1002 /**
1003 * @tc.steps: step4. get receive watermark
1004 * @tc.expected: step4. E_OK receive watermark is zero
1005 */
1006 WaterMark w2 = -1;
1007 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w2), E_OK);
1008 EXPECT_EQ(w2, 0u);
1009
1010 /**
1011 * @tc.steps: step5. set peer watermark
1012 * @tc.expected: step5. E_OK
1013 */
1014 WaterMark w3 = 2;
1015 EXPECT_EQ(meta.SavePeerWaterMark("D1", w3, true), E_OK);
1016
1017 /**
1018 * @tc.steps: step6. get receive watermark
1019 * @tc.expected: step6. E_OK receive watermark is peer watermark
1020 */
1021 WaterMark w4 = -1;
1022 EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w4), E_OK);
1023 EXPECT_EQ(w4, w3);
1024 }
1025
1026 /**
1027 * @tc.name: ClearQueryWaterMark 002
1028 * @tc.desc: Test metaData clear watermark function.
1029 * @tc.type: FUNC
1030 * @tc.require: AR000FN6G9
1031 * @tc.author: zhangqiquan
1032 */
1033 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark002, TestSize.Level1)
1034 {
1035 VirtualSingleVerSyncDBInterface storage;
1036 Metadata meta;
1037
1038 /**
1039 * @tc.steps: step1. initialize meta with storage
1040 * @tc.expected: step1. E_OK
1041 */
1042 int errCode = meta.Initialize(&storage);
1043 ASSERT_EQ(errCode, E_OK);
1044
1045 /**
1046 * @tc.steps: step2. save receive watermark
1047 * @tc.expected: step2. E_OK
1048 */
1049 WaterMark w1 = 1;
1050 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w1), E_OK);
1051 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q2", "D1", w1), E_OK);
1052 EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D2", w1), E_OK);
1053
1054 /**
1055 * @tc.steps: step3. erase peer watermark, make sure data remove in db
1056 * @tc.expected: step3. E_OK
1057 */
1058 Metadata anotherMeta;
1059 ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1060 EXPECT_EQ(anotherMeta.EraseDeviceWaterMark("D1", true), E_OK);
1061
1062 /**
1063 * @tc.steps: step4. get receive watermark
1064 * @tc.expected: step4. E_OK receive watermark is zero
1065 */
1066 WaterMark w2 = -1;
1067 EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q1", "D1", w2), E_OK);
1068 EXPECT_EQ(w2, 0u);
1069 w2 = -1;
1070 EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q2", "D1", w2), E_OK);
1071 EXPECT_EQ(w2, 0u);
1072 w2 = -1;
1073 EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q1", "D2", w2), E_OK);
1074 EXPECT_EQ(w2, w1);
1075 }
1076
1077 /**
1078 * @tc.name: ClearQueryWaterMark 003
1079 * @tc.desc: Test metaData clear watermark busy.
1080 * @tc.type: FUNC
1081 * @tc.require: AR000FN6G9
1082 * @tc.author: zhangqiquan
1083 */
1084 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark003, TestSize.Level1)
1085 {
1086 VirtualSingleVerSyncDBInterface storage;
1087 Metadata meta;
1088 /**
1089 * @tc.steps: step1. initialize meta with storage
1090 * @tc.expected: step1. E_OK
1091 */
1092 int errCode = meta.Initialize(&storage);
1093 ASSERT_EQ(errCode, E_OK);
1094 /**
1095 * @tc.steps: step2. set busy and erase water mark
1096 * @tc.expected: step2. -E_BUSY
1097 */
1098 storage.SetBusy(false, true);
1099 EXPECT_EQ(meta.EraseDeviceWaterMark("DEVICE_ID", true), -E_BUSY);
1100 }
1101
1102 /**
1103 * @tc.name: GetQueryLastTimestamp001
1104 * @tc.desc: Test function of GetQueryLastTimestamp.
1105 * @tc.type: FUNC
1106 * @tc.require: AR000FN6G9
1107 * @tc.author: zhangshijie
1108 */
1109 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryLastTimestamp001, TestSize.Level1)
1110 {
1111 /**
1112 * @tc.steps: step1. initialize meta with nullptr
1113 * @tc.expected: step1. return -E_INVALID_DB
1114 */
1115 Metadata meta;
1116 EXPECT_EQ(meta.Initialize(nullptr), -E_INVALID_DB);
1117
1118 /**
1119 * @tc.steps: step2. initialize meta with storage
1120 * @tc.expected: step2. E_OK
1121 */
1122 VirtualSingleVerSyncDBInterface storage;
1123 int errCode = meta.Initialize(&storage);
1124 ASSERT_EQ(errCode, E_OK);
1125
1126 /**
1127 * @tc.steps: step3. call GetQueryLastTimestamp with a non-exists device
1128 * @tc.expected: step3. return INT64_MAX
1129 */
1130 EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q1"), static_cast<uint64_t>(INT64_MAX));
1131
1132 /**
1133 * @tc.steps: step4. call GetQueryLastTimestamp with device D1 again
1134 * @tc.expected: step4. return 0
1135 */
1136 EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q1"), 0u);
1137
1138 /**
1139 * @tc.steps: step5. call GetQueryLastTimestamp with device D1 and Q2
1140 * @tc.expected: step5. return INT64_MAX
1141 */
1142 EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q2"), static_cast<uint64_t>(INT64_MAX));
1143 }
1144
1145 /**
1146 * @tc.name: MetaDataExceptionBranch001
1147 * @tc.desc: Test execption branch of meata data.
1148 * @tc.type: FUNC
1149 * @tc.require: AR000FN6G9
1150 * @tc.author: zhangshijie
1151 */
1152 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, MetaDataExceptionBranch001, TestSize.Level1)
1153 {
1154 /**
1155 * @tc.steps: step1. call GetRemoveDataMark with a device not in map
1156 * @tc.expected: step1. out value = 0
1157 */
1158 Metadata meta;
1159 uint64_t val = 99; // 99 is the initial value of outValue
1160 uint64_t outValue = val;
1161 meta.GetRemoveDataMark("D1", outValue);
1162 EXPECT_EQ(outValue, 0u);
1163
1164 /**
1165 * @tc.steps: step2. reset outValue, call GetDbCreateTime with a device not in map
1166 * @tc.expected: step2. out value = 0
1167 */
1168 outValue = val;
1169 meta.GetDbCreateTime("D1", outValue);
1170 EXPECT_EQ(outValue, 0u);
1171
1172 /**
1173 * @tc.steps: step3. call ResetMetaDataAfterRemoveData with a device not in map
1174 * @tc.expected: step3. return -E_NOT_FOUND
1175 */
1176 EXPECT_EQ(meta.ResetMetaDataAfterRemoveData("D1"), -E_NOT_FOUND);
1177 }
1178
1179 /**
1180 * @tc.name: GetDeleteKeyWaterMark 001
1181 * @tc.desc: Test metaData save and get deleteWaterMark.
1182 * @tc.type: FUNC
1183 * @tc.require: AR000FN6G9
1184 * @tc.author: zhangqiquan
1185 */
1186 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteKeyWaterMark001, TestSize.Level1)
1187 {
1188 VirtualSingleVerSyncDBInterface storage;
1189 Metadata meta;
1190
1191 /**
1192 * @tc.steps: step1. initialize meta with storage
1193 * @tc.expected: step1. E_OK
1194 */
1195 int errCode = meta.Initialize(&storage);
1196 ASSERT_EQ(errCode, E_OK);
1197
1198 /**
1199 * @tc.steps: step2. save receive and send watermark
1200 * @tc.expected: step2. E_OK
1201 */
1202 WaterMark w1 = 1;
1203 EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", w1), E_OK);
1204 EXPECT_EQ(meta.SetSendDeleteSyncWaterMark("D1", w1), E_OK);
1205
1206 /**
1207 * @tc.steps: step3. get receive and send watermark
1208 * @tc.expected: step3. E_OK and get the latest value
1209 */
1210 WaterMark w = 0;
1211 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w), E_OK);
1212 EXPECT_EQ(w1, w);
1213 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", w), E_OK);
1214 EXPECT_EQ(w1, w);
1215
1216 /**
1217 * @tc.steps: step4. set peer and local watermark
1218 * @tc.expected: step4. E_OK
1219 */
1220 WaterMark w2 = 2;
1221 EXPECT_EQ(meta.SaveLocalWaterMark("D1", w2), E_OK);
1222 EXPECT_EQ(meta.SavePeerWaterMark("D1", w2, true), E_OK);
1223
1224 /**
1225 * @tc.steps: step5. get receive and send watermark
1226 * @tc.expected: step5. E_OK and get the w1
1227 */
1228 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w), E_OK);
1229 EXPECT_EQ(w2, w);
1230 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", w), E_OK);
1231 EXPECT_EQ(w2, w);
1232
1233 /**
1234 * @tc.steps: step6. set peer and local watermark
1235 * @tc.expected: step6. E_OK
1236 */
1237 WaterMark w3 = 3;
1238 EXPECT_EQ(meta.SaveLocalWaterMark("D2", w3), E_OK);
1239 EXPECT_EQ(meta.SavePeerWaterMark("D2", w3, true), E_OK);
1240
1241 /**
1242 * @tc.steps: step7. get receive and send watermark
1243 * @tc.expected: step7. E_OK and get the w3
1244 */
1245 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D2", w), E_OK);
1246 EXPECT_EQ(w3, w);
1247 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D2", w), E_OK);
1248 EXPECT_EQ(w3, w);
1249
1250 /**
1251 * @tc.steps: step8. get not exit receive and send watermark
1252 * @tc.expected: step8. E_OK and get the 0
1253 */
1254 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D3", w), E_OK);
1255 EXPECT_EQ(w, 0u);
1256 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D3", w), E_OK);
1257 EXPECT_EQ(w, 0u);
1258 }
1259
1260 /**
1261 * @tc.name: GetDeleteKeyWaterMark 002
1262 * @tc.desc: Test metaData save and get deleteWaterMark after push or pull mode.
1263 * @tc.type: FUNC
1264 * @tc.require: AR000FN6G9
1265 * @tc.author: zhangqiquan
1266 */
1267 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteKeyWaterMark002, TestSize.Level1)
1268 {
1269 VirtualSingleVerSyncDBInterface storage;
1270 Metadata meta;
1271
1272 /**
1273 * @tc.steps: step1. initialize meta with storage
1274 * @tc.expected: step1. E_OK
1275 */
1276 int errCode = meta.Initialize(&storage);
1277 ASSERT_EQ(errCode, E_OK);
1278
1279 /**
1280 * @tc.steps: step2. set peer and local watermark
1281 * @tc.expected: step2. E_OK
1282 */
1283 WaterMark w1 = 3;
1284 EXPECT_EQ(meta.SaveLocalWaterMark("D1", w1), E_OK);
1285 EXPECT_EQ(meta.SavePeerWaterMark("D1", w1, true), E_OK);
1286
1287 /**
1288 * @tc.steps: step2. save receive and send watermark
1289 * @tc.expected: step2. E_OK
1290 */
1291 WaterMark w2 = 1;
1292 EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", w2), E_OK);
1293 EXPECT_EQ(meta.SetSendDeleteSyncWaterMark("D1", w2), E_OK);
1294
1295 /**
1296 * @tc.steps: step3. get receive and send watermark
1297 * @tc.expected: step3. E_OK and get the bigger value
1298 */
1299 WaterMark w = 0;
1300 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w), E_OK);
1301 EXPECT_EQ(w1, w);
1302 EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", w), E_OK);
1303 EXPECT_EQ(w1, w);
1304 }
1305
1306 /**
1307 * @tc.name: ClearDeleteKeyWaterMark 001
1308 * @tc.desc: Test metaData clear watermark function.
1309 * @tc.type: FUNC
1310 * @tc.require: AR000FN6G9
1311 * @tc.author: zhangqiquan
1312 */
1313 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearDeleteKeyWaterMark001, TestSize.Level1)
1314 {
1315 VirtualSingleVerSyncDBInterface storage;
1316 Metadata meta;
1317
1318 /**
1319 * @tc.steps: step1. initialize meta with storage
1320 * @tc.expected: step1. E_OK
1321 */
1322 int errCode = meta.Initialize(&storage);
1323 ASSERT_EQ(errCode, E_OK);
1324
1325 /**
1326 * @tc.steps: step2. save receive watermark
1327 * @tc.expected: step2. E_OK
1328 */
1329 WaterMark w1 = 1;
1330 EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", w1), E_OK);
1331
1332 /**
1333 * @tc.steps: step3. erase peer watermark
1334 * @tc.expected: step3. E_OK
1335 */
1336 EXPECT_EQ(meta.EraseDeviceWaterMark("D1", true), E_OK);
1337
1338 /**
1339 * @tc.steps: step4. get receive watermark
1340 * @tc.expected: step4. E_OK receive watermark is zero
1341 */
1342 WaterMark w2 = -1;
1343 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w2), E_OK);
1344 EXPECT_EQ(w2, 0u);
1345
1346 /**
1347 * @tc.steps: step5. set peer watermark
1348 * @tc.expected: step5. E_OK
1349 */
1350 WaterMark w3 = 2;
1351 EXPECT_EQ(meta.SavePeerWaterMark("D1", w3, true), E_OK);
1352
1353 /**
1354 * @tc.steps: step6. get receive watermark
1355 * @tc.expected: step6. E_OK receive watermark is peer watermark
1356 */
1357 WaterMark w4 = -1;
1358 EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w4), E_OK);
1359 EXPECT_EQ(w4, w3);
1360 }
1361
1362 /**
1363 * @tc.name: VerifyCacheAndDb 001
1364 * @tc.desc: Test metaData watermark cache and db are consistent and correct.
1365 * @tc.type: FUNC
1366 * @tc.require: AR000FN6G9
1367 * @tc.author: zhangqiquan
1368 */
1369 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDataQuerySync001, TestSize.Level1)
1370 {
1371 Metadata meta;
1372 VirtualSingleVerSyncDBInterface storage;
1373
1374 /**
1375 * @tc.steps: step1. initialize meta with storage
1376 * @tc.expected: step1. E_OK
1377 */
1378 int errCode = meta.Initialize(&storage);
1379 ASSERT_EQ(errCode, E_OK);
1380
1381 const std::string deviceId = "D1";
1382 const std::string queryId = "Q1";
1383
1384 /**
1385 * @tc.steps: step2. save deleteSync watermark
1386 * @tc.expected: step2. E_OK
1387 */
1388 WaterMark deleteWaterMark = 1;
1389 EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark(deviceId, deleteWaterMark), E_OK);
1390 EXPECT_EQ(meta.SetSendDeleteSyncWaterMark(deviceId, deleteWaterMark), E_OK);
1391
1392 /**
1393 * @tc.steps: step3. save querySync watermark
1394 * @tc.expected: step2. E_OK
1395 */
1396 WaterMark queryWaterMark = 2;
1397 EXPECT_EQ(meta.SetRecvQueryWaterMark(queryId, deviceId, queryWaterMark), E_OK);
1398 EXPECT_EQ(meta.SetSendQueryWaterMark(queryId, deviceId, queryWaterMark), E_OK);
1399
1400 /**
1401 * @tc.steps: step4. initialize meta with storage
1402 * @tc.expected: step4. E_OK
1403 */
1404 Metadata anotherMeta;
1405 ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1406
1407 /**
1408 * @tc.steps: step5. verify delete sync data
1409 * @tc.expected: step5. E_OK and waterMark equal to deleteWaterMark
1410 */
1411 WaterMark waterMark;
1412 EXPECT_EQ(anotherMeta.GetRecvDeleteSyncWaterMark(deviceId, waterMark), E_OK);
1413 EXPECT_EQ(waterMark, deleteWaterMark);
1414 EXPECT_EQ(anotherMeta.GetSendDeleteSyncWaterMark(deviceId, waterMark), E_OK);
1415 EXPECT_EQ(waterMark, deleteWaterMark);
1416
1417 /**
1418 * @tc.steps: step6. verify query sync data
1419 * @tc.expected: step6. E_OK and waterMark equal to queryWaterMark
1420 */
1421 EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark(queryId, deviceId, waterMark), E_OK);
1422 EXPECT_EQ(waterMark, queryWaterMark);
1423 EXPECT_EQ(anotherMeta.GetSendQueryWaterMark(queryId, deviceId, waterMark), E_OK);
1424 EXPECT_EQ(waterMark, queryWaterMark);
1425 }
1426
1427 /**
1428 * @tc.name: VerifyLruMap 001
1429 * @tc.desc: Test metaData watermark cache lru ability.
1430 * @tc.type: FUNC
1431 * @tc.require: AR000FN6G9
1432 * @tc.author: zhangqiquan
1433 */
1434 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyLruMap001, TestSize.Level1)
1435 {
1436 LruMap<std::string, QueryWaterMark> lruMap;
1437 const int maxCacheItems = 200;
1438
1439 /**
1440 * @tc.steps: step1. fill items to LruMap
1441 * @tc.expected: step1. E_OK
1442 */
1443 const int startCount = 0;
1444 for (int i = startCount; i < maxCacheItems; i++) {
1445 std::string key = std::to_string(i);
1446 QueryWaterMark value;
1447 value.recvWaterMark = static_cast<uint64_t>(i + 1);
1448 EXPECT_EQ(lruMap.Put(key, value), E_OK);
1449 }
1450
1451 /**
1452 * @tc.steps: step2. get the first item
1453 * @tc.expected: step2. E_OK first item will move to last
1454 */
1455 std::string firstItemKey = std::to_string(startCount);
1456 QueryWaterMark firstItemValue;
1457 EXPECT_EQ(lruMap.Get(firstItemKey, firstItemValue), E_OK);
1458 EXPECT_EQ(firstItemValue.recvWaterMark, 1u);
1459
1460 /**
1461 * @tc.steps: step3. insert new items to LruMap
1462 * @tc.expected: step3. the second items was removed
1463 */
1464 std::string key = std::to_string(maxCacheItems);
1465 QueryWaterMark value;
1466 value.recvWaterMark = maxCacheItems;
1467 EXPECT_EQ(lruMap.Put(key, value), E_OK);
1468
1469 /**
1470 * @tc.steps: step4. get the second item
1471 * @tc.expected: step4. E_NOT_FOUND it was removed by algorithm
1472 */
1473 std::string secondItemKey = std::to_string(startCount + 1);
1474 QueryWaterMark secondItemValue;
1475 EXPECT_EQ(lruMap.Get(secondItemKey, secondItemValue), -E_NOT_FOUND);
1476 EXPECT_EQ(secondItemValue.recvWaterMark, 0u);
1477 }
1478
1479 /**
1480 * @tc.name: VerifyMetaDataInit 001
1481 * @tc.desc: Test metaData init correctly
1482 * @tc.type: FUNC
1483 * @tc.require: AR000FN6G9
1484 * @tc.author: zhangqiquan
1485 */
1486 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDataInit001, TestSize.Level1)
1487 {
1488 Metadata meta;
1489 VirtualSingleVerSyncDBInterface storage;
1490
1491 /**
1492 * @tc.steps: step1. initialize meta with storage
1493 * @tc.expected: step1. E_OK
1494 */
1495 ASSERT_EQ(meta.Initialize(&storage), E_OK);
1496
1497 DeviceID deviceA = "DeviceA";
1498 DeviceID deviceB = "DeviceA";
1499 WaterMark setWaterMark = 1;
1500
1501 /**
1502 * @tc.steps: step2. meta save and get waterMark
1503 * @tc.expected: step2. expect get the same waterMark
1504 */
1505 EXPECT_EQ(meta.SaveLocalWaterMark(deviceA, setWaterMark), E_OK);
1506 EXPECT_EQ(meta.SaveLocalWaterMark(deviceB, setWaterMark), E_OK);
1507 WaterMark getWaterMark = 0;
1508 meta.GetLocalWaterMark(deviceA, getWaterMark);
1509 EXPECT_EQ(getWaterMark, setWaterMark);
1510 meta.GetLocalWaterMark(deviceB, getWaterMark);
1511 EXPECT_EQ(getWaterMark, setWaterMark);
1512
1513
1514 /**
1515 * @tc.steps: step3. init again
1516 * @tc.expected: step3. E_OK
1517 */
1518 Metadata anotherMeta;
1519 ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1520
1521 /**
1522 * @tc.steps: step4. get waterMark again
1523 * @tc.expected: step4. expect get the same waterMark
1524 */
1525 anotherMeta.GetLocalWaterMark(deviceA, getWaterMark);
1526 EXPECT_EQ(getWaterMark, setWaterMark);
1527 anotherMeta.GetLocalWaterMark(deviceB, getWaterMark);
1528 EXPECT_EQ(getWaterMark, setWaterMark);
1529 }
1530
1531 namespace {
InitVerifyStorageEnvironment(Metadata & meta,VirtualSingleVerSyncDBInterface & storage,const std::string & deviceId,const int & startCount,const uint32_t & maxStoreItems)1532 void InitVerifyStorageEnvironment(Metadata &meta, VirtualSingleVerSyncDBInterface &storage,
1533 const std::string &deviceId, const int &startCount, const uint32_t &maxStoreItems)
1534 {
1535 /**
1536 * @tc.steps: step1. initialize meta with storage
1537 * @tc.expected: step1. E_OK
1538 */
1539 ASSERT_EQ(meta.Initialize(&storage), E_OK);
1540
1541 /**
1542 * @tc.steps: step2. fill items to metadata
1543 * @tc.expected: step2. E_OK
1544 */
1545 for (uint32_t i = startCount; i < maxStoreItems; i++) {
1546 std::string queryId = std::to_string(i);
1547 WaterMark recvWaterMark = i + 1;
1548 EXPECT_EQ(meta.SetRecvQueryWaterMark(queryId, deviceId, recvWaterMark), E_OK);
1549 }
1550 }
1551 }
1552
1553 /**
1554 * @tc.name: VerifyManagerQuerySyncStorage 001
1555 * @tc.desc: Test metaData remove least used querySync storage items.
1556 * @tc.type: FUNC
1557 * @tc.require: AR000FN6G9
1558 * @tc.author: zhangqiquan
1559 */
1560 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyManagerQuerySyncStorage001, TestSize.Level3)
1561 {
1562 Metadata meta;
1563 VirtualSingleVerSyncDBInterface storage;
1564 const uint32_t maxStoreItems = 100000;
1565 const int startCount = 0;
1566 const std::string deviceId = "Device";
1567
1568 InitVerifyStorageEnvironment(meta, storage, deviceId, startCount, maxStoreItems);
1569
1570 /**
1571 * @tc.steps: step3. insert new items to metadata
1572 * @tc.expected: step3. E_OK
1573 */
1574 std::string newQueryId = std::to_string(maxStoreItems);
1575 WaterMark newWaterMark = maxStoreItems + 1;
1576 EXPECT_EQ(meta.SetRecvQueryWaterMark(newQueryId, deviceId, newWaterMark), E_OK);
1577
1578 /**
1579 * @tc.steps: step4. touch the first item
1580 * @tc.expected: step4. E_OK update first item used time
1581 */
1582 std::string firstItemKey = std::to_string(startCount);
1583 WaterMark firstWaterMark = 11u;
1584 EXPECT_EQ(meta.SetRecvQueryWaterMark(firstItemKey, deviceId, firstWaterMark), E_OK);
1585
1586 /**
1587 * @tc.steps: step5. initialize new meta with storage
1588 * @tc.expected: step5. the second item will be removed
1589 */
1590 Metadata newMeta;
1591 ASSERT_EQ(newMeta.Initialize(&storage), E_OK);
1592
1593 /**
1594 * @tc.steps: step6. touch the first item
1595 * @tc.expected: step6. E_OK it still exist
1596 */
1597 WaterMark exceptWaterMark;
1598 EXPECT_EQ(newMeta.GetRecvQueryWaterMark(firstItemKey, deviceId, exceptWaterMark), E_OK);
1599 EXPECT_EQ(exceptWaterMark, firstWaterMark);
1600
1601 /**
1602 * @tc.steps: step7. get the second item
1603 * @tc.expected: step7. NOT_FOUND secondWaterMark is zero
1604 */
1605 WaterMark secondWaterMark;
1606 std::string secondQueryId = std::to_string(startCount + 1);
1607 EXPECT_EQ(newMeta.GetRecvQueryWaterMark(secondQueryId, deviceId, secondWaterMark), E_OK);
1608 EXPECT_EQ(secondWaterMark, 0u);
1609 }
1610
1611 /**
1612 * @tc.name: VerifyMetaDbCreateTime 001
1613 * @tc.desc: Test metaData get and set cbCreateTime.
1614 * @tc.type: FUNC
1615 * @tc.require: AR000FN6G9
1616 * @tc.author: zhuwentao
1617 */
1618 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDbCreateTime001, TestSize.Level1)
1619 {
1620 Metadata meta;
1621 VirtualSingleVerSyncDBInterface storage;
1622 /**
1623 * @tc.steps: step1. initialize meta with storage
1624 * @tc.expected: step1. E_OK
1625 */
1626 int errCode = meta.Initialize(&storage);
1627 ASSERT_EQ(errCode, E_OK);
1628 /**
1629 * @tc.steps: step2. set local and peer watermark and dbCreateTime
1630 * @tc.expected: step4. E_OK
1631 */
1632 WaterMark value = 2;
1633 EXPECT_EQ(meta.SaveLocalWaterMark("D1", value), E_OK);
1634 EXPECT_EQ(meta.SavePeerWaterMark("D1", value, true), E_OK);
1635 EXPECT_EQ(meta.SetDbCreateTime("D1", 10u, true), E_OK);
1636 /**
1637 * @tc.steps: step3. check peer and local watermark and dbCreateTime
1638 * @tc.expected: step4. E_OK
1639 */
1640 WaterMark curValue = 0;
1641 meta.GetLocalWaterMark("D1", curValue);
1642 EXPECT_EQ(value, curValue);
1643 meta.GetPeerWaterMark("D1", curValue);
1644 EXPECT_EQ(value, curValue);
1645 uint64_t curDbCreatTime = 0;
1646 meta.GetDbCreateTime("D1", curDbCreatTime);
1647 EXPECT_EQ(curDbCreatTime, 10u);
1648 /**
1649 * @tc.steps: step3. change dbCreateTime and check
1650 * @tc.expected: step4. E_OK
1651 */
1652 EXPECT_EQ(meta.SetDbCreateTime("D1", 20u, true), E_OK);
1653 uint64_t clearDeviceDataMark = INT_MAX;
1654 meta.GetRemoveDataMark("D1", clearDeviceDataMark);
1655 EXPECT_EQ(clearDeviceDataMark, 1u);
1656 EXPECT_EQ(meta.ResetMetaDataAfterRemoveData("D1"), E_OK);
1657 meta.GetRemoveDataMark("D1", clearDeviceDataMark);
1658 EXPECT_EQ(clearDeviceDataMark, 0u);
1659 meta.GetDbCreateTime("D1", curDbCreatTime);
1660 EXPECT_EQ(curDbCreatTime, 20u);
1661 }
1662
1663 /**
1664 * @tc.name: VerifyManagerQuerySyncStorage 002
1665 * @tc.desc: Test metaData remove least used querySync storage items when exit wrong data.
1666 * @tc.type: FUNC
1667 * @tc.require: AR000FN6G9
1668 * @tc.author: zhangqiquan
1669 */
1670 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyManagerQuerySyncStorage002, TestSize.Level3)
1671 {
1672 Metadata meta;
1673 VirtualSingleVerSyncDBInterface storage;
1674 const uint32_t maxStoreItems = 100000;
1675 const int startCount = 0;
1676 const std::string deviceId = "Device";
1677
1678 InitVerifyStorageEnvironment(meta, storage, deviceId, startCount, maxStoreItems);
1679
1680 /**
1681 * @tc.steps: step3. insert a wrong Value
1682 * @tc.expected: step3. E_OK
1683 */
1684 std::string newQueryId = std::to_string(maxStoreItems);
1685 Key dbKey;
1686 DBCommon::StringToVector(QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey()
1687 + DBCommon::TransferHashString(deviceId) + newQueryId, dbKey);
1688 Value wrongValue;
1689 EXPECT_EQ(storage.PutMetaData(dbKey, wrongValue, false), E_OK);
1690
1691 /**
1692 * @tc.steps: step4. initialize new meta with storage
1693 * @tc.expected: step4. E_OK
1694 */
1695 Metadata newMeta;
1696 ASSERT_EQ(newMeta.Initialize(&storage), E_OK);
1697
1698 /**
1699 * @tc.steps: step5. touch the first item
1700 * @tc.expected: step5. E_OK still exit
1701 */
1702 std::string firstItemKey = std::to_string(startCount);
1703 WaterMark exceptWaterMark;
1704 EXPECT_EQ(newMeta.GetRecvQueryWaterMark(firstItemKey, deviceId, exceptWaterMark), E_OK);
1705 EXPECT_EQ(exceptWaterMark, 1u);
1706 }
1707
1708 /**
1709 * @tc.name: AllPredicateQuerySync001
1710 * @tc.desc: Test normal push sync for AllPredicate data.
1711 * @tc.type: FUNC
1712 * @tc.require: AR000FN6G9
1713 * @tc.author: zhuwentao
1714 */
1715 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync001, TestSize.Level1)
1716 {
1717 /**
1718 * @tc.steps: step1. InitSchemaDb
1719 */
1720 InitSchemaDb();
1721 DBStatus status = OK;
1722 std::vector<std::string> devices;
1723 devices.push_back(g_deviceB->GetDeviceId());
1724
1725 /**
1726 * @tc.steps: step2. deviceA put {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1727 {key21, SCHEMA_VALUE2} - {key29, SCHEMA_VALUE2}
1728 */
1729 Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1730 Value value2(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end());
1731 Key key = {'1'};
1732 Key key2 = {'2'};
1733 const int dataSize = 4000;
1734 for (int i = 0; i < dataSize; i++) {
1735 key.push_back(i);
1736 key2.push_back(i);
1737 status = g_schemaKvDelegatePtr->Put(key, value);
1738 ASSERT_TRUE(status == OK);
1739 status = g_schemaKvDelegatePtr->Put(key2, value2);
1740 ASSERT_TRUE(status == OK);
1741 key.pop_back();
1742 key2.pop_back();
1743 }
1744 ASSERT_TRUE(status == OK);
1745
1746 /**
1747 * @tc.steps: step3. deviceA call query sync and wait
1748 * @tc.expected: step3. sync should return OK.
1749 */
1750 Query query = Query::Select().EqualTo("$.field_name1", 1);
1751 std::map<std::string, DBStatus> result;
1752 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1753 ASSERT_TRUE(status == OK);
1754
1755 /**
1756 * @tc.expected: step4. onComplete should be called, DeviceB have {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1757 */
1758 ASSERT_TRUE(result.size() == devices.size());
1759 for (const auto &pair : result) {
1760 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1761 EXPECT_TRUE(pair.second == OK);
1762 }
1763 VirtualDataItem item;
1764 VirtualDataItem item2;
1765 for (int i = 0; i < dataSize; i++) {
1766 key.push_back(i);
1767 key2.push_back(i);
1768 g_deviceB->GetData(key, item);
1769 EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
1770 EXPECT_TRUE(item.value == value);
1771 key.pop_back();
1772 key2.pop_back();
1773 }
1774 }
1775
1776 /**
1777 * @tc.name: AllPredicateQuerySync002
1778 * @tc.desc: Test wrong query param push sync for AllPredicate data.
1779 * @tc.type: FUNC
1780 * @tc.require: AR000FN6G9
1781 * @tc.author: zhuwentao
1782 */
1783 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync002, TestSize.Level1)
1784 {
1785 /**
1786 * @tc.steps: step1. InitSchemaDb
1787 */
1788 InitSchemaDb();
1789 DBStatus status = OK;
1790 std::vector<std::string> devices;
1791 devices.push_back(g_deviceB->GetDeviceId());
1792
1793 /**
1794 * @tc.steps: step2. deviceA call query sync and wait
1795 * @tc.expected: step2. sync should return INVALID_QUERY_FIELD
1796 */
1797 Query query = Query::Select().GreaterThan("field_name11", 10);
1798 std::map<std::string, DBStatus> result;
1799 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1800 ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1801 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, query);
1802 ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1803 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
1804 ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1805 }
1806
1807 /**
1808 * @tc.name: AllPredicateQuerySync003
1809 * @tc.desc: Test normal push sync for AllPredicate data with limit
1810 * @tc.type: FUNC
1811 * @tc.require: AR000FN6G9
1812 * @tc.author: zhuwentao
1813 */
1814 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync003, TestSize.Level1)
1815 {
1816 /**
1817 * @tc.steps: step1. InitSchemaDb
1818 */
1819 InitSchemaDb();
1820 DBStatus status = OK;
1821 std::vector<std::string> devices;
1822 devices.push_back(g_deviceB->GetDeviceId());
1823
1824 /**
1825 * @tc.steps: step2. deviceA put {key1, SCHEMA_VALUE1} - {key9, SCHEMA_VALUE1}
1826 */
1827 Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1828 Value value2(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end());
1829 Key key = {'1'};
1830 Key key2 = {'2'};
1831 const int dataSize = 10;
1832 for (int i = 0; i < dataSize; i++) {
1833 key.push_back(i);
1834 key2.push_back(i);
1835 status = g_schemaKvDelegatePtr->Put(key, value);
1836 ASSERT_TRUE(status == OK);
1837 status = g_schemaKvDelegatePtr->Put(key2, value2);
1838 ASSERT_TRUE(status == OK);
1839 key.pop_back();
1840 key2.pop_back();
1841 }
1842 ASSERT_TRUE(status == OK);
1843
1844 /**
1845 * @tc.steps: step3. deviceA call query sync with limit and wait
1846 * @tc.expected: step3. sync should return OK.
1847 */
1848 Query query = Query::Select().EqualTo("$.field_name1", 1).Limit(20, 0);
1849 std::map<std::string, DBStatus> result;
1850 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1851 ASSERT_TRUE(status == OK);
1852
1853 /**
1854 * @tc.expected: step4. onComplete should be called, DeviceB have {key1, SCHEMA_VALUE1} - {key9, SCHEMA_VALUE1}
1855 */
1856 ASSERT_TRUE(result.size() == devices.size());
1857 for (const auto &pair : result) {
1858 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1859 EXPECT_TRUE(pair.second == OK);
1860 }
1861 VirtualDataItem item;
1862 VirtualDataItem item2;
1863 for (int i = 0; i < dataSize; i++) {
1864 key.push_back(i);
1865 key2.push_back(i);
1866 g_deviceB->GetData(key, item);
1867 EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
1868 EXPECT_TRUE(item.value == value);
1869 key.pop_back();
1870 key2.pop_back();
1871 }
1872 }
1873
1874 /**
1875 * @tc.name: AllPredicateQuerySync004
1876 * @tc.desc: Test normal pull sync for AllPredicate data.
1877 * @tc.type: FUNC
1878 * @tc.require: AR000FN6G9
1879 * @tc.author: zhuwentao
1880 */
1881 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync004, TestSize.Level1)
1882 {
1883 /**
1884 * @tc.steps: step1. InitSchemaDb
1885 */
1886 InitSchemaDb();
1887 DBStatus status = OK;
1888 std::vector<std::string> devices;
1889 devices.push_back(g_deviceB->GetDeviceId());
1890
1891 /**
1892 * @tc.steps: step2. deviceB put {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1893 */
1894 Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1895 Key key = {'1'};
1896 const int dataSize = 10;
1897 for (int i = 0; i < dataSize; i++) {
1898 key.push_back(i);
1899 g_deviceB->PutData(key, value, 10 + i, 0);
1900 ASSERT_TRUE(status == OK);
1901 key.pop_back();
1902 }
1903 ASSERT_TRUE(status == OK);
1904
1905 /**
1906 * @tc.steps: step3. deviceA call query sync and wait
1907 * @tc.expected: step3. sync should return OK.
1908 */
1909 Query query = Query::Select().EqualTo("$.field_name1", 1);
1910 std::map<std::string, DBStatus> result;
1911 status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
1912 ASSERT_TRUE(status == OK);
1913
1914 /**
1915 * @tc.expected: step4. onComplete should be called, DeviceA have {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1916 */
1917 ASSERT_TRUE(result.size() == devices.size());
1918 for (const auto &pair : result) {
1919 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1920 EXPECT_TRUE(pair.second == OK);
1921 }
1922 Value item;
1923 Value item2;
1924 for (int i = 0; i < dataSize; i++) {
1925 key.push_back(i);
1926 g_schemaKvDelegatePtr->Get(key, item);
1927 EXPECT_TRUE(item == value);
1928 key.pop_back();
1929 }
1930 }