1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SYNC_ENGINE_H 17 #define SYNC_ENGINE_H 18 19 #include <map> 20 #include <mutex> 21 #include <queue> 22 23 #include "communicator_proxy.h" 24 #include "device_manager.h" 25 #include "isync_engine.h" 26 #include "isync_task_context.h" 27 #include "remote_executor.h" 28 #include "subscribe_manager.h" 29 #include "task_pool.h" 30 31 namespace DistributedDB { 32 33 class SyncEngine : public ISyncEngine { 34 public: 35 SyncEngine(); 36 ~SyncEngine() override; 37 38 // Do some init things 39 int Initialize(ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metadata, 40 const InitCallbackParam &callbackParam) override; 41 42 // Do some things, when db close. 43 int Close() override; 44 45 // Alloc and Add sync SyncTarget 46 // return E_OK if operator success. 47 int AddSyncOperation(SyncOperation *operation) override; 48 49 // Clear the SyncTarget matched the syncId. 50 void RemoveSyncOperation(int syncId) override; 51 52 #ifndef OMIT_MULTI_VER 53 // notify other devices data has changed 54 void BroadCastDataChanged() const override; 55 #endif 56 57 // Get Online devices 58 void GetOnlineDevices(std::vector<std::string> &devices) const override; 59 60 // Register the device connect callback, this function must be called after Engine inited 61 void StartCommunicator() override; 62 63 // Get the queue cache memory size 64 int GetQueueCacheSize() const; 65 66 // Get the number of message which is discarded 67 unsigned int GetDiscardMsgNum() const; 68 69 // Get the maximum of executing message number 70 unsigned int GetMaxExecNum() const; 71 72 // Set the maximum of queue cache memory size 73 void SetMaxQueueCacheSize(int value); 74 75 std::string GetLabel() const override; 76 77 bool GetSyncRetry() const; 78 void SetSyncRetry(bool isRetry) override; 79 80 // Set an equal identifier for this database, After this called, send msg to the target will use this identifier 81 int SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets) override; 82 83 void SetEqualIdentifier() override; 84 85 void SetEqualIdentifierMap(const std::string &identifier, const std::vector<std::string> &targets) override; 86 87 void OfflineHandleByDevice(const std::string &deviceId, ISyncInterface *storage); 88 89 void GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries); 90 91 // subscribeQueries item is queryId 92 void GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds); 93 94 void GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries); 95 96 void PutUnfinishedSubQueries(const std::string &device, const std::vector<QuerySyncObject> &subscribeQueries); 97 98 void GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries); 99 100 // used by SingleVerSyncer when db online 101 int StartAutoSubscribeTimer(const ISyncInterface &syncInterface) override; 102 103 // used by SingleVerSyncer when remote/local db closed 104 void StopAutoSubscribeTimer() override; 105 106 int SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const override; 107 108 bool IsEngineActive() const override; 109 110 void SchemaChange() override; 111 112 void Dump(int fd) override; 113 114 int RemoteQuery(const std::string &device, const RemoteCondition &condition, 115 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result) override; 116 117 void NotifyConnectionClosed(uint64_t connectionId) override; 118 119 void NotifyUserChange() override; 120 121 void AbortMachineIfNeed(uint32_t syncId) override; 122 123 void AddSubscribe(SyncGenericInterface *storage, 124 const std::map<std::string, std::vector<QuerySyncObject>> &subscribeQuery) override; 125 126 void TimeChange() override; 127 128 int32_t GetResponseTaskCount() override; 129 protected: 130 // Create a context 131 virtual ISyncTaskContext *CreateSyncTaskContext(const ISyncInterface &syncInterface) = 0; 132 133 // Find SyncTaskContext from the map 134 ISyncTaskContext *FindSyncTaskContext(const std::string &deviceId); 135 ISyncTaskContext *GetSyncTaskContextAndInc(const std::string &deviceId); 136 void GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam); 137 void GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam); 138 139 void ClearSyncInterface(); 140 ISyncInterface *GetAndIncSyncInterface(); 141 void SetSyncInterface(ISyncInterface *syncInterface); 142 143 ISyncTaskContext *GetSyncTaskContext(const std::string &deviceId, int &errCode); 144 145 std::mutex storageMutex_; 146 ISyncInterface *syncInterface_; 147 // Used to store all send sync task infos (such as pull sync response, and push sync request) 148 std::map<std::string, ISyncTaskContext *> syncTaskContextMap_; 149 std::mutex contextMapLock_; 150 std::shared_ptr<SubscribeManager> subManager_; 151 std::function<void(const InternalSyncParma ¶m)> queryAutoSyncCallback_; 152 153 private: 154 155 // Init DeviceManager set callback and remoteExecutor 156 int InitInnerSource(const std::function<void(std::string)> &onRemoteDataChanged, 157 const std::function<void(std::string)> &offlineChanged, ISyncInterface *syncInterface); 158 159 // Init Comunicator, register callbacks 160 int InitComunicator(const ISyncInterface *syncInterface); 161 162 // Add the sync task info to the map. 163 int AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation); 164 165 // Sync Request CallbackTask run at a sub thread. 166 void MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg); 167 168 void RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg); 169 170 void ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator); 171 172 // wrapper of MessageReciveCallbackTask 173 void MessageReciveCallback(const std::string &targetDev, Message *inMsg); 174 175 // Sync Request Callback 176 int MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg); 177 178 // Exec the given SyncTarget. and callback onComplete. 179 int ExecSyncTask(ISyncTaskContext *context); 180 181 // Anti-DOS attack 182 void PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize); 183 184 // Get message size 185 int GetMsgSize(const Message *inMsg) const; 186 187 // Do not run MessageReceiveCallbackTask until msgQueue is empty 188 int DealMsgUtilQueueEmpty(); 189 190 // Handle message in order. 191 int ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg); 192 193 ISyncTaskContext *GetContextForMsg(const std::string &targetDev, int &errCode); 194 195 ICommunicator *AllocCommunicator(const std::string &identifier, int &errCode); 196 197 void UnRegCommunicatorsCallback(); 198 199 void ReleaseCommunicators(); 200 201 bool IsSkipCalculateLen(const Message *inMsg); 202 203 void ClearInnerResource(); 204 205 void IncExecTaskCount(); 206 207 void DecExecTaskCount(); 208 209 RemoteExecutor *GetAndIncRemoteExector(); 210 211 void SetRemoteExector(RemoteExecutor *executor); 212 213 bool CheckDeviceIdValid(const std::string &checkDeviceId, const std::string &localDeviceId); 214 215 int GetLocalDeviceId(std::string &deviceId); 216 217 void WaitingExecTaskExist(); 218 219 int HandleRemoteExecutorMsg(const std::string &targetDev, Message *inMsg); 220 221 void AddQuerySubscribe(SyncGenericInterface *storage, const std::string &device, const QuerySyncObject &query); 222 223 ICommunicator *communicator_; 224 DeviceManager *deviceManager_; 225 std::function<void(const std::string &)> onRemoteDataChanged_; 226 std::function<void(const std::string &)> offlineChanged_; 227 std::shared_ptr<Metadata> metadata_; 228 std::deque<Message *> msgQueue_; 229 volatile uint32_t execTaskCount_; 230 std::string label_; 231 volatile bool isSyncRetry_; 232 std::mutex communicatorProxyLock_; 233 CommunicatorProxy *communicatorProxy_; 234 std::mutex equalCommunicatorsLock_; 235 std::map<std::string, ICommunicator *> equalCommunicators_; 236 237 static int queueCacheSize_; 238 static int maxQueueCacheSize_; 239 static unsigned int discardMsgNum_; 240 static const unsigned int MAX_EXEC_NUM = 7; // Set the maximum of threads as 6 < 7 241 static constexpr int DEFAULT_CACHE_SIZE = 160 * 1024 * 1024; // Initial the default cache size of queue as 160MB 242 static std::mutex queueLock_; 243 std::atomic<bool> isActive_; 244 245 // key: device value: equalIdentifier 246 std::map<std::string, std::string> equalIdentifierMap_; 247 std::mutex execTaskCountLock_; 248 std::condition_variable execTaskCv_; 249 250 std::mutex remoteExecutorLock_; 251 RemoteExecutor *remoteExecutor_; 252 }; 253 } // namespace DistributedDB 254 255 #endif // SYNC_ENGINE_H 256