1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SYNC_TASK_CONTEXT_H 17 #define SYNC_TASK_CONTEXT_H 18 19 #include <list> 20 #include <mutex> 21 22 #include "icommunicator.h" 23 #include "ikvdb_sync_interface.h" 24 #include "isync_task_context.h" 25 #include "meta_data.h" 26 #include "runtime_context.h" 27 #include "semaphore_utils.h" 28 #include "sync_operation.h" 29 #include "sync_target.h" 30 #include "time_helper.h" 31 32 namespace DistributedDB { 33 enum SyncDirectionFlag { 34 SEND = 0, 35 RECEIVE = 1, 36 }; 37 struct TaskParam { 38 uint32_t timeout = 0; 39 }; 40 class ISyncStateMachine; 41 42 class SyncTaskContext : public ISyncTaskContext { 43 public: 44 SyncTaskContext(); 45 46 // Add a sync task target to the queue 47 int AddSyncTarget(ISyncTarget *target) override; 48 49 // Set the status of this task 50 void SetOperationStatus(int status) override; 51 52 // Clear context data 53 void Clear() override; 54 55 // remove a sync target by syncId 56 int RemoveSyncOperation(int syncId) override; 57 58 // If the requestTargetQueue is empty 59 bool IsTargetQueueEmpty() const override; 60 61 // Get the status of this task 62 int GetOperationStatus() const override; 63 64 // Set the mode of this task 65 void SetMode(int mode) override; 66 67 // Get the mode of this task 68 int GetMode() const override; 69 70 // Move to next target to sync 71 void MoveToNextTarget() override; 72 73 int GetNextTarget() override; 74 75 // Get the current task syncId 76 uint32_t GetSyncId() const override; 77 78 // Get the current task deviceId. 79 std::string GetDeviceId() const override; 80 81 // Set the sync task queue exec status 82 void SetTaskExecStatus(int status) override; 83 84 // Get the sync task queue exec status 85 int GetTaskExecStatus() const override; 86 87 // Return if now is doing auto sync 88 bool IsAutoSync() const override; 89 90 // Set a Timer used for timeout 91 int StartTimer() override; 92 93 // delete timer 94 void StopTimer() override; 95 96 // modify timer 97 int ModifyTimer(int milliSeconds) override; 98 99 // Set a RetryTime for the sync task 100 void SetRetryTime(int retryTime) override; 101 102 // Get a RetryTime for the sync task 103 int GetRetryTime() const override; 104 105 // Set Retry status for the sync task 106 void SetRetryStatus(int isNeedRetry) override; 107 108 // Get Retry status for the sync task 109 int GetRetryStatus() const override; 110 111 TimerId GetTimerId() const override; 112 113 // Inc the current message sequenceId 114 void IncSequenceId() override; 115 116 // Get the current initiactive sync session id 117 uint32_t GetRequestSessionId() const override; 118 119 // Get the current message sequence id 120 uint32_t GetSequenceId() const override; 121 122 void ReSetSequenceId() override; 123 124 void IncPacketId(); 125 126 uint64_t GetPacketId() const; 127 128 // Get the current watch timeout time 129 int GetTimeoutTime() const override; 130 131 void SetTimeoutCallback(const TimerAction &timeOutCallback) override; 132 133 // Start the sync state machine 134 int StartStateMachine() override; 135 136 // Set the timeoffset with the remote device 137 void SetTimeOffset(TimeOffset offset) override; 138 139 // Get the timeoffset with the remote device 140 TimeOffset GetTimeOffset() const override; 141 142 // Used for sync message callback 143 int ReceiveMessageCallback(Message *inMsg) override; 144 145 // used to register a callback, called when new SyncTarget added 146 void RegOnSyncTask(const std::function<int(void)> &callback) override; 147 148 // When schedule a new task, should call this function to inc usedcount 149 int IncUsedCount() override; 150 151 // When schedule task exit, should call this function to dec usedcount 152 void SafeExit() override; 153 154 // Get current local time from TimeHelper 155 Timestamp GetCurrentLocalTime() const override; 156 157 // Set the remount software version num 158 void SetRemoteSoftwareVersion(uint32_t version) override; 159 160 // Get the remount software version num 161 uint32_t GetRemoteSoftwareVersion() const override; 162 163 // Get the remount software version id, when called GetRemoteSoftwareVersion this id will be increase. 164 // Used to check if the version num is is overdue 165 uint64_t GetRemoteSoftwareVersionId() const override; 166 167 // Judge if the communicator is normal 168 bool IsCommNormal() const override; 169 170 void ClearSyncOperation() override; 171 172 // If ability sync request set version, need call this function. 173 // Should be called with ObjLock 174 virtual void Abort(int status); 175 176 // Used in send msg, as execution is asynchronous, should use this function to handle result. 177 static void CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId, bool isDirectEnd = true); 178 179 int GetTaskErrCode() const override; 180 181 void SetTaskErrCode(int errCode) override; 182 183 bool IsSyncTaskNeedRetry() const override; 184 185 void SetSyncRetry(bool isRetry) override; 186 187 int GetSyncRetryTimes() const override; 188 189 int GetSyncRetryTimeout(int retryTime) const override; 190 191 void ClearAllSyncTask() override; 192 193 bool IsAutoLiftWaterMark() const override; 194 195 void IncNegotiationCount() override; 196 197 // check if need trigger query auto sync and get query from inMsg 198 bool IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query) override; 199 200 bool IsAutoSubscribe() const override; 201 202 bool IsCurrentSyncTaskCanBeSkipped() const override; 203 204 virtual void ResetLastPushTaskStatus(); 205 206 void SchemaChange() override; 207 208 void Dump(int fd) override; 209 210 void AbortMachineIfNeed(uint32_t syncId) override; 211 212 bool IsSchemaCompatible() const override; 213 214 void SetDbAbility(DbAbility &remoteDbAbility) override; 215 216 void TimeChange() override; 217 218 int32_t GetResponseTaskCount() override; 219 220 int GetCommErrCode() const; 221 222 void SetCommFailErrCode(int errCode); 223 protected: 224 const static int KILL_WAIT_SECONDS = INT32_MAX; 225 226 ~SyncTaskContext() override; 227 228 virtual int TimeOut(TimerId id); 229 230 virtual void CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam); 231 232 void CommErrHandlerFuncInner(int errCode, uint32_t sessionId, bool isDirectEnd = true); 233 234 void KillWait(); 235 236 void ClearSyncTarget(); 237 238 void CancelCurrentSyncRetryIfNeed(int newTargetMode, uint32_t syncId); 239 240 virtual void SaveLastPushTaskExecStatus(int finalStatus); 241 242 int RunPermissionCheck(uint8_t flag) const; 243 244 SyncOperation *GetAndIncSyncOperation() const; 245 246 uint32_t GenerateRequestSessionId(); 247 248 static uint8_t GetPermissionCheckFlag(bool isAutoSync, int syncMode); 249 250 void SetErrCodeWhenWaitTimeOut(int errCode); 251 252 mutable std::mutex targetQueueLock_; 253 std::list<ISyncTarget *> requestTargetQueue_; 254 std::list<ISyncTarget *> responseTargetQueue_; 255 SyncOperation *syncOperation_; 256 mutable std::mutex operationLock_; 257 volatile uint32_t syncId_; 258 volatile int mode_; 259 volatile bool isAutoSync_; 260 volatile int status_; 261 volatile int taskExecStatus_; 262 std::string deviceId_; 263 std::string syncActionName_; 264 ISyncInterface *syncInterface_; 265 ICommunicator *communicator_; 266 ISyncStateMachine *stateMachine_; 267 TimeOffset timeOffset_ = 0; 268 volatile int retryTime_ = 0; 269 volatile int isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY; 270 volatile uint32_t requestSessionId_ = 0; 271 volatile uint32_t lastRequestSessionId_ = 0; 272 volatile uint32_t sequenceId_ = 1; 273 std::function<int(void)> onSyncTaskAdd_; 274 275 // for safe exit 276 std::condition_variable safeKill_; 277 volatile int usedCount_ = 0; 278 279 // for timeout callback 280 std::mutex timerLock_; 281 TimerId timerId_ = 0; 282 int timeout_ = 1000; // 1000ms 283 TimerAction timeOutCallback_; 284 std::unique_ptr<TimeHelper> timeHelper_; 285 286 // for version sync 287 mutable std::mutex remoteSoftwareVersionLock_; 288 volatile uint32_t remoteSoftwareVersion_; 289 volatile uint64_t remoteSoftwareVersionId_; // Check if the remoteSoftwareVersion_ is is overdue 290 291 volatile bool isCommNormal_; 292 volatile int taskErrCode_; 293 std::atomic<int> commErrCode_; 294 volatile uint64_t packetId_ = 0; // used for assignment to reSendMap_.ReSendInfo.packetId in 103 version or above 295 volatile bool syncTaskRetryStatus_; 296 volatile bool isSyncRetry_; 297 volatile uint32_t negotiationCount_; 298 volatile bool isAutoSubscribe_; 299 300 // For global ISyncTaskContext Set, used by CommErrCallback. 301 static std::mutex synTaskContextSetLock_; 302 static std::set<ISyncTaskContext *> synTaskContextSet_; 303 }; 304 } // namespace DistributedDB 305 306 #endif // SYNC_TASK_CONTEXT_H 307