1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef QUERY_SYNC_WATER_MARK_HELPER_H
17 #define QUERY_SYNC_WATER_MARK_HELPER_H
18 
19 #include <deque>
20 #include <map>
21 #include <mutex>
22 #include <string>
23 #include <vector>
24 #include "db_types.h"
25 #include "ikvdb_sync_interface.h"
26 #include "lru_map.h"
27 
28 namespace DistributedDB {
29 struct QueryWaterMark {
30     uint32_t version = 0; // start with 103
31     WaterMark sendWaterMark = 0;
32     WaterMark recvWaterMark = 0;
33     Timestamp lastUsedTime = 0; // use for delete data
34     std::string sql; // for analyze sql from logs
35     Timestamp lastQueryTime = 0; // use for miss query scene add in 106
36 };
37 
38 struct DeleteWaterMark {
39     uint32_t version = 0;
40     WaterMark sendWaterMark = 0;
41     WaterMark recvWaterMark = 0;
42 };
43 
44 class QuerySyncWaterMarkHelper {
45 public:
46     QuerySyncWaterMarkHelper();
47     ~QuerySyncWaterMarkHelper();
48 
49     DISABLE_COPY_ASSIGN_MOVE(QuerySyncWaterMarkHelper);
50 
51     int Initialize(ISyncInterface *storage);
52 
53     int GetQueryWaterMark(const std::string &queryIdentify, const std::string &deviceId,
54         QueryWaterMark &queryWaterMark);
55 
56     int SetSendQueryWaterMark(const std::string &queryIdentify,
57         const std::string &deviceId, const WaterMark &waterMark);
58 
59     int SetRecvQueryWaterMark(const std::string &queryIdentify,
60         const std::string &deviceId, const WaterMark &waterMark);
61 
62     int SetLastQueryTime(const std::string &queryIdentify,
63         const std::string &deviceId, const Timestamp &timestamp);
64 
65     int GetDeleteSyncWaterMark(const std::string &deviceId, DeleteWaterMark &deleteWaterMark);
66 
67     int SetSendDeleteSyncWaterMark(const std::string &deviceId, const WaterMark &waterMark);
68 
69     int SetRecvDeleteSyncWaterMark(const std::string &deviceId, const WaterMark &waterMark, bool isNeedHash);
70 
71     // this function will read deleteWaterMark from db by it's deleteWaterMarkKey
72     // and then serialize it and put to cache
73     int LoadDeleteSyncDataToCache(const Key &deleteWaterMarkKey);
74 
75     // this function will remove data in db
76     int RemoveLeastUsedQuerySyncItems(const std::vector<Key> &querySyncIds);
77 
78     // reset the waterMark to zero
79     int ResetRecvQueryWaterMark(const DeviceID &deviceId, const std::string &tableName, bool isNeedHash);
80 
81     static std::string GetQuerySyncPrefixKey();
82 
83     static std::string GetDeleteSyncPrefixKey();
84 
85 private:
86 
87     int GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue);
88 
89     int SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue);
90 
91     int DeleteMetaDataFromDB(const std::vector<Key> &keys) const;
92 
93     int SaveQueryWaterMarkToDB(const DeviceID &dbKeyString, const QueryWaterMark &queryWaterMark);
94 
95     int GetQueryWaterMarkFromDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark);
96 
97     int SetRecvQueryWaterMarkWithoutLock(const std::string &cacheKey,
98         const WaterMark &waterMark);
99 
100     // search the queryWaterMark from db or cache_
101     // and ensure it exit in cache_
102     int GetQueryWaterMarkInCacheAndDb(const std::string &cacheKey, QueryWaterMark &queryWaterMark);
103 
104     // only first create queryWaterMark will call this function
105     // it will create a queryWaterMark and save to db
106     int PutQueryWaterMarkToDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark);
107 
108     // get the querySync hashId in cache_ or generate one and then put it in to cache_
109     // the hashId is made up of "QUERY_SYNC_PREFIX_KEY" + hash(deviceId) + queryId
110     DeviceID GetHashQuerySyncDeviceId(const DeviceID &deviceId, const DeviceID &queryId);
111 
112     // put queryWaterMark to lru cache_ and then save to db
113     int UpdateCacheAndSave(const std::string &cacheKey, QueryWaterMark &queryWaterMark);
114 
115     // search the deleteWaterMark from db or cache_
116     // and ensure it exit in cache_
117     int GetDeleteWaterMarkFromCache(const DeviceID &hashDeviceId, DeleteWaterMark &deleteWaterMark);
118 
119     // get the deleteSync hashId in cache_ or generate one and then put it in to cache_
120     // the hashId is made up of "DELETE_SYNC_PREFIX_KEY" + hash(deviceId)
121     DeviceID GetHashDeleteSyncDeviceId(const DeviceID &deviceId, bool isNeedHash = true);
122 
123     int SaveDeleteWaterMarkToDB(const DeviceID &hashDeviceId, const DeleteWaterMark &deleteWaterMark);
124 
125     int GetDeleteWaterMarkFromDB(const DeviceID &hashDeviceId, DeleteWaterMark &deleteWaterMark);
126 
127     // put queryWaterMark to lru cache_ and then save to db
128     int UpdateDeleteSyncCacheAndSave(const std::string &dbKey, const DeleteWaterMark &deleteWaterMark);
129 
130     static int SerializeQueryWaterMark(const QueryWaterMark &queryWaterMark, std::vector<uint8_t> &outValue);
131 
132     static int DeSerializeQueryWaterMark(const std::vector<uint8_t> &dbQueryWaterMark, QueryWaterMark &queryWaterMark);
133 
134     static uint64_t CalculateQueryWaterMarkSize(const QueryWaterMark &queryWaterMark);
135 
136     static int SerializeDeleteWaterMark(const DeleteWaterMark &deleteWaterMark, std::vector<uint8_t> &outValue);
137 
138     static int DeSerializeDeleteWaterMark(const std::vector<uint8_t> &inValue, DeleteWaterMark &deleteWaterMark);
139 
140     static uint64_t CalculateDeleteWaterMarkSize();
141 
142     // store or visit queryWaterMark should add a lock
143     // because it will change the eliminationChain
144     // and the queryWaterMark use a LRU Map to store in ram
145     std::mutex queryWaterMarkLock_;
146     LruMap<std::string, QueryWaterMark> querySyncCache_;
147     std::map<DeviceID, std::map<std::string, std::string>> deviceIdToHashQuerySyncIdMap_;
148 
149     // also store deleteKeyWaterMark should add a lock
150     std::mutex deleteSyncLock_;
151     std::map<std::string, DeleteWaterMark> deleteSyncCache_;
152     std::map<DeviceID, std::string> deviceIdToHashDeleteSyncIdMap_;
153 
154     ISyncInterface *storage_;
155 };
156 } // namespace DistributedDB
157 #endif
158