1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef GENRIC_SYNCER_H
17 #define GENRIC_SYNCER_H
18 
19 #include <functional>
20 #include <mutex>
21 #include <map>
22 
23 #include "isyncer.h"
24 #include "isync_engine.h"
25 #include "meta_data.h"
26 #include "sync_operation.h"
27 #include "time_helper.h"
28 
29 namespace DistributedDB {
30 class GenericSyncer : public virtual ISyncer {
31 using DataChangedFunc = std::function<void(const std::string &device)>;
32 
33 public:
34     GenericSyncer();
35     ~GenericSyncer() override;
36 
37     // Init the Syncer modules
38     int Initialize(ISyncInterface *syncInterface, bool isNeedActive) override;
39 
40     // Close
41     int Close(bool isClosedOperation) override;
42 
43     // Sync function.
44     // param devices: The device id list.
45     // param mode: Sync mode, see SyncMode.
46     // param onComplete: The syncer finish callback. set by caller
47     // param onFinalize: will be callback when this Sync Operation finalized.
48     // return a Sync id. It will return a positive value if failed,
49     int Sync(const std::vector<std::string> &devices, int mode,
50         const std::function<void(const std::map<std::string, int> &)> &onComplete,
51         const std::function<void(void)> &onFinalize, bool wait) override;
52 
53     // Sync function. use SyncParma to reduce parameter.
54     int Sync(const SyncParma &param);
55 
56     int Sync(const SyncParma &param, uint64_t connectionId) override;
57 
58     // Remove the operation, with the given syncId, used to clean resource if sync finished or failed.
59     int RemoveSyncOperation(int syncId) override;
60 
61     int StopSync(uint64_t connectionId) override;
62 
63     // Get The current virtual timestamp
64     uint64_t GetTimestamp() override;
65 
66     // Get manual sync queue size
67     int GetQueuedSyncSize(int *queuedSyncSize) const override;
68 
69     // Set manual sync queue limit
70     int SetQueuedSyncLimit(const int *queuedSyncLimit) override;
71 
72     // Get manual sync queue limit
73     int GetQueuedSyncLimit(int *queuedSyncLimit) const override;
74 
75     // Disable add new manual sync, for rekey
76     int DisableManualSync(void) override;
77 
78     // Enable add new manual sync, for rekey
79     int EnableManualSync(void) override;
80 
81     // Get local deviceId, is hashed
82     int GetLocalIdentity(std::string &outTarget) const override;
83 
84     // Set Manual Sync retry config
85     int SetSyncRetry(bool isRetry) override;
86 
87     // Set an equal identifier for this database, After this called, send msg to the target will use this identifier
88     int SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets) override;
89 
90     // Inner function, Used for subscribe sync
91     int Sync(const InternalSyncParma &param);
92 
93     // Remote data changed callback
94     virtual void RemoteDataChanged(const std::string &device) = 0;
95 
96     virtual void RemoteDeviceOffline(const std::string &device) = 0;
97 
98     void Dump(int fd) override;
99 
100     SyncerBasicInfo DumpSyncerBasicInfo() override;
101 
102     int RemoteQuery(const std::string &device, const RemoteCondition &condition,
103         uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result) override;
104 
105     int GetSyncDataSize(const std::string &device, size_t &size) const override;
106 
107     int GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const override;
108 
109     int GetWatermarkInfo(const std::string &device, WatermarkInfo &info) override;
110 
111     int UpgradeSchemaVerInMeta() override;
112 
113     void ResetSyncStatus() override;
114 
115     int64_t GetLocalTimeOffset() override;
116 
117     int32_t GetTaskCount() override;
118 protected:
119 
120     // trigger query auto sync or auto subscribe
121     // trigger auto subscribe only when subscribe task is failed triggered by remote db opened
122     // it won't be triggered again when subscribe task success
123     virtual void QueryAutoSync(const InternalSyncParma &param);
124 
125     // Create a sync engine, if has memory error, will return nullptr.
126     virtual ISyncEngine *CreateSyncEngine() = 0;
127 
128     virtual int PrepareSync(const SyncParma &param, uint32_t syncId, uint64_t connectionId);
129 
130     // Add a Sync Operation, after call this function, the operation will be start
131     virtual void AddSyncOperation(ISyncEngine *engine, SyncOperation *operation);
132 
133     // Used to set to the SyncOperation Onkill
134     virtual void SyncOperationKillCallbackInner(int syncId);
135 
136     // Used to set to the SyncOperation Onkill
137     void SyncOperationKillCallback(int syncId);
138 
139     // Init the metadata
140     int InitMetaData(ISyncInterface *syncInterface);
141 
142     // Init the TimeHelper
143     int InitTimeHelper(ISyncInterface *syncInterface);
144 
145     // Init the Sync engine
146     virtual int InitSyncEngine(ISyncInterface *syncInterface);
147 
148     int CheckSyncActive(ISyncInterface *syncInterface, bool isNeedActive);
149 
150     // Used to general a sync id, maybe it is currentSyncId++;
151     // The return value is sync id.
152     uint32_t GenerateSyncId();
153 
154     // Check if the mode arg is valid
155     bool IsValidMode(int mode) const;
156 
157     virtual int SyncConditionCheck(const SyncParma &param, const ISyncEngine *engine, ISyncInterface *storage) const;
158 
159     // Check if the devices arg is valid
160     bool IsValidDevices(const std::vector<std::string> &devices) const;
161 
162     // Used Clear all SyncOperations.
163     // isClosedOperation is false while userChanged
164     void ClearSyncOperations(bool isClosedOperation);
165 
166     void ClearInnerResource(bool isClosedOperation);
167 
168     void TriggerSyncFinished(SyncOperation *operation);
169 
170     // Callback when the special sync finished.
171     void OnSyncFinished(int syncId);
172 
173     bool IsManualSync(int inMode) const;
174 
175     virtual int AddQueuedManualSyncSize(int mode, bool wait);
176 
177     bool IsQueuedManualSyncFull(int mode, bool wait) const;
178 
179     void SubQueuedSyncSize(void);
180 
181     void GetOnlineDevices(std::vector<std::string> &devices) const;
182 
183     std::string GetSyncDevicesStr(const std::vector<std::string> &devices) const;
184 
185     void InitSyncOperation(SyncOperation *operation, const SyncParma &param);
186 
187     int StatusCheck() const;
188 
189     int SyncPreCheck(const SyncParma &param) const;
190 
191     int BuildSyncEngine();
192 
193     int InitTimeChangedListener();
194 
195     void ReleaseInnerResource();
196 
197     void RecordTimeChangeOffset(void *changedOffset);
198 
199     int CloseInner(bool isClosedOperation);
200 
201     int InitStorageResource(ISyncInterface *syncInterface);
202 
203     void ResetTimeSyncMarkByTimeChange(std::shared_ptr<Metadata> &metadata, ISyncInterface &storage);
204 
205     static int SyncModuleInit();
206 
207     static int SyncResourceInit();
208 
209     static bool IsNeedActive(ISyncInterface *syncInterface);
210 
211     static const int MIN_VALID_SYNC_ID;
212     static std::mutex moduleInitLock_;
213 
214     // Used to general the next sync id.
215     static int currentSyncId_;
216     static std::mutex syncIdLock_;
217     // For sync in progress.
218     std::map<uint64_t, std::list<int>> connectionIdMap_;
219     std::map<int, uint64_t> syncIdMap_;
220 
221     ISyncEngine *syncEngine_;
222     ISyncInterface *syncInterface_;
223     std::shared_ptr<TimeHelper> timeHelper_;
224     std::shared_ptr<Metadata> metadata_;
225     bool initialized_;
226     std::mutex operationMapLock_;
227     std::map<int, SyncOperation *> syncOperationMap_;
228     int queuedManualSyncSize_;
229     int queuedManualSyncLimit_;
230     bool manualSyncEnable_;
231     bool closing_;
232     mutable std::mutex queuedManualSyncLock_;
233     mutable std::mutex syncerLock_;
234     std::string label_;
235     std::mutex engineMutex_;
236     bool engineFinalize_;
237     std::condition_variable engineFinalizeCv_;
238 
239     std::mutex timeChangeListenerMutex_;
240     bool timeChangeListenerFinalize_;
241     std::condition_variable timeChangeCv_;
242     NotificationChain::Listener *timeChangedListener_;
243 };
244 } // namespace DistributedDB
245 
246 #endif  // GENRIC_SYNCER_H
247