1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SYNC_TASK_CONTEXT_H
17 #define SYNC_TASK_CONTEXT_H
18 
19 #include <list>
20 #include <mutex>
21 
22 #include "icommunicator.h"
23 #include "ikvdb_sync_interface.h"
24 #include "isync_task_context.h"
25 #include "meta_data.h"
26 #include "runtime_context.h"
27 #include "semaphore_utils.h"
28 #include "sync_operation.h"
29 #include "sync_target.h"
30 #include "time_helper.h"
31 
32 namespace DistributedDB {
33 enum SyncDirectionFlag {
34     SEND = 0,
35     RECEIVE = 1,
36 };
37 struct TaskParam {
38     uint32_t timeout = 0;
39 };
40 class ISyncStateMachine;
41 
42 class SyncTaskContext : public ISyncTaskContext {
43 public:
44     SyncTaskContext();
45 
46     // Add a sync task target to the queue
47     int AddSyncTarget(ISyncTarget *target) override;
48 
49     // Set the status of this task
50     void SetOperationStatus(int status) override;
51 
52     // Clear context data
53     void Clear() override;
54 
55     // remove a sync target by syncId
56     int RemoveSyncOperation(int syncId) override;
57 
58     // If the requestTargetQueue is empty
59     bool IsTargetQueueEmpty() const override;
60 
61     // Get the status of this task
62     int GetOperationStatus() const override;
63 
64     // Set the mode of this task
65     void SetMode(int mode) override;
66 
67     // Get the mode of this task
68     int GetMode() const override;
69 
70     // Move to next target to sync
71     void MoveToNextTarget() override;
72 
73     int GetNextTarget() override;
74 
75     // Get the current task syncId
76     uint32_t GetSyncId() const override;
77 
78     // Get the current task deviceId.
79     std::string GetDeviceId() const override;
80 
81     // Set the sync task queue exec status
82     void SetTaskExecStatus(int status) override;
83 
84     // Get the sync task queue exec status
85     int GetTaskExecStatus() const override;
86 
87     // Return if now is doing auto sync
88     bool IsAutoSync() const override;
89 
90     // Set a Timer used for timeout
91     int StartTimer() override;
92 
93     // delete timer
94     void StopTimer() override;
95 
96     // modify timer
97     int ModifyTimer(int milliSeconds) override;
98 
99     // Set a RetryTime for the sync task
100     void SetRetryTime(int retryTime) override;
101 
102     // Get a RetryTime for the sync task
103     int GetRetryTime() const override;
104 
105     // Set Retry status for the sync task
106     void SetRetryStatus(int isNeedRetry) override;
107 
108     // Get Retry status for the sync task
109     int GetRetryStatus() const override;
110 
111     TimerId GetTimerId() const override;
112 
113     // Inc the current message sequenceId
114     void IncSequenceId() override;
115 
116     // Get the current initiactive sync session id
117     uint32_t GetRequestSessionId() const override;
118 
119     // Get the current message sequence id
120     uint32_t GetSequenceId() const override;
121 
122     void ReSetSequenceId() override;
123 
124     void IncPacketId();
125 
126     uint64_t GetPacketId() const;
127 
128     // Get the current watch timeout time
129     int GetTimeoutTime() const override;
130 
131     void SetTimeoutCallback(const TimerAction &timeOutCallback) override;
132 
133     // Start the sync state machine
134     int StartStateMachine() override;
135 
136     // Set the timeoffset with the remote device
137     void SetTimeOffset(TimeOffset offset) override;
138 
139     // Get the timeoffset with the remote device
140     TimeOffset GetTimeOffset() const override;
141 
142     // Used for sync message callback
143     int ReceiveMessageCallback(Message *inMsg) override;
144 
145     // used to register a callback, called when new SyncTarget added
146     void RegOnSyncTask(const std::function<int(void)> &callback) override;
147 
148     // When schedule a new task, should call this function to inc usedcount
149     int IncUsedCount() override;
150 
151     // When schedule task exit, should call this function to dec usedcount
152     void SafeExit() override;
153 
154     // Get current local time from TimeHelper
155     Timestamp GetCurrentLocalTime() const override;
156 
157     // Set the remount software version num
158     void SetRemoteSoftwareVersion(uint32_t version) override;
159 
160     // Get the remount software version num
161     uint32_t GetRemoteSoftwareVersion() const override;
162 
163     // Get the remount software version id, when called GetRemoteSoftwareVersion this id will be increase.
164     // Used to check if the version num is is overdue
165     uint64_t GetRemoteSoftwareVersionId() const override;
166 
167     // Judge if the communicator is normal
168     bool IsCommNormal() const override;
169 
170     void ClearSyncOperation() override;
171 
172     // If ability sync request set version, need call this function.
173     // Should be called with ObjLock
174     virtual void Abort(int status);
175 
176     // Used in send msg, as execution is asynchronous, should use this function to handle result.
177     static void CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId, bool isDirectEnd = true);
178 
179     int GetTaskErrCode() const override;
180 
181     void SetTaskErrCode(int errCode) override;
182 
183     bool IsSyncTaskNeedRetry() const override;
184 
185     void SetSyncRetry(bool isRetry) override;
186 
187     int GetSyncRetryTimes() const override;
188 
189     int GetSyncRetryTimeout(int retryTime) const override;
190 
191     void ClearAllSyncTask() override;
192 
193     bool IsAutoLiftWaterMark() const override;
194 
195     void IncNegotiationCount() override;
196 
197     // check if need trigger query auto sync and get query from inMsg
198     bool IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query) override;
199 
200     bool IsAutoSubscribe() const override;
201 
202     bool IsCurrentSyncTaskCanBeSkipped() const override;
203 
204     virtual void ResetLastPushTaskStatus();
205 
206     void SchemaChange() override;
207 
208     void Dump(int fd) override;
209 
210     void AbortMachineIfNeed(uint32_t syncId) override;
211 
212     bool IsSchemaCompatible() const override;
213 
214     void SetDbAbility(DbAbility &remoteDbAbility) override;
215 
216     void TimeChange() override;
217 
218     int32_t GetResponseTaskCount() override;
219 
220     int GetCommErrCode() const;
221 
222     void SetCommFailErrCode(int errCode);
223 protected:
224     const static int KILL_WAIT_SECONDS = INT32_MAX;
225 
226     ~SyncTaskContext() override;
227 
228     virtual int TimeOut(TimerId id);
229 
230     virtual void CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam);
231 
232     void CommErrHandlerFuncInner(int errCode, uint32_t sessionId, bool isDirectEnd = true);
233 
234     void KillWait();
235 
236     void ClearSyncTarget();
237 
238     void CancelCurrentSyncRetryIfNeed(int newTargetMode, uint32_t syncId);
239 
240     virtual void SaveLastPushTaskExecStatus(int finalStatus);
241 
242     int RunPermissionCheck(uint8_t flag) const;
243 
244     SyncOperation *GetAndIncSyncOperation() const;
245 
246     uint32_t GenerateRequestSessionId();
247 
248     static uint8_t GetPermissionCheckFlag(bool isAutoSync, int syncMode);
249 
250     void SetErrCodeWhenWaitTimeOut(int errCode);
251 
252     mutable std::mutex targetQueueLock_;
253     std::list<ISyncTarget *> requestTargetQueue_;
254     std::list<ISyncTarget *> responseTargetQueue_;
255     SyncOperation *syncOperation_;
256     mutable std::mutex operationLock_;
257     volatile uint32_t syncId_;
258     volatile int mode_;
259     volatile bool isAutoSync_;
260     volatile int status_;
261     volatile int taskExecStatus_;
262     std::string deviceId_;
263     std::string syncActionName_;
264     ISyncInterface *syncInterface_;
265     ICommunicator *communicator_;
266     ISyncStateMachine *stateMachine_;
267     TimeOffset timeOffset_ = 0;
268     volatile int retryTime_ = 0;
269     volatile int isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY;
270     volatile uint32_t requestSessionId_ = 0;
271     volatile uint32_t lastRequestSessionId_ = 0;
272     volatile uint32_t sequenceId_ = 1;
273     std::function<int(void)> onSyncTaskAdd_;
274 
275     // for safe exit
276     std::condition_variable safeKill_;
277     volatile int usedCount_ = 0;
278 
279     // for timeout callback
280     std::mutex timerLock_;
281     TimerId timerId_ = 0;
282     int timeout_ = 1000; // 1000ms
283     TimerAction timeOutCallback_;
284     std::unique_ptr<TimeHelper> timeHelper_;
285 
286     // for version sync
287     mutable std::mutex remoteSoftwareVersionLock_;
288     volatile uint32_t remoteSoftwareVersion_;
289     volatile uint64_t remoteSoftwareVersionId_; // Check if the remoteSoftwareVersion_ is is overdue
290 
291     volatile bool isCommNormal_;
292     volatile int taskErrCode_;
293     std::atomic<int> commErrCode_;
294     volatile uint64_t packetId_ = 0; // used for assignment to reSendMap_.ReSendInfo.packetId in 103 version or above
295     volatile bool syncTaskRetryStatus_;
296     volatile bool isSyncRetry_;
297     volatile uint32_t negotiationCount_;
298     volatile bool isAutoSubscribe_;
299 
300     // For global ISyncTaskContext Set, used by CommErrCallback.
301     static std::mutex synTaskContextSetLock_;
302     static std::set<ISyncTaskContext *> synTaskContextSet_;
303 };
304 } // namespace DistributedDB
305 
306 #endif // SYNC_TASK_CONTEXT_H
307