1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SYNC_ENGINE_H
17 #define SYNC_ENGINE_H
18 
19 #include <map>
20 #include <mutex>
21 #include <queue>
22 
23 #include "communicator_proxy.h"
24 #include "device_manager.h"
25 #include "isync_engine.h"
26 #include "isync_task_context.h"
27 #include "remote_executor.h"
28 #include "subscribe_manager.h"
29 #include "task_pool.h"
30 
31 namespace DistributedDB {
32 
33 class SyncEngine : public ISyncEngine {
34 public:
35     SyncEngine();
36     ~SyncEngine() override;
37 
38     // Do some init things
39     int Initialize(ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metadata,
40         const InitCallbackParam &callbackParam) override;
41 
42     // Do some things, when db close.
43     int Close() override;
44 
45     // Alloc and Add sync SyncTarget
46     // return E_OK if operator success.
47     int AddSyncOperation(SyncOperation *operation) override;
48 
49     // Clear the SyncTarget matched the syncId.
50     void RemoveSyncOperation(int syncId) override;
51 
52 #ifndef OMIT_MULTI_VER
53     // notify other devices data has changed
54     void BroadCastDataChanged() const override;
55 #endif
56 
57     // Get Online devices
58     void GetOnlineDevices(std::vector<std::string> &devices) const override;
59 
60     // Register the device connect callback, this function must be called after Engine inited
61     void StartCommunicator() override;
62 
63     // Get the queue cache memory size
64     int GetQueueCacheSize() const;
65 
66     // Get the number of message which is discarded
67     unsigned int GetDiscardMsgNum() const;
68 
69     // Get the maximum of executing message number
70     unsigned int GetMaxExecNum() const;
71 
72     // Set the maximum of queue cache memory size
73     void SetMaxQueueCacheSize(int value);
74 
75     std::string GetLabel() const override;
76 
77     bool GetSyncRetry() const;
78     void SetSyncRetry(bool isRetry) override;
79 
80     // Set an equal identifier for this database, After this called, send msg to the target will use this identifier
81     int SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets) override;
82 
83     void SetEqualIdentifier() override;
84 
85     void SetEqualIdentifierMap(const std::string &identifier, const std::vector<std::string> &targets) override;
86 
87     void OfflineHandleByDevice(const std::string &deviceId, ISyncInterface *storage);
88 
89     void GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries);
90 
91     // subscribeQueries item is queryId
92     void GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds);
93 
94     void GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries);
95 
96     void PutUnfinishedSubQueries(const std::string &device, const std::vector<QuerySyncObject> &subscribeQueries);
97 
98     void GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries);
99 
100     // used by SingleVerSyncer when db online
101     int StartAutoSubscribeTimer(const ISyncInterface &syncInterface) override;
102 
103     // used by SingleVerSyncer when remote/local db closed
104     void StopAutoSubscribeTimer() override;
105 
106     int SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const override;
107 
108     bool IsEngineActive() const override;
109 
110     void SchemaChange() override;
111 
112     void Dump(int fd) override;
113 
114     int RemoteQuery(const std::string &device, const RemoteCondition &condition,
115         uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result) override;
116 
117     void NotifyConnectionClosed(uint64_t connectionId) override;
118 
119     void NotifyUserChange() override;
120 
121     void AbortMachineIfNeed(uint32_t syncId) override;
122 
123     void AddSubscribe(SyncGenericInterface *storage,
124         const std::map<std::string, std::vector<QuerySyncObject>> &subscribeQuery) override;
125 
126     void TimeChange() override;
127 
128     int32_t GetResponseTaskCount() override;
129 protected:
130     // Create a context
131     virtual ISyncTaskContext *CreateSyncTaskContext(const ISyncInterface &syncInterface) = 0;
132 
133     // Find SyncTaskContext from the map
134     ISyncTaskContext *FindSyncTaskContext(const std::string &deviceId);
135     ISyncTaskContext *GetSyncTaskContextAndInc(const std::string &deviceId);
136     void GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam);
137     void GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam);
138 
139     void ClearSyncInterface();
140     ISyncInterface *GetAndIncSyncInterface();
141     void SetSyncInterface(ISyncInterface *syncInterface);
142 
143     ISyncTaskContext *GetSyncTaskContext(const std::string &deviceId, int &errCode);
144 
145     std::mutex storageMutex_;
146     ISyncInterface *syncInterface_;
147     // Used to store all send sync task infos (such as pull sync response, and push sync request)
148     std::map<std::string, ISyncTaskContext *> syncTaskContextMap_;
149     std::mutex contextMapLock_;
150     std::shared_ptr<SubscribeManager> subManager_;
151     std::function<void(const InternalSyncParma &param)> queryAutoSyncCallback_;
152 
153 private:
154 
155     // Init DeviceManager set callback and remoteExecutor
156     int InitInnerSource(const std::function<void(std::string)> &onRemoteDataChanged,
157         const std::function<void(std::string)> &offlineChanged, ISyncInterface *syncInterface);
158 
159     // Init Comunicator, register callbacks
160     int InitComunicator(const ISyncInterface *syncInterface);
161 
162     // Add the sync task info to the map.
163     int AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation);
164 
165     // Sync Request CallbackTask run at a sub thread.
166     void MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg);
167 
168     void RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg);
169 
170     void ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator);
171 
172     // wrapper of MessageReciveCallbackTask
173     void MessageReciveCallback(const std::string &targetDev, Message *inMsg);
174 
175     // Sync Request Callback
176     int MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg);
177 
178     // Exec the given SyncTarget. and callback onComplete.
179     int ExecSyncTask(ISyncTaskContext *context);
180 
181     // Anti-DOS attack
182     void PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize);
183 
184     // Get message size
185     int GetMsgSize(const Message *inMsg) const;
186 
187     // Do not run MessageReceiveCallbackTask until msgQueue is empty
188     int DealMsgUtilQueueEmpty();
189 
190     // Handle message in order.
191     int ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg);
192 
193     ISyncTaskContext *GetContextForMsg(const std::string &targetDev, int &errCode);
194 
195     ICommunicator *AllocCommunicator(const std::string &identifier, int &errCode);
196 
197     void UnRegCommunicatorsCallback();
198 
199     void ReleaseCommunicators();
200 
201     bool IsSkipCalculateLen(const Message *inMsg);
202 
203     void ClearInnerResource();
204 
205     void IncExecTaskCount();
206 
207     void DecExecTaskCount();
208 
209     RemoteExecutor *GetAndIncRemoteExector();
210 
211     void SetRemoteExector(RemoteExecutor *executor);
212 
213     bool CheckDeviceIdValid(const std::string &checkDeviceId, const std::string &localDeviceId);
214 
215     int GetLocalDeviceId(std::string &deviceId);
216 
217     void WaitingExecTaskExist();
218 
219     int HandleRemoteExecutorMsg(const std::string &targetDev, Message *inMsg);
220 
221     void AddQuerySubscribe(SyncGenericInterface *storage, const std::string &device, const QuerySyncObject &query);
222 
223     ICommunicator *communicator_;
224     DeviceManager *deviceManager_;
225     std::function<void(const std::string &)> onRemoteDataChanged_;
226     std::function<void(const std::string &)> offlineChanged_;
227     std::shared_ptr<Metadata> metadata_;
228     std::deque<Message *> msgQueue_;
229     volatile uint32_t execTaskCount_;
230     std::string label_;
231     volatile bool isSyncRetry_;
232     std::mutex communicatorProxyLock_;
233     CommunicatorProxy *communicatorProxy_;
234     std::mutex equalCommunicatorsLock_;
235     std::map<std::string, ICommunicator *> equalCommunicators_;
236 
237     static int queueCacheSize_;
238     static int maxQueueCacheSize_;
239     static unsigned int discardMsgNum_;
240     static const unsigned int MAX_EXEC_NUM = 7; // Set the maximum of threads as 6 < 7
241     static constexpr int DEFAULT_CACHE_SIZE = 160 * 1024 * 1024; // Initial the default cache size of queue as 160MB
242     static std::mutex queueLock_;
243     std::atomic<bool> isActive_;
244 
245     // key: device value: equalIdentifier
246     std::map<std::string, std::string> equalIdentifierMap_;
247     std::mutex execTaskCountLock_;
248     std::condition_variable execTaskCv_;
249 
250     std::mutex remoteExecutorLock_;
251     RemoteExecutor *remoteExecutor_;
252 };
253 } // namespace DistributedDB
254 
255 #endif // SYNC_ENGINE_H
256