1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "query_sync_water_mark_helper.h"
17
18 #include <algorithm>
19 #include <version.h>
20 #include "platform_specific.h"
21 #include "parcel.h"
22 #include "db_errno.h"
23 #include "db_common.h"
24 #include "log_print.h"
25
26 namespace DistributedDB {
27 namespace {
28 const uint32_t MAX_STORE_ITEMS = 100000;
29 // WaterMark Version
30 constexpr uint32_t QUERY_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_6_0;
31 constexpr uint32_t DELETE_WATERMARK_VERSION_CURRENT = SOFTWARE_VERSION_RELEASE_3_0;
32 }
33
QuerySyncWaterMarkHelper()34 QuerySyncWaterMarkHelper::QuerySyncWaterMarkHelper()
35 : storage_(nullptr)
36 {}
37
~QuerySyncWaterMarkHelper()38 QuerySyncWaterMarkHelper::~QuerySyncWaterMarkHelper()
39 {
40 storage_ = nullptr;
41 deviceIdToHashQuerySyncIdMap_.clear();
42 deleteSyncCache_.clear();
43 deviceIdToHashDeleteSyncIdMap_.clear();
44 }
45
GetMetadataFromDb(const std::vector<uint8_t> & key,std::vector<uint8_t> & outValue)46 int QuerySyncWaterMarkHelper::GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue)
47 {
48 if (storage_ == nullptr) {
49 return -E_INVALID_DB;
50 }
51 return storage_->GetMetaData(key, outValue);
52 }
53
SetMetadataToDb(const std::vector<uint8_t> & key,const std::vector<uint8_t> & inValue)54 int QuerySyncWaterMarkHelper::SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue)
55 {
56 if (storage_ == nullptr) {
57 return -E_INVALID_DB;
58 }
59 return storage_->PutMetaData(key, inValue, false);
60 }
61
DeleteMetaDataFromDB(const std::vector<Key> & keys) const62 int QuerySyncWaterMarkHelper::DeleteMetaDataFromDB(const std::vector<Key> &keys) const
63 {
64 if (storage_ == nullptr) {
65 return -E_INVALID_DB;
66 }
67 return storage_->DeleteMetaData(keys);
68 }
69
Initialize(ISyncInterface * storage)70 int QuerySyncWaterMarkHelper::Initialize(ISyncInterface *storage)
71 {
72 storage_ = storage;
73 return E_OK;
74 }
75
LoadDeleteSyncDataToCache(const Key & deleteWaterMarkKey)76 int QuerySyncWaterMarkHelper::LoadDeleteSyncDataToCache(const Key &deleteWaterMarkKey)
77 {
78 std::vector<uint8_t> value;
79 int errCode = GetMetadataFromDb(deleteWaterMarkKey, value);
80 if (errCode != E_OK) {
81 return errCode;
82 }
83 DeleteWaterMark deleteWaterMark;
84 std::string dbKey(deleteWaterMarkKey.begin(), deleteWaterMarkKey.end());
85 errCode = DeSerializeDeleteWaterMark(value, deleteWaterMark);
86 if (errCode != E_OK) {
87 return errCode;
88 }
89 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
90 deleteSyncCache_[dbKey] = deleteWaterMark;
91 return errCode;
92 }
93
GetQueryWaterMarkInCacheAndDb(const std::string & cacheKey,QueryWaterMark & queryWaterMark)94 int QuerySyncWaterMarkHelper::GetQueryWaterMarkInCacheAndDb(const std::string &cacheKey,
95 QueryWaterMark &queryWaterMark)
96 {
97 // first get from cache_
98 int errCode = querySyncCache_.Get(cacheKey, queryWaterMark);
99 bool addToCache = false;
100 if (errCode == -E_NOT_FOUND) {
101 // second get from db
102 errCode = GetQueryWaterMarkFromDB(cacheKey, queryWaterMark);
103 addToCache = true;
104 }
105 if (errCode == -E_NOT_FOUND) {
106 // third generate one and save to db
107 errCode = PutQueryWaterMarkToDB(cacheKey, queryWaterMark);
108 }
109 // something error return
110 if (errCode != E_OK) {
111 LOGE("[Meta]GetQueryWaterMark Fail code = %d", errCode);
112 return errCode;
113 }
114 // remember add to cache_
115 if (addToCache) {
116 querySyncCache_.Put(cacheKey, queryWaterMark);
117 }
118 return errCode;
119 }
120
GetQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,QueryWaterMark & queryWaterMark)121 int QuerySyncWaterMarkHelper::GetQueryWaterMark(const std::string &queryIdentify, const std::string &deviceId,
122 QueryWaterMark &queryWaterMark)
123 {
124 std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, queryIdentify);
125 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
126 return GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
127 }
128
SetRecvQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)129 int QuerySyncWaterMarkHelper::SetRecvQueryWaterMark(const std::string &queryIdentify,
130 const std::string &deviceId, const WaterMark &waterMark)
131 {
132 std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, queryIdentify);
133 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
134 return SetRecvQueryWaterMarkWithoutLock(cacheKey, waterMark);
135 }
136
SetLastQueryTime(const std::string & queryIdentify,const std::string & deviceId,const Timestamp & timestamp)137 int QuerySyncWaterMarkHelper::SetLastQueryTime(const std::string &queryIdentify,
138 const std::string &deviceId, const Timestamp ×tamp)
139 {
140 std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, queryIdentify);
141 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
142 QueryWaterMark queryWaterMark;
143 int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
144 if (errCode != E_OK) {
145 return errCode;
146 }
147 queryWaterMark.lastQueryTime = timestamp;
148 return UpdateCacheAndSave(cacheKey, queryWaterMark);
149 }
150
SetRecvQueryWaterMarkWithoutLock(const std::string & cacheKey,const WaterMark & waterMark)151 int QuerySyncWaterMarkHelper::SetRecvQueryWaterMarkWithoutLock(const std::string &cacheKey,
152 const WaterMark &waterMark)
153 {
154 QueryWaterMark queryWaterMark;
155 int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
156 if (errCode != E_OK) {
157 return errCode;
158 }
159 queryWaterMark.recvWaterMark = waterMark;
160 return UpdateCacheAndSave(cacheKey, queryWaterMark);
161 }
162
SetSendQueryWaterMark(const std::string & queryIdentify,const std::string & deviceId,const WaterMark & waterMark)163 int QuerySyncWaterMarkHelper::SetSendQueryWaterMark(const std::string &queryIdentify,
164 const std::string &deviceId, const WaterMark &waterMark)
165 {
166 std::string cacheKey = GetHashQuerySyncDeviceId(deviceId, queryIdentify);
167 QueryWaterMark queryWaterMark;
168 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
169 int errCode = GetQueryWaterMarkInCacheAndDb(cacheKey, queryWaterMark);
170 if (errCode != E_OK) {
171 return errCode;
172 }
173 queryWaterMark.sendWaterMark = waterMark;
174 return UpdateCacheAndSave(cacheKey, queryWaterMark);
175 }
176
UpdateCacheAndSave(const std::string & cacheKey,QueryWaterMark & queryWaterMark)177 int QuerySyncWaterMarkHelper::UpdateCacheAndSave(const std::string &cacheKey,
178 QueryWaterMark &queryWaterMark)
179 {
180 // update lastUsedTime
181 int errCode = OS::GetCurrentSysTimeInMicrosecond(queryWaterMark.lastUsedTime);
182 if (errCode != E_OK) {
183 return errCode;
184 }
185 // save db first
186 errCode = SaveQueryWaterMarkToDB(cacheKey, queryWaterMark);
187 if (errCode != E_OK) {
188 return errCode;
189 }
190 querySyncCache_.Put(cacheKey, queryWaterMark);
191 return errCode;
192 }
193
PutQueryWaterMarkToDB(const DeviceID & dbKeyString,QueryWaterMark & queryWaterMark)194 int QuerySyncWaterMarkHelper::PutQueryWaterMarkToDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark)
195 {
196 int errCode = OS::GetCurrentSysTimeInMicrosecond(queryWaterMark.lastUsedTime);
197 if (errCode != E_OK) {
198 return errCode;
199 }
200 queryWaterMark.version = QUERY_WATERMARK_VERSION_CURRENT;
201 return SaveQueryWaterMarkToDB(dbKeyString, queryWaterMark);
202 }
203
SaveQueryWaterMarkToDB(const DeviceID & dbKeyString,const QueryWaterMark & queryWaterMark)204 int QuerySyncWaterMarkHelper::SaveQueryWaterMarkToDB(const DeviceID &dbKeyString, const QueryWaterMark &queryWaterMark)
205 {
206 // serialize value
207 Value dbValue;
208 int errCode = SerializeQueryWaterMark(queryWaterMark, dbValue);
209 if (errCode != E_OK) {
210 return errCode;
211 }
212 // serialize key
213 Key dbKey;
214 DBCommon::StringToVector(dbKeyString, dbKey);
215 // save
216 errCode = SetMetadataToDb(dbKey, dbValue);
217 if (errCode != E_OK) {
218 LOGE("QuerySyncWaterMarkHelper::SaveQueryWaterMarkToDB failed errCode:%d", errCode);
219 }
220 return errCode;
221 }
222
GetQueryWaterMarkFromDB(const DeviceID & dbKeyString,QueryWaterMark & queryWaterMark)223 int QuerySyncWaterMarkHelper::GetQueryWaterMarkFromDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark)
224 {
225 // serialize key
226 Key dbKey;
227 DBCommon::StringToVector(dbKeyString, dbKey);
228 // search in db
229 Value dbValue;
230 int errCode = GetMetadataFromDb(dbKey, dbValue);
231 if (errCode != E_OK) {
232 return errCode;
233 }
234 return DeSerializeQueryWaterMark(dbValue, queryWaterMark);
235 }
236
SerializeQueryWaterMark(const QueryWaterMark & queryWaterMark,Value & outValue)237 int QuerySyncWaterMarkHelper::SerializeQueryWaterMark(const QueryWaterMark &queryWaterMark, Value &outValue)
238 {
239 uint64_t length = CalculateQueryWaterMarkSize(queryWaterMark);
240 outValue.resize(length);
241 Parcel parcel(outValue.data(), outValue.size());
242 parcel.WriteUInt32(queryWaterMark.version);
243 parcel.EightByteAlign();
244 parcel.WriteUInt64(queryWaterMark.sendWaterMark);
245 parcel.WriteUInt64(queryWaterMark.recvWaterMark);
246 parcel.WriteUInt64(queryWaterMark.lastUsedTime);
247 parcel.WriteString(queryWaterMark.sql);
248 parcel.WriteUInt64(queryWaterMark.lastQueryTime);
249 if (parcel.IsError()) {
250 LOGE("[Meta] Parcel error when serialize queryWaterMark");
251 return -E_PARSE_FAIL;
252 }
253 return E_OK;
254 }
255
DeSerializeQueryWaterMark(const Value & dbQueryWaterMark,QueryWaterMark & queryWaterMark)256 int QuerySyncWaterMarkHelper::DeSerializeQueryWaterMark(const Value &dbQueryWaterMark, QueryWaterMark &queryWaterMark)
257 {
258 Parcel parcel(const_cast<uint8_t *>(dbQueryWaterMark.data()), dbQueryWaterMark.size());
259 parcel.ReadUInt32(queryWaterMark.version);
260 parcel.EightByteAlign();
261 parcel.ReadUInt64(queryWaterMark.sendWaterMark);
262 parcel.ReadUInt64(queryWaterMark.recvWaterMark);
263 parcel.ReadUInt64(queryWaterMark.lastUsedTime);
264 parcel.ReadString(queryWaterMark.sql);
265 if (queryWaterMark.version >= SOFTWARE_VERSION_RELEASE_6_0) {
266 parcel.ReadUInt64(queryWaterMark.lastQueryTime);
267 }
268 if (parcel.IsError()) {
269 LOGE("[Meta] Parcel error when deserialize queryWaterMark");
270 return -E_PARSE_FAIL;
271 }
272 return E_OK;
273 }
274
CalculateQueryWaterMarkSize(const QueryWaterMark & queryWaterMark)275 uint64_t QuerySyncWaterMarkHelper::CalculateQueryWaterMarkSize(const QueryWaterMark &queryWaterMark)
276 {
277 uint64_t length = Parcel::GetUInt32Len(); // version
278 length = Parcel::GetEightByteAlign(length);
279 length += Parcel::GetUInt64Len(); // sendWaterMark
280 length += Parcel::GetUInt64Len(); // recvWaterMark
281 length += Parcel::GetUInt64Len(); // lastUsedTime
282 length += Parcel::GetStringLen(queryWaterMark.sql);
283 length += Parcel::GetUInt64Len(); // lastQueryTime
284 return length;
285 }
286
GetHashQuerySyncDeviceId(const DeviceID & deviceId,const DeviceID & queryId)287 DeviceID QuerySyncWaterMarkHelper::GetHashQuerySyncDeviceId(const DeviceID &deviceId, const DeviceID &queryId)
288 {
289 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
290 DeviceID hashQuerySyncId;
291 if (deviceIdToHashQuerySyncIdMap_[deviceId].count(queryId) == 0) {
292 // do not modify this
293 hashQuerySyncId = DBConstant::QUERY_SYNC_PREFIX_KEY + DBCommon::TransferHashString(deviceId) + queryId;
294 deviceIdToHashQuerySyncIdMap_[deviceId][queryId] = hashQuerySyncId;
295 } else {
296 hashQuerySyncId = deviceIdToHashQuerySyncIdMap_[deviceId][queryId];
297 }
298 return hashQuerySyncId;
299 }
300
GetDeleteSyncWaterMark(const std::string & deviceId,DeleteWaterMark & deleteWaterMark)301 int QuerySyncWaterMarkHelper::GetDeleteSyncWaterMark(const std::string &deviceId, DeleteWaterMark &deleteWaterMark)
302 {
303 std::string hashId = GetHashDeleteSyncDeviceId(deviceId);
304 // lock prevent different thread visit deleteSyncCache_
305 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
306 return GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
307 }
308
SetSendDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark)309 int QuerySyncWaterMarkHelper::SetSendDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark)
310 {
311 std::string hashId = GetHashDeleteSyncDeviceId(deviceId);
312 DeleteWaterMark deleteWaterMark;
313 // lock prevent different thread visit deleteSyncCache_
314 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
315 int errCode = GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
316 if (errCode != E_OK) {
317 return errCode;
318 }
319 deleteWaterMark.sendWaterMark = waterMark;
320 return UpdateDeleteSyncCacheAndSave(hashId, deleteWaterMark);
321 }
322
SetRecvDeleteSyncWaterMark(const DeviceID & deviceId,const WaterMark & waterMark,bool isNeedHash)323 int QuerySyncWaterMarkHelper::SetRecvDeleteSyncWaterMark(const DeviceID &deviceId, const WaterMark &waterMark,
324 bool isNeedHash)
325 {
326 std::string hashId = GetHashDeleteSyncDeviceId(deviceId, isNeedHash);
327 DeleteWaterMark deleteWaterMark;
328 // lock prevent different thread visit deleteSyncCache_
329 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
330 int errCode = GetDeleteWaterMarkFromCache(hashId, deleteWaterMark);
331 if (errCode != E_OK) {
332 return errCode;
333 }
334 deleteWaterMark.recvWaterMark = waterMark;
335 return UpdateDeleteSyncCacheAndSave(hashId, deleteWaterMark);
336 }
337
UpdateDeleteSyncCacheAndSave(const std::string & dbKey,const DeleteWaterMark & deleteWaterMark)338 int QuerySyncWaterMarkHelper::UpdateDeleteSyncCacheAndSave(const std::string &dbKey,
339 const DeleteWaterMark &deleteWaterMark)
340 {
341 // save db first
342 int errCode = SaveDeleteWaterMarkToDB(dbKey, deleteWaterMark);
343 if (errCode != E_OK) {
344 return errCode;
345 }
346 // modify cache
347 deleteSyncCache_[dbKey] = deleteWaterMark;
348 return errCode;
349 }
350
GetDeleteWaterMarkFromCache(const DeviceID & hashDeviceId,DeleteWaterMark & deleteWaterMark)351 int QuerySyncWaterMarkHelper::GetDeleteWaterMarkFromCache(const DeviceID &hashDeviceId,
352 DeleteWaterMark &deleteWaterMark)
353 {
354 // if not found
355 if (deleteSyncCache_.find(hashDeviceId) == deleteSyncCache_.end()) {
356 DeleteWaterMark waterMark;
357 waterMark.version = DELETE_WATERMARK_VERSION_CURRENT;
358 int errCode = GetDeleteWaterMarkFromDB(hashDeviceId, waterMark);
359 if (errCode == -E_NOT_FOUND) {
360 deleteWaterMark.sendWaterMark = 0;
361 deleteWaterMark.recvWaterMark = 0;
362 errCode = E_OK;
363 }
364 if (errCode != E_OK) {
365 LOGE("[Meta]GetDeleteWaterMark Fail code = %d", errCode);
366 return errCode;
367 }
368 deleteSyncCache_.insert(std::pair<DeviceID, DeleteWaterMark>(hashDeviceId, waterMark));
369 }
370 deleteWaterMark = deleteSyncCache_[hashDeviceId];
371 return E_OK;
372 }
373
GetDeleteWaterMarkFromDB(const DeviceID & hashDeviceId,DeleteWaterMark & deleteWaterMark)374 int QuerySyncWaterMarkHelper::GetDeleteWaterMarkFromDB(const DeviceID &hashDeviceId,
375 DeleteWaterMark &deleteWaterMark)
376 {
377 Key dbKey;
378 DBCommon::StringToVector(hashDeviceId, dbKey);
379 // search in db
380 Value dbValue;
381 int errCode = GetMetadataFromDb(dbKey, dbValue);
382 if (errCode != E_OK) {
383 return errCode;
384 }
385 // serialize value
386 return DeSerializeDeleteWaterMark(dbValue, deleteWaterMark);
387 }
388
SaveDeleteWaterMarkToDB(const DeviceID & hashDeviceId,const DeleteWaterMark & deleteWaterMark)389 int QuerySyncWaterMarkHelper::SaveDeleteWaterMarkToDB(const DeviceID &hashDeviceId,
390 const DeleteWaterMark &deleteWaterMark)
391 {
392 // serialize value
393 Value dbValue;
394 int errCode = SerializeDeleteWaterMark(deleteWaterMark, dbValue);
395 if (errCode != E_OK) {
396 return errCode;
397 }
398 Key dbKey;
399 DBCommon::StringToVector(hashDeviceId, dbKey);
400 // save
401 errCode = SetMetadataToDb(dbKey, dbValue);
402 if (errCode != E_OK) {
403 LOGE("QuerySyncWaterMarkHelper::SaveDeleteWaterMarkToDB failed errCode:%d", errCode);
404 }
405 return errCode;
406 }
407
GetHashDeleteSyncDeviceId(const DeviceID & deviceId,bool isNeedHash)408 DeviceID QuerySyncWaterMarkHelper::GetHashDeleteSyncDeviceId(const DeviceID &deviceId, bool isNeedHash)
409 {
410 DeviceID hashDeleteSyncId;
411 std::lock_guard<std::mutex> autoLock(deleteSyncLock_);
412 if (deviceIdToHashDeleteSyncIdMap_.count(deviceId) == 0) {
413 hashDeleteSyncId = DBConstant::DELETE_SYNC_PREFIX_KEY +
414 (isNeedHash ? DBCommon::TransferHashString(deviceId) : deviceId);
415 deviceIdToHashDeleteSyncIdMap_.insert(std::pair<DeviceID, DeviceID>(deviceId, hashDeleteSyncId));
416 } else {
417 hashDeleteSyncId = deviceIdToHashDeleteSyncIdMap_[deviceId];
418 }
419 return hashDeleteSyncId;
420 }
421
SerializeDeleteWaterMark(const DeleteWaterMark & deleteWaterMark,std::vector<uint8_t> & outValue)422 int QuerySyncWaterMarkHelper::SerializeDeleteWaterMark(const DeleteWaterMark &deleteWaterMark,
423 std::vector<uint8_t> &outValue)
424 {
425 uint64_t length = CalculateDeleteWaterMarkSize();
426 outValue.resize(length);
427 Parcel parcel(outValue.data(), outValue.size());
428 parcel.WriteUInt32(deleteWaterMark.version);
429 parcel.EightByteAlign();
430 parcel.WriteUInt64(deleteWaterMark.sendWaterMark);
431 parcel.WriteUInt64(deleteWaterMark.recvWaterMark);
432 if (parcel.IsError()) {
433 LOGE("[Meta] Parcel error when serialize deleteWaterMark.");
434 return -E_PARSE_FAIL;
435 }
436 return E_OK;
437 }
438
DeSerializeDeleteWaterMark(const std::vector<uint8_t> & inValue,DeleteWaterMark & deleteWaterMark)439 int QuerySyncWaterMarkHelper::DeSerializeDeleteWaterMark(const std::vector<uint8_t> &inValue,
440 DeleteWaterMark &deleteWaterMark)
441 {
442 Parcel parcel(const_cast<uint8_t *>(inValue.data()), inValue.size());
443 parcel.ReadUInt32(deleteWaterMark.version);
444 parcel.EightByteAlign();
445 parcel.ReadUInt64(deleteWaterMark.sendWaterMark);
446 parcel.ReadUInt64(deleteWaterMark.recvWaterMark);
447 if (parcel.IsError()) {
448 LOGE("[Meta] Parcel error when deserialize deleteWaterMark.");
449 return -E_PARSE_FAIL;
450 }
451 return E_OK;
452 }
453
CalculateDeleteWaterMarkSize()454 uint64_t QuerySyncWaterMarkHelper::CalculateDeleteWaterMarkSize()
455 {
456 uint64_t length = Parcel::GetUInt32Len(); // version
457 length = Parcel::GetEightByteAlign(length);
458 length += Parcel::GetUInt64Len(); // sendWaterMark
459 length += Parcel::GetUInt64Len(); // recvWaterMark
460 return length;
461 }
462
GetQuerySyncPrefixKey()463 std::string QuerySyncWaterMarkHelper::GetQuerySyncPrefixKey()
464 {
465 return DBConstant::QUERY_SYNC_PREFIX_KEY;
466 }
467
GetDeleteSyncPrefixKey()468 std::string QuerySyncWaterMarkHelper::GetDeleteSyncPrefixKey()
469 {
470 return DBConstant::DELETE_SYNC_PREFIX_KEY;
471 }
472
RemoveLeastUsedQuerySyncItems(const std::vector<Key> & querySyncIds)473 int QuerySyncWaterMarkHelper::RemoveLeastUsedQuerySyncItems(const std::vector<Key> &querySyncIds)
474 {
475 if (querySyncIds.size() < MAX_STORE_ITEMS) {
476 return E_OK;
477 }
478 std::vector<std::pair<std::string, Timestamp>> allItems;
479 std::map<std::string, std::vector<uint8_t>> idMap;
480 std::vector<std::vector<uint8_t>> waitToRemove;
481 for (const auto &id : querySyncIds) {
482 Value value;
483 int errCode = GetMetadataFromDb(id, value);
484 if (errCode != E_OK) {
485 waitToRemove.push_back(id);
486 continue; // may be this failure cause by wrong data
487 }
488 QueryWaterMark queryWaterMark;
489 std::string queryKey(id.begin(), id.end());
490 errCode = DeSerializeQueryWaterMark(value, queryWaterMark);
491 if (errCode != E_OK) {
492 waitToRemove.push_back(id);
493 continue; // may be this failure cause by wrong data
494 }
495 idMap.insert({queryKey, id});
496 allItems.emplace_back(queryKey, queryWaterMark.lastUsedTime);
497 }
498 // we only remove broken data below
499 // 1. common data size less then 10w
500 // 2. allItems.size() - MAX_STORE_ITEMS - waitToRemove.size() < 0
501 // so we only let allItems.size() < MAX_STORE_ITEMS + waitToRemove.size()
502 if (allItems.size() < MAX_STORE_ITEMS + waitToRemove.size()) {
503 // remove in db
504 return DeleteMetaDataFromDB(waitToRemove);
505 }
506 uint32_t removeCount = allItems.size() - MAX_STORE_ITEMS - waitToRemove.size();
507 // quick select the k_th least used
508 std::nth_element(allItems.begin(), allItems.begin() + removeCount, allItems.end(),
509 [](const std::pair<std::string, Timestamp> &w1, const std::pair<std::string, Timestamp> &w2) {
510 return w1.second < w2.second;
511 });
512 for (uint32_t i = 0; i < removeCount; ++i) {
513 waitToRemove.push_back(idMap[allItems[i].first]);
514 }
515 // remove in db
516 return DeleteMetaDataFromDB(waitToRemove);
517 }
518
ResetRecvQueryWaterMark(const DeviceID & deviceId,const std::string & tableName,bool isNeedHash)519 int QuerySyncWaterMarkHelper::ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName,
520 bool isNeedHash)
521 {
522 // lock prevent other thread modify queryWaterMark at this moment
523 std::lock_guard<std::mutex> autoLock(queryWaterMarkLock_);
524 std::string prefixKeyStr = DBConstant::QUERY_SYNC_PREFIX_KEY +
525 (isNeedHash ? DBCommon::TransferHashString(deviceId) : deviceId);
526 if (!tableName.empty()) {
527 std::string hashTableName = DBCommon::TransferHashString(tableName);
528 std::string hexTableName = DBCommon::TransferStringToHex(hashTableName);
529 prefixKeyStr += hexTableName;
530 }
531
532 // remove in db
533 Key prefixKey;
534 DBCommon::StringToVector(prefixKeyStr, prefixKey);
535 int errCode = storage_->DeleteMetaDataByPrefixKey(prefixKey);
536 if (errCode != E_OK) {
537 LOGE("[META]ResetRecvQueryWaterMark fail errCode:%d", errCode);
538 return errCode;
539 }
540 // clean cache
541 querySyncCache_.RemoveWithPrefixKey(prefixKeyStr);
542 return E_OK;
543 }
544 } // namespace DistributedDB