1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SINGLE_VER_SUBSCRIBE_MANAGER_H
17 #define SINGLE_VER_SUBSCRIBE_MANAGER_H
18 
19 #include <map>
20 #include <shared_mutex>
21 #include "query_sync_object.h"
22 
23 namespace DistributedDB {
24 enum class SubscribeStatus {
25     NOT_ACTIVE = 0,
26     ACTIVE = 1,
27 };
28 
29 using SubscribeMap = std::map<std::string, std::map<std::string, SubscribeStatus>>;
30 using SubscribedTotalMap = std::map<std::string, std::pair<QuerySyncObject, int>>;
31 
32 class SubscribeManager {
33 public:
34     SubscribeManager() = default;
35     ~SubscribeManager() = default;
36 
37     DISABLE_COPY_ASSIGN_MOVE(SubscribeManager);
38 
39     // clear remoteSubscribeMap_[device] list when remote db is closed or dev offline.
40     void ClearRemoteSubscribeQuery(const std::string &device);
41 
42     // clear localSubscribeMap_[device] list when device is offline.
43     void ClearLocalSubscribeQuery(const std::string &device);
44 
45     void ClearAllRemoteQuery();
46 
47     // add query when receive subscribe command
48     int ReserveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query);
49 
50     // active query to ACTIVE when send ack ok
51     int ActiveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query);
52 
53     // reserve query when user call SubscribeRemoteQuery, status set to NOT_ACTIVE
54     int ReserveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query);
55 
56     // active query to ACTIVE when receive ack ok
57     int ActiveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query);
58 
59     // delete local subscribe query when recv wrong errCode, only not_active status allowed to del
60     void DeleteLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query);
61 
62     // delete remote subscribe query when send msg failed, only not_active status allowed to del
63     void DeleteRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query);
64 
65     // put subscribe queries into unfinished map when remote db online
66     void PutLocalUnFinishedSubQueries(const std::string &device, const std::vector<QuerySyncObject> &subscribeQueries);
67 
68     // get all device unFinished subscribe queries which triggered by auto subscribe and need retry subscribe
69     void GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries) const;
70 
71     // remove query when receive unsubscribe command
72     void RemoveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query);
73 
74     // remove query when user call UnSubscribeRemoteQuery
75     void RemoveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query);
76 
77     // get device active subscribeQueries from localSubscribeMap_
78     void GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries) const;
79 
80     // get device remote queryId from remoteSubscribedMap_ while data change
81     void GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds) const;
82     // get device remote subscribeQueries from remoteSubscribedMap_ while data change
83     void GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries) const;
84 
85     bool IsLastRemoteContainSubscribe(const std::string &device, const std::string &queryId) const;
86 
87     int LocalSubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const;
88 
89     bool IsQueryExistSubscribe(const std::string &queryId) const;
90 private:
91     void ClearSubscribeQuery(const std::string &device, SubscribeMap &subscribeMap,
92         SubscribedTotalMap &subscribedTotalMap);
93 
94     int ReserveSubscribeQuery(const std::string &device, const QuerySyncObject &query, SubscribeMap &subscribeMap,
95         SubscribedTotalMap &subscribedTotalMap);
96 
97     int ActiveSubscribeQuery(const std::string &device, const std::string &queryId, SubscribeMap &subscribeMap,
98         SubscribedTotalMap &subscribedTotalMap);
99 
100     void DeleteSubscribeQuery(const std::string &device, const std::string &queryId, SubscribeMap &subscribeMap,
101         SubscribedTotalMap &subscribedTotalMap);
102 
103     void RemoveSubscribeQuery(const std::string &device, const std::string &queryId, SubscribeMap &subscribeMap,
104         SubscribedTotalMap &subscribedTotalMap);
105 
106     void GetSubscribeQueries(const std::string &device, const SubscribeMap &subscribeMap,
107         const SubscribedTotalMap &subscribedTotalMap, std::vector<QuerySyncObject> &subscribeQueries) const;
108 
109     mutable std::shared_mutex localSubscribeMapLock_;
110     // subscribe sponsor, key: device, value: pair<queryId, status> map
111     // status 0: active, 1: not active
112     SubscribeMap localSubscribeMap_;
113 
114     // used retry subscribe in db open scene, key: device value: set<queryId>
115     std::map<std::string, std::set<std::string>> unFinishedLocalAutoSubMap_;
116 
117     // subscribe sponsor total query info, key:queryId, value:<QuerySyncObject, user_num>
118     // while use_num is 0, delete item from the map
119     SubscribedTotalMap localSubscribeTotalMap_;
120 
121     mutable std::shared_mutex remoteSubscribedMapLock_;
122     // subscribed, key: device, value: pair<queryId, status> map
123     // status 0: active, 1: not active
124     SubscribeMap remoteSubscribedMap_;
125 
126     // subscribed total query info, key:queryId, value:<QuerySyncObject, user_num>
127     // while use_num is 0, delete item from the map
128     SubscribedTotalMap remoteSubscribedTotalMap_;
129 };
130 } // namespace DistributedDB
131 
132 #endif // SINGLE_VER_SUBSCRIBE_MANAGER_H