1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef TIME_SYNC_H 17 #define TIME_SYNC_H 18 19 #include "icommunicator.h" 20 #include "meta_data.h" 21 #include "sync_task_context.h" 22 #include "time_helper.h" 23 24 namespace DistributedDB { 25 class TimeSyncPacket { 26 public: 27 TimeSyncPacket(); 28 ~TimeSyncPacket(); 29 30 void SetSourceTimeBegin(Timestamp sourceTimeBegin); 31 32 Timestamp GetSourceTimeBegin() const; 33 34 void SetSourceTimeEnd(Timestamp sourceTimeEnd); 35 36 Timestamp GetSourceTimeEnd() const; 37 38 void SetTargetTimeBegin(Timestamp targetTimeBegin); 39 40 Timestamp GetTargetTimeBegin() const; 41 42 void SetTargetTimeEnd(Timestamp targetTimeEnd); 43 44 Timestamp GetTargetTimeEnd() const; 45 46 void SetVersion(uint32_t version); 47 48 uint32_t GetVersion() const; 49 50 void SetRequestLocalOffset(TimeOffset offset); 51 52 TimeOffset GetRequestLocalOffset() const; 53 54 void SetResponseLocalOffset(TimeOffset offset); 55 56 TimeOffset GetResponseLocalOffset() const; 57 58 static uint32_t CalculateLen(); 59 private: 60 Timestamp sourceTimeBegin_; // start point time on peer 61 Timestamp sourceTimeEnd_; // end point time on local 62 Timestamp targetTimeBegin_; // start point time on peer 63 Timestamp targetTimeEnd_; // end point time on peer 64 uint32_t version_; 65 TimeOffset requestLocalOffset_; // local system time offset in request device 66 TimeOffset responseLocalOffset_; // local system time offset in response device 67 }; 68 69 class TimeSync : public std::enable_shared_from_this<TimeSync> { 70 public: 71 TimeSync(); 72 virtual ~TimeSync(); 73 74 DISABLE_COPY_ASSIGN_MOVE(TimeSync); 75 76 static int RegisterTransformFunc(); 77 78 static uint32_t CalculateLen(const Message *inMsg); 79 80 static int Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg); // register to communicator 81 82 static int DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg); // register to communicator 83 84 int Initialize(ICommunicator *communicator, const std::shared_ptr<Metadata> &metadata, 85 const ISyncInterface *storage, const DeviceID &deviceId); 86 87 virtual int SyncStart(const CommErrHandler &handler = nullptr, uint32_t sessionId = 0); // send timesync request 88 89 int AckRecv(const Message *message, uint32_t targetSessionId = 0); 90 91 int RequestRecv(const Message *message); 92 93 // Get timeoffset from metadata 94 int GetTimeOffset(TimeOffset &outOffset, uint32_t timeout, uint32_t sessionId = 0); 95 96 bool IsNeedSync() const; 97 98 void SetOnline(bool isOnline); 99 100 void Close(); 101 102 TimeSyncPacket BuildAckPacket(const TimeSyncPacket &request); 103 104 void SetTimeSyncFinishIfNeed(); 105 106 void ClearTimeSyncFinish(); 107 108 int GenerateTimeOffsetIfNeed(TimeOffset systemOffset, TimeOffset senderLocalOffset); 109 110 bool IsRemoteLowVersion(uint32_t checkVersion); 111 112 // Used in send msg, as execution is asynchronous, should use this function to handle result. 113 static void CommErrHandlerFunc(int errCode, TimeSync *timeSync); 114 115 protected: 116 static const int MAX_RETRY_TIME = 1; 117 118 static std::pair<TimeOffset, TimeOffset> CalculateTimeOffset(const TimeSyncPacket &timeSyncInfo); 119 120 static bool IsPacketValid(const Message *inMsg, uint16_t messageType); 121 122 void Finalize(); 123 124 int SaveTimeOffset(const DeviceID &deviceID, TimeOffset timeOffset); 125 126 int SendPacket(const DeviceID &deviceId, const Message *message, const CommErrHandler &handler = nullptr); 127 128 int TimeSyncDriver(TimerId timerId); 129 130 void ResetTimer(); 131 132 bool IsClosed() const; 133 134 int SendMessageWithSendEnd(const Message *message, const CommErrHandler &handler); 135 136 Timestamp GetSourceBeginTime(Timestamp packetBeginTime, uint32_t sessionId); 137 138 void ReTimeSyncIfNeed(const TimeSyncPacket &ackPacket); 139 140 bool CheckReTimeSyncIfNeedWithLowVersion(TimeOffset timeOffsetIgnoreRtt); 141 142 bool CheckReTimeSyncIfNeedWithHighVersion(TimeOffset timeOffsetIgnoreRtt, const TimeSyncPacket &ackPacket); 143 144 int SaveOffsetWithAck(const TimeSyncPacket &ackPacket); 145 146 bool CheckSkipTimeSync(const DeviceTimeInfo &info); 147 148 void SetTimeSyncFinishInner(bool finish); 149 150 static TimeOffset CalculateRawTimeOffset(const TimeSyncPacket &timeSyncInfo, TimeOffset deltaTime); 151 152 ICommunicator *communicateHandle_; 153 std::shared_ptr<Metadata> metadata_; 154 std::unique_ptr<TimeHelper> timeHelper_; 155 DeviceID deviceId_; 156 int retryTime_; 157 TimerId driverTimerId_; 158 TimerAction driverCallback_; 159 bool isSynced_; 160 bool isAckReceived_; 161 std::condition_variable conditionVar_; 162 mutable std::mutex cvLock_; 163 NotificationChain::Listener *timeChangedListener_; 164 std::condition_variable timeDriverCond_; 165 std::mutex timeDriverLock_; 166 int timeDriverLockCount_; 167 bool isOnline_; 168 bool closed_; 169 std::mutex beginTimeMutex_; 170 std::map<uint32_t, Timestamp> sessionBeginTime_; 171 std::vector<uint8_t> dbId_; 172 static std::mutex timeSyncSetLock_; 173 static std::set<TimeSync *> timeSyncSet_; 174 }; 175 } // namespace DistributedDB 176 177 #endif // TIME_SYNC_H 178