1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef TIME_SYNC_H
17 #define TIME_SYNC_H
18 
19 #include "icommunicator.h"
20 #include "meta_data.h"
21 #include "sync_task_context.h"
22 #include "time_helper.h"
23 
24 namespace DistributedDB {
25 class TimeSyncPacket {
26 public:
27     TimeSyncPacket();
28     ~TimeSyncPacket();
29 
30     void SetSourceTimeBegin(Timestamp sourceTimeBegin);
31 
32     Timestamp GetSourceTimeBegin() const;
33 
34     void SetSourceTimeEnd(Timestamp sourceTimeEnd);
35 
36     Timestamp GetSourceTimeEnd() const;
37 
38     void SetTargetTimeBegin(Timestamp targetTimeBegin);
39 
40     Timestamp GetTargetTimeBegin() const;
41 
42     void SetTargetTimeEnd(Timestamp targetTimeEnd);
43 
44     Timestamp GetTargetTimeEnd() const;
45 
46     void SetVersion(uint32_t version);
47 
48     uint32_t GetVersion() const;
49 
50     void SetRequestLocalOffset(TimeOffset offset);
51 
52     TimeOffset GetRequestLocalOffset() const;
53 
54     void SetResponseLocalOffset(TimeOffset offset);
55 
56     TimeOffset GetResponseLocalOffset() const;
57 
58     static uint32_t CalculateLen();
59 private:
60     Timestamp sourceTimeBegin_;  // start point time on peer
61     Timestamp sourceTimeEnd_;    // end point time on local
62     Timestamp targetTimeBegin_;  // start point time on peer
63     Timestamp targetTimeEnd_;    // end point time on peer
64     uint32_t version_;
65     TimeOffset requestLocalOffset_; // local system time offset in request device
66     TimeOffset responseLocalOffset_; // local system time offset in response device
67 };
68 
69 class TimeSync : public std::enable_shared_from_this<TimeSync> {
70 public:
71     TimeSync();
72     virtual ~TimeSync();
73 
74     DISABLE_COPY_ASSIGN_MOVE(TimeSync);
75 
76     static int RegisterTransformFunc();
77 
78     static uint32_t CalculateLen(const Message *inMsg);
79 
80     static int Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg); // register to communicator
81 
82     static int DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg); // register to communicator
83 
84     int Initialize(ICommunicator *communicator, const std::shared_ptr<Metadata> &metadata,
85         const ISyncInterface *storage, const DeviceID &deviceId);
86 
87     virtual int SyncStart(const CommErrHandler &handler = nullptr, uint32_t sessionId = 0); // send timesync request
88 
89     int AckRecv(const Message *message, uint32_t targetSessionId = 0);
90 
91     int RequestRecv(const Message *message);
92 
93     // Get timeoffset from metadata
94     int GetTimeOffset(TimeOffset &outOffset, uint32_t timeout, uint32_t sessionId = 0);
95 
96     bool IsNeedSync() const;
97 
98     void SetOnline(bool isOnline);
99 
100     void Close();
101 
102     TimeSyncPacket BuildAckPacket(const TimeSyncPacket &request);
103 
104     void SetTimeSyncFinishIfNeed();
105 
106     void ClearTimeSyncFinish();
107 
108     int GenerateTimeOffsetIfNeed(TimeOffset systemOffset, TimeOffset senderLocalOffset);
109 
110     bool IsRemoteLowVersion(uint32_t checkVersion);
111 
112     // Used in send msg, as execution is asynchronous, should use this function to handle result.
113     static void CommErrHandlerFunc(int errCode, TimeSync *timeSync);
114 
115 protected:
116     static const int MAX_RETRY_TIME = 1;
117 
118     static std::pair<TimeOffset, TimeOffset> CalculateTimeOffset(const TimeSyncPacket &timeSyncInfo);
119 
120     static bool IsPacketValid(const Message *inMsg, uint16_t messageType);
121 
122     void Finalize();
123 
124     int SaveTimeOffset(const DeviceID &deviceID, TimeOffset timeOffset);
125 
126     int SendPacket(const DeviceID &deviceId, const Message *message, const CommErrHandler &handler = nullptr);
127 
128     int TimeSyncDriver(TimerId timerId);
129 
130     void ResetTimer();
131 
132     bool IsClosed() const;
133 
134     int SendMessageWithSendEnd(const Message *message, const CommErrHandler &handler);
135 
136     Timestamp GetSourceBeginTime(Timestamp packetBeginTime, uint32_t sessionId);
137 
138     void ReTimeSyncIfNeed(const TimeSyncPacket &ackPacket);
139 
140     bool CheckReTimeSyncIfNeedWithLowVersion(TimeOffset timeOffsetIgnoreRtt);
141 
142     bool CheckReTimeSyncIfNeedWithHighVersion(TimeOffset timeOffsetIgnoreRtt, const TimeSyncPacket &ackPacket);
143 
144     int SaveOffsetWithAck(const TimeSyncPacket &ackPacket);
145 
146     bool CheckSkipTimeSync(const DeviceTimeInfo &info);
147 
148     void SetTimeSyncFinishInner(bool finish);
149 
150     static TimeOffset CalculateRawTimeOffset(const TimeSyncPacket &timeSyncInfo, TimeOffset deltaTime);
151 
152     ICommunicator *communicateHandle_;
153     std::shared_ptr<Metadata> metadata_;
154     std::unique_ptr<TimeHelper> timeHelper_;
155     DeviceID deviceId_;
156     int retryTime_;
157     TimerId driverTimerId_;
158     TimerAction driverCallback_;
159     bool isSynced_;
160     bool isAckReceived_;
161     std::condition_variable conditionVar_;
162     mutable std::mutex cvLock_;
163     NotificationChain::Listener *timeChangedListener_;
164     std::condition_variable timeDriverCond_;
165     std::mutex timeDriverLock_;
166     int timeDriverLockCount_;
167     bool isOnline_;
168     bool closed_;
169     std::mutex beginTimeMutex_;
170     std::map<uint32_t, Timestamp> sessionBeginTime_;
171     std::vector<uint8_t> dbId_;
172     static std::mutex timeSyncSetLock_;
173     static std::set<TimeSync *> timeSyncSet_;
174 };
175 } // namespace DistributedDB
176 
177 #endif // TIME_SYNC_H
178