1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 #include <thread>
18 
19 #include "db_common.h"
20 #include "db_constant.h"
21 #include "distributeddb_data_generate_unit_test.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "generic_single_ver_kv_entry.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "platform_specific.h"
27 #include "query.h"
28 #include "query_sync_object.h"
29 #include "single_ver_data_sync.h"
30 #include "single_ver_serialize_manager.h"
31 #include "sync_types.h"
32 #include "virtual_communicator.h"
33 #include "virtual_communicator_aggregator.h"
34 #include "virtual_single_ver_sync_db_Interface.h"
35 
36 using namespace testing::ext;
37 using namespace DistributedDB;
38 using namespace DistributedDBUnitTest;
39 using namespace std;
40 
41 namespace {
42     string g_testDir;
43     const string STORE_ID = "kv_store_sync_test";
44     const string SCHEMA_STORE_ID = "kv_store_sync_schema_test";
45     const std::string DEVICE_B = "deviceB";
46 
47     KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
48     KvStoreDelegateManager g_schemaMgr(SCHEMA_APP_ID, USER_ID);
49     KvStoreConfig g_config;
50     DistributedDBToolsUnitTest g_tool;
51     DBStatus g_kvDelegateStatus = INVALID_ARGS;
52     DBStatus g_schemaKvDelegateStatus = INVALID_ARGS;
53     KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
54     KvStoreNbDelegate* g_schemaKvDelegatePtr = nullptr;
55     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
56     KvVirtualDevice *g_deviceB = nullptr;
57 
58     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
59     auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
60         placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
61     auto g_schemaKvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
62         placeholders::_1, placeholders::_2, std::ref(g_schemaKvDelegateStatus), std::ref(g_schemaKvDelegatePtr));
63     const string SCHEMA_STRING =
64     "{\"SCHEMA_VERSION\":\"1.0\","
65     "\"SCHEMA_MODE\":\"STRICT\","
66     "\"SCHEMA_DEFINE\":{"
67     "\"field_name1\":\"BOOL\","
68     "\"field_name2\":\"BOOL\","
69     "\"field_name3\":\"INTEGER, NOT NULL\","
70     "\"field_name4\":\"LONG, DEFAULT 100\","
71     "\"field_name5\":\"DOUBLE, NOT NULL, DEFAULT 3.14\","
72     "\"field_name6\":\"STRING, NOT NULL, DEFAULT '3.1415'\","
73     "\"field_name7\":\"LONG, DEFAULT 100\","
74     "\"field_name8\":\"LONG, DEFAULT 100\","
75     "\"field_name9\":\"LONG, DEFAULT 100\","
76     "\"field_name10\":\"LONG, DEFAULT 100\""
77     "},"
78     "\"SCHEMA_INDEXES\":[\"$.field_name1\", \"$.field_name2\"]}";
79 
80     const std::string SCHEMA_VALUE1 =
81     "{\"field_name1\":true,"
82     "\"field_name2\":false,"
83     "\"field_name3\":10,"
84     "\"field_name4\":20,"
85     "\"field_name5\":3.14,"
86     "\"field_name6\":\"3.1415\","
87     "\"field_name7\":100,"
88     "\"field_name8\":100,"
89     "\"field_name9\":100,"
90     "\"field_name10\":100}";
91 
92     const std::string SCHEMA_VALUE2 =
93     "{\"field_name1\":false,"
94     "\"field_name2\":true,"
95     "\"field_name3\":100,"
96     "\"field_name4\":200,"
97     "\"field_name5\":3.14,"
98     "\"field_name6\":\"3.1415\","
99     "\"field_name7\":100,"
100     "\"field_name8\":100,"
101     "\"field_name9\":100,"
102     "\"field_name10\":100}";
103 }
104 
105 class DistributedDBSingleVerP2PQuerySyncTest : public testing::Test {
106 public:
107     static void SetUpTestCase(void);
108     static void TearDownTestCase(void);
109     void SetUp();
110     void TearDown();
111 };
112 
SetUpTestCase(void)113 void DistributedDBSingleVerP2PQuerySyncTest::SetUpTestCase(void)
114 {
115     /**
116      * @tc.setup: Init datadir and Virtual Communicator.
117      */
118     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
119     string dir = g_testDir + "/single_ver";
120     DIR* dirTmp = opendir(dir.c_str());
121     if (dirTmp == nullptr) {
122         OS::MakeDBDirectory(dir);
123     } else {
124         closedir(dirTmp);
125     }
126 
127     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
128     ASSERT_TRUE(g_communicatorAggregator != nullptr);
129     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
130 }
131 
TearDownTestCase(void)132 void DistributedDBSingleVerP2PQuerySyncTest::TearDownTestCase(void)
133 {
134     /**
135      * @tc.teardown: Release virtual Communicator and clear data dir.
136      */
137     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
138         LOGE("rm test db files error!");
139     }
140     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
141 }
142 
SetUp(void)143 void DistributedDBSingleVerP2PQuerySyncTest::SetUp(void)
144 {
145     DistributedDBToolsUnitTest::PrintTestCaseInfo();
146     /**
147      * @tc.setup: create virtual device B and get a KvStoreNbDelegate as deviceA
148      */
149     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
150     ASSERT_TRUE(g_deviceB != nullptr);
151     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
152     ASSERT_TRUE(syncInterfaceB != nullptr);
153     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
154 }
155 
TearDown(void)156 void DistributedDBSingleVerP2PQuerySyncTest::TearDown(void)
157 {
158     /**
159      * @tc.teardown: Release device A, B
160      */
161     if (g_kvDelegatePtr != nullptr) {
162         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
163         g_kvDelegatePtr = nullptr;
164         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
165         LOGD("delete kv store status %d", status);
166         ASSERT_TRUE(status == OK);
167     }
168     if (g_schemaKvDelegatePtr != nullptr) {
169         ASSERT_EQ(g_schemaMgr.CloseKvStore(g_schemaKvDelegatePtr), OK);
170         g_schemaKvDelegatePtr = nullptr;
171         DBStatus status = g_schemaMgr.DeleteKvStore(SCHEMA_STORE_ID);
172         LOGD("delete kv store status %d", status);
173         ASSERT_TRUE(status == OK);
174     }
175     if (g_deviceB != nullptr) {
176         delete g_deviceB;
177         g_deviceB = nullptr;
178     }
179     PermissionCheckCallbackV2 nullCallback;
180     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
181 }
182 
InitNormalDb()183 void InitNormalDb()
184 {
185     g_config.dataDir = g_testDir;
186     g_mgr.SetKvStoreConfig(g_config);
187     KvStoreNbDelegate::Option option;
188     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
189     ASSERT_TRUE(g_kvDelegateStatus == OK);
190     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
191 }
192 
InitSchemaDb()193 void InitSchemaDb()
194 {
195     g_config.dataDir = g_testDir;
196     g_schemaMgr.SetKvStoreConfig(g_config);
197     KvStoreNbDelegate::Option option;
198     option.schema = SCHEMA_STRING;
199     g_schemaMgr.GetKvStore(SCHEMA_STORE_ID, option, g_schemaKvDelegateCallback);
200     ASSERT_TRUE(g_schemaKvDelegateStatus == OK);
201     ASSERT_TRUE(g_schemaKvDelegatePtr != nullptr);
202 }
203 
204 /**
205  * @tc.name: Normal Sync 001
206  * @tc.desc: Test normal push sync for keyprefix data.
207  * @tc.type: FUNC
208  * @tc.require: AR000FN6G9
209  * @tc.author: xushaohua
210  */
211 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync001, TestSize.Level1)
212 {
213     InitNormalDb();
214     DBStatus status = OK;
215     std::vector<std::string> devices;
216     devices.push_back(g_deviceB->GetDeviceId());
217 
218     /**
219      * @tc.steps: step1. deviceA put {k0, v0} - {k9, v9}
220      */
221     Key key = {'1'};
222     Value value = {'1'};
223     const int dataSize = 10;
224     for (int i = 0; i < dataSize; i++) {
225         key.push_back(i);
226         value.push_back(i);
227         status = g_kvDelegatePtr->Put(key, value);
228         ASSERT_TRUE(status == OK);
229         key.pop_back();
230         value.pop_back();
231     }
232     Key key2 = {'2'};
233     Value value2 = {'2'};
234     status = g_kvDelegatePtr->Put(key2, value2);
235     ASSERT_TRUE(status == OK);
236 
237     /**
238      * @tc.steps: step2. deviceA call query sync and wait
239      * @tc.expected: step2. sync should return OK.
240      */
241     Query query = Query::Select().PrefixKey(key);
242     std::map<std::string, DBStatus> result;
243     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
244     ASSERT_TRUE(status == OK);
245 
246     /**
247      * @tc.expected: step3. onComplete should be called, DeviceB have {k1,v1} - {k9, v9}
248      */
249     ASSERT_TRUE(result.size() == devices.size());
250     for (const auto &pair : result) {
251         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
252         EXPECT_TRUE(pair.second == OK);
253     }
254     VirtualDataItem item;
255     for (int i = 0; i < dataSize; i++) {
256         key.push_back(i);
257         value.push_back(i);
258         g_deviceB->GetData(key, item);
259         EXPECT_TRUE(item.value == value);
260         key.pop_back();
261         value.pop_back();
262     }
263     EXPECT_TRUE(g_deviceB->GetData(key2, item) != E_OK);
264 }
265 
266 /**
267  * @tc.name: Normal Sync 002
268  * @tc.desc: Test normal push sync for limit and offset.
269  * @tc.type: FUNC
270  * @tc.require: AR000FN6G9
271  * @tc.author: xushaohua
272  */
273 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync002, TestSize.Level1)
274 {
275     InitNormalDb();
276     DBStatus status = OK;
277     std::vector<std::string> devices;
278     devices.push_back(g_deviceB->GetDeviceId());
279 
280     /**
281      * @tc.steps: step1. deviceA put {k0, v0} - {k9, v9}
282      */
283     Key key = {'1'};
284     Value value = {'1'};
285     const int dataSize = 10;
286     for (int i = 0; i < dataSize; i++) {
287         key.push_back(i);
288         value.push_back(i);
289         status = g_kvDelegatePtr->Put(key, value);
290         ASSERT_TRUE(status == OK);
291         key.pop_back();
292         value.pop_back();
293     }
294 
295     /**
296      * @tc.steps: step2. deviceA call sync and wait
297      * @tc.expected: step2. sync should return OK.
298      */
299     const int limit = 5;
300     const int offset = 4;
301     Query query = Query::Select().PrefixKey(key).Limit(limit, offset);
302     std::map<std::string, DBStatus> result;
303     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
304     ASSERT_TRUE(status == OK);
305 
306     /**
307      * @tc.expected: step3. onComplete should be called, DeviceB have {k4,v4} {k8, v8}
308      */
309     ASSERT_TRUE(result.size() == devices.size());
310     for (const auto &pair : result) {
311         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
312         EXPECT_TRUE(pair.second == OK);
313     }
314 
315     VirtualDataItem item;
316     for (int i = limit - 1; i < limit + offset; i++) {
317         key.push_back(i);
318         value.push_back(i);
319         g_deviceB->GetData(key, item);
320         EXPECT_TRUE(item.value == value);
321         key.pop_back();
322         value.pop_back();
323     }
324 }
325 
326 /**
327  * @tc.name: Normal Sync 001
328  * @tc.desc: Test normal push_and_pull sync for keyprefix data.
329  * @tc.type: FUNC
330  * @tc.require: AR000FN6G9
331  * @tc.author: zhuwentao
332  */
333 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync003, TestSize.Level1)
334 {
335     InitNormalDb();
336     DBStatus status = OK;
337     std::vector<std::string> devices;
338     devices.push_back(g_deviceB->GetDeviceId());
339 
340     /**
341      * @tc.steps: step1. deviceA put {k, v}, {b, v}
342      */
343     Key key = {'1'};
344     Value value = {'1'};
345     const int dataSize = 10;
346     status = g_kvDelegatePtr->Put(key, value);
347     ASSERT_TRUE(status == OK);
348     Key key2 = {'2'};
349     Value value2 = {'2'};
350     status = g_kvDelegatePtr->Put(key2, value2);
351     ASSERT_TRUE(status == OK);
352 
353     /**
354      * @tc.steps: step2. deviceB put {b0, v0} - {b9, v9}, {c, v}
355      */
356     for (int i = 0; i < dataSize; i++) {
357         key2.push_back(i);
358         value2.push_back(i);
359         g_deviceB->PutData(key2, value2, 10 + i, 0);
360         key2.pop_back();
361         value2.pop_back();
362     }
363     Key key3 = {'3'};
364     Value value3 = {'3'};
365     g_deviceB->PutData(key3, value3, 20, 0);
366 
367     /**
368      * @tc.steps: step2. deviceA call query sync and wait
369      * @tc.expected: step2. sync should return OK.
370      */
371     Query query = Query::Select().PrefixKey(key2);
372     std::map<std::string, DBStatus> result;
373     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, query);
374     ASSERT_TRUE(status == OK);
375 
376     /**
377      * @tc.expected: step3. onComplete should be called, DeviceA have {b0, v0} - {b9, v9}, DeviceB have {b, v}
378      */
379     ASSERT_TRUE(result.size() == devices.size());
380     for (const auto &pair : result) {
381         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
382         EXPECT_TRUE(pair.second == OK);
383     }
384     VirtualDataItem item;
385     Value tmpValue;
386     for (int i = 0; i < dataSize; i++) {
387         key2.push_back(i);
388         value2.push_back(i);
389         g_kvDelegatePtr->Get(key2, tmpValue);
390         EXPECT_TRUE(tmpValue == value2);
391         key2.pop_back();
392         value2.pop_back();
393     }
394     EXPECT_TRUE(g_deviceB->GetData(key, item) != E_OK);
395     EXPECT_TRUE(g_deviceB->GetData(key2, item) == E_OK);
396     g_kvDelegatePtr->Get(key3, tmpValue);
397     EXPECT_TRUE(tmpValue != value3);
398 }
399 
400 /**
401  * @tc.name: Normal Sync 001
402  * @tc.desc: Test normal pull sync for keyprefix data.
403  * @tc.type: FUNC
404  * @tc.require: AR000FN6G9
405  * @tc.author: zhuwentao
406  */
407 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync004, TestSize.Level1)
408 {
409     InitNormalDb();
410     DBStatus status = OK;
411     std::vector<std::string> devices;
412     devices.push_back(g_deviceB->GetDeviceId());
413     /**
414      * @tc.steps: step1. deviceB put {k1, v1} - {k9, k9}, {b0, v0} - {b9, v9}
415      */
416     Key key = {'1'};
417     Value value = {'1'};
418     const int dataSize = 10;
419     Key key2 = {'2'};
420     Value value2 = {'2'};
421     vector<std::pair<Key, Value>> key1Vec;
422     vector<std::pair<Key, Value>> key2Vec;
423     for (int i = 0; i < dataSize; i++) {
424         Key tmpKey(key);
425         Value tmpValue(value);
426         tmpKey.push_back(i);
427         tmpValue.push_back(i);
428         key1Vec.push_back(pair<Key, Value> {tmpKey, tmpValue});
429     }
430     for (int i = 0; i < dataSize; i++) {
431         Key tmpKey(key2);
432         Value tmpValue(value2);
433         tmpKey.push_back(i);
434         tmpValue.push_back(i);
435         key2Vec.push_back(pair<Key, Value> {tmpKey, tmpValue});
436     }
437     for (int i = 0; i < dataSize; i++) {
438         g_deviceB->PutData(key2Vec[i].first, key2Vec[i].second, 20 + i, 0);
439         g_deviceB->PutData(key1Vec[i].first, key1Vec[i].second, 10 + i, 0);
440     }
441 
442     /**
443      * @tc.steps: step2. deviceA call query sync and wait
444      * @tc.expected: step2. sync should return OK.
445      */
446     Query query = Query::Select().PrefixKey(key2);
447     std::map<std::string, DBStatus> result;
448     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
449     ASSERT_TRUE(status == OK);
450 
451     /**
452      * @tc.expected: step3. onComplete should be called, DeviceA have {b0, v0} - {b9, v9}
453      */
454     ASSERT_TRUE(result.size() == devices.size());
455     for (const auto &pair : result) {
456         EXPECT_TRUE(pair.second == OK);
457     }
458     VirtualDataItem item;
459     Value tmpValue;
460     for (int i = 0; i < dataSize; i++) {
461         g_kvDelegatePtr->Get(key2Vec[i].first, tmpValue);
462         EXPECT_TRUE(tmpValue == key2Vec[i].second);
463         g_kvDelegatePtr->Get(key1Vec[i].first, tmpValue);
464         EXPECT_TRUE(tmpValue != key1Vec[i].second);
465     }
466 }
467 
468 /**
469  * @tc.name: NormalSync005
470  * @tc.desc: Test normal push sync for inkeys query.
471  * @tc.type: FUNC
472  * @tc.require: AR000GOHO7
473  * @tc.author: lidongwei
474  */
475 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync005, TestSize.Level1)
476 {
477     InitNormalDb();
478     std::vector<std::string> devices;
479     devices.push_back(g_deviceB->GetDeviceId());
480 
481     /**
482      * @tc.steps: step1. deviceA put K1-K5
483      */
484     ASSERT_EQ(g_kvDelegatePtr->PutBatch(
485         {{KEY_1, VALUE_1}, {KEY_2, VALUE_2}, {KEY_3, VALUE_3}, {KEY_4, VALUE_4}, {KEY_5, VALUE_5}}), OK);
486 
487     /**
488      * @tc.steps: step2. deviceA sync K2,K4 and wait
489      * @tc.expected: step2. sync should return OK.
490      */
491     Query query = Query::Select().InKeys({KEY_2, KEY_4});
492     std::map<std::string, DBStatus> result;
493     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), OK);
494 
495     /**
496      * @tc.expected: step3. onComplete should be called.
497      */
498     ASSERT_EQ(result.size(), devices.size());
499     for (const auto &pair : result) {
500         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
501         EXPECT_EQ(pair.second, OK);
502     }
503 
504     /**
505      * @tc.steps: step4. deviceB have K2K4 and have no K1K3K5.
506      * @tc.expected: step4. sync should return OK.
507      */
508     VirtualDataItem item;
509     EXPECT_EQ(g_deviceB->GetData(KEY_2, item), E_OK);
510     EXPECT_EQ(item.value, VALUE_2);
511     EXPECT_EQ(g_deviceB->GetData(KEY_4, item), E_OK);
512     EXPECT_EQ(item.value, VALUE_4);
513     EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
514     EXPECT_EQ(g_deviceB->GetData(KEY_3, item), -E_NOT_FOUND);
515     EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
516 
517     /**
518      * @tc.steps: step5. deviceA sync with invalid inkeys query
519      * @tc.expected: step5. sync failed and the rc is right.
520      */
521     query = Query::Select().InKeys({});
522     result.clear();
523     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), INVALID_ARGS);
524 
525     std::set<Key> keys;
526     for (uint8_t i = 0; i < DBConstant::MAX_BATCH_SIZE + 1; i++) {
527         Key key = { i };
528         keys.emplace(key);
529     }
530     query = Query::Select().InKeys(keys);
531     result.clear();
532     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), OVER_MAX_LIMITS);
533 
534     query = Query::Select().InKeys({{}});
535     result.clear();
536     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query), INVALID_ARGS);
537 }
538 
539 /**
540  * @tc.name: NormalSync006
541  * @tc.desc: Test normal push sync with query by 32 devices;
542  * @tc.type: FUNC
543  * @tc.require:
544  * @tc.author: zhuwentao
545  */
546 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, NormalSync006, TestSize.Level1)
547 {
548     /**
549      * @tc.steps: step1. init db and 32 devices
550      */
551     InitNormalDb();
552     uint32_t syncDevCount = 32u;
553     std::vector<KvVirtualDevice *> virtualDeviceVec(syncDevCount, nullptr);
554     const std::string device = "deviceTmp_";
555     std::vector<std::string> devices;
556     bool isError = false;
557     for (uint32_t i = 0; i < syncDevCount; i++) {
558         std::string tmpDev = device + std::to_string(i);
559         virtualDeviceVec[i] = new (std::nothrow) KvVirtualDevice(tmpDev);
560         if (virtualDeviceVec[i] == nullptr) {
561             isError = true;
562             break;
563         }
564         VirtualSingleVerSyncDBInterface *tmpSyncInterface = new (std::nothrow) VirtualSingleVerSyncDBInterface();
565         if (tmpSyncInterface == nullptr) {
566             isError = true;
567             break;
568         }
569         ASSERT_EQ(virtualDeviceVec[i]->Initialize(g_communicatorAggregator, tmpSyncInterface), E_OK);
570         devices.push_back(virtualDeviceVec[i]->GetDeviceId());
571     }
572     if (isError) {
573         for (uint32_t i = 0; i < syncDevCount; i++) {
574             if (virtualDeviceVec[i] != nullptr) {
575                 delete virtualDeviceVec[i];
576                 virtualDeviceVec[i] = nullptr;
577             }
578         }
579         ASSERT_TRUE(false);
580     }
581     /**
582      * @tc.steps: step2. deviceA put {k0, v0}
583      */
584     Key key = {'1'};
585     Value value = {'1'};
586     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
587     /**
588      * @tc.steps: step3. deviceA call query sync and wait
589      * @tc.expected: step3. sync should return OK.
590      */
591     Query query = Query::Select().PrefixKey(key);
592     std::map<std::string, DBStatus> result;
593     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query) == OK);
594 
595     /**
596      * @tc.expected: step3. onComplete should be called, DeviceB have {k1,v1} - {k9, v9}
597      */
598     ASSERT_TRUE(result.size() == devices.size());
599     for (const auto &pair : result) {
600         EXPECT_TRUE(pair.second == OK);
601     }
602     VirtualDataItem item;
603     for (uint32_t i = 0; i < syncDevCount; i++) {
604         EXPECT_TRUE(virtualDeviceVec[i]->GetData(key, item) == E_OK);
605         EXPECT_EQ(item.value, value);
606         delete virtualDeviceVec[i];
607         virtualDeviceVec[i] = nullptr;
608     }
609 }
610 
611 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, QueryRequestPacketTest001, TestSize.Level1)
612 {
613     /**
614      * @tc.steps: step1. prepare a QuerySyncRequestPacket.
615      */
616     auto packet = new (std::nothrow) DataRequestPacket;
617     ASSERT_TRUE(packet != nullptr);
618     auto kvEntry = new (std::nothrow) GenericSingleVerKvEntry;
619     ASSERT_TRUE(kvEntry != nullptr);
620     kvEntry->SetTimestamp(1);
621     SyncEntry syncData {.entries = {kvEntry}};
622 #ifndef OMIT_ZLIB
623     ASSERT_TRUE(GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
624         {CompressAlgorithm::ZLIB, SOFTWARE_VERSION_CURRENT}) == E_OK);
625     packet->SetCompressAlgo(CompressAlgorithm::ZLIB);
626     packet->SetFlag(4); // set IS_COMPRESS_DATA flag true
627 #endif
628     packet->SetBasicInfo(-E_NOT_SUPPORT, SOFTWARE_VERSION_CURRENT, SyncModeType::QUERY_PUSH_PULL);
629     packet->SetData(syncData.entries);
630     packet->SetCompressData(syncData.compressedEntries);
631     packet->SetEndWaterMark(INT8_MAX);
632     packet->SetWaterMark(INT16_MAX, INT32_MAX, INT64_MAX);
633     QuerySyncObject syncQuery(Query::Select().PrefixKey({'2'}));
634     packet->SetQuery(syncQuery);
635     packet->SetQueryId(syncQuery.GetIdentify());
636     packet->SetReserved(std::vector<uint64_t> {INT8_MAX});
637 
638     /**
639      * @tc.steps: step2. put the QuerySyncRequestPacket into a message.
640      */
641     Message msg;
642     msg.SetExternalObject(packet);
643     msg.SetMessageId(QUERY_SYNC_MESSAGE);
644     msg.SetMessageType(TYPE_REQUEST);
645 
646     /**
647      * @tc.steps: step3. Serialization the message to a buffer.
648      */
649     int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
650     vector<uint8_t> buffer(len);
651     ASSERT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), &msg), E_OK);
652 
653     /**
654      * @tc.steps: step4. DeSerialization the buffer to a message.
655      */
656     Message outMsg(QUERY_SYNC_MESSAGE);
657     outMsg.SetMessageType(TYPE_REQUEST);
658     ASSERT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), &outMsg), E_OK);
659 
660     /**
661      * @tc.steps: step5. checkout the outMsg.
662      * @tc.expected: step5. outMsg equal the the in msg
663      */
664     auto outPacket = outMsg.GetObject<DataRequestPacket>();
665     EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
666     EXPECT_EQ(outPacket->GetMode(), SyncModeType::QUERY_PUSH_PULL);
667     EXPECT_EQ(outPacket->GetEndWaterMark(), static_cast<uint64_t>(INT8_MAX));
668     EXPECT_EQ(outPacket->GetLocalWaterMark(), static_cast<uint64_t>(INT16_MAX));
669     EXPECT_EQ(outPacket->GetPeerWaterMark(), static_cast<uint64_t>(INT32_MAX));
670     EXPECT_EQ(outPacket->GetDeletedWaterMark(), static_cast<uint64_t>(INT64_MAX));
671 #ifndef OMIT_ZLIB
672     EXPECT_EQ(outPacket->GetFlag(), static_cast<uint32_t>(4)); // check IS_COMPRESS_DATA flag true
673 #endif
674     EXPECT_EQ(outPacket->GetQueryId(), syncQuery.GetIdentify());
675     EXPECT_EQ(outPacket->GetReserved(), std::vector<uint64_t> {INT8_MAX});
676     EXPECT_EQ(outPacket->GetSendCode(), -E_NOT_SUPPORT);
677     EXPECT_EQ(outPacket->GetData()[0]->GetTimestamp(), 1u);
678 }
679 
680 /**
681  * @tc.name: QueryRequestPacketTest002
682  * @tc.desc: Test exception branch of serialization.
683  * @tc.type: FUNC
684  * @tc.require:
685  * @tc.author: zhangshijie
686  */
687 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, SerializationManager001, TestSize.Level1)
688 {
689     /**
690      * @tc.steps: step1. call SingleVerSerializeManager::Serialization with buffer = nullptr or msg = nullptr
691      * @tc.expected:step1 return -E_MESSAGE_ID_ERROR
692      */
693     Message msg;
694     msg.SetMessageType(TYPE_INVALID);
695     vector<uint8_t> buffer(10); // 10 is test buffer len
696     EXPECT_EQ(SingleVerSerializeManager::Serialization(nullptr, buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
697     EXPECT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), nullptr), -E_MESSAGE_ID_ERROR);
698 
699     /**
700      * @tc.steps: step2. call SingleVerSerializeManager::Serialization with invalid type message
701      * @tc.expected:step2 return -E_MESSAGE_ID_ERROR
702      */
703     EXPECT_EQ(SingleVerSerializeManager::Serialization(buffer.data(), buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
704 
705     /**
706      * @tc.steps: step3. call SingleVerSerializeManager::DeSerialization with buffer = nullptr or msg = nullptr
707      * @tc.expected:step3 return -E_MESSAGE_ID_ERROR
708      */
709     EXPECT_EQ(SingleVerSerializeManager::DeSerialization(nullptr, buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
710     EXPECT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), nullptr), -E_MESSAGE_ID_ERROR);
711 
712     /**
713      * @tc.steps: step4. call SingleVerSerializeManager::DeSerialization with invalid type message
714      * @tc.expected:step4 return -E_MESSAGE_ID_ERROR
715      */
716     EXPECT_EQ(SingleVerSerializeManager::DeSerialization(buffer.data(), buffer.size(), &msg), -E_MESSAGE_ID_ERROR);
717 }
718 
719 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, QueryAckPacketTest001, TestSize.Level1)
720 {
721     /**
722      * @tc.steps: step1. prepare a QuerySyncAckPacket.
723      */
724     DataAckPacket packet;
725     packet.SetVersion(SOFTWARE_VERSION_CURRENT);
726     packet.SetData(INT64_MAX);
727     packet.SetRecvCode(-E_NOT_SUPPORT);
728     std::vector<uint64_t> reserved = {INT8_MAX};
729     packet.SetReserved(reserved);
730 
731     /**
732      * @tc.steps: step2. put the QuerySyncAckPacket into a message.
733      */
734     Message msg;
735     msg.SetCopiedObject(packet);
736     msg.SetMessageId(QUERY_SYNC_MESSAGE);
737     msg.SetMessageType(TYPE_RESPONSE);
738 
739     /**
740      * @tc.steps: step3. Serialization the message to a buffer.
741      */
742     int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
743     LOGE("test leng = %d", len);
744     uint8_t *buffer = new (nothrow) uint8_t[len];
745     ASSERT_TRUE(buffer != nullptr);
746     int errCode = SingleVerSerializeManager::Serialization(buffer, len, &msg);
747     ASSERT_EQ(errCode, E_OK);
748 
749     /**
750      * @tc.steps: step4. DeSerialization the buffer to a message.
751      */
752     Message outMsg;
753     outMsg.SetMessageId(QUERY_SYNC_MESSAGE);
754     outMsg.SetMessageType(TYPE_RESPONSE);
755     errCode = SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg);
756     ASSERT_EQ(errCode, E_OK);
757 
758     /**
759      * @tc.steps: step5. checkout the outMsg.
760      * @tc.expected: step5. outMsg equal the the in msg
761      */
762     auto outPacket = outMsg.GetObject<DataAckPacket>();
763     EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
764     EXPECT_EQ(outPacket->GetData(), static_cast<uint64_t>(INT64_MAX));
765     std::vector<uint64_t> reserved2 = {INT8_MAX};
766     EXPECT_EQ(outPacket->GetReserved(), reserved2);
767     EXPECT_EQ(outPacket->GetRecvCode(), -E_NOT_SUPPORT);
768     delete[] buffer;
769 }
770 
771 /**
772  * @tc.name: GetQueryWaterMark 001
773  * @tc.desc: Test metaData save and get queryWaterMark.
774  * @tc.type: FUNC
775  * @tc.require: AR000FN6G9
776  * @tc.author: zhangqiquan
777  */
778 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark001, TestSize.Level1)
779 {
780     VirtualSingleVerSyncDBInterface storage;
781     Metadata meta;
782 
783     /**
784      * @tc.steps: step1. initialize meta with storage
785      * @tc.expected: step1. E_OK
786      */
787     int errCode = meta.Initialize(&storage);
788     ASSERT_EQ(errCode, E_OK);
789 
790     /**
791      * @tc.steps: step2. save receive and send watermark
792      * @tc.expected: step2. E_OK
793      */
794     WaterMark w1 = 1;
795     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w1), E_OK);
796     EXPECT_EQ(meta.SetSendQueryWaterMark("Q1", "D1", w1), E_OK);
797 
798     /**
799      * @tc.steps: step3. get receive and send watermark
800      * @tc.expected: step3. E_OK and get the latest value
801      */
802     WaterMark w = 0;
803     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w), E_OK);
804     EXPECT_EQ(w1, w);
805     EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", w), E_OK);
806     EXPECT_EQ(w1, w);
807 
808     /**
809      * @tc.steps: step4. set peer and local watermark
810      * @tc.expected: step4. E_OK
811      */
812     WaterMark w2 = 2;
813     EXPECT_EQ(meta.SaveLocalWaterMark("D1", w2), E_OK);
814     EXPECT_EQ(meta.SavePeerWaterMark("D1", w2, true), E_OK);
815 
816     /**
817      * @tc.steps: step5. get receive and send watermark
818      * @tc.expected: step5. E_OK and get the w1
819      */
820     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w), E_OK);
821     EXPECT_EQ(w2, w);
822     EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", w), E_OK);
823     EXPECT_EQ(w2, w);
824 
825     /**
826      * @tc.steps: step6. set peer and local watermark
827      * @tc.expected: step6. E_OK
828      */
829     WaterMark w3 = 3;
830     EXPECT_EQ(meta.SaveLocalWaterMark("D2", w3), E_OK);
831     EXPECT_EQ(meta.SavePeerWaterMark("D2", w3, true), E_OK);
832 
833     /**
834      * @tc.steps: step7. get receive and send watermark
835      * @tc.expected: step7. E_OK and get the w3
836      */
837     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q2", "D2", w), E_OK);
838     EXPECT_EQ(w3, w);
839     EXPECT_EQ(meta.GetSendQueryWaterMark("Q2", "D2", w), E_OK);
840     EXPECT_EQ(w3, w);
841 
842     /**
843      * @tc.steps: step8. get not exit receive and send watermark
844      * @tc.expected: step8. E_OK and get the 0
845      */
846     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q3", "D3", w), E_OK);
847     EXPECT_EQ(w, 0u);
848     EXPECT_EQ(meta.GetSendQueryWaterMark("Q3", "D3", w), E_OK);
849     EXPECT_EQ(w, 0u);
850 }
851 
852 /**
853  * @tc.name: GetQueryWaterMark 002
854  * @tc.desc: Test metaData save and get queryWaterMark after push or pull mode.
855  * @tc.type: FUNC
856  * @tc.require: AR000FN6G9
857  * @tc.author: zhangqiquan
858  */
859 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark002, TestSize.Level1)
860 {
861     VirtualSingleVerSyncDBInterface storage;
862     Metadata meta;
863 
864     /**
865      * @tc.steps: step1. initialize meta with storage
866      * @tc.expected: step1. E_OK
867      */
868     int errCode = meta.Initialize(&storage);
869     ASSERT_EQ(errCode, E_OK);
870 
871     /**
872      * @tc.steps: step2. set peer and local watermark
873      * @tc.expected: step2. E_OK
874      */
875     WaterMark w1 = 2;
876     EXPECT_EQ(meta.SaveLocalWaterMark("D1", w1), E_OK);
877     EXPECT_EQ(meta.SavePeerWaterMark("D1", w1, true), E_OK);
878 
879     /**
880      * @tc.steps: step2. save receive and send watermark
881      * @tc.expected: step2. E_OK
882      */
883     WaterMark w2 = 1;
884     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w2), E_OK);
885     EXPECT_EQ(meta.SetSendQueryWaterMark("Q1", "D1", w2), E_OK);
886 
887     /**
888      * @tc.steps: step3. get receive and send watermark
889      * @tc.expected: step3. E_OK and get the bigger value
890      */
891     WaterMark w = 0;
892     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w), E_OK);
893     EXPECT_EQ(w1, w);
894     EXPECT_EQ(meta.GetSendQueryWaterMark("Q1", "D1", w), E_OK);
895     EXPECT_EQ(w1, w);
896 }
897 
898 /**
899  * @tc.name: GetQueryWaterMark 003
900  * @tc.desc: check time offset after remove water mark
901  * @tc.type: FUNC
902  * @tc.require:
903  * @tc.author: lianhuix
904  */
905 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryWaterMark003, TestSize.Level1)
906 {
907     VirtualSingleVerSyncDBInterface storage;
908     Metadata meta;
909 
910     int errCode = meta.Initialize(&storage);
911     ASSERT_EQ(errCode, E_OK);
912 
913     const std::string DEVICE_B = "DEVICE_B";
914     TimeOffset offset = 100; // 100: offset
915     meta.SaveTimeOffset(DEVICE_B, offset);
916 
917     WaterMark w1 = 2; // 2: watermark
918     meta.SavePeerWaterMark(DBCommon::TransferHashString(DEVICE_B), w1, false);
919 
920     TimeOffset offsetGot;
921     meta.GetTimeOffset(DEVICE_B, offsetGot);
922     EXPECT_EQ(offsetGot, offset);
923 }
924 
925 /**
926  * @tc.name: GetDeleteWaterMark001
927  * @tc.desc: Test metaData save and get deleteWaterMark.
928  * @tc.type: FUNC
929  * @tc.require: AR000FN6G9
930  * @tc.author: zhangqiquan
931  */
932 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteWaterMark001, TestSize.Level1)
933 {
934     VirtualSingleVerSyncDBInterface storage;
935     Metadata meta;
936 
937     /**
938      * @tc.steps: step1. initialize meta with storage
939      * @tc.expected: step1. E_OK
940      */
941     int errCode = meta.Initialize(&storage);
942     ASSERT_EQ(errCode, E_OK);
943 
944     /**
945      * @tc.steps: step2. set and get recv/send delete watermark
946      * @tc.expected: step2. set E_OK and get water mark is equal with last set
947      */
948     const std::string device = "DEVICE";
949     const WaterMark maxWaterMark = 1000u;
__anon6100a1770202() 950     std::thread recvThread([&meta, &device, &maxWaterMark]() {
951         for (WaterMark expectRecv = 0u; expectRecv < maxWaterMark; ++expectRecv) {
952             EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark(device, expectRecv), E_OK);
953             WaterMark actualRecv = 0u;
954             EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark(device, actualRecv), E_OK);
955             EXPECT_EQ(actualRecv, expectRecv);
956         }
957     });
__anon6100a1770302() 958     std::thread sendThread([&meta, &device, &maxWaterMark]() {
959         for (WaterMark expectSend = 0u; expectSend < maxWaterMark; ++expectSend) {
960             EXPECT_EQ(meta.SetSendDeleteSyncWaterMark(device, expectSend), E_OK);
961             WaterMark actualSend = 0u;
962             EXPECT_EQ(meta.GetSendDeleteSyncWaterMark(device, actualSend), E_OK);
963             EXPECT_EQ(actualSend, expectSend);
964         }
965     });
966     recvThread.join();
967     sendThread.join();
968 }
969 
970 /**
971  * @tc.name: ClearQueryWaterMark 001
972  * @tc.desc: Test metaData clear watermark function.
973  * @tc.type: FUNC
974  * @tc.require: AR000FN6G9
975  * @tc.author: zhangqiquan
976  */
977 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark001, TestSize.Level1)
978 {
979     VirtualSingleVerSyncDBInterface storage;
980     Metadata meta;
981 
982     /**
983      * @tc.steps: step1. initialize meta with storage
984      * @tc.expected: step1. E_OK
985      */
986     int errCode = meta.Initialize(&storage);
987     ASSERT_EQ(errCode, E_OK);
988 
989     /**
990      * @tc.steps: step2. save receive watermark
991      * @tc.expected: step2. E_OK
992      */
993     WaterMark w1 = 1;
994     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w1), E_OK);
995 
996     /**
997      * @tc.steps: step3. erase peer watermark
998      * @tc.expected: step3. E_OK
999      */
1000     EXPECT_EQ(meta.EraseDeviceWaterMark("D1", true), E_OK);
1001 
1002     /**
1003      * @tc.steps: step4. get receive watermark
1004      * @tc.expected: step4. E_OK receive watermark is zero
1005      */
1006     WaterMark w2 = -1;
1007     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w2), E_OK);
1008     EXPECT_EQ(w2, 0u);
1009 
1010     /**
1011      * @tc.steps: step5. set peer watermark
1012      * @tc.expected: step5. E_OK
1013      */
1014     WaterMark w3 = 2;
1015     EXPECT_EQ(meta.SavePeerWaterMark("D1", w3, true), E_OK);
1016 
1017     /**
1018      * @tc.steps: step6. get receive watermark
1019      * @tc.expected: step6. E_OK receive watermark is peer watermark
1020      */
1021     WaterMark w4 = -1;
1022     EXPECT_EQ(meta.GetRecvQueryWaterMark("Q1", "D1", w4), E_OK);
1023     EXPECT_EQ(w4, w3);
1024 }
1025 
1026 /**
1027  * @tc.name: ClearQueryWaterMark 002
1028  * @tc.desc: Test metaData clear watermark function.
1029  * @tc.type: FUNC
1030  * @tc.require: AR000FN6G9
1031  * @tc.author: zhangqiquan
1032  */
1033 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark002, TestSize.Level1)
1034 {
1035     VirtualSingleVerSyncDBInterface storage;
1036     Metadata meta;
1037 
1038     /**
1039      * @tc.steps: step1. initialize meta with storage
1040      * @tc.expected: step1. E_OK
1041      */
1042     int errCode = meta.Initialize(&storage);
1043     ASSERT_EQ(errCode, E_OK);
1044 
1045     /**
1046      * @tc.steps: step2. save receive watermark
1047      * @tc.expected: step2. E_OK
1048      */
1049     WaterMark w1 = 1;
1050     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D1", w1), E_OK);
1051     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q2", "D1", w1), E_OK);
1052     EXPECT_EQ(meta.SetRecvQueryWaterMark("Q1", "D2", w1), E_OK);
1053 
1054     /**
1055      * @tc.steps: step3. erase peer watermark, make sure data remove in db
1056      * @tc.expected: step3. E_OK
1057      */
1058     Metadata anotherMeta;
1059     ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1060     EXPECT_EQ(anotherMeta.EraseDeviceWaterMark("D1", true), E_OK);
1061 
1062     /**
1063      * @tc.steps: step4. get receive watermark
1064      * @tc.expected: step4. E_OK receive watermark is zero
1065      */
1066     WaterMark w2 = -1;
1067     EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q1", "D1", w2), E_OK);
1068     EXPECT_EQ(w2, 0u);
1069     w2 = -1;
1070     EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q2", "D1", w2), E_OK);
1071     EXPECT_EQ(w2, 0u);
1072     w2 = -1;
1073     EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark("Q1", "D2", w2), E_OK);
1074     EXPECT_EQ(w2, w1);
1075 }
1076 
1077 /**
1078  * @tc.name: ClearQueryWaterMark 003
1079  * @tc.desc: Test metaData clear watermark busy.
1080  * @tc.type: FUNC
1081  * @tc.require: AR000FN6G9
1082  * @tc.author: zhangqiquan
1083  */
1084 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearQueryWaterMark003, TestSize.Level1)
1085 {
1086     VirtualSingleVerSyncDBInterface storage;
1087     Metadata meta;
1088     /**
1089      * @tc.steps: step1. initialize meta with storage
1090      * @tc.expected: step1. E_OK
1091      */
1092     int errCode = meta.Initialize(&storage);
1093     ASSERT_EQ(errCode, E_OK);
1094     /**
1095      * @tc.steps: step2. set busy and erase water mark
1096      * @tc.expected: step2. -E_BUSY
1097      */
1098     storage.SetBusy(false, true);
1099     EXPECT_EQ(meta.EraseDeviceWaterMark("DEVICE_ID", true), -E_BUSY);
1100 }
1101 
1102 /**
1103  * @tc.name: GetQueryLastTimestamp001
1104  * @tc.desc: Test function of GetQueryLastTimestamp.
1105  * @tc.type: FUNC
1106  * @tc.require: AR000FN6G9
1107  * @tc.author: zhangshijie
1108  */
1109 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetQueryLastTimestamp001, TestSize.Level1)
1110 {
1111     /**
1112      * @tc.steps: step1. initialize meta with nullptr
1113      * @tc.expected: step1. return -E_INVALID_DB
1114      */
1115     Metadata meta;
1116     EXPECT_EQ(meta.Initialize(nullptr), -E_INVALID_DB);
1117 
1118     /**
1119      * @tc.steps: step2. initialize meta with storage
1120      * @tc.expected: step2. E_OK
1121      */
1122     VirtualSingleVerSyncDBInterface storage;
1123     int errCode = meta.Initialize(&storage);
1124     ASSERT_EQ(errCode, E_OK);
1125 
1126     /**
1127      * @tc.steps: step3. call GetQueryLastTimestamp with a non-exists device
1128      * @tc.expected: step3. return INT64_MAX
1129      */
1130     EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q1"), static_cast<uint64_t>(INT64_MAX));
1131 
1132     /**
1133      * @tc.steps: step4. call GetQueryLastTimestamp with device D1 again
1134      * @tc.expected: step4. return 0
1135      */
1136     EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q1"), 0u);
1137 
1138     /**
1139      * @tc.steps: step5. call GetQueryLastTimestamp with device D1 and Q2
1140      * @tc.expected: step5. return INT64_MAX
1141      */
1142     EXPECT_EQ(meta.GetQueryLastTimestamp("D1", "Q2"), static_cast<uint64_t>(INT64_MAX));
1143 }
1144 
1145 /**
1146  * @tc.name: MetaDataExceptionBranch001
1147  * @tc.desc: Test execption branch of meata data.
1148  * @tc.type: FUNC
1149  * @tc.require: AR000FN6G9
1150  * @tc.author: zhangshijie
1151  */
1152 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, MetaDataExceptionBranch001, TestSize.Level1)
1153 {
1154     /**
1155      * @tc.steps: step1. call GetRemoveDataMark with a device not in map
1156      * @tc.expected: step1. out value = 0
1157      */
1158     Metadata meta;
1159     uint64_t val = 99; // 99 is the initial value of outValue
1160     uint64_t outValue = val;
1161     meta.GetRemoveDataMark("D1", outValue);
1162     EXPECT_EQ(outValue, 0u);
1163 
1164     /**
1165      * @tc.steps: step2. reset outValue, call GetDbCreateTime with a device not in map
1166      * @tc.expected: step2. out value = 0
1167      */
1168     outValue = val;
1169     meta.GetDbCreateTime("D1", outValue);
1170     EXPECT_EQ(outValue, 0u);
1171 
1172     /**
1173      * @tc.steps: step3. call ResetMetaDataAfterRemoveData with a device not in map
1174      * @tc.expected: step3. return -E_NOT_FOUND
1175      */
1176     EXPECT_EQ(meta.ResetMetaDataAfterRemoveData("D1"), -E_NOT_FOUND);
1177 }
1178 
1179 /**
1180  * @tc.name: GetDeleteKeyWaterMark 001
1181  * @tc.desc: Test metaData save and get deleteWaterMark.
1182  * @tc.type: FUNC
1183  * @tc.require: AR000FN6G9
1184  * @tc.author: zhangqiquan
1185  */
1186 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteKeyWaterMark001, TestSize.Level1)
1187 {
1188     VirtualSingleVerSyncDBInterface storage;
1189     Metadata meta;
1190 
1191     /**
1192      * @tc.steps: step1. initialize meta with storage
1193      * @tc.expected: step1. E_OK
1194      */
1195     int errCode = meta.Initialize(&storage);
1196     ASSERT_EQ(errCode, E_OK);
1197 
1198     /**
1199      * @tc.steps: step2. save receive and send watermark
1200      * @tc.expected: step2. E_OK
1201      */
1202     WaterMark w1 = 1;
1203     EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", w1), E_OK);
1204     EXPECT_EQ(meta.SetSendDeleteSyncWaterMark("D1", w1), E_OK);
1205 
1206     /**
1207      * @tc.steps: step3. get receive and send watermark
1208      * @tc.expected: step3. E_OK and get the latest value
1209      */
1210     WaterMark w = 0;
1211     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w), E_OK);
1212     EXPECT_EQ(w1, w);
1213     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", w), E_OK);
1214     EXPECT_EQ(w1, w);
1215 
1216     /**
1217      * @tc.steps: step4. set peer and local watermark
1218      * @tc.expected: step4. E_OK
1219      */
1220     WaterMark w2 = 2;
1221     EXPECT_EQ(meta.SaveLocalWaterMark("D1", w2), E_OK);
1222     EXPECT_EQ(meta.SavePeerWaterMark("D1", w2, true), E_OK);
1223 
1224     /**
1225      * @tc.steps: step5. get receive and send watermark
1226      * @tc.expected: step5. E_OK and get the w1
1227      */
1228     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w), E_OK);
1229     EXPECT_EQ(w2, w);
1230     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", w), E_OK);
1231     EXPECT_EQ(w2, w);
1232 
1233     /**
1234      * @tc.steps: step6. set peer and local watermark
1235      * @tc.expected: step6. E_OK
1236      */
1237     WaterMark w3 = 3;
1238     EXPECT_EQ(meta.SaveLocalWaterMark("D2", w3), E_OK);
1239     EXPECT_EQ(meta.SavePeerWaterMark("D2", w3, true), E_OK);
1240 
1241     /**
1242      * @tc.steps: step7. get receive and send watermark
1243      * @tc.expected: step7. E_OK and get the w3
1244      */
1245     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D2", w), E_OK);
1246     EXPECT_EQ(w3, w);
1247     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D2", w), E_OK);
1248     EXPECT_EQ(w3, w);
1249 
1250     /**
1251      * @tc.steps: step8. get not exit receive and send watermark
1252      * @tc.expected: step8. E_OK and get the 0
1253      */
1254     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D3", w), E_OK);
1255     EXPECT_EQ(w, 0u);
1256     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D3", w), E_OK);
1257     EXPECT_EQ(w, 0u);
1258 }
1259 
1260 /**
1261  * @tc.name: GetDeleteKeyWaterMark 002
1262  * @tc.desc: Test metaData save and get deleteWaterMark after push or pull mode.
1263  * @tc.type: FUNC
1264  * @tc.require: AR000FN6G9
1265  * @tc.author: zhangqiquan
1266  */
1267 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, GetDeleteKeyWaterMark002, TestSize.Level1)
1268 {
1269     VirtualSingleVerSyncDBInterface storage;
1270     Metadata meta;
1271 
1272     /**
1273      * @tc.steps: step1. initialize meta with storage
1274      * @tc.expected: step1. E_OK
1275      */
1276     int errCode = meta.Initialize(&storage);
1277     ASSERT_EQ(errCode, E_OK);
1278 
1279     /**
1280      * @tc.steps: step2. set peer and local watermark
1281      * @tc.expected: step2. E_OK
1282      */
1283     WaterMark w1 = 3;
1284     EXPECT_EQ(meta.SaveLocalWaterMark("D1", w1), E_OK);
1285     EXPECT_EQ(meta.SavePeerWaterMark("D1", w1, true), E_OK);
1286 
1287     /**
1288      * @tc.steps: step2. save receive and send watermark
1289      * @tc.expected: step2. E_OK
1290      */
1291     WaterMark w2 = 1;
1292     EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", w2), E_OK);
1293     EXPECT_EQ(meta.SetSendDeleteSyncWaterMark("D1", w2), E_OK);
1294 
1295     /**
1296      * @tc.steps: step3. get receive and send watermark
1297      * @tc.expected: step3. E_OK and get the bigger value
1298      */
1299     WaterMark w = 0;
1300     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w), E_OK);
1301     EXPECT_EQ(w1, w);
1302     EXPECT_EQ(meta.GetSendDeleteSyncWaterMark("D1", w), E_OK);
1303     EXPECT_EQ(w1, w);
1304 }
1305 
1306 /**
1307  * @tc.name: ClearDeleteKeyWaterMark 001
1308  * @tc.desc: Test metaData clear watermark function.
1309  * @tc.type: FUNC
1310  * @tc.require: AR000FN6G9
1311  * @tc.author: zhangqiquan
1312  */
1313 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, ClearDeleteKeyWaterMark001, TestSize.Level1)
1314 {
1315     VirtualSingleVerSyncDBInterface storage;
1316     Metadata meta;
1317 
1318     /**
1319      * @tc.steps: step1. initialize meta with storage
1320      * @tc.expected: step1. E_OK
1321      */
1322     int errCode = meta.Initialize(&storage);
1323     ASSERT_EQ(errCode, E_OK);
1324 
1325     /**
1326      * @tc.steps: step2. save receive watermark
1327      * @tc.expected: step2. E_OK
1328      */
1329     WaterMark w1 = 1;
1330     EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark("D1", w1), E_OK);
1331 
1332     /**
1333      * @tc.steps: step3. erase peer watermark
1334      * @tc.expected: step3. E_OK
1335      */
1336     EXPECT_EQ(meta.EraseDeviceWaterMark("D1", true), E_OK);
1337 
1338     /**
1339      * @tc.steps: step4. get receive watermark
1340      * @tc.expected: step4. E_OK receive watermark is zero
1341      */
1342     WaterMark w2 = -1;
1343     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w2), E_OK);
1344     EXPECT_EQ(w2, 0u);
1345 
1346     /**
1347      * @tc.steps: step5. set peer watermark
1348      * @tc.expected: step5. E_OK
1349      */
1350     WaterMark w3 = 2;
1351     EXPECT_EQ(meta.SavePeerWaterMark("D1", w3, true), E_OK);
1352 
1353     /**
1354      * @tc.steps: step6. get receive watermark
1355      * @tc.expected: step6. E_OK receive watermark is peer watermark
1356      */
1357     WaterMark w4 = -1;
1358     EXPECT_EQ(meta.GetRecvDeleteSyncWaterMark("D1", w4), E_OK);
1359     EXPECT_EQ(w4, w3);
1360 }
1361 
1362 /**
1363  * @tc.name: VerifyCacheAndDb 001
1364  * @tc.desc: Test metaData watermark cache and db are consistent and correct.
1365  * @tc.type: FUNC
1366  * @tc.require: AR000FN6G9
1367  * @tc.author: zhangqiquan
1368  */
1369 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDataQuerySync001, TestSize.Level1)
1370 {
1371     Metadata meta;
1372     VirtualSingleVerSyncDBInterface storage;
1373 
1374     /**
1375      * @tc.steps: step1. initialize meta with storage
1376      * @tc.expected: step1. E_OK
1377      */
1378     int errCode = meta.Initialize(&storage);
1379     ASSERT_EQ(errCode, E_OK);
1380 
1381     const std::string deviceId = "D1";
1382     const std::string queryId = "Q1";
1383 
1384     /**
1385     * @tc.steps: step2. save deleteSync watermark
1386     * @tc.expected: step2. E_OK
1387     */
1388     WaterMark deleteWaterMark = 1;
1389     EXPECT_EQ(meta.SetRecvDeleteSyncWaterMark(deviceId, deleteWaterMark), E_OK);
1390     EXPECT_EQ(meta.SetSendDeleteSyncWaterMark(deviceId, deleteWaterMark), E_OK);
1391 
1392     /**
1393     * @tc.steps: step3. save querySync watermark
1394     * @tc.expected: step2. E_OK
1395     */
1396     WaterMark queryWaterMark = 2;
1397     EXPECT_EQ(meta.SetRecvQueryWaterMark(queryId, deviceId, queryWaterMark), E_OK);
1398     EXPECT_EQ(meta.SetSendQueryWaterMark(queryId, deviceId, queryWaterMark), E_OK);
1399 
1400     /**
1401     * @tc.steps: step4. initialize meta with storage
1402     * @tc.expected: step4. E_OK
1403     */
1404     Metadata anotherMeta;
1405     ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1406 
1407     /**
1408     * @tc.steps: step5. verify delete sync data
1409     * @tc.expected: step5. E_OK and waterMark equal to deleteWaterMark
1410     */
1411     WaterMark waterMark;
1412     EXPECT_EQ(anotherMeta.GetRecvDeleteSyncWaterMark(deviceId, waterMark), E_OK);
1413     EXPECT_EQ(waterMark, deleteWaterMark);
1414     EXPECT_EQ(anotherMeta.GetSendDeleteSyncWaterMark(deviceId, waterMark), E_OK);
1415     EXPECT_EQ(waterMark, deleteWaterMark);
1416 
1417     /**
1418     * @tc.steps: step6. verify query sync data
1419     * @tc.expected: step6. E_OK and waterMark equal to queryWaterMark
1420     */
1421     EXPECT_EQ(anotherMeta.GetRecvQueryWaterMark(queryId, deviceId, waterMark), E_OK);
1422     EXPECT_EQ(waterMark, queryWaterMark);
1423     EXPECT_EQ(anotherMeta.GetSendQueryWaterMark(queryId, deviceId, waterMark), E_OK);
1424     EXPECT_EQ(waterMark, queryWaterMark);
1425 }
1426 
1427 /**
1428  * @tc.name: VerifyLruMap 001
1429  * @tc.desc: Test metaData watermark cache lru ability.
1430  * @tc.type: FUNC
1431  * @tc.require: AR000FN6G9
1432  * @tc.author: zhangqiquan
1433  */
1434 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyLruMap001, TestSize.Level1)
1435 {
1436     LruMap<std::string, QueryWaterMark> lruMap;
1437     const int maxCacheItems = 200;
1438 
1439     /**
1440     * @tc.steps: step1. fill items to LruMap
1441     * @tc.expected: step1. E_OK
1442     */
1443     const int startCount = 0;
1444     for (int i = startCount; i < maxCacheItems; i++) {
1445         std::string key = std::to_string(i);
1446         QueryWaterMark value;
1447         value.recvWaterMark = static_cast<uint64_t>(i + 1);
1448         EXPECT_EQ(lruMap.Put(key, value), E_OK);
1449     }
1450 
1451     /**
1452     * @tc.steps: step2. get the first item
1453     * @tc.expected: step2. E_OK first item will move to last
1454     */
1455     std::string firstItemKey = std::to_string(startCount);
1456     QueryWaterMark firstItemValue;
1457     EXPECT_EQ(lruMap.Get(firstItemKey, firstItemValue), E_OK);
1458     EXPECT_EQ(firstItemValue.recvWaterMark, 1u);
1459 
1460     /**
1461     * @tc.steps: step3. insert new items to LruMap
1462     * @tc.expected: step3. the second items was removed
1463     */
1464     std::string key = std::to_string(maxCacheItems);
1465     QueryWaterMark value;
1466     value.recvWaterMark = maxCacheItems;
1467     EXPECT_EQ(lruMap.Put(key, value), E_OK);
1468 
1469     /**
1470     * @tc.steps: step4. get the second item
1471     * @tc.expected: step4. E_NOT_FOUND it was removed by algorithm
1472     */
1473     std::string secondItemKey = std::to_string(startCount + 1);
1474     QueryWaterMark secondItemValue;
1475     EXPECT_EQ(lruMap.Get(secondItemKey, secondItemValue), -E_NOT_FOUND);
1476     EXPECT_EQ(secondItemValue.recvWaterMark, 0u);
1477 }
1478 
1479 /**
1480  * @tc.name: VerifyMetaDataInit 001
1481  * @tc.desc: Test metaData init correctly
1482  * @tc.type: FUNC
1483  * @tc.require: AR000FN6G9
1484  * @tc.author: zhangqiquan
1485  */
1486 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDataInit001, TestSize.Level1)
1487 {
1488     Metadata meta;
1489     VirtualSingleVerSyncDBInterface storage;
1490 
1491     /**
1492     * @tc.steps: step1. initialize meta with storage
1493     * @tc.expected: step1. E_OK
1494     */
1495     ASSERT_EQ(meta.Initialize(&storage), E_OK);
1496 
1497     DeviceID deviceA = "DeviceA";
1498     DeviceID deviceB = "DeviceA";
1499     WaterMark setWaterMark = 1;
1500 
1501     /**
1502     * @tc.steps: step2. meta save and get waterMark
1503     * @tc.expected: step2. expect get the same waterMark
1504     */
1505     EXPECT_EQ(meta.SaveLocalWaterMark(deviceA, setWaterMark), E_OK);
1506     EXPECT_EQ(meta.SaveLocalWaterMark(deviceB, setWaterMark), E_OK);
1507     WaterMark getWaterMark = 0;
1508     meta.GetLocalWaterMark(deviceA, getWaterMark);
1509     EXPECT_EQ(getWaterMark, setWaterMark);
1510     meta.GetLocalWaterMark(deviceB, getWaterMark);
1511     EXPECT_EQ(getWaterMark, setWaterMark);
1512 
1513 
1514     /**
1515     * @tc.steps: step3. init again
1516     * @tc.expected: step3. E_OK
1517     */
1518     Metadata anotherMeta;
1519     ASSERT_EQ(anotherMeta.Initialize(&storage), E_OK);
1520 
1521     /**
1522     * @tc.steps: step4. get waterMark again
1523     * @tc.expected: step4. expect get the same waterMark
1524     */
1525     anotherMeta.GetLocalWaterMark(deviceA, getWaterMark);
1526     EXPECT_EQ(getWaterMark, setWaterMark);
1527     anotherMeta.GetLocalWaterMark(deviceB, getWaterMark);
1528     EXPECT_EQ(getWaterMark, setWaterMark);
1529 }
1530 
1531 namespace {
InitVerifyStorageEnvironment(Metadata & meta,VirtualSingleVerSyncDBInterface & storage,const std::string & deviceId,const int & startCount,const uint32_t & maxStoreItems)1532 void InitVerifyStorageEnvironment(Metadata &meta, VirtualSingleVerSyncDBInterface &storage,
1533     const std::string &deviceId, const int &startCount, const uint32_t &maxStoreItems)
1534 {
1535     /**
1536     * @tc.steps: step1. initialize meta with storage
1537     * @tc.expected: step1. E_OK
1538     */
1539     ASSERT_EQ(meta.Initialize(&storage), E_OK);
1540 
1541     /**
1542     * @tc.steps: step2. fill items to metadata
1543     * @tc.expected: step2. E_OK
1544     */
1545     for (uint32_t i = startCount; i < maxStoreItems; i++) {
1546         std::string queryId = std::to_string(i);
1547         WaterMark recvWaterMark = i + 1;
1548         EXPECT_EQ(meta.SetRecvQueryWaterMark(queryId, deviceId, recvWaterMark), E_OK);
1549     }
1550 }
1551 }
1552 
1553 /**
1554  * @tc.name: VerifyManagerQuerySyncStorage 001
1555  * @tc.desc: Test metaData remove least used querySync storage items.
1556  * @tc.type: FUNC
1557  * @tc.require: AR000FN6G9
1558  * @tc.author: zhangqiquan
1559  */
1560 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyManagerQuerySyncStorage001, TestSize.Level3)
1561 {
1562     Metadata meta;
1563     VirtualSingleVerSyncDBInterface storage;
1564     const uint32_t maxStoreItems = 100000;
1565     const int startCount = 0;
1566     const std::string deviceId = "Device";
1567 
1568     InitVerifyStorageEnvironment(meta, storage, deviceId, startCount, maxStoreItems);
1569 
1570     /**
1571     * @tc.steps: step3. insert new items to metadata
1572     * @tc.expected: step3. E_OK
1573     */
1574     std::string newQueryId = std::to_string(maxStoreItems);
1575     WaterMark newWaterMark = maxStoreItems + 1;
1576     EXPECT_EQ(meta.SetRecvQueryWaterMark(newQueryId, deviceId, newWaterMark), E_OK);
1577 
1578     /**
1579     * @tc.steps: step4. touch the first item
1580     * @tc.expected: step4. E_OK update first item used time
1581     */
1582     std::string firstItemKey = std::to_string(startCount);
1583     WaterMark firstWaterMark = 11u;
1584     EXPECT_EQ(meta.SetRecvQueryWaterMark(firstItemKey, deviceId, firstWaterMark), E_OK);
1585 
1586     /**
1587     * @tc.steps: step5. initialize new meta with storage
1588     * @tc.expected: step5. the second item will be removed
1589     */
1590     Metadata newMeta;
1591     ASSERT_EQ(newMeta.Initialize(&storage), E_OK);
1592 
1593     /**
1594     * @tc.steps: step6. touch the first item
1595     * @tc.expected: step6. E_OK it still exist
1596     */
1597     WaterMark exceptWaterMark;
1598     EXPECT_EQ(newMeta.GetRecvQueryWaterMark(firstItemKey, deviceId, exceptWaterMark), E_OK);
1599     EXPECT_EQ(exceptWaterMark, firstWaterMark);
1600 
1601     /**
1602     * @tc.steps: step7. get the second item
1603     * @tc.expected: step7. NOT_FOUND secondWaterMark is zero
1604     */
1605     WaterMark secondWaterMark;
1606     std::string secondQueryId = std::to_string(startCount + 1);
1607     EXPECT_EQ(newMeta.GetRecvQueryWaterMark(secondQueryId, deviceId, secondWaterMark), E_OK);
1608     EXPECT_EQ(secondWaterMark, 0u);
1609 }
1610 
1611 /**
1612  * @tc.name: VerifyMetaDbCreateTime 001
1613  * @tc.desc: Test metaData get and set cbCreateTime.
1614  * @tc.type: FUNC
1615  * @tc.require: AR000FN6G9
1616  * @tc.author: zhuwentao
1617  */
1618 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyMetaDbCreateTime001, TestSize.Level1)
1619 {
1620     Metadata meta;
1621     VirtualSingleVerSyncDBInterface storage;
1622     /**
1623      * @tc.steps: step1. initialize meta with storage
1624      * @tc.expected: step1. E_OK
1625      */
1626     int errCode = meta.Initialize(&storage);
1627     ASSERT_EQ(errCode, E_OK);
1628     /**
1629      * @tc.steps: step2. set local and peer watermark and dbCreateTime
1630      * @tc.expected: step4. E_OK
1631      */
1632     WaterMark value = 2;
1633     EXPECT_EQ(meta.SaveLocalWaterMark("D1", value), E_OK);
1634     EXPECT_EQ(meta.SavePeerWaterMark("D1", value, true), E_OK);
1635     EXPECT_EQ(meta.SetDbCreateTime("D1", 10u, true), E_OK);
1636     /**
1637      * @tc.steps: step3. check peer and local watermark and dbCreateTime
1638      * @tc.expected: step4. E_OK
1639      */
1640     WaterMark curValue = 0;
1641     meta.GetLocalWaterMark("D1", curValue);
1642     EXPECT_EQ(value, curValue);
1643     meta.GetPeerWaterMark("D1", curValue);
1644     EXPECT_EQ(value, curValue);
1645     uint64_t curDbCreatTime = 0;
1646     meta.GetDbCreateTime("D1", curDbCreatTime);
1647     EXPECT_EQ(curDbCreatTime, 10u);
1648     /**
1649      * @tc.steps: step3. change dbCreateTime and check
1650      * @tc.expected: step4. E_OK
1651      */
1652     EXPECT_EQ(meta.SetDbCreateTime("D1", 20u, true), E_OK);
1653     uint64_t clearDeviceDataMark = INT_MAX;
1654     meta.GetRemoveDataMark("D1", clearDeviceDataMark);
1655     EXPECT_EQ(clearDeviceDataMark, 1u);
1656     EXPECT_EQ(meta.ResetMetaDataAfterRemoveData("D1"), E_OK);
1657     meta.GetRemoveDataMark("D1", clearDeviceDataMark);
1658     EXPECT_EQ(clearDeviceDataMark, 0u);
1659     meta.GetDbCreateTime("D1", curDbCreatTime);
1660     EXPECT_EQ(curDbCreatTime, 20u);
1661 }
1662 
1663 /**
1664  * @tc.name: VerifyManagerQuerySyncStorage 002
1665  * @tc.desc: Test metaData remove least used querySync storage items when exit wrong data.
1666  * @tc.type: FUNC
1667  * @tc.require: AR000FN6G9
1668  * @tc.author: zhangqiquan
1669  */
1670 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, VerifyManagerQuerySyncStorage002, TestSize.Level3)
1671 {
1672     Metadata meta;
1673     VirtualSingleVerSyncDBInterface storage;
1674     const uint32_t maxStoreItems = 100000;
1675     const int startCount = 0;
1676     const std::string deviceId = "Device";
1677 
1678     InitVerifyStorageEnvironment(meta, storage, deviceId, startCount, maxStoreItems);
1679 
1680     /**
1681     * @tc.steps: step3. insert a wrong Value
1682     * @tc.expected: step3. E_OK
1683     */
1684     std::string newQueryId = std::to_string(maxStoreItems);
1685     Key dbKey;
1686     DBCommon::StringToVector(QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey()
1687         + DBCommon::TransferHashString(deviceId) + newQueryId, dbKey);
1688     Value wrongValue;
1689     EXPECT_EQ(storage.PutMetaData(dbKey, wrongValue, false), E_OK);
1690 
1691     /**
1692     * @tc.steps: step4. initialize new meta with storage
1693     * @tc.expected: step4. E_OK
1694     */
1695     Metadata newMeta;
1696     ASSERT_EQ(newMeta.Initialize(&storage), E_OK);
1697 
1698     /**
1699     * @tc.steps: step5. touch the first item
1700     * @tc.expected: step5. E_OK still exit
1701     */
1702     std::string firstItemKey = std::to_string(startCount);
1703     WaterMark exceptWaterMark;
1704     EXPECT_EQ(newMeta.GetRecvQueryWaterMark(firstItemKey, deviceId, exceptWaterMark), E_OK);
1705     EXPECT_EQ(exceptWaterMark, 1u);
1706 }
1707 
1708 /**
1709  * @tc.name: AllPredicateQuerySync001
1710  * @tc.desc: Test normal push sync for AllPredicate data.
1711  * @tc.type: FUNC
1712  * @tc.require: AR000FN6G9
1713  * @tc.author: zhuwentao
1714  */
1715 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync001, TestSize.Level1)
1716 {
1717     /**
1718      * @tc.steps: step1. InitSchemaDb
1719      */
1720     InitSchemaDb();
1721     DBStatus status = OK;
1722     std::vector<std::string> devices;
1723     devices.push_back(g_deviceB->GetDeviceId());
1724 
1725     /**
1726      * @tc.steps: step2. deviceA put {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1727                          {key21, SCHEMA_VALUE2} - {key29, SCHEMA_VALUE2}
1728      */
1729     Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1730     Value value2(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end());
1731     Key key = {'1'};
1732     Key key2 = {'2'};
1733     const int dataSize = 4000;
1734     for (int i = 0; i < dataSize; i++) {
1735         key.push_back(i);
1736         key2.push_back(i);
1737         status = g_schemaKvDelegatePtr->Put(key, value);
1738         ASSERT_TRUE(status == OK);
1739         status = g_schemaKvDelegatePtr->Put(key2, value2);
1740         ASSERT_TRUE(status == OK);
1741         key.pop_back();
1742         key2.pop_back();
1743     }
1744     ASSERT_TRUE(status == OK);
1745 
1746     /**
1747      * @tc.steps: step3. deviceA call query sync and wait
1748      * @tc.expected: step3. sync should return OK.
1749      */
1750     Query query = Query::Select().EqualTo("$.field_name1", 1);
1751     std::map<std::string, DBStatus> result;
1752     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1753     ASSERT_TRUE(status == OK);
1754 
1755     /**
1756      * @tc.expected: step4. onComplete should be called, DeviceB have {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1757      */
1758     ASSERT_TRUE(result.size() == devices.size());
1759     for (const auto &pair : result) {
1760         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1761         EXPECT_TRUE(pair.second == OK);
1762     }
1763     VirtualDataItem item;
1764     VirtualDataItem item2;
1765     for (int i = 0; i < dataSize; i++) {
1766         key.push_back(i);
1767         key2.push_back(i);
1768         g_deviceB->GetData(key, item);
1769         EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
1770         EXPECT_TRUE(item.value == value);
1771         key.pop_back();
1772         key2.pop_back();
1773     }
1774 }
1775 
1776 /**
1777  * @tc.name: AllPredicateQuerySync002
1778  * @tc.desc: Test wrong query param push sync for AllPredicate data.
1779  * @tc.type: FUNC
1780  * @tc.require: AR000FN6G9
1781  * @tc.author: zhuwentao
1782  */
1783 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync002, TestSize.Level1)
1784 {
1785     /**
1786      * @tc.steps: step1. InitSchemaDb
1787      */
1788     InitSchemaDb();
1789     DBStatus status = OK;
1790     std::vector<std::string> devices;
1791     devices.push_back(g_deviceB->GetDeviceId());
1792 
1793     /**
1794      * @tc.steps: step2. deviceA call query sync and wait
1795      * @tc.expected: step2. sync should return INVALID_QUERY_FIELD
1796      */
1797     Query query = Query::Select().GreaterThan("field_name11", 10);
1798     std::map<std::string, DBStatus> result;
1799     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1800     ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1801     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result, query);
1802     ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1803     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
1804     ASSERT_TRUE(status == INVALID_QUERY_FIELD);
1805 }
1806 
1807 /**
1808  * @tc.name: AllPredicateQuerySync003
1809  * @tc.desc: Test normal push sync for AllPredicate data with limit
1810  * @tc.type: FUNC
1811  * @tc.require: AR000FN6G9
1812  * @tc.author: zhuwentao
1813  */
1814 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync003, TestSize.Level1)
1815 {
1816     /**
1817      * @tc.steps: step1. InitSchemaDb
1818      */
1819     InitSchemaDb();
1820     DBStatus status = OK;
1821     std::vector<std::string> devices;
1822     devices.push_back(g_deviceB->GetDeviceId());
1823 
1824     /**
1825      * @tc.steps: step2. deviceA put {key1, SCHEMA_VALUE1} - {key9, SCHEMA_VALUE1}
1826      */
1827     Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1828     Value value2(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end());
1829     Key key = {'1'};
1830     Key key2 = {'2'};
1831     const int dataSize = 10;
1832     for (int i = 0; i < dataSize; i++) {
1833         key.push_back(i);
1834         key2.push_back(i);
1835         status = g_schemaKvDelegatePtr->Put(key, value);
1836         ASSERT_TRUE(status == OK);
1837         status = g_schemaKvDelegatePtr->Put(key2, value2);
1838         ASSERT_TRUE(status == OK);
1839         key.pop_back();
1840         key2.pop_back();
1841     }
1842     ASSERT_TRUE(status == OK);
1843 
1844     /**
1845      * @tc.steps: step3. deviceA call query sync with limit and wait
1846      * @tc.expected: step3. sync should return OK.
1847      */
1848     Query query = Query::Select().EqualTo("$.field_name1", 1).Limit(20, 0);
1849     std::map<std::string, DBStatus> result;
1850     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query);
1851     ASSERT_TRUE(status == OK);
1852 
1853     /**
1854      * @tc.expected: step4. onComplete should be called, DeviceB have {key1, SCHEMA_VALUE1} - {key9, SCHEMA_VALUE1}
1855      */
1856     ASSERT_TRUE(result.size() == devices.size());
1857     for (const auto &pair : result) {
1858         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1859         EXPECT_TRUE(pair.second == OK);
1860     }
1861     VirtualDataItem item;
1862     VirtualDataItem item2;
1863     for (int i = 0; i < dataSize; i++) {
1864         key.push_back(i);
1865         key2.push_back(i);
1866         g_deviceB->GetData(key, item);
1867         EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
1868         EXPECT_TRUE(item.value == value);
1869         key.pop_back();
1870         key2.pop_back();
1871     }
1872 }
1873 
1874 /**
1875  * @tc.name: AllPredicateQuerySync004
1876  * @tc.desc: Test normal pull sync for AllPredicate data.
1877  * @tc.type: FUNC
1878  * @tc.require: AR000FN6G9
1879  * @tc.author: zhuwentao
1880  */
1881 HWTEST_F(DistributedDBSingleVerP2PQuerySyncTest, AllPredicateQuerySync004, TestSize.Level1)
1882 {
1883     /**
1884      * @tc.steps: step1. InitSchemaDb
1885      */
1886     InitSchemaDb();
1887     DBStatus status = OK;
1888     std::vector<std::string> devices;
1889     devices.push_back(g_deviceB->GetDeviceId());
1890 
1891     /**
1892      * @tc.steps: step2. deviceB put {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1893      */
1894     Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
1895     Key key = {'1'};
1896     const int dataSize = 10;
1897     for (int i = 0; i < dataSize; i++) {
1898         key.push_back(i);
1899         g_deviceB->PutData(key, value, 10 + i, 0);
1900         ASSERT_TRUE(status == OK);
1901         key.pop_back();
1902     }
1903     ASSERT_TRUE(status == OK);
1904 
1905     /**
1906      * @tc.steps: step3. deviceA call query sync and wait
1907      * @tc.expected: step3. sync should return OK.
1908      */
1909     Query query = Query::Select().EqualTo("$.field_name1", 1);
1910     std::map<std::string, DBStatus> result;
1911     status = g_tool.SyncTest(g_schemaKvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, query);
1912     ASSERT_TRUE(status == OK);
1913 
1914     /**
1915      * @tc.expected: step4. onComplete should be called, DeviceA have {key11, SCHEMA_VALUE1} - {key19, SCHEMA_VALUE1}
1916      */
1917     ASSERT_TRUE(result.size() == devices.size());
1918     for (const auto &pair : result) {
1919         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1920         EXPECT_TRUE(pair.second == OK);
1921     }
1922     Value item;
1923     Value item2;
1924     for (int i = 0; i < dataSize; i++) {
1925         key.push_back(i);
1926         g_schemaKvDelegatePtr->Get(key, item);
1927         EXPECT_TRUE(item == value);
1928         key.pop_back();
1929     }
1930 }