1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "query_sync_water_mark_helper.h"
17 
18 #include <algorithm>
19 #include <version.h>
20 #include "platform_specific.h"
21 #include "parcel.h"
22 #include "db_errno.h"
23 #include "db_common.h"
24 #include "log_print.h"
25 
26 namespace DistributedDB {
27 namespace {
28     const uint32_t MAX_STORE_ITEMS = 100000;
29     // WaterMark Version
30     constexpr uint32_t QUERY_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_6_0;
31     constexpr uint32_t DELETE_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_3_0;
32 }
33 
QuerySyncWaterMarkHelper()34 QuerySyncWaterMarkHelper::QuerySyncWaterMarkHelper()
35     : storage_(nullptr)
36 {}
37 
~QuerySyncWaterMarkHelper()38 QuerySyncWaterMarkHelper::~QuerySyncWaterMarkHelper()
39 {
40     storage_ = nullptr;
41     deviceIdToHashQuerySyncIdMap_.clear();
42     deleteSyncCache_.clear();
43     deviceIdToHashDeleteSyncIdMap_.clear();
44 }
45 
GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue)46 int QuerySyncWaterMarkHelper::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue)
47 {
48     if (storage_ == nullptr) {
49         return -E_INVALID_DB;
50     }
51     return storage_->GetMetaData(key, outValue);
52 }
53 
SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)54 int QuerySyncWaterMarkHelper::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue)
55 {
56     if (storage_ == nullptr) {
57         return -E_INVALID_DB;
58     }
59     return storage_->PutMetaData(key, inValue, false);
60 }
61 
DeleteMetaDataFromDB(const std::vector<Key> & keys) const62 int QuerySyncWaterMarkHelper::DeleteMetaDataFromDB(const std::vector<Key> &keys) const
63 {
64     if (storage_ == nullptr) {
65         return -E_INVALID_DB;
66     }
67     return storage_->DeleteMetaData(keys);
68 }
69 
Initialize(ISyncInterface * storage)70 int QuerySyncWaterMarkHelper::Initialize(ISyncInterface *storage)
71 {
72     storage_ = storage;
73     return E_OK;
74 }
75 
LoadDeleteSyncDataToCache(const Key & deleteWaterMarkKey)76 int QuerySyncWaterMarkHelper::LoadDeleteSyncDataToCache(const Key &deleteWaterMarkKey)
77 {
78     std::vector<uint8_t> value;
79     int errCode = GetMetadataFromDb(deleteWaterMarkKey, value);
80     if (errCode != E_OK) {
81         return errCode;
82     }
83     DeleteWaterMark deleteWaterMark;
84     std::string dbKey(deleteWaterMarkKey.begin(), deleteWaterMarkKey.end());
85     errCode = DeSerializeDeleteWaterMark(value, deleteWaterMark);
86     if (errCode != E_OK) {
87         return errCode;
88     }
89     std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
90     deleteSyncCache_[dbKey] = deleteWaterMark;
91     return errCode;
92 }
93 
GetQueryWaterMarkInCacheAndDb(const std::string & cacheKey,QueryWaterMark & queryWaterMark)94 int QuerySyncWaterMarkHelper::GetQueryWaterMarkInCacheAndDb(const std::string &cacheKey,
95     QueryWaterMark &queryWaterMark)
96 {
97     // first get from cache_
98     int errCode = querySyncCache_.Get(cacheKey, queryWaterMark);
99     bool addToCache = false;
100     if (errCode == -E_NOT_FOUND) {
101         // second get from db
102         errCode = GetQueryWaterMarkFromDB(cacheKey, queryWaterMark);
103         addToCache = true;
104     }
105     if (errCode == -E_NOT_FOUND) {
106         // third generate one and save to db
107         errCode = PutQueryWaterMarkToDB(cacheKey, queryWaterMark);
108     }
109     // something error return
110     if (errCode != E_OK) {
111         LOGE("[Meta]GetQueryWaterMark Fail code = %d", errCode);
112         return errCode;
113     }
114     // remember add to cache_
115     if (addToCache) {
116         querySyncCache_.Put(cacheKey, queryWaterMark);
117     }
118     return errCode;
119 }
120 
GetQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,QueryWaterMark & queryWaterMark)121 int QuerySyncWaterMarkHelper::GetQueryWaterMark(const std::string &queryIdentify, const std::string &deviceId,
122     QueryWaterMark &queryWaterMark)
123 {
124     std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, queryIdentify);
125     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
126     return GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
127 }
128 
SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)129 int QuerySyncWaterMarkHelper::SetRecvQueryWaterMark(const std::string &queryIdentify,
130     const std::string &deviceId, const WaterMark &waterMark)
131 {
132     std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, queryIdentify);
133     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
134     return SetRecvQueryWaterMarkWithoutLock(cacheKey, waterMark);
135 }
136 
SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const Timestamp & timestamp)137 int QuerySyncWaterMarkHelper::SetLastQueryTime(const std::string &queryIdentify,
138     const std::string &deviceId, const Timestamp &timestamp)
139 {
140     std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, queryIdentify);
141     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
142     QueryWaterMark queryWaterMark;
143     int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
144     if (errCode != E_OK) {
145         return errCode;
146     }
147     queryWaterMark.lastQueryTime = timestamp;
148     return UpdateCacheAndSave(cacheKey, queryWaterMark);
149 }
150 
SetRecvQueryWaterMarkWithoutLock(const std::string & cacheKey,const WaterMark & waterMark)151 int QuerySyncWaterMarkHelper::SetRecvQueryWaterMarkWithoutLock(const std::string &cacheKey,
152     const WaterMark &waterMark)
153 {
154     QueryWaterMark queryWaterMark;
155     int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
156     if (errCode != E_OK) {
157         return errCode;
158     }
159     queryWaterMark.recvWaterMark = waterMark;
160     return UpdateCacheAndSave(cacheKey, queryWaterMark);
161 }
162 
SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)163 int QuerySyncWaterMarkHelper::SetSendQueryWaterMark(const std::string &queryIdentify,
164     const std::string &deviceId, const WaterMark &waterMark)
165 {
166     std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, queryIdentify);
167     QueryWaterMark queryWaterMark;
168     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
169     int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
170     if (errCode != E_OK) {
171         return errCode;
172     }
173     queryWaterMark.sendWaterMark = waterMark;
174     return UpdateCacheAndSave(cacheKey, queryWaterMark);
175 }
176 
UpdateCacheAndSave(const std::string & cacheKey,QueryWaterMark & queryWaterMark)177 int QuerySyncWaterMarkHelper::UpdateCacheAndSave(const std::string &cacheKey,
178     QueryWaterMark &queryWaterMark)
179 {
180     // update lastUsedTime
181     int errCode = OS::GetCurrentSysTimeInMicrosecond(queryWaterMark.lastUsedTime);
182     if (errCode != E_OK) {
183         return errCode;
184     }
185     // save db first
186     errCode = SaveQueryWaterMarkToDB(cacheKey, queryWaterMark);
187     if (errCode != E_OK) {
188         return errCode;
189     }
190     querySyncCache_.Put(cacheKey, queryWaterMark);
191     return errCode;
192 }
193 
PutQueryWaterMarkToDB(const DeviceID & dbKeyString,QueryWaterMark & queryWaterMark)194 int QuerySyncWaterMarkHelper::PutQueryWaterMarkToDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark)
195 {
196     int errCode = OS::GetCurrentSysTimeInMicrosecond(queryWaterMark.lastUsedTime);
197     if (errCode != E_OK) {
198         return errCode;
199     }
200     queryWaterMark.version = QUERY_WATERMARK_VERSION_CURRENT;
201     return SaveQueryWaterMarkToDB(dbKeyString, queryWaterMark);
202 }
203 
SaveQueryWaterMarkToDB(const DeviceID & dbKeyString,const QueryWaterMark & queryWaterMark)204 int QuerySyncWaterMarkHelper::SaveQueryWaterMarkToDB(const DeviceID &dbKeyString, const QueryWaterMark &queryWaterMark)
205 {
206     // serialize value
207     Value dbValue;
208     int errCode = SerializeQueryWaterMark(queryWaterMark, dbValue);
209     if (errCode != E_OK) {
210         return errCode;
211     }
212     // serialize key
213     Key dbKey;
214     DBCommon::StringToVector(dbKeyString, dbKey);
215     // save
216     errCode = SetMetadataToDb(dbKey, dbValue);
217     if (errCode != E_OK) {
218         LOGE("QuerySyncWaterMarkHelper::SaveQueryWaterMarkToDB failed errCode:%d", errCode);
219     }
220     return errCode;
221 }
222 
GetQueryWaterMarkFromDB(const DeviceID & dbKeyString,QueryWaterMark & queryWaterMark)223 int QuerySyncWaterMarkHelper::GetQueryWaterMarkFromDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark)
224 {
225     // serialize key
226     Key dbKey;
227     DBCommon::StringToVector(dbKeyString, dbKey);
228     // search in db
229     Value dbValue;
230     int errCode = GetMetadataFromDb(dbKey, dbValue);
231     if (errCode != E_OK) {
232         return errCode;
233     }
234     return DeSerializeQueryWaterMark(dbValue, queryWaterMark);
235 }
236 
SerializeQueryWaterMark(const QueryWaterMark & queryWaterMark,Value & outValue)237 int QuerySyncWaterMarkHelper::SerializeQueryWaterMark(const QueryWaterMark &queryWaterMark, Value &outValue)
238 {
239     uint64_t length = CalculateQueryWaterMarkSize(queryWaterMark);
240     outValue.resize(length);
241     Parcel parcel(outValue.data(), outValue.size());
242     parcel.WriteUInt32(queryWaterMark.version);
243     parcel.EightByteAlign();
244     parcel.WriteUInt64(queryWaterMark.sendWaterMark);
245     parcel.WriteUInt64(queryWaterMark.recvWaterMark);
246     parcel.WriteUInt64(queryWaterMark.lastUsedTime);
247     parcel.WriteString(queryWaterMark.sql);
248     parcel.WriteUInt64(queryWaterMark.lastQueryTime);
249     if (parcel.IsError()) {
250         LOGE("[Meta] Parcel error when serialize queryWaterMark");
251         return -E_PARSE_FAIL;
252     }
253     return E_OK;
254 }
255 
DeSerializeQueryWaterMark(const Value & dbQueryWaterMark,QueryWaterMark & queryWaterMark)256 int QuerySyncWaterMarkHelper::DeSerializeQueryWaterMark(const Value &dbQueryWaterMark, QueryWaterMark &queryWaterMark)
257 {
258     Parcel parcel(const_cast<uint8_t *>(dbQueryWaterMark.data()), dbQueryWaterMark.size());
259     parcel.ReadUInt32(queryWaterMark.version);
260     parcel.EightByteAlign();
261     parcel.ReadUInt64(queryWaterMark.sendWaterMark);
262     parcel.ReadUInt64(queryWaterMark.recvWaterMark);
263     parcel.ReadUInt64(queryWaterMark.lastUsedTime);
264     parcel.ReadString(queryWaterMark.sql);
265     if (queryWaterMark.version >= SOFTWARE_VERSION_RELEASE_6_0) {
266         parcel.ReadUInt64(queryWaterMark.lastQueryTime);
267     }
268     if (parcel.IsError()) {
269         LOGE("[Meta] Parcel error when deserialize queryWaterMark");
270         return -E_PARSE_FAIL;
271     }
272     return E_OK;
273 }
274 
CalculateQueryWaterMarkSize(const QueryWaterMark & queryWaterMark)275 uint64_t QuerySyncWaterMarkHelper::CalculateQueryWaterMarkSize(const QueryWaterMark &queryWaterMark)
276 {
277     uint64_t length = Parcel::GetUInt32Len(); // version
278     length = Parcel::GetEightByteAlign(length);
279     length += Parcel::GetUInt64Len(); // sendWaterMark
280     length += Parcel::GetUInt64Len(); // recvWaterMark
281     length += Parcel::GetUInt64Len(); // lastUsedTime
282     length += Parcel::GetStringLen(queryWaterMark.sql);
283     length += Parcel::GetUInt64Len(); // lastQueryTime
284     return length;
285 }
286 
GetHashQuerySyncDeviceId(const DeviceID & deviceId,const DeviceID & queryId)287 DeviceID QuerySyncWaterMarkHelper::GetHashQuerySyncDeviceId(const DeviceID &deviceId, const DeviceID &queryId)
288 {
289     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
290     DeviceID hashQuerySyncId;
291     if (deviceIdToHashQuerySyncIdMap_[deviceId].count(queryId) == 0) {
292         // do not modify this
293         hashQuerySyncId = DBConstant::QUERY_SYNC_PREFIX_KEY + DBCommon::TransferHashString(deviceId) + queryId;
294         deviceIdToHashQuerySyncIdMap_[deviceId][queryId] = hashQuerySyncId;
295     } else {
296         hashQuerySyncId = deviceIdToHashQuerySyncIdMap_[deviceId][queryId];
297     }
298     return hashQuerySyncId;
299 }
300 
GetDeleteSyncWaterMark(const std::string & deviceId,DeleteWaterMark & deleteWaterMark)301 int QuerySyncWaterMarkHelper::GetDeleteSyncWaterMark(const std::string &deviceId, DeleteWaterMark &deleteWaterMark)
302 {
303     std::string hashId = GetHashDeleteSyncDeviceId(deviceId);
304     // lock prevent different thread visit deleteSyncCache_
305     std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
306     return GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
307 }
308 
SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)309 int QuerySyncWaterMarkHelper::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
310 {
311     std::string hashId = GetHashDeleteSyncDeviceId(deviceId);
312     DeleteWaterMark deleteWaterMark;
313     // lock prevent different thread visit deleteSyncCache_
314     std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
315     int errCode = GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
316     if (errCode != E_OK) {
317         return errCode;
318     }
319     deleteWaterMark.sendWaterMark = waterMark;
320     return UpdateDeleteSyncCacheAndSave(hashId, deleteWaterMark);
321 }
322 
SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark,bool isNeedHash)323 int QuerySyncWaterMarkHelper::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark,
324     bool isNeedHash)
325 {
326     std::string hashId = GetHashDeleteSyncDeviceId(deviceId, isNeedHash);
327     DeleteWaterMark deleteWaterMark;
328     // lock prevent different thread visit deleteSyncCache_
329     std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
330     int errCode = GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
331     if (errCode != E_OK) {
332         return errCode;
333     }
334     deleteWaterMark.recvWaterMark = waterMark;
335     return UpdateDeleteSyncCacheAndSave(hashId, deleteWaterMark);
336 }
337 
UpdateDeleteSyncCacheAndSave(const std::string & dbKey,const DeleteWaterMark & deleteWaterMark)338 int QuerySyncWaterMarkHelper::UpdateDeleteSyncCacheAndSave(const std::string &dbKey,
339     const DeleteWaterMark &deleteWaterMark)
340 {
341     // save db first
342     int errCode = SaveDeleteWaterMarkToDB(dbKey, deleteWaterMark);
343     if (errCode != E_OK) {
344         return errCode;
345     }
346     // modify cache
347     deleteSyncCache_[dbKey] = deleteWaterMark;
348     return errCode;
349 }
350 
GetDeleteWaterMarkFromCache(const DeviceID & hashDeviceId,DeleteWaterMark & deleteWaterMark)351 int QuerySyncWaterMarkHelper::GetDeleteWaterMarkFromCache(const DeviceID &hashDeviceId,
352     DeleteWaterMark &deleteWaterMark)
353 {
354     // if not found
355     if (deleteSyncCache_.find(hashDeviceId) == deleteSyncCache_.end()) {
356         DeleteWaterMark waterMark;
357         waterMark.version = DELETE_WATERMARK_VERSION_CURRENT;
358         int errCode = GetDeleteWaterMarkFromDB(hashDeviceId, waterMark);
359         if (errCode == -E_NOT_FOUND) {
360             deleteWaterMark.sendWaterMark = 0;
361             deleteWaterMark.recvWaterMark = 0;
362             errCode = E_OK;
363         }
364         if (errCode != E_OK) {
365             LOGE("[Meta]GetDeleteWaterMark Fail code = %d", errCode);
366             return errCode;
367         }
368         deleteSyncCache_.insert(std::pair<DeviceID, DeleteWaterMark>(hashDeviceId, waterMark));
369     }
370     deleteWaterMark = deleteSyncCache_[hashDeviceId];
371     return E_OK;
372 }
373 
GetDeleteWaterMarkFromDB(const DeviceID & hashDeviceId,DeleteWaterMark & deleteWaterMark)374 int QuerySyncWaterMarkHelper::GetDeleteWaterMarkFromDB(const DeviceID &hashDeviceId,
375     DeleteWaterMark &deleteWaterMark)
376 {
377     Key dbKey;
378     DBCommon::StringToVector(hashDeviceId, dbKey);
379     // search in db
380     Value dbValue;
381     int errCode = GetMetadataFromDb(dbKey, dbValue);
382     if (errCode != E_OK) {
383         return errCode;
384     }
385     // serialize value
386     return DeSerializeDeleteWaterMark(dbValue, deleteWaterMark);
387 }
388 
SaveDeleteWaterMarkToDB(const DeviceID & hashDeviceId,const DeleteWaterMark & deleteWaterMark)389 int QuerySyncWaterMarkHelper::SaveDeleteWaterMarkToDB(const DeviceID &hashDeviceId,
390     const DeleteWaterMark &deleteWaterMark)
391 {
392     // serialize value
393     Value dbValue;
394     int errCode = SerializeDeleteWaterMark(deleteWaterMark, dbValue);
395     if (errCode != E_OK) {
396         return errCode;
397     }
398     Key dbKey;
399     DBCommon::StringToVector(hashDeviceId, dbKey);
400     // save
401     errCode = SetMetadataToDb(dbKey, dbValue);
402     if (errCode != E_OK) {
403         LOGE("QuerySyncWaterMarkHelper::SaveDeleteWaterMarkToDB failed errCode:%d", errCode);
404     }
405     return errCode;
406 }
407 
GetHashDeleteSyncDeviceId(const DeviceID & deviceId,bool isNeedHash)408 DeviceID QuerySyncWaterMarkHelper::GetHashDeleteSyncDeviceId(const DeviceID &deviceId, bool isNeedHash)
409 {
410     DeviceID hashDeleteSyncId;
411     std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
412     if (deviceIdToHashDeleteSyncIdMap_.count(deviceId) == 0) {
413         hashDeleteSyncId = DBConstant::DELETE_SYNC_PREFIX_KEY +
414             (isNeedHash ? DBCommon::TransferHashString(deviceId) : deviceId);
415         deviceIdToHashDeleteSyncIdMap_.insert(std::pair<DeviceID, DeviceID>(deviceId, hashDeleteSyncId));
416     } else {
417         hashDeleteSyncId = deviceIdToHashDeleteSyncIdMap_[deviceId];
418     }
419     return hashDeleteSyncId;
420 }
421 
SerializeDeleteWaterMark(const DeleteWaterMark & deleteWaterMark,std::vector<uint8_t> & outValue)422 int QuerySyncWaterMarkHelper::SerializeDeleteWaterMark(const DeleteWaterMark &deleteWaterMark,
423     std::vector<uint8_t> &outValue)
424 {
425     uint64_t length = CalculateDeleteWaterMarkSize();
426     outValue.resize(length);
427     Parcel parcel(outValue.data(), outValue.size());
428     parcel.WriteUInt32(deleteWaterMark.version);
429     parcel.EightByteAlign();
430     parcel.WriteUInt64(deleteWaterMark.sendWaterMark);
431     parcel.WriteUInt64(deleteWaterMark.recvWaterMark);
432     if (parcel.IsError()) {
433         LOGE("[Meta] Parcel error when serialize deleteWaterMark.");
434         return -E_PARSE_FAIL;
435     }
436     return E_OK;
437 }
438 
DeSerializeDeleteWaterMark(const std::vector<uint8_t> & inValue,DeleteWaterMark & deleteWaterMark)439 int QuerySyncWaterMarkHelper::DeSerializeDeleteWaterMark(const std::vector<uint8_t> &inValue,
440     DeleteWaterMark &deleteWaterMark)
441 {
442     Parcel parcel(const_cast<uint8_t *>(inValue.data()), inValue.size());
443     parcel.ReadUInt32(deleteWaterMark.version);
444     parcel.EightByteAlign();
445     parcel.ReadUInt64(deleteWaterMark.sendWaterMark);
446     parcel.ReadUInt64(deleteWaterMark.recvWaterMark);
447     if (parcel.IsError()) {
448         LOGE("[Meta] Parcel error when deserialize deleteWaterMark.");
449         return -E_PARSE_FAIL;
450     }
451     return E_OK;
452 }
453 
CalculateDeleteWaterMarkSize()454 uint64_t QuerySyncWaterMarkHelper::CalculateDeleteWaterMarkSize()
455 {
456     uint64_t length = Parcel::GetUInt32Len(); // version
457     length = Parcel::GetEightByteAlign(length);
458     length += Parcel::GetUInt64Len(); // sendWaterMark
459     length += Parcel::GetUInt64Len(); // recvWaterMark
460     return length;
461 }
462 
GetQuerySyncPrefixKey()463 std::string QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey()
464 {
465     return DBConstant::QUERY_SYNC_PREFIX_KEY;
466 }
467 
GetDeleteSyncPrefixKey()468 std::string QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey()
469 {
470     return DBConstant::DELETE_SYNC_PREFIX_KEY;
471 }
472 
RemoveLeastUsedQuerySyncItems(const std::vector<Key> & querySyncIds)473 int QuerySyncWaterMarkHelper::RemoveLeastUsedQuerySyncItems(const std::vector<Key> &querySyncIds)
474 {
475     if (querySyncIds.size() < MAX_STORE_ITEMS) {
476         return E_OK;
477     }
478     std::vector<std::pair<std::string, Timestamp>> allItems;
479     std::map<std::string, std::vector<uint8_t>> idMap;
480     std::vector<std::vector<uint8_t>> waitToRemove;
481     for (const auto &id : querySyncIds) {
482         Value value;
483         int errCode = GetMetadataFromDb(id, value);
484         if (errCode != E_OK) {
485             waitToRemove.push_back(id);
486             continue; // may be this failure cause by wrong data
487         }
488         QueryWaterMark queryWaterMark;
489         std::string queryKey(id.begin(), id.end());
490         errCode = DeSerializeQueryWaterMark(value, queryWaterMark);
491         if (errCode != E_OK) {
492             waitToRemove.push_back(id);
493             continue; // may be this failure cause by wrong data
494         }
495         idMap.insert({queryKey, id});
496         allItems.emplace_back(queryKey, queryWaterMark.lastUsedTime);
497     }
498     // we only remove broken data below
499     // 1. common data size less then 10w
500     // 2. allItems.size() - MAX_STORE_ITEMS - waitToRemove.size() < 0
501     // so we only let allItems.size() < MAX_STORE_ITEMS + waitToRemove.size()
502     if (allItems.size() < MAX_STORE_ITEMS + waitToRemove.size()) {
503         // remove in db
504         return DeleteMetaDataFromDB(waitToRemove);
505     }
506     uint32_t removeCount = allItems.size() - MAX_STORE_ITEMS - waitToRemove.size();
507     // quick select the k_th least used
508     std::nth_element(allItems.begin(), allItems.begin() + removeCount, allItems.end(),
509         [](const std::pair<std::string, Timestamp> &w1, const std::pair<std::string, Timestamp> &w2) {
510             return w1.second < w2.second;
511         });
512     for (uint32_t i = 0; i < removeCount; ++i) {
513         waitToRemove.push_back(idMap[allItems[i].first]);
514     }
515     // remove in db
516     return DeleteMetaDataFromDB(waitToRemove);
517 }
518 
ResetRecvQueryWaterMark(const DeviceID & deviceId,const std::string & tableName,bool isNeedHash)519 int QuerySyncWaterMarkHelper::ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName,
520     bool isNeedHash)
521 {
522     // lock prevent other thread modify queryWaterMark at this moment
523     std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
524     std::string prefixKeyStr = DBConstant::QUERY_SYNC_PREFIX_KEY +
525         (isNeedHash ? DBCommon::TransferHashString(deviceId) : deviceId);
526     if (!tableName.empty()) {
527         std::string hashTableName = DBCommon::TransferHashString(tableName);
528         std::string hexTableName = DBCommon::TransferStringToHex(hashTableName);
529         prefixKeyStr += hexTableName;
530     }
531 
532     // remove in db
533     Key prefixKey;
534     DBCommon::StringToVector(prefixKeyStr, prefixKey);
535     int errCode = storage_->DeleteMetaDataByPrefixKey(prefixKey);
536     if (errCode != E_OK) {
537         LOGE("[META]ResetRecvQueryWaterMark fail errCode:%d", errCode);
538         return errCode;
539     }
540     // clean cache
541     querySyncCache_.RemoveWithPrefixKey(prefixKeyStr);
542     return E_OK;
543 }
544 }  // namespace DistributedDB