1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef GENRIC_SYNCER_H 17 #define GENRIC_SYNCER_H 18 19 #include <functional> 20 #include <mutex> 21 #include <map> 22 23 #include "isyncer.h" 24 #include "isync_engine.h" 25 #include "meta_data.h" 26 #include "sync_operation.h" 27 #include "time_helper.h" 28 29 namespace DistributedDB { 30 class GenericSyncer : public virtual ISyncer { 31 using DataChangedFunc = std::function<void(const std::string &device)>; 32 33 public: 34 GenericSyncer(); 35 ~GenericSyncer() override; 36 37 // Init the Syncer modules 38 int Initialize(ISyncInterface *syncInterface, bool isNeedActive) override; 39 40 // Close 41 int Close(bool isClosedOperation) override; 42 43 // Sync function. 44 // param devices: The device id list. 45 // param mode: Sync mode, see SyncMode. 46 // param onComplete: The syncer finish callback. set by caller 47 // param onFinalize: will be callback when this Sync Operation finalized. 48 // return a Sync id. It will return a positive value if failed, 49 int Sync(const std::vector<std::string> &devices, int mode, 50 const std::function<void(const std::map<std::string, int> &)> &onComplete, 51 const std::function<void(void)> &onFinalize, bool wait) override; 52 53 // Sync function. use SyncParma to reduce parameter. 54 int Sync(const SyncParma ¶m); 55 56 int Sync(const SyncParma ¶m, uint64_t connectionId) override; 57 58 // Remove the operation, with the given syncId, used to clean resource if sync finished or failed. 59 int RemoveSyncOperation(int syncId) override; 60 61 int StopSync(uint64_t connectionId) override; 62 63 // Get The current virtual timestamp 64 uint64_t GetTimestamp() override; 65 66 // Get manual sync queue size 67 int GetQueuedSyncSize(int *queuedSyncSize) const override; 68 69 // Set manual sync queue limit 70 int SetQueuedSyncLimit(const int *queuedSyncLimit) override; 71 72 // Get manual sync queue limit 73 int GetQueuedSyncLimit(int *queuedSyncLimit) const override; 74 75 // Disable add new manual sync, for rekey 76 int DisableManualSync(void) override; 77 78 // Enable add new manual sync, for rekey 79 int EnableManualSync(void) override; 80 81 // Get local deviceId, is hashed 82 int GetLocalIdentity(std::string &outTarget) const override; 83 84 // Set Manual Sync retry config 85 int SetSyncRetry(bool isRetry) override; 86 87 // Set an equal identifier for this database, After this called, send msg to the target will use this identifier 88 int SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets) override; 89 90 // Inner function, Used for subscribe sync 91 int Sync(const InternalSyncParma ¶m); 92 93 // Remote data changed callback 94 virtual void RemoteDataChanged(const std::string &device) = 0; 95 96 virtual void RemoteDeviceOffline(const std::string &device) = 0; 97 98 void Dump(int fd) override; 99 100 SyncerBasicInfo DumpSyncerBasicInfo() override; 101 102 int RemoteQuery(const std::string &device, const RemoteCondition &condition, 103 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result) override; 104 105 int GetSyncDataSize(const std::string &device, size_t &size) const override; 106 107 int GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const override; 108 109 int GetWatermarkInfo(const std::string &device, WatermarkInfo &info) override; 110 111 int UpgradeSchemaVerInMeta() override; 112 113 void ResetSyncStatus() override; 114 115 int64_t GetLocalTimeOffset() override; 116 117 int32_t GetTaskCount() override; 118 protected: 119 120 // trigger query auto sync or auto subscribe 121 // trigger auto subscribe only when subscribe task is failed triggered by remote db opened 122 // it won't be triggered again when subscribe task success 123 virtual void QueryAutoSync(const InternalSyncParma ¶m); 124 125 // Create a sync engine, if has memory error, will return nullptr. 126 virtual ISyncEngine *CreateSyncEngine() = 0; 127 128 virtual int PrepareSync(const SyncParma ¶m, uint32_t syncId, uint64_t connectionId); 129 130 // Add a Sync Operation, after call this function, the operation will be start 131 virtual void AddSyncOperation(ISyncEngine *engine, SyncOperation *operation); 132 133 // Used to set to the SyncOperation Onkill 134 virtual void SyncOperationKillCallbackInner(int syncId); 135 136 // Used to set to the SyncOperation Onkill 137 void SyncOperationKillCallback(int syncId); 138 139 // Init the metadata 140 int InitMetaData(ISyncInterface *syncInterface); 141 142 // Init the TimeHelper 143 int InitTimeHelper(ISyncInterface *syncInterface); 144 145 // Init the Sync engine 146 virtual int InitSyncEngine(ISyncInterface *syncInterface); 147 148 int CheckSyncActive(ISyncInterface *syncInterface, bool isNeedActive); 149 150 // Used to general a sync id, maybe it is currentSyncId++; 151 // The return value is sync id. 152 uint32_t GenerateSyncId(); 153 154 // Check if the mode arg is valid 155 bool IsValidMode(int mode) const; 156 157 virtual int SyncConditionCheck(const SyncParma ¶m, const ISyncEngine *engine, ISyncInterface *storage) const; 158 159 // Check if the devices arg is valid 160 bool IsValidDevices(const std::vector<std::string> &devices) const; 161 162 // Used Clear all SyncOperations. 163 // isClosedOperation is false while userChanged 164 void ClearSyncOperations(bool isClosedOperation); 165 166 void ClearInnerResource(bool isClosedOperation); 167 168 void TriggerSyncFinished(SyncOperation *operation); 169 170 // Callback when the special sync finished. 171 void OnSyncFinished(int syncId); 172 173 bool IsManualSync(int inMode) const; 174 175 virtual int AddQueuedManualSyncSize(int mode, bool wait); 176 177 bool IsQueuedManualSyncFull(int mode, bool wait) const; 178 179 void SubQueuedSyncSize(void); 180 181 void GetOnlineDevices(std::vector<std::string> &devices) const; 182 183 std::string GetSyncDevicesStr(const std::vector<std::string> &devices) const; 184 185 void InitSyncOperation(SyncOperation *operation, const SyncParma ¶m); 186 187 int StatusCheck() const; 188 189 int SyncPreCheck(const SyncParma ¶m) const; 190 191 int BuildSyncEngine(); 192 193 int InitTimeChangedListener(); 194 195 void ReleaseInnerResource(); 196 197 void RecordTimeChangeOffset(void *changedOffset); 198 199 int CloseInner(bool isClosedOperation); 200 201 int InitStorageResource(ISyncInterface *syncInterface); 202 203 void ResetTimeSyncMarkByTimeChange(std::shared_ptr<Metadata> &metadata, ISyncInterface &storage); 204 205 static int SyncModuleInit(); 206 207 static int SyncResourceInit(); 208 209 static bool IsNeedActive(ISyncInterface *syncInterface); 210 211 static const int MIN_VALID_SYNC_ID; 212 static std::mutex moduleInitLock_; 213 214 // Used to general the next sync id. 215 static int currentSyncId_; 216 static std::mutex syncIdLock_; 217 // For sync in progress. 218 std::map<uint64_t, std::list<int>> connectionIdMap_; 219 std::map<int, uint64_t> syncIdMap_; 220 221 ISyncEngine *syncEngine_; 222 ISyncInterface *syncInterface_; 223 std::shared_ptr<TimeHelper> timeHelper_; 224 std::shared_ptr<Metadata> metadata_; 225 bool initialized_; 226 std::mutex operationMapLock_; 227 std::map<int, SyncOperation *> syncOperationMap_; 228 int queuedManualSyncSize_; 229 int queuedManualSyncLimit_; 230 bool manualSyncEnable_; 231 bool closing_; 232 mutable std::mutex queuedManualSyncLock_; 233 mutable std::mutex syncerLock_; 234 std::string label_; 235 std::mutex engineMutex_; 236 bool engineFinalize_; 237 std::condition_variable engineFinalizeCv_; 238 239 std::mutex timeChangeListenerMutex_; 240 bool timeChangeListenerFinalize_; 241 std::condition_variable timeChangeCv_; 242 NotificationChain::Listener *timeChangedListener_; 243 }; 244 } // namespace DistributedDB 245 246 #endif // GENRIC_SYNCER_H 247