1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "subscribe_manager.h"
16
17 #include <mutex>
18 #include "db_common.h"
19 #include "sync_types.h"
20
21 namespace DistributedDB {
ClearRemoteSubscribeQuery(const std::string & device)22 void SubscribeManager::ClearRemoteSubscribeQuery(const std::string &device)
23 {
24 std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
25 ClearSubscribeQuery(device, remoteSubscribedMap_, remoteSubscribedTotalMap_);
26 }
27
ClearAllRemoteQuery()28 void SubscribeManager::ClearAllRemoteQuery()
29 {
30 std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
31 remoteSubscribedMap_.clear();
32 remoteSubscribedTotalMap_.clear();
33 }
34
ClearLocalSubscribeQuery(const std::string & device)35 void SubscribeManager::ClearLocalSubscribeQuery(const std::string &device)
36 {
37 std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
38 unFinishedLocalAutoSubMap_.erase(device);
39 ClearSubscribeQuery(device, localSubscribeMap_, localSubscribeTotalMap_);
40 }
41
ReserveRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)42 int SubscribeManager::ReserveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
43 {
44 std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
45 int errCode = ReserveSubscribeQuery(device, query, remoteSubscribedMap_, remoteSubscribedTotalMap_);
46 LOGI("[SubscribeManager] dev=%s,queryId=%s remote reserve err=%d", STR_MASK(device), STR_MASK(query.GetIdentify()),
47 errCode);
48 return errCode;
49 }
50
ActiveRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)51 int SubscribeManager::ActiveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
52 {
53 std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
54 std::string queryId = query.GetIdentify();
55 int errCode = ActiveSubscribeQuery(device, queryId, remoteSubscribedMap_, remoteSubscribedTotalMap_);
56 LOGI("[SubscribeManager] dev=%s,queryId=%s remote active err=%d", STR_MASK(device), STR_MASK(queryId), errCode);
57 return errCode;
58 }
59
ReserveLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)60 int SubscribeManager::ReserveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
61 {
62 std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
63 int errCode = ReserveSubscribeQuery(device, query, localSubscribeMap_, localSubscribeTotalMap_);
64 LOGI("[SubscribeManager] dev=%s,queryId=%s local reserve err=%d", STR_MASK(device), STR_MASK(query.GetIdentify()),
65 errCode);
66 return errCode;
67 }
68
ActiveLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)69 int SubscribeManager::ActiveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
70 {
71 std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
72 std::string queryId = query.GetIdentify();
73 int errCode = ActiveSubscribeQuery(device, queryId, localSubscribeMap_, localSubscribeTotalMap_);
74 LOGI("[SubscribeManager] dev=%s,queryId=%s local active err=%d", STR_MASK(device), STR_MASK(queryId), errCode);
75 if (errCode != E_OK) {
76 return errCode;
77 }
78 if (unFinishedLocalAutoSubMap_.find(device) != unFinishedLocalAutoSubMap_.end() &&
79 unFinishedLocalAutoSubMap_[device].find(queryId) != unFinishedLocalAutoSubMap_[device].end()) {
80 unFinishedLocalAutoSubMap_[device].erase(queryId);
81 }
82 return errCode;
83 }
84
DeleteLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)85 void SubscribeManager::DeleteLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
86 {
87 std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
88 std::string queryId = query.GetIdentify();
89 DeleteSubscribeQuery(device, queryId, localSubscribeMap_, localSubscribeTotalMap_);
90 }
91
DeleteRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)92 void SubscribeManager::DeleteRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
93 {
94 std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
95 std::string queryId = query.GetIdentify();
96 DeleteSubscribeQuery(device, queryId, remoteSubscribedMap_, remoteSubscribedTotalMap_);
97 }
98
PutLocalUnFinishedSubQueries(const std::string & device,const std::vector<QuerySyncObject> & subscribeQueries)99 void SubscribeManager::PutLocalUnFinishedSubQueries(const std::string &device,
100 const std::vector<QuerySyncObject> &subscribeQueries)
101 {
102 LOGI("[SubscribeManager] put local unfinished subscribe queries, nums=%zu", subscribeQueries.size());
103 std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
104 if (subscribeQueries.empty()) {
105 unFinishedLocalAutoSubMap_.erase(device);
106 return;
107 }
108 unFinishedLocalAutoSubMap_[device].clear();
109 auto iter = unFinishedLocalAutoSubMap_.find(device);
110 for (const auto &query : subscribeQueries) {
111 iter->second.insert(query.GetIdentify());
112 }
113 }
114
GetAllUnFinishSubQueries(std::map<std::string,std::vector<QuerySyncObject>> & allSyncQueries) const115 void SubscribeManager::GetAllUnFinishSubQueries(
116 std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries) const
117 {
118 std::shared_lock<std::shared_mutex> lock(localSubscribeMapLock_);
119 for (auto &item : unFinishedLocalAutoSubMap_) {
120 if (item.second.empty()) {
121 continue;
122 }
123 allSyncQueries[item.first] = {};
124 auto iter = allSyncQueries.find(item.first);
125 for (const auto &queryId : item.second) {
126 auto iterTmp = localSubscribeTotalMap_.find(queryId);
127 if (iterTmp == localSubscribeTotalMap_.end()) {
128 LOGI("[SubscribeManager] queryId=%s not in localTotalMap", STR_MASK(queryId));
129 continue;
130 }
131 iter->second.push_back(iterTmp->second.first);
132 }
133 }
134 }
135
RemoveRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)136 void SubscribeManager::RemoveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
137 {
138 std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
139 std::string queryId = query.GetIdentify();
140 RemoveSubscribeQuery(device, queryId, remoteSubscribedMap_, remoteSubscribedTotalMap_);
141 }
142
RemoveLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)143 void SubscribeManager::RemoveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
144 {
145 std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
146 std::string queryId = query.GetIdentify();
147 RemoveSubscribeQuery(device, queryId, localSubscribeMap_, localSubscribeTotalMap_);
148 if (unFinishedLocalAutoSubMap_.find(device) != unFinishedLocalAutoSubMap_.end() &&
149 unFinishedLocalAutoSubMap_[device].find(queryId) != unFinishedLocalAutoSubMap_[device].end()) {
150 unFinishedLocalAutoSubMap_[device].erase(queryId);
151 LOGI("[SubscribeManager] dev=%s,queryId=%s delete from UnFinishedMap", STR_MASK(device), STR_MASK(queryId));
152 if (unFinishedLocalAutoSubMap_[device].empty()) {
153 LOGI("[SubscribeManager] dev=%s delete from unFinish map", STR_MASK(device));
154 unFinishedLocalAutoSubMap_.erase(device);
155 }
156 }
157 }
158
GetLocalSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries) const159 void SubscribeManager::GetLocalSubscribeQueries(const std::string &device,
160 std::vector<QuerySyncObject> &subscribeQueries) const
161 {
162 std::shared_lock<std::shared_mutex> lock(localSubscribeMapLock_);
163 GetSubscribeQueries(device, localSubscribeMap_, localSubscribeTotalMap_, subscribeQueries);
164 }
165
GetRemoteSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries) const166 void SubscribeManager::GetRemoteSubscribeQueries(const std::string &device,
167 std::vector<QuerySyncObject> &subscribeQueries) const
168 {
169 std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
170 GetSubscribeQueries(device, remoteSubscribedMap_, remoteSubscribedTotalMap_, subscribeQueries);
171 }
172
IsLastRemoteContainSubscribe(const std::string & device,const std::string & queryId) const173 bool SubscribeManager::IsLastRemoteContainSubscribe(const std::string &device, const std::string &queryId) const
174 {
175 std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
176 if (remoteSubscribedMap_.find(device) == remoteSubscribedMap_.end()) {
177 return false;
178 }
179 auto iter = remoteSubscribedTotalMap_.find(queryId);
180 if (iter == remoteSubscribedTotalMap_.end()) {
181 return false;
182 }
183 return iter->second.second == 1;
184 }
185
GetRemoteSubscribeQueryIds(const std::string & device,std::vector<std::string> & subscribeQueryIds) const186 void SubscribeManager::GetRemoteSubscribeQueryIds(const std::string &device,
187 std::vector<std::string> &subscribeQueryIds) const
188 {
189 std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
190 auto iter = remoteSubscribedMap_.find(device);
191 if (iter == remoteSubscribedMap_.end()) {
192 LOGI("[SubscribeManager] dev=%s not in remoteSubscribedMap", STR_MASK(device));
193 return;
194 }
195 for (const auto &queryInfo : iter->second) {
196 if (remoteSubscribedTotalMap_.find(queryInfo.first) == remoteSubscribedTotalMap_.end()) {
197 LOGE("[SubscribeManager] queryId=%s not in RemoteTotalMap", STR_MASK(queryInfo.first));
198 continue;
199 }
200 subscribeQueryIds.push_back(queryInfo.first);
201 }
202 }
203
LocalSubscribeLimitCheck(const std::vector<std::string> & devices,QuerySyncObject & query) const204 int SubscribeManager::LocalSubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const
205 {
206 std::shared_lock<std::shared_mutex> lock(localSubscribeMapLock_);
207 size_t devNum = localSubscribeMap_.size();
208 for (const auto &device : devices) {
209 if (localSubscribeMap_.find(device) != localSubscribeMap_.end()) {
210 continue;
211 }
212 devNum++;
213 if (devNum > MAX_DEVICES_NUM) {
214 LOGE("[SubscribeManager] local subscribe devices is over limit");
215 return -E_MAX_LIMITS;
216 }
217 }
218 std::string queryId = query.GetIdentify();
219 auto allIter = localSubscribeTotalMap_.find(queryId);
220 if (allIter == localSubscribeTotalMap_.end() && localSubscribeTotalMap_.size() >= MAX_SUBSCRIBE_NUM_PER_DB) {
221 LOGE("[SubscribeManager] all local subscribe sums is over limit");
222 return -E_MAX_LIMITS;
223 }
224 return E_OK;
225 }
226
IsQueryExistSubscribe(const std::string & queryId) const227 bool SubscribeManager::IsQueryExistSubscribe(const std::string &queryId) const
228 {
229 std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
230 return remoteSubscribedTotalMap_.find(queryId) != remoteSubscribedTotalMap_.end();
231 }
232
ClearSubscribeQuery(const std::string & device,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)233 void SubscribeManager::ClearSubscribeQuery(const std::string &device, SubscribeMap &subscribeMap,
234 SubscribedTotalMap &subscribedTotalMap)
235 {
236 if (subscribeMap.find(device) == subscribeMap.end()) {
237 LOGI("[SubscribeManager] dev=%s not in SubscribedMap", STR_MASK(device));
238 return;
239 }
240 for (const auto &queryInfo : subscribeMap[device]) {
241 if (subscribedTotalMap.find(queryInfo.first) != subscribedTotalMap.end()) {
242 if (subscribedTotalMap[queryInfo.first].second > 0) {
243 subscribedTotalMap[queryInfo.first].second--;
244 }
245 if (subscribedTotalMap[queryInfo.first].second == 0) {
246 LOGI("[SubscribeManager] queryId=%s delete from TotalMap", STR_MASK(queryInfo.first));
247 subscribedTotalMap.erase(queryInfo.first);
248 }
249 }
250 }
251 subscribeMap.erase(device);
252 LOGI("[SubscribeManager] clear dev=%s remote subscribe queies finished", STR_MASK(device));
253 }
254
ReserveSubscribeQuery(const std::string & device,const QuerySyncObject & query,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)255 int SubscribeManager::ReserveSubscribeQuery(const std::string &device, const QuerySyncObject &query,
256 SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
257 {
258 std::string queryId = query.GetIdentify();
259 auto iter = subscribeMap.find(device);
260 auto allIter = subscribedTotalMap.find(queryId);
261 // limit check
262 if (allIter == subscribedTotalMap.end() && subscribedTotalMap.size() >= MAX_SUBSCRIBE_NUM_PER_DB) {
263 LOGE("[SubscribeManager] all subscribe sums is over limit");
264 return -E_MAX_LIMITS;
265 }
266 if (iter == subscribeMap.end() && subscribeMap.size() >= MAX_DEVICES_NUM) {
267 LOGE("[SubscribeManager] subscribe devices is over limit");
268 return -E_MAX_LIMITS;
269 }
270 if (iter != subscribeMap.end() && iter->second.find(queryId) == iter->second.end() &&
271 iter->second.size() >= MAX_SUBSCRIBE_NUM_PER_DEV) {
272 LOGE("[SubscribeManager] subscribe sums is over limit");
273 return -E_MAX_LIMITS;
274 }
275 if (iter != subscribeMap.end() && iter->second.find(queryId) != iter->second.end() &&
276 iter->second[queryId] == SubscribeStatus::ACTIVE) {
277 LOGE("[SubscribeManager] dev=%s,queryId=%s already active in map", STR_MASK(device), STR_MASK(queryId));
278 return E_OK;
279 }
280
281 if (iter == subscribeMap.end()) {
282 subscribeMap[device] = std::map<std::string, SubscribeStatus> {};
283 }
284 bool isNeedInc = false;
285 if (subscribeMap[device].find(queryId) == subscribeMap[device].end()) {
286 subscribeMap[device][queryId] = SubscribeStatus::NOT_ACTIVE;
287 isNeedInc = true;
288 }
289 if (allIter == subscribedTotalMap.end()) {
290 subscribedTotalMap[queryId] = {query, 1};
291 } else if (isNeedInc) {
292 subscribedTotalMap[queryId].second++;
293 }
294 return E_OK;
295 }
296
ActiveSubscribeQuery(const std::string & device,const std::string & queryId,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)297 int SubscribeManager::ActiveSubscribeQuery(const std::string &device, const std::string &queryId,
298 SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
299 {
300 if (subscribedTotalMap.find(queryId) == subscribedTotalMap.end()) {
301 LOGE("[SubscribeManager] can not find queryId=%s in SubscribeTotalMap", STR_MASK(queryId));
302 return -E_INTERNAL_ERROR;
303 }
304 if (subscribeMap.find(device) == subscribeMap.end()) {
305 LOGE("[SubscribeManager] can not find dev=%s in localSubscribeMap", STR_MASK(device));
306 return -E_INTERNAL_ERROR;
307 }
308 if (subscribeMap[device].find(queryId) == subscribeMap[device].end()) {
309 LOGE("[SubscribeManager] can not find dev=%s,queryId=%s in map", STR_MASK(device), STR_MASK(queryId));
310 return -E_INTERNAL_ERROR;
311 }
312 subscribeMap[device][queryId] = SubscribeStatus::ACTIVE;
313 return E_OK;
314 }
315
DeleteSubscribeQuery(const std::string & device,const std::string & queryId,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)316 void SubscribeManager::DeleteSubscribeQuery(const std::string &device, const std::string &queryId,
317 SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
318 {
319 if (subscribeMap.find(device) == subscribeMap.end()) {
320 LOGE("[SubscribeManager] can not find dev=%s in map", STR_MASK(device));
321 return;
322 }
323 if (subscribeMap[device].find(queryId) == subscribeMap[device].end()) {
324 LOGE("[SubscribeManager] can not find dev=%s,queryId=%s in map", STR_MASK(device), STR_MASK(queryId));
325 return;
326 }
327 SubscribeStatus queryStatus = subscribeMap[device][queryId];
328 // not permit to delete the query when something wrong this time,because it is subscribed successfully last time
329 if (queryStatus == SubscribeStatus::ACTIVE) {
330 LOGE("[SubscribeManager] dev=%s,queryId=%s is active, no need to del", STR_MASK(device), STR_MASK(queryId));
331 return;
332 }
333 subscribeMap[device].erase(queryId);
334 auto iter = subscribedTotalMap.find(queryId);
335 if (iter == subscribedTotalMap.end()) {
336 LOGE("[SubscribeManager] can not find queryId=%s in SubscribeTotalMap", STR_MASK(queryId));
337 return;
338 }
339 iter->second.second--;
340 if (iter->second.second <= 0) {
341 LOGI("[SubscribeManager] del queryId=%s from SubscribeTotalMap", STR_MASK(queryId));
342 subscribedTotalMap.erase(queryId);
343 }
344 LOGI("[SubscribeManager] dev=%s,queryId=%s remove from SubscribeMap success", STR_MASK(device), STR_MASK(queryId));
345 }
346
RemoveSubscribeQuery(const std::string & device,const std::string & queryId,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)347 void SubscribeManager::RemoveSubscribeQuery(const std::string &device, const std::string &queryId,
348 SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
349 {
350 auto iter = subscribeMap.find(device);
351 if (iter == subscribeMap.end()) {
352 LOGE("[SubscribeManager] dev=%s not in SubscribedMap", STR_MASK(device));
353 return;
354 }
355 if (iter->second.find(queryId) == subscribeMap[device].end()) {
356 LOGI("[SubscribeManager] dev=%s,queryId=%s not in SubscribedMap", STR_MASK(device), STR_MASK(queryId));
357 return;
358 }
359 iter->second.erase(queryId);
360 auto allIter = subscribedTotalMap.find(queryId);
361 if (allIter == subscribedTotalMap.end()) {
362 LOGI("[SubscribeManager] queryId=%s not in TotalMap", STR_MASK(queryId));
363 return;
364 }
365 allIter->second.second--;
366 if (allIter->second.second <= 0) {
367 subscribedTotalMap.erase(queryId);
368 LOGI("[SubscribeManager] queryId=%s delete from TotalMap", STR_MASK(queryId));
369 }
370 LOGI("[SubscribeManager] dev=%s,queryId=%s remove from SubscribedMap success", STR_MASK(device), STR_MASK(queryId));
371 }
372
GetSubscribeQueries(const std::string & device,const SubscribeMap & subscribeMap,const SubscribedTotalMap & subscribedTotalMap,std::vector<QuerySyncObject> & subscribeQueries) const373 void SubscribeManager::GetSubscribeQueries(const std::string &device, const SubscribeMap &subscribeMap,
374 const SubscribedTotalMap &subscribedTotalMap, std::vector<QuerySyncObject> &subscribeQueries) const
375 {
376 auto iter = subscribeMap.find(device);
377 if (iter == subscribeMap.end()) {
378 LOGD("[SubscribeManager] dev=%s not in localSubscribeMap", STR_MASK(device));
379 return;
380 }
381 for (const auto &queryInfo : iter->second) {
382 auto iterTmp = subscribedTotalMap.find(queryInfo.first);
383 if (iterTmp == subscribedTotalMap.end()) {
384 LOGE("[SubscribeManager] queryId=%s not in localTotalMap", STR_MASK(queryInfo.first));
385 continue;
386 }
387 subscribeQueries.push_back(iterTmp->second.first);
388 }
389 }
390 } // namespace DistributedDB