1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "subscribe_manager.h"
16 
17 #include <mutex>
18 #include "db_common.h"
19 #include "sync_types.h"
20 
21 namespace DistributedDB {
ClearRemoteSubscribeQuery(const std::string & device)22 void SubscribeManager::ClearRemoteSubscribeQuery(const std::string &device)
23 {
24     std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
25     ClearSubscribeQuery(device, remoteSubscribedMap_, remoteSubscribedTotalMap_);
26 }
27 
ClearAllRemoteQuery()28 void SubscribeManager::ClearAllRemoteQuery()
29 {
30     std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
31     remoteSubscribedMap_.clear();
32     remoteSubscribedTotalMap_.clear();
33 }
34 
ClearLocalSubscribeQuery(const std::string & device)35 void SubscribeManager::ClearLocalSubscribeQuery(const std::string &device)
36 {
37     std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
38     unFinishedLocalAutoSubMap_.erase(device);
39     ClearSubscribeQuery(device, localSubscribeMap_, localSubscribeTotalMap_);
40 }
41 
ReserveRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)42 int SubscribeManager::ReserveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
43 {
44     std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
45     int errCode = ReserveSubscribeQuery(device, query, remoteSubscribedMap_, remoteSubscribedTotalMap_);
46     LOGI("[SubscribeManager] dev=%s,queryId=%s remote reserve err=%d", STR_MASK(device), STR_MASK(query.GetIdentify()),
47         errCode);
48     return errCode;
49 }
50 
ActiveRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)51 int SubscribeManager::ActiveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
52 {
53     std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
54     std::string queryId = query.GetIdentify();
55     int errCode = ActiveSubscribeQuery(device, queryId, remoteSubscribedMap_, remoteSubscribedTotalMap_);
56     LOGI("[SubscribeManager] dev=%s,queryId=%s remote active err=%d", STR_MASK(device), STR_MASK(queryId), errCode);
57     return errCode;
58 }
59 
ReserveLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)60 int SubscribeManager::ReserveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
61 {
62     std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
63     int errCode = ReserveSubscribeQuery(device, query, localSubscribeMap_, localSubscribeTotalMap_);
64     LOGI("[SubscribeManager] dev=%s,queryId=%s local reserve err=%d", STR_MASK(device), STR_MASK(query.GetIdentify()),
65         errCode);
66     return errCode;
67 }
68 
ActiveLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)69 int SubscribeManager::ActiveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
70 {
71     std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
72     std::string queryId = query.GetIdentify();
73     int errCode = ActiveSubscribeQuery(device, queryId, localSubscribeMap_, localSubscribeTotalMap_);
74     LOGI("[SubscribeManager] dev=%s,queryId=%s local active err=%d", STR_MASK(device), STR_MASK(queryId), errCode);
75     if (errCode != E_OK) {
76         return errCode;
77     }
78     if (unFinishedLocalAutoSubMap_.find(device) != unFinishedLocalAutoSubMap_.end() &&
79         unFinishedLocalAutoSubMap_[device].find(queryId) != unFinishedLocalAutoSubMap_[device].end()) {
80         unFinishedLocalAutoSubMap_[device].erase(queryId);
81     }
82     return errCode;
83 }
84 
DeleteLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)85 void SubscribeManager::DeleteLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
86 {
87     std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
88     std::string queryId = query.GetIdentify();
89     DeleteSubscribeQuery(device, queryId, localSubscribeMap_, localSubscribeTotalMap_);
90 }
91 
DeleteRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)92 void SubscribeManager::DeleteRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
93 {
94     std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
95     std::string queryId = query.GetIdentify();
96     DeleteSubscribeQuery(device, queryId, remoteSubscribedMap_, remoteSubscribedTotalMap_);
97 }
98 
PutLocalUnFinishedSubQueries(const std::string & device,const std::vector<QuerySyncObject> & subscribeQueries)99 void SubscribeManager::PutLocalUnFinishedSubQueries(const std::string &device,
100     const std::vector<QuerySyncObject> &subscribeQueries)
101 {
102     LOGI("[SubscribeManager] put local unfinished subscribe queries, nums=%zu", subscribeQueries.size());
103     std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
104     if (subscribeQueries.empty()) {
105         unFinishedLocalAutoSubMap_.erase(device);
106         return;
107     }
108     unFinishedLocalAutoSubMap_[device].clear();
109     auto iter = unFinishedLocalAutoSubMap_.find(device);
110     for (const auto &query : subscribeQueries) {
111         iter->second.insert(query.GetIdentify());
112     }
113 }
114 
GetAllUnFinishSubQueries(std::map<std::string,std::vector<QuerySyncObject>> & allSyncQueries) const115 void SubscribeManager::GetAllUnFinishSubQueries(
116     std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries) const
117 {
118     std::shared_lock<std::shared_mutex> lock(localSubscribeMapLock_);
119     for (auto &item : unFinishedLocalAutoSubMap_) {
120         if (item.second.empty()) {
121             continue;
122         }
123         allSyncQueries[item.first] = {};
124         auto iter = allSyncQueries.find(item.first);
125         for (const auto &queryId : item.second) {
126             auto iterTmp = localSubscribeTotalMap_.find(queryId);
127             if (iterTmp == localSubscribeTotalMap_.end()) {
128                 LOGI("[SubscribeManager] queryId=%s not in localTotalMap", STR_MASK(queryId));
129                 continue;
130             }
131             iter->second.push_back(iterTmp->second.first);
132         }
133     }
134 }
135 
RemoveRemoteSubscribeQuery(const std::string & device,const QuerySyncObject & query)136 void SubscribeManager::RemoveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query)
137 {
138     std::unique_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
139     std::string queryId = query.GetIdentify();
140     RemoveSubscribeQuery(device, queryId, remoteSubscribedMap_, remoteSubscribedTotalMap_);
141 }
142 
RemoveLocalSubscribeQuery(const std::string & device,const QuerySyncObject & query)143 void SubscribeManager::RemoveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query)
144 {
145     std::unique_lock<std::shared_mutex> lockGuard(localSubscribeMapLock_);
146     std::string queryId = query.GetIdentify();
147     RemoveSubscribeQuery(device, queryId, localSubscribeMap_, localSubscribeTotalMap_);
148     if (unFinishedLocalAutoSubMap_.find(device) != unFinishedLocalAutoSubMap_.end() &&
149         unFinishedLocalAutoSubMap_[device].find(queryId) != unFinishedLocalAutoSubMap_[device].end()) {
150         unFinishedLocalAutoSubMap_[device].erase(queryId);
151         LOGI("[SubscribeManager] dev=%s,queryId=%s delete from UnFinishedMap", STR_MASK(device), STR_MASK(queryId));
152         if (unFinishedLocalAutoSubMap_[device].empty()) {
153             LOGI("[SubscribeManager] dev=%s delete from unFinish map", STR_MASK(device));
154             unFinishedLocalAutoSubMap_.erase(device);
155         }
156     }
157 }
158 
GetLocalSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries) const159 void SubscribeManager::GetLocalSubscribeQueries(const std::string &device,
160     std::vector<QuerySyncObject> &subscribeQueries) const
161 {
162     std::shared_lock<std::shared_mutex> lock(localSubscribeMapLock_);
163     GetSubscribeQueries(device, localSubscribeMap_, localSubscribeTotalMap_, subscribeQueries);
164 }
165 
GetRemoteSubscribeQueries(const std::string & device,std::vector<QuerySyncObject> & subscribeQueries) const166 void SubscribeManager::GetRemoteSubscribeQueries(const std::string &device,
167     std::vector<QuerySyncObject> &subscribeQueries) const
168 {
169     std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
170     GetSubscribeQueries(device, remoteSubscribedMap_, remoteSubscribedTotalMap_, subscribeQueries);
171 }
172 
IsLastRemoteContainSubscribe(const std::string & device,const std::string & queryId) const173 bool SubscribeManager::IsLastRemoteContainSubscribe(const std::string &device, const std::string &queryId) const
174 {
175     std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
176     if (remoteSubscribedMap_.find(device) == remoteSubscribedMap_.end()) {
177         return false;
178     }
179     auto iter = remoteSubscribedTotalMap_.find(queryId);
180     if (iter == remoteSubscribedTotalMap_.end()) {
181         return false;
182     }
183     return iter->second.second == 1;
184 }
185 
GetRemoteSubscribeQueryIds(const std::string & device,std::vector<std::string> & subscribeQueryIds) const186 void SubscribeManager::GetRemoteSubscribeQueryIds(const std::string &device,
187     std::vector<std::string> &subscribeQueryIds) const
188 {
189     std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
190     auto iter = remoteSubscribedMap_.find(device);
191     if (iter == remoteSubscribedMap_.end()) {
192         LOGI("[SubscribeManager] dev=%s not in remoteSubscribedMap", STR_MASK(device));
193         return;
194     }
195     for (const auto &queryInfo : iter->second) {
196         if (remoteSubscribedTotalMap_.find(queryInfo.first) == remoteSubscribedTotalMap_.end()) {
197             LOGE("[SubscribeManager] queryId=%s not in RemoteTotalMap", STR_MASK(queryInfo.first));
198             continue;
199         }
200         subscribeQueryIds.push_back(queryInfo.first);
201     }
202 }
203 
LocalSubscribeLimitCheck(const std::vector<std::string> & devices,QuerySyncObject & query) const204 int SubscribeManager::LocalSubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const
205 {
206     std::shared_lock<std::shared_mutex> lock(localSubscribeMapLock_);
207     size_t devNum = localSubscribeMap_.size();
208     for (const auto &device : devices) {
209         if (localSubscribeMap_.find(device) != localSubscribeMap_.end()) {
210             continue;
211         }
212         devNum++;
213         if (devNum > MAX_DEVICES_NUM) {
214             LOGE("[SubscribeManager] local subscribe devices is over limit");
215             return -E_MAX_LIMITS;
216         }
217     }
218     std::string queryId = query.GetIdentify();
219     auto allIter = localSubscribeTotalMap_.find(queryId);
220     if (allIter == localSubscribeTotalMap_.end() && localSubscribeTotalMap_.size() >= MAX_SUBSCRIBE_NUM_PER_DB) {
221         LOGE("[SubscribeManager] all local subscribe sums is over limit");
222         return -E_MAX_LIMITS;
223     }
224     return E_OK;
225 }
226 
IsQueryExistSubscribe(const std::string & queryId) const227 bool SubscribeManager::IsQueryExistSubscribe(const std::string &queryId) const
228 {
229     std::shared_lock<std::shared_mutex> lockGuard(remoteSubscribedMapLock_);
230     return remoteSubscribedTotalMap_.find(queryId) != remoteSubscribedTotalMap_.end();
231 }
232 
ClearSubscribeQuery(const std::string & device,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)233 void SubscribeManager::ClearSubscribeQuery(const std::string &device, SubscribeMap &subscribeMap,
234     SubscribedTotalMap &subscribedTotalMap)
235 {
236     if (subscribeMap.find(device) == subscribeMap.end()) {
237         LOGI("[SubscribeManager] dev=%s not in SubscribedMap", STR_MASK(device));
238         return;
239     }
240     for (const auto &queryInfo : subscribeMap[device]) {
241         if (subscribedTotalMap.find(queryInfo.first) != subscribedTotalMap.end()) {
242             if (subscribedTotalMap[queryInfo.first].second > 0) {
243                 subscribedTotalMap[queryInfo.first].second--;
244             }
245             if (subscribedTotalMap[queryInfo.first].second == 0) {
246                 LOGI("[SubscribeManager] queryId=%s delete from TotalMap", STR_MASK(queryInfo.first));
247                 subscribedTotalMap.erase(queryInfo.first);
248             }
249         }
250     }
251     subscribeMap.erase(device);
252     LOGI("[SubscribeManager] clear dev=%s remote subscribe queies finished", STR_MASK(device));
253 }
254 
ReserveSubscribeQuery(const std::string & device,const QuerySyncObject & query,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)255 int SubscribeManager::ReserveSubscribeQuery(const std::string &device, const QuerySyncObject &query,
256     SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
257 {
258     std::string queryId = query.GetIdentify();
259     auto iter = subscribeMap.find(device);
260     auto allIter = subscribedTotalMap.find(queryId);
261     // limit check
262     if (allIter == subscribedTotalMap.end() && subscribedTotalMap.size() >= MAX_SUBSCRIBE_NUM_PER_DB) {
263         LOGE("[SubscribeManager] all subscribe sums is over limit");
264         return -E_MAX_LIMITS;
265     }
266     if (iter == subscribeMap.end() && subscribeMap.size() >= MAX_DEVICES_NUM) {
267         LOGE("[SubscribeManager] subscribe devices is over limit");
268         return -E_MAX_LIMITS;
269     }
270     if (iter != subscribeMap.end() && iter->second.find(queryId) == iter->second.end() &&
271         iter->second.size() >= MAX_SUBSCRIBE_NUM_PER_DEV) {
272         LOGE("[SubscribeManager] subscribe sums is over limit");
273         return -E_MAX_LIMITS;
274     }
275     if (iter != subscribeMap.end() && iter->second.find(queryId) != iter->second.end() &&
276         iter->second[queryId] == SubscribeStatus::ACTIVE) {
277         LOGE("[SubscribeManager] dev=%s,queryId=%s already active in map", STR_MASK(device), STR_MASK(queryId));
278         return E_OK;
279     }
280 
281     if (iter == subscribeMap.end()) {
282         subscribeMap[device] = std::map<std::string, SubscribeStatus> {};
283     }
284     bool isNeedInc = false;
285     if (subscribeMap[device].find(queryId) == subscribeMap[device].end()) {
286         subscribeMap[device][queryId] = SubscribeStatus::NOT_ACTIVE;
287         isNeedInc = true;
288     }
289     if (allIter == subscribedTotalMap.end()) {
290         subscribedTotalMap[queryId] = {query, 1};
291     } else if (isNeedInc) {
292         subscribedTotalMap[queryId].second++;
293     }
294     return E_OK;
295 }
296 
ActiveSubscribeQuery(const std::string & device,const std::string & queryId,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)297 int SubscribeManager::ActiveSubscribeQuery(const std::string &device, const std::string &queryId,
298     SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
299 {
300     if (subscribedTotalMap.find(queryId) == subscribedTotalMap.end()) {
301         LOGE("[SubscribeManager] can not find queryId=%s in SubscribeTotalMap", STR_MASK(queryId));
302         return -E_INTERNAL_ERROR;
303     }
304     if (subscribeMap.find(device) == subscribeMap.end()) {
305         LOGE("[SubscribeManager] can not find dev=%s in localSubscribeMap", STR_MASK(device));
306         return -E_INTERNAL_ERROR;
307     }
308     if (subscribeMap[device].find(queryId) == subscribeMap[device].end()) {
309         LOGE("[SubscribeManager] can not find dev=%s,queryId=%s in map", STR_MASK(device), STR_MASK(queryId));
310         return -E_INTERNAL_ERROR;
311     }
312     subscribeMap[device][queryId] = SubscribeStatus::ACTIVE;
313     return E_OK;
314 }
315 
DeleteSubscribeQuery(const std::string & device,const std::string & queryId,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)316 void SubscribeManager::DeleteSubscribeQuery(const std::string &device, const std::string &queryId,
317     SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
318 {
319     if (subscribeMap.find(device) == subscribeMap.end()) {
320         LOGE("[SubscribeManager] can not find dev=%s in map", STR_MASK(device));
321         return;
322     }
323     if (subscribeMap[device].find(queryId) == subscribeMap[device].end()) {
324         LOGE("[SubscribeManager] can not find dev=%s,queryId=%s in map", STR_MASK(device), STR_MASK(queryId));
325         return;
326     }
327     SubscribeStatus queryStatus = subscribeMap[device][queryId];
328     // not permit to delete the query when something wrong this time,because it is subscribed successfully last time
329     if (queryStatus == SubscribeStatus::ACTIVE) {
330         LOGE("[SubscribeManager] dev=%s,queryId=%s is active, no need to del", STR_MASK(device), STR_MASK(queryId));
331         return;
332     }
333     subscribeMap[device].erase(queryId);
334     auto iter = subscribedTotalMap.find(queryId);
335     if (iter == subscribedTotalMap.end()) {
336         LOGE("[SubscribeManager] can not find queryId=%s in SubscribeTotalMap", STR_MASK(queryId));
337         return;
338     }
339     iter->second.second--;
340     if (iter->second.second <= 0) {
341         LOGI("[SubscribeManager] del queryId=%s from SubscribeTotalMap", STR_MASK(queryId));
342         subscribedTotalMap.erase(queryId);
343     }
344     LOGI("[SubscribeManager] dev=%s,queryId=%s remove from SubscribeMap success", STR_MASK(device), STR_MASK(queryId));
345 }
346 
RemoveSubscribeQuery(const std::string & device,const std::string & queryId,SubscribeMap & subscribeMap,SubscribedTotalMap & subscribedTotalMap)347 void SubscribeManager::RemoveSubscribeQuery(const std::string &device, const std::string &queryId,
348     SubscribeMap &subscribeMap, SubscribedTotalMap &subscribedTotalMap)
349 {
350     auto iter = subscribeMap.find(device);
351     if (iter == subscribeMap.end()) {
352         LOGE("[SubscribeManager] dev=%s not in SubscribedMap", STR_MASK(device));
353         return;
354     }
355     if (iter->second.find(queryId) == subscribeMap[device].end()) {
356         LOGI("[SubscribeManager] dev=%s,queryId=%s not in SubscribedMap", STR_MASK(device), STR_MASK(queryId));
357         return;
358     }
359     iter->second.erase(queryId);
360     auto allIter = subscribedTotalMap.find(queryId);
361     if (allIter == subscribedTotalMap.end()) {
362         LOGI("[SubscribeManager] queryId=%s not in TotalMap", STR_MASK(queryId));
363         return;
364     }
365     allIter->second.second--;
366     if (allIter->second.second <= 0) {
367         subscribedTotalMap.erase(queryId);
368         LOGI("[SubscribeManager] queryId=%s delete from TotalMap", STR_MASK(queryId));
369     }
370     LOGI("[SubscribeManager] dev=%s,queryId=%s remove from SubscribedMap success", STR_MASK(device), STR_MASK(queryId));
371 }
372 
GetSubscribeQueries(const std::string & device,const SubscribeMap & subscribeMap,const SubscribedTotalMap & subscribedTotalMap,std::vector<QuerySyncObject> & subscribeQueries) const373 void SubscribeManager::GetSubscribeQueries(const std::string &device, const SubscribeMap &subscribeMap,
374     const SubscribedTotalMap &subscribedTotalMap, std::vector<QuerySyncObject> &subscribeQueries) const
375 {
376     auto iter = subscribeMap.find(device);
377     if (iter == subscribeMap.end()) {
378         LOGD("[SubscribeManager] dev=%s not in localSubscribeMap", STR_MASK(device));
379         return;
380     }
381     for (const auto &queryInfo : iter->second) {
382         auto iterTmp = subscribedTotalMap.find(queryInfo.first);
383         if (iterTmp == subscribedTotalMap.end()) {
384             LOGE("[SubscribeManager] queryId=%s not in localTotalMap", STR_MASK(queryInfo.first));
385             continue;
386         }
387         subscribeQueries.push_back(iterTmp->second.first);
388     }
389 }
390 } // namespace DistributedDB