1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SINGLE_VER_SUBSCRIBE_MANAGER_H 17 #define SINGLE_VER_SUBSCRIBE_MANAGER_H 18 19 #include <map> 20 #include <shared_mutex> 21 #include "query_sync_object.h" 22 23 namespace DistributedDB { 24 enum class SubscribeStatus { 25 NOT_ACTIVE = 0, 26 ACTIVE = 1, 27 }; 28 29 using SubscribeMap = std::map<std::string, std::map<std::string, SubscribeStatus>>; 30 using SubscribedTotalMap = std::map<std::string, std::pair<QuerySyncObject, int>>; 31 32 class SubscribeManager { 33 public: 34 SubscribeManager() = default; 35 ~SubscribeManager() = default; 36 37 DISABLE_COPY_ASSIGN_MOVE(SubscribeManager); 38 39 // clear remoteSubscribeMap_[device] list when remote db is closed or dev offline. 40 void ClearRemoteSubscribeQuery(const std::string &device); 41 42 // clear localSubscribeMap_[device] list when device is offline. 43 void ClearLocalSubscribeQuery(const std::string &device); 44 45 void ClearAllRemoteQuery(); 46 47 // add query when receive subscribe command 48 int ReserveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query); 49 50 // active query to ACTIVE when send ack ok 51 int ActiveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query); 52 53 // reserve query when user call SubscribeRemoteQuery, status set to NOT_ACTIVE 54 int ReserveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query); 55 56 // active query to ACTIVE when receive ack ok 57 int ActiveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query); 58 59 // delete local subscribe query when recv wrong errCode, only not_active status allowed to del 60 void DeleteLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query); 61 62 // delete remote subscribe query when send msg failed, only not_active status allowed to del 63 void DeleteRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query); 64 65 // put subscribe queries into unfinished map when remote db online 66 void PutLocalUnFinishedSubQueries(const std::string &device, const std::vector<QuerySyncObject> &subscribeQueries); 67 68 // get all device unFinished subscribe queries which triggered by auto subscribe and need retry subscribe 69 void GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries) const; 70 71 // remove query when receive unsubscribe command 72 void RemoveRemoteSubscribeQuery(const std::string &device, const QuerySyncObject &query); 73 74 // remove query when user call UnSubscribeRemoteQuery 75 void RemoveLocalSubscribeQuery(const std::string &device, const QuerySyncObject &query); 76 77 // get device active subscribeQueries from localSubscribeMap_ 78 void GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries) const; 79 80 // get device remote queryId from remoteSubscribedMap_ while data change 81 void GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds) const; 82 // get device remote subscribeQueries from remoteSubscribedMap_ while data change 83 void GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries) const; 84 85 bool IsLastRemoteContainSubscribe(const std::string &device, const std::string &queryId) const; 86 87 int LocalSubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const; 88 89 bool IsQueryExistSubscribe(const std::string &queryId) const; 90 private: 91 void ClearSubscribeQuery(const std::string &device, SubscribeMap &subscribeMap, 92 SubscribedTotalMap &subscribedTotalMap); 93 94 int ReserveSubscribeQuery(const std::string &device, const QuerySyncObject &query, SubscribeMap &subscribeMap, 95 SubscribedTotalMap &subscribedTotalMap); 96 97 int ActiveSubscribeQuery(const std::string &device, const std::string &queryId, SubscribeMap &subscribeMap, 98 SubscribedTotalMap &subscribedTotalMap); 99 100 void DeleteSubscribeQuery(const std::string &device, const std::string &queryId, SubscribeMap &subscribeMap, 101 SubscribedTotalMap &subscribedTotalMap); 102 103 void RemoveSubscribeQuery(const std::string &device, const std::string &queryId, SubscribeMap &subscribeMap, 104 SubscribedTotalMap &subscribedTotalMap); 105 106 void GetSubscribeQueries(const std::string &device, const SubscribeMap &subscribeMap, 107 const SubscribedTotalMap &subscribedTotalMap, std::vector<QuerySyncObject> &subscribeQueries) const; 108 109 mutable std::shared_mutex localSubscribeMapLock_; 110 // subscribe sponsor, key: device, value: pair<queryId, status> map 111 // status 0: active, 1: not active 112 SubscribeMap localSubscribeMap_; 113 114 // used retry subscribe in db open scene, key: device value: set<queryId> 115 std::map<std::string, std::set<std::string>> unFinishedLocalAutoSubMap_; 116 117 // subscribe sponsor total query info, key:queryId, value:<QuerySyncObject, user_num> 118 // while use_num is 0, delete item from the map 119 SubscribedTotalMap localSubscribeTotalMap_; 120 121 mutable std::shared_mutex remoteSubscribedMapLock_; 122 // subscribed, key: device, value: pair<queryId, status> map 123 // status 0: active, 1: not active 124 SubscribeMap remoteSubscribedMap_; 125 126 // subscribed total query info, key:queryId, value:<QuerySyncObject, user_num> 127 // while use_num is 0, delete item from the map 128 SubscribedTotalMap remoteSubscribedTotalMap_; 129 }; 130 } // namespace DistributedDB 131 132 #endif // SINGLE_VER_SUBSCRIBE_MANAGER_H