1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "time_sync.h"
17 
18 #include "parcel.h"
19 #include "log_print.h"
20 #include "sync_types.h"
21 #include "message_transform.h"
22 #include "version.h"
23 #include "isync_task_context.h"
24 
25 namespace DistributedDB {
26 std::mutex TimeSync::timeSyncSetLock_;
27 std::set<TimeSync *> TimeSync::timeSyncSet_;
28 namespace {
29     constexpr uint64_t TIME_SYNC_INTERVAL = 24 * 60 * 60 * 1000; // 24h
30     constexpr int TRIP_DIV_HALF = 2;
31     constexpr int64_t MAX_TIME_OFFSET_NOISE = 1 * 1000 * 10000; // 1 second in 100-nanosecond units
32     constexpr int64_t MAX_TIME_RTT_NOISE = 1 * 1000 * 10000; // 1 second in 100-nanosecond units
33     constexpr uint64_t RTT_NOISE_CHECK_INTERVAL = 30 * 60 * 1000 * 10000u; // 30 minute in 100-nanosecond units
34 }
35 
36 // Class TimeSyncPacket
TimeSyncPacket()37 TimeSyncPacket::TimeSyncPacket()
38     : sourceTimeBegin_(0),
39       sourceTimeEnd_(0),
40       targetTimeBegin_(0),
41       targetTimeEnd_(0),
42       version_(TIME_SYNC_VERSION_V1),
43       requestLocalOffset_(0),
44       responseLocalOffset_(0)
45 {
46 }
47 
~TimeSyncPacket()48 TimeSyncPacket::~TimeSyncPacket()
49 {
50 }
51 
SetSourceTimeBegin(Timestamp sourceTimeBegin)52 void TimeSyncPacket::SetSourceTimeBegin(Timestamp sourceTimeBegin)
53 {
54     sourceTimeBegin_ = sourceTimeBegin;
55 }
56 
GetSourceTimeBegin() const57 Timestamp TimeSyncPacket::GetSourceTimeBegin() const
58 {
59     return sourceTimeBegin_;
60 }
61 
SetSourceTimeEnd(Timestamp sourceTimeEnd)62 void TimeSyncPacket::SetSourceTimeEnd(Timestamp sourceTimeEnd)
63 {
64     sourceTimeEnd_ = sourceTimeEnd;
65 }
66 
GetSourceTimeEnd() const67 Timestamp TimeSyncPacket::GetSourceTimeEnd() const
68 {
69     return sourceTimeEnd_;
70 }
71 
SetTargetTimeBegin(Timestamp targetTimeBegin)72 void TimeSyncPacket::SetTargetTimeBegin(Timestamp targetTimeBegin)
73 {
74     targetTimeBegin_ = targetTimeBegin;
75 }
76 
GetTargetTimeBegin() const77 Timestamp TimeSyncPacket::GetTargetTimeBegin() const
78 {
79     return targetTimeBegin_;
80 }
81 
SetTargetTimeEnd(Timestamp targetTimeEnd)82 void TimeSyncPacket::SetTargetTimeEnd(Timestamp targetTimeEnd)
83 {
84     targetTimeEnd_ = targetTimeEnd;
85 }
86 
GetTargetTimeEnd() const87 Timestamp TimeSyncPacket::GetTargetTimeEnd() const
88 {
89     return targetTimeEnd_;
90 }
91 
SetVersion(uint32_t version)92 void TimeSyncPacket::SetVersion(uint32_t version)
93 {
94     version_ = version;
95 }
96 
GetVersion() const97 uint32_t TimeSyncPacket::GetVersion() const
98 {
99     return version_;
100 }
101 
GetRequestLocalOffset() const102 TimeOffset TimeSyncPacket::GetRequestLocalOffset() const
103 {
104     return requestLocalOffset_;
105 }
106 
SetRequestLocalOffset(TimeOffset offset)107 void TimeSyncPacket::SetRequestLocalOffset(TimeOffset offset)
108 {
109     requestLocalOffset_ = offset;
110 }
111 
GetResponseLocalOffset() const112 TimeOffset TimeSyncPacket::GetResponseLocalOffset() const
113 {
114     return responseLocalOffset_;
115 }
116 
SetResponseLocalOffset(TimeOffset offset)117 void TimeSyncPacket::SetResponseLocalOffset(TimeOffset offset)
118 {
119     responseLocalOffset_ = offset;
120 }
121 
CalculateLen()122 uint32_t TimeSyncPacket::CalculateLen()
123 {
124     uint32_t len = Parcel::GetUInt32Len(); // version_
125     len += Parcel::GetUInt64Len(); // sourceTimeBegin_
126     len += Parcel::GetUInt64Len(); // sourceTimeEnd_
127     len += Parcel::GetUInt64Len(); // targetTimeBegin_
128     len += Parcel::GetUInt64Len(); // targetTimeEnd_
129     len = Parcel::GetEightByteAlign(len);
130     len += Parcel::GetInt64Len(); // requestLocalOffset_
131     len += Parcel::GetInt64Len(); // responseLocalOffset_
132     return len;
133 }
134 
135 // Class TimeSync
TimeSync()136 TimeSync::TimeSync()
137     : communicateHandle_(nullptr),
138       metadata_(nullptr),
139       timeHelper_(nullptr),
140       retryTime_(0),
141       driverTimerId_(0),
142       isSynced_(false),
143       isAckReceived_(false),
144       timeChangedListener_(nullptr),
145       timeDriverLockCount_(0),
146       isOnline_(true),
147       closed_(false)
148 {
149 }
150 
~TimeSync()151 TimeSync::~TimeSync()
152 {
153     Finalize();
154     driverTimerId_ = 0;
155 
156     if (timeChangedListener_ != nullptr) {
157         timeChangedListener_->Drop(true);
158         timeChangedListener_ = nullptr;
159     }
160     timeHelper_ = nullptr;
161     communicateHandle_ = nullptr;
162     metadata_ = nullptr;
163 
164     std::lock_guard<std::mutex> lock(timeSyncSetLock_);
165     timeSyncSet_.erase(this);
166 }
167 
RegisterTransformFunc()168 int TimeSync::RegisterTransformFunc()
169 {
170     TransformFunc func;
171     func.computeFunc = [](const Message *inMsg) { return CalculateLen(inMsg); };
172     func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
173         return Serialization(buffer, length, inMsg);
174     };
175     func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
176         return DeSerialization(buffer, length, inMsg);
177     };
178     return MessageTransform::RegTransformFunction(TIME_SYNC_MESSAGE, func);
179 }
180 
Initialize(ICommunicator * communicator,const std::shared_ptr<Metadata> & metadata,const ISyncInterface * storage,const DeviceID & deviceId)181 int TimeSync::Initialize(ICommunicator *communicator, const std::shared_ptr<Metadata> &metadata,
182     const ISyncInterface *storage, const DeviceID &deviceId)
183 {
184     if ((communicator == nullptr) || (storage == nullptr) || (metadata == nullptr)) {
185         return -E_INVALID_ARGS;
186     }
187     {
188         std::lock_guard<std::mutex> lock(timeSyncSetLock_);
189         timeSyncSet_.insert(this);
190     }
191     communicateHandle_ = communicator;
192     metadata_ = metadata;
193     deviceId_ = deviceId;
194     timeHelper_ = std::make_unique<TimeHelper>();
195 
196     int errCode = timeHelper_->Initialize(storage, metadata_);
197     if (errCode != E_OK) {
198         timeHelper_ = nullptr;
199         LOGE("[TimeSync] timeHelper Init failed, err %d.", errCode);
200         return errCode;
201     }
202     dbId_ = storage->GetIdentifier();
203     driverCallback_ = [this](TimerId timerId) { return TimeSyncDriver(timerId); };
204     errCode = RuntimeContext::GetInstance()->SetTimer(TIME_SYNC_INTERVAL, driverCallback_, nullptr, driverTimerId_);
205     if (errCode != E_OK) {
206         return errCode;
207     }
208     isSynced_ = metadata_->IsTimeSyncFinish(deviceId_);
209     return errCode;
210 }
211 
Finalize()212 void TimeSync::Finalize()
213 {
214     // Stop the timer
215     LOGD("[TimeSync] Finalize enter!");
216     RuntimeContext *runtimeContext = RuntimeContext::GetInstance();
217     TimerId timerId;
218     {
219         std::unique_lock<std::mutex> lock(timeDriverLock_);
220         timerId = driverTimerId_;
221     }
222     runtimeContext->RemoveTimer(timerId, true);
223     std::unique_lock<std::mutex> lock(timeDriverLock_);
224     timeDriverCond_.wait(lock, [this]() { return this->timeDriverLockCount_ == 0; });
225     LOGD("[TimeSync] Finalized!");
226 }
227 
SyncStart(const CommErrHandler & handler,uint32_t sessionId)228 int TimeSync::SyncStart(const CommErrHandler &handler,  uint32_t sessionId)
229 {
230     isOnline_ = true;
231     TimeSyncPacket packet;
232     Timestamp startTime = timeHelper_->GetTime();
233     packet.SetSourceTimeBegin(startTime);
234     TimeOffset timeOffset = metadata_->GetLocalTimeOffset();
235     packet.SetRequestLocalOffset(timeOffset);
236     // send timeSync request
237     LOGD("[TimeSync] startTime = %" PRIu64 ", offset = % " PRId64 " , dev = %s{private}", startTime, timeOffset,
238         deviceId_.c_str());
239 
240     Message *message = new (std::nothrow) Message(TIME_SYNC_MESSAGE);
241     if (message == nullptr) {
242         return -E_OUT_OF_MEMORY;
243     }
244     message->SetSessionId(sessionId);
245     message->SetMessageType(TYPE_REQUEST);
246     message->SetPriority(Priority::NORMAL);
247     int errCode = message->SetCopiedObject<>(packet);
248     if (errCode != E_OK) {
249         delete message;
250         message = nullptr;
251         return errCode;
252     }
253     errCode = SendMessageWithSendEnd(message, handler);
254     if (errCode != E_OK) {
255         delete message;
256         message = nullptr;
257     }
258     return errCode;
259 }
260 
CalculateLen(const Message * inMsg)261 uint32_t TimeSync::CalculateLen(const Message *inMsg)
262 {
263     if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
264         return 0;
265     }
266 
267     const TimeSyncPacket *packet = const_cast<TimeSyncPacket *>(inMsg->GetObject<TimeSyncPacket>());
268     if (packet == nullptr) {
269         return 0;
270     }
271 
272     return TimeSyncPacket::CalculateLen();
273 }
274 
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)275 int TimeSync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
276 {
277     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
278         return -E_INVALID_ARGS;
279     }
280     const TimeSyncPacket *packet = inMsg->GetObject<TimeSyncPacket>();
281     if ((packet == nullptr) || (length != TimeSyncPacket::CalculateLen())) {
282         return -E_INVALID_ARGS;
283     }
284 
285     Parcel parcel(buffer, length);
286     Timestamp srcBegin = packet->GetSourceTimeBegin();
287     Timestamp srcEnd = packet->GetSourceTimeEnd();
288     Timestamp targetBegin = packet->GetTargetTimeBegin();
289     Timestamp targetEnd = packet->GetTargetTimeEnd();
290 
291     int errCode = parcel.WriteUInt32(TIME_SYNC_VERSION_V1);
292     if (errCode != E_OK) {
293         return -E_SECUREC_ERROR;
294     }
295     errCode = parcel.WriteUInt64(srcBegin);
296     if (errCode != E_OK) {
297         return -E_SECUREC_ERROR;
298     }
299     errCode = parcel.WriteUInt64(srcEnd);
300     if (errCode != E_OK) {
301         return -E_SECUREC_ERROR;
302     }
303     errCode = parcel.WriteUInt64(targetBegin);
304     if (errCode != E_OK) {
305         return -E_SECUREC_ERROR;
306     }
307     errCode = parcel.WriteUInt64(targetEnd);
308     if (errCode != E_OK) {
309         return -E_SECUREC_ERROR;
310     }
311     parcel.EightByteAlign();
312     parcel.WriteInt64(packet->GetRequestLocalOffset());
313     parcel.WriteInt64(packet->GetResponseLocalOffset());
314     if (parcel.IsError()) {
315         return -E_PARSE_FAIL;
316     }
317     return errCode;
318 }
319 
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)320 int TimeSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
321 {
322     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
323         return -E_INVALID_ARGS;
324     }
325     TimeSyncPacket packet;
326     Parcel parcel(const_cast<uint8_t *>(buffer), length);
327     Timestamp srcBegin;
328     Timestamp srcEnd;
329     Timestamp targetBegin;
330     Timestamp targetEnd;
331 
332     uint32_t version = 0;
333     parcel.ReadUInt32(version);
334     if (parcel.IsError()) {
335         return -E_INVALID_ARGS;
336     }
337     if (version > TIME_SYNC_VERSION_V1) {
338         packet.SetVersion(version);
339         return inMsg->SetCopiedObject<>(packet);
340     }
341     parcel.ReadUInt64(srcBegin);
342     parcel.ReadUInt64(srcEnd);
343     parcel.ReadUInt64(targetBegin);
344     parcel.ReadUInt64(targetEnd);
345     if (parcel.IsError()) {
346         return -E_INVALID_ARGS;
347     }
348     packet.SetSourceTimeBegin(srcBegin);
349     packet.SetSourceTimeEnd(srcEnd);
350     packet.SetTargetTimeBegin(targetBegin);
351     packet.SetTargetTimeEnd(targetEnd);
352     parcel.EightByteAlign();
353     if (parcel.IsContinueRead()) {
354         TimeOffset requestLocalOffset;
355         TimeOffset responseLocalOffset;
356         parcel.ReadInt64(requestLocalOffset);
357         parcel.ReadInt64(responseLocalOffset);
358         if (parcel.IsError()) {
359             LOGE("[TimeSync] Parse packet failed, message type %u", inMsg->GetMessageType());
360             return -E_PARSE_FAIL;
361         }
362         packet.SetRequestLocalOffset(requestLocalOffset);
363         packet.SetResponseLocalOffset(responseLocalOffset);
364     }
365 
366     return inMsg->SetCopiedObject<>(packet);
367 }
368 
AckRecv(const Message * message,uint32_t targetSessionId)369 int TimeSync::AckRecv(const Message *message, uint32_t targetSessionId)
370 {
371     // only check when sessionId is not 0, because old version timesync sessionId is 0.
372     if (message != nullptr && message->GetSessionId() != 0 &&
373         message->GetErrorNo() == E_FEEDBACK_COMMUNICATOR_NOT_FOUND && message->GetSessionId() == targetSessionId) {
374         LOGE("[AbilitySync][AckMsgCheck] Remote db is closed");
375         return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND;
376     }
377     if (!IsPacketValid(message, TYPE_RESPONSE)) {
378         return -E_INVALID_ARGS;
379     }
380     const TimeSyncPacket *packet = message->GetObject<TimeSyncPacket>();
381     if (packet == nullptr) {
382         LOGE("[TimeSync] AckRecv packet is null");
383         return -E_INVALID_ARGS;
384     }
385 
386     TimeSyncPacket packetData = TimeSyncPacket(*packet);
387     Timestamp sourceTimeEnd = timeHelper_->GetTime();
388     packetData.SetSourceTimeBegin(GetSourceBeginTime(packetData.GetSourceTimeBegin(), targetSessionId));
389     packetData.SetSourceTimeEnd(sourceTimeEnd);
390     if (packetData.GetSourceTimeBegin() > packetData.GetSourceTimeEnd() ||
391         packetData.GetTargetTimeBegin() > packetData.GetTargetTimeEnd() ||
392         packetData.GetSourceTimeEnd() > TimeHelper::MAX_VALID_TIME ||
393         packetData.GetTargetTimeEnd() > TimeHelper::MAX_VALID_TIME) {
394         LOGD("[TimeSync][AckRecv] Time valid check failed.");
395         return -E_INVALID_TIME;
396     }
397     int errCode = SaveOffsetWithAck(packetData);
398     {
399         std::lock_guard<std::mutex> lock(cvLock_);
400         isAckReceived_ = true;
401     }
402     conditionVar_.notify_all();
403     ResetTimer();
404     return errCode;
405 }
406 
RequestRecv(const Message * message)407 int TimeSync::RequestRecv(const Message *message)
408 {
409     if (!IsPacketValid(message, TYPE_REQUEST)) {
410         return -E_INVALID_ARGS;
411     }
412 
413     const TimeSyncPacket *packet = message->GetObject<TimeSyncPacket>();
414     if (packet == nullptr) {
415         return -E_INVALID_ARGS;
416     }
417 
418     // build timeSync ack packet
419     TimeSyncPacket ackPacket = BuildAckPacket(*packet);
420     if (ackPacket.GetSourceTimeBegin() > TimeHelper::MAX_VALID_TIME) {
421         LOGD("[TimeSync][RequestRecv] Time valid check failed.");
422         return -E_INVALID_TIME;
423     }
424     ReTimeSyncIfNeed(ackPacket);
425 
426     Message *ackMessage = new (std::nothrow) Message(TIME_SYNC_MESSAGE);
427     if (ackMessage == nullptr) {
428         return -E_OUT_OF_MEMORY;
429     }
430     ackMessage->SetSessionId(message->GetSessionId());
431     ackMessage->SetPriority(Priority::NORMAL);
432     ackMessage->SetMessageType(TYPE_RESPONSE);
433     ackMessage->SetTarget(deviceId_);
434     int errCode = ackMessage->SetCopiedObject<>(ackPacket);
435     if (errCode != E_OK) {
436         delete ackMessage;
437         ackMessage = nullptr;
438         return errCode;
439     }
440 
441     errCode = SendPacket(deviceId_, ackMessage);
442     if (errCode != E_OK) {
443         delete ackMessage;
444         ackMessage = nullptr;
445     }
446     return errCode;
447 }
448 
SaveTimeOffset(const DeviceID & deviceID,TimeOffset timeOffset)449 int TimeSync::SaveTimeOffset(const DeviceID &deviceID, TimeOffset timeOffset)
450 {
451     return metadata_->SaveTimeOffset(deviceID, timeOffset);
452 }
453 
CalculateTimeOffset(const TimeSyncPacket & timeSyncInfo)454 std::pair<TimeOffset, TimeOffset> TimeSync::CalculateTimeOffset(const TimeSyncPacket &timeSyncInfo)
455 {
456     TimeOffset roundTrip = static_cast<TimeOffset>((timeSyncInfo.GetSourceTimeEnd() -
457         timeSyncInfo.GetSourceTimeBegin()) - (timeSyncInfo.GetTargetTimeEnd() - timeSyncInfo.GetTargetTimeBegin()));
458     TimeOffset offset1 = static_cast<TimeOffset>(timeSyncInfo.GetTargetTimeBegin() -
459         timeSyncInfo.GetSourceTimeBegin() - (roundTrip / TRIP_DIV_HALF));
460     TimeOffset offset2 = static_cast<TimeOffset>(timeSyncInfo.GetTargetTimeEnd() + (roundTrip / TRIP_DIV_HALF) -
461         timeSyncInfo.GetSourceTimeEnd());
462     TimeOffset offset = (offset1 / TRIP_DIV_HALF) + (offset2 / TRIP_DIV_HALF);
463     LOGD("TimeSync::CalculateTimeOffset roundTrip= %" PRId64 ", offset1 = %" PRId64 ", offset2 = %" PRId64
464         ", offset = %" PRId64, roundTrip, offset1, offset2, offset);
465     return {offset, roundTrip};
466 }
467 
IsPacketValid(const Message * inMsg,uint16_t messageType)468 bool TimeSync::IsPacketValid(const Message *inMsg, uint16_t messageType)
469 {
470     if (inMsg == nullptr) {
471         return false;
472     }
473     if (inMsg->GetMessageId() != TIME_SYNC_MESSAGE) {
474         LOGD("message Id = %d", inMsg->GetMessageId());
475         return false;
476     }
477     if (messageType != inMsg->GetMessageType()) {
478         LOGD("input Type = %" PRIu16 ", inMsg type = %" PRIu16, messageType, inMsg->GetMessageType());
479         return false;
480     }
481     return true;
482 }
483 
SendPacket(const DeviceID & deviceId,const Message * message,const CommErrHandler & handler)484 int TimeSync::SendPacket(const DeviceID &deviceId, const Message *message, const CommErrHandler &handler)
485 {
486     SendConfig conf;
487     timeHelper_->SetSendConfig(deviceId, false, SEND_TIME_OUT, conf);
488     int errCode = communicateHandle_->SendMessage(deviceId, message, conf, handler);
489     if (errCode != E_OK) {
490         LOGE("[TimeSync] SendPacket failed, err %d", errCode);
491     }
492     return errCode;
493 }
494 
TimeSyncDriver(TimerId timerId)495 int TimeSync::TimeSyncDriver(TimerId timerId)
496 {
497     if (timerId != driverTimerId_) {
498         return -E_INTERNAL_ERROR;
499     }
500     if (!isOnline_) {
501         return E_OK;
502     }
503     std::lock_guard<std::mutex> lock(timeDriverLock_);
504     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
505         CommErrHandler handler = [this](int ret, bool isDirectEnd) { CommErrHandlerFunc(ret, this); };
506         (void)this->SyncStart(handler);
507         std::lock_guard<std::mutex> innerLock(this->timeDriverLock_);
508         this->timeDriverLockCount_--;
509         this->timeDriverCond_.notify_all();
510     });
511     if (errCode != E_OK) {
512         LOGE("[TimeSync][TimerSyncDriver] ScheduleTask failed err %d", errCode);
513         return errCode;
514     }
515     timeDriverLockCount_++;
516     return E_OK;
517 }
518 
GetTimeOffset(TimeOffset & outOffset,uint32_t timeout,uint32_t sessionId)519 int TimeSync::GetTimeOffset(TimeOffset &outOffset, uint32_t timeout, uint32_t sessionId)
520 {
521     if (!isSynced_) {
522         {
523             std::lock_guard<std::mutex> lock(cvLock_);
524             isAckReceived_ = false;
525         }
526         CommErrHandler handler = [this](int ret, bool isDirectEnd) { CommErrHandlerFunc(ret, this); };
527         int errCode = SyncStart(handler, sessionId);
528         LOGD("TimeSync::GetTimeOffset start, current time = %" PRIu64 ", errCode = %d, timeout = %" PRIu32 " ms",
529             TimeHelper::GetSysCurrentTime(), errCode, timeout);
530         std::unique_lock<std::mutex> lock(cvLock_);
531         if (errCode != E_OK || !conditionVar_.wait_for(lock, std::chrono::milliseconds(timeout), [this]() {
532             return this->isAckReceived_ || this->closed_;
533             })) {
534             LOGD("TimeSync::GetTimeOffset, retryTime_ = %d", retryTime_);
535             retryTime_++;
536             if (retryTime_ < MAX_RETRY_TIME) {
537                 lock.unlock();
538                 LOGI("TimeSync::GetTimeOffset timeout, try again");
539                 return GetTimeOffset(outOffset, timeout);
540             }
541             retryTime_ = 0;
542             return -E_TIMEOUT;
543         }
544     }
545     if (IsClosed()) {
546         return -E_BUSY;
547     }
548     retryTime_ = 0;
549     metadata_->GetTimeOffset(deviceId_, outOffset);
550     return E_OK;
551 }
552 
IsNeedSync() const553 bool TimeSync::IsNeedSync() const
554 {
555     return !isSynced_;
556 }
557 
SetOnline(bool isOnline)558 void TimeSync::SetOnline(bool isOnline)
559 {
560     isOnline_ = isOnline;
561 }
562 
CommErrHandlerFunc(int errCode,TimeSync * timeSync)563 void TimeSync::CommErrHandlerFunc(int errCode, TimeSync *timeSync)
564 {
565     LOGD("[TimeSync][CommErrHandle] errCode:%d", errCode);
566     std::lock_guard<std::mutex> lock(timeSyncSetLock_);
567     if (timeSyncSet_.count(timeSync) == 0) {
568         LOGI("[TimeSync][CommErrHandle] timeSync has been killed");
569         return;
570     }
571     if (timeSync == nullptr) {
572         LOGI("[TimeSync][CommErrHandle] timeSync is nullptr");
573         return;
574     }
575     if (errCode != E_OK) {
576         timeSync->SetOnline(false);
577     } else {
578         timeSync->SetOnline(true);
579     }
580 }
581 
ResetTimer()582 void TimeSync::ResetTimer()
583 {
584     TimerId timerId;
585     {
586         std::lock_guard<std::mutex> lock(timeDriverLock_);
587         timerId = driverTimerId_;
588         driverTimerId_ = 0u;
589     }
590     if (timerId == 0u) {
591         return;
592     }
593     RuntimeContext::GetInstance()->RemoveTimer(timerId, true);
594     int errCode = RuntimeContext::GetInstance()->SetTimer(
595         TIME_SYNC_INTERVAL, driverCallback_, nullptr, timerId);
596     if (errCode != E_OK) {
597         LOGW("[TimeSync] Reset TimeSync timer failed err :%d", errCode);
598     } else {
599         std::lock_guard<std::mutex> lock(timeDriverLock_);
600         driverTimerId_ = timerId;
601     }
602 }
603 
Close()604 void TimeSync::Close()
605 {
606     Finalize();
607     {
608         std::lock_guard<std::mutex> lock(cvLock_);
609         closed_ = true;
610     }
611     conditionVar_.notify_all();
612 }
613 
IsClosed() const614 bool TimeSync::IsClosed() const
615 {
616     std::lock_guard<std::mutex> lock(cvLock_);
617     return closed_ ;
618 }
619 
SendMessageWithSendEnd(const Message * message,const CommErrHandler & handler)620 int TimeSync::SendMessageWithSendEnd(const Message *message, const CommErrHandler &handler)
621 {
622     std::shared_ptr<TimeSync> timeSyncPtr = shared_from_this();
623     auto sessionId = message->GetSessionId();
624     return SendPacket(deviceId_, message, [handler, timeSyncPtr, sessionId, this](int errCode, bool isDirectEnd) {
625         if (closed_) {
626             LOGW("[TimeSync] DB closed, ignore send end! dev=%.3s", deviceId_.c_str());
627             return;
628         }
629         {
630             std::lock_guard<std::mutex> autoLock(beginTimeMutex_);
631             sessionBeginTime_.clear();
632             sessionBeginTime_[sessionId] = timeHelper_->GetTime();
633         }
634         if (handler != nullptr) {
635             handler(errCode, isDirectEnd);
636         }
637     });
638 }
639 
GetSourceBeginTime(Timestamp packetBeginTime,uint32_t sessionId)640 Timestamp TimeSync::GetSourceBeginTime(Timestamp packetBeginTime, uint32_t sessionId)
641 {
642     std::lock_guard<std::mutex> autoLock(beginTimeMutex_);
643     if (sessionBeginTime_.find(sessionId) == sessionBeginTime_.end()) {
644         LOGW("[TimeSync] Current cache not exist packet send time");
645         return packetBeginTime;
646     }
647     auto sendTime = sessionBeginTime_[sessionId];
648     LOGD("[TimeSync] Use packet send time %" PRIu64 " rather than %" PRIu64, sendTime, packetBeginTime);
649     return sendTime;
650 }
651 
BuildAckPacket(const TimeSyncPacket & request)652 TimeSyncPacket TimeSync::BuildAckPacket(const TimeSyncPacket &request)
653 {
654     TimeSyncPacket ackPacket = TimeSyncPacket(request);
655     Timestamp targetTimeBegin = timeHelper_->GetTime();
656     ackPacket.SetTargetTimeBegin(targetTimeBegin);
657     Timestamp targetTimeEnd = timeHelper_->GetTime();
658     ackPacket.SetTargetTimeEnd(targetTimeEnd);
659     TimeOffset requestOffset = request.GetRequestLocalOffset();
660     TimeOffset responseOffset = metadata_->GetLocalTimeOffset();
661     ackPacket.SetRequestLocalOffset(requestOffset);
662     ackPacket.SetResponseLocalOffset(responseOffset);
663     LOGD("TimeSync::RequestRecv, dev = %s{private}, sTimeEnd = %" PRIu64 ", tTimeEnd = %" PRIu64 ", sbegin = %" PRIu64
664         ", tbegin = %" PRIu64 ", request offset = %" PRId64 ", response offset = %" PRId64, deviceId_.c_str(),
665         ackPacket.GetSourceTimeEnd(), ackPacket.GetTargetTimeEnd(), ackPacket.GetSourceTimeBegin(),
666         ackPacket.GetTargetTimeBegin(), requestOffset, responseOffset);
667     return ackPacket;
668 }
669 
IsRemoteLowVersion(uint32_t checkVersion)670 bool TimeSync::IsRemoteLowVersion(uint32_t checkVersion)
671 {
672     uint16_t version = 0;
673     int errCode = communicateHandle_->GetRemoteCommunicatorVersion(deviceId_, version);
674     return errCode == -E_NOT_FOUND || (version < checkVersion - SOFTWARE_VERSION_EARLIEST);
675 }
676 
ReTimeSyncIfNeed(const TimeSyncPacket & ackPacket)677 void TimeSync::ReTimeSyncIfNeed(const TimeSyncPacket &ackPacket)
678 {
679     TimeOffset timeOffsetIgnoreRtt =
680         static_cast<TimeOffset>(ackPacket.GetSourceTimeBegin() - ackPacket.GetTargetTimeBegin());
681     bool reTimeSync = false;
682     if (IsRemoteLowVersion(SOFTWARE_VERSION_RELEASE_9_0)) {
683         reTimeSync = CheckReTimeSyncIfNeedWithLowVersion(timeOffsetIgnoreRtt);
684     } else {
685         reTimeSync = CheckReTimeSyncIfNeedWithHighVersion(timeOffsetIgnoreRtt, ackPacket);
686     }
687 
688     if ((std::abs(timeOffsetIgnoreRtt) >= INT64_MAX / 2) || reTimeSync) { // 2 is half of INT64_MAX
689         LOGI("[TimeSync][RequestRecv] timeOffSet invalid, should do time sync");
690         SetTimeSyncFinishInner(false);
691         RuntimeContext::GetInstance()->ClearDeviceTimeInfo(deviceId_);
692     }
693 
694     // reset time change by time sync
695     int errCode = metadata_->SetTimeChangeMark(deviceId_, false);
696     if (errCode != E_OK) {
697         LOGW("[TimeSync] Mark dev %.3s no time change failed %d", deviceId_.c_str(), errCode);
698     }
699 }
700 
CheckReTimeSyncIfNeedWithLowVersion(TimeOffset timeOffsetIgnoreRtt)701 bool TimeSync::CheckReTimeSyncIfNeedWithLowVersion(TimeOffset timeOffsetIgnoreRtt)
702 {
703     TimeOffset metadataTimeOffset;
704     metadata_->GetTimeOffset(deviceId_, metadataTimeOffset);
705     return (std::abs(metadataTimeOffset) >= INT64_MAX / 2) || // 2 is half of INT64_MAX
706         (std::abs(metadataTimeOffset - timeOffsetIgnoreRtt) > MAX_TIME_OFFSET_NOISE);
707 }
708 
CheckReTimeSyncIfNeedWithHighVersion(TimeOffset timeOffsetIgnoreRtt,const TimeSyncPacket & ackPacket)709 bool TimeSync::CheckReTimeSyncIfNeedWithHighVersion(TimeOffset timeOffsetIgnoreRtt, const TimeSyncPacket &ackPacket)
710 {
711     TimeOffset rawTimeOffset = timeOffsetIgnoreRtt - ackPacket.GetRequestLocalOffset() +
712         ackPacket.GetResponseLocalOffset();
713     auto [errCode, info] = RuntimeContext::GetInstance()->GetDeviceTimeInfo(deviceId_);
714     return errCode == -E_NOT_FOUND || (std::abs(info.systemTimeOffset - rawTimeOffset) > MAX_TIME_OFFSET_NOISE);
715 }
716 
SaveOffsetWithAck(const TimeSyncPacket & ackPacket)717 int TimeSync::SaveOffsetWithAck(const TimeSyncPacket &ackPacket)
718 {
719     // calculate timeoffset of two devices
720     auto [offset, rtt] = CalculateTimeOffset(ackPacket);
721     TimeOffset rawOffset = CalculateRawTimeOffset(ackPacket, offset);
722     LOGD("TimeSync::AckRecv, dev = %s{private}, sEnd = %" PRIu64 ", tEnd = %" PRIu64 ", sBegin = %" PRIu64
723         ", tBegin = %" PRIu64 ", offset = %" PRId64 ", rawOffset = %" PRId64 ", requestLocalOffset = %" PRId64
724         ", responseLocalOffset = %" PRId64,
725         deviceId_.c_str(),
726         ackPacket.GetSourceTimeEnd(),
727         ackPacket.GetTargetTimeEnd(),
728         ackPacket.GetSourceTimeBegin(),
729         ackPacket.GetTargetTimeBegin(),
730         offset,
731         rawOffset,
732         ackPacket.GetRequestLocalOffset(),
733         ackPacket.GetResponseLocalOffset());
734 
735     // save timeoffset into metadata, maybe a block action
736     int errCode = SaveTimeOffset(deviceId_, offset);
737     if (errCode != E_OK) {
738         return errCode;
739     }
740     errCode = metadata_->SetSystemTimeOffset(deviceId_, rawOffset);
741     if (errCode != E_OK) {
742         return errCode;
743     }
744     DeviceTimeInfo info;
745     info.systemTimeOffset = rawOffset;
746     info.recordTime = timeHelper_->GetSysCurrentTime();
747     info.rtt = rtt;
748     RuntimeContext::GetInstance()->SetDeviceTimeInfo(deviceId_, info);
749     SetTimeSyncFinishInner(true);
750     // save finish next time after save failed
751     return E_OK;
752 }
753 
CalculateRawTimeOffset(const TimeSyncPacket & timeSyncInfo,TimeOffset deltaTime)754 TimeOffset TimeSync::CalculateRawTimeOffset(const TimeSyncPacket &timeSyncInfo, TimeOffset deltaTime)
755 {
756     // deltaTime = (t1' + response - t1 - request + t2' + response - t2 - request)/2
757     // rawTimeOffset =  request - response + (t1' - t1 + t2' - t2)/2
758     // rawTimeOffset = deltaTime + requestLocalOffset - responseLocalOffset
759     return deltaTime + timeSyncInfo.GetRequestLocalOffset() - timeSyncInfo.GetResponseLocalOffset();
760 }
761 
CheckSkipTimeSync(const DeviceTimeInfo & info)762 bool TimeSync::CheckSkipTimeSync(const DeviceTimeInfo &info)
763 {
764     uint64_t currentRawTime = timeHelper_->GetSysCurrentTime();
765     if (currentRawTime < info.recordTime) {
766         LOGW("[TimeSync] current time %" PRIu64 " less than record time %" PRIu64, currentRawTime, info.recordTime);
767         return false;
768     }
769     uint64_t interval = timeHelper_->GetSysCurrentTime() - info.recordTime;
770     if (info.rtt < MAX_TIME_RTT_NOISE) {
771         return true;
772     }
773     if (interval > RTT_NOISE_CHECK_INTERVAL) {
774         LOGI("[TimeSync] rtt %" PRId64 " is greater than noise should re time sync, interval is %" PRIu64, info.rtt,
775             interval);
776         return false;
777     }
778 #ifdef DE_DEBUG_ENV
779 #ifdef TEST_RTT_NOISE_CHECK_INTERVAL
780     if (interval > TEST_RTT_NOISE_CHECK_INTERVAL) {
781         LOGI("[TimeSync][TEST] rtt %" PRId64 " is greater than noise should re time sync, interval is %" PRIu64,
782             info.rtt, interval);
783         return false;
784     }
785 #endif
786 #endif
787     return true;
788 }
789 
SetTimeSyncFinishIfNeed()790 void TimeSync::SetTimeSyncFinishIfNeed()
791 {
792     auto [errCode, info] = RuntimeContext::GetInstance()->GetDeviceTimeInfo(deviceId_);
793     if (errCode != E_OK) {
794         return;
795     }
796     int64_t systemTimeOffset = metadata_->GetSystemTimeOffset(deviceId_);
797     LOGD("[TimeSync] Check db offset %" PRId64 " cache offset %" PRId64, systemTimeOffset, info.systemTimeOffset);
798     if (!CheckSkipTimeSync(info) || (IsNeedSync() &&
799         std::abs(systemTimeOffset - info.systemTimeOffset) >= MAX_TIME_OFFSET_NOISE)) {
800         SetTimeSyncFinishInner(false);
801         return;
802     }
803     if (IsNeedSync()) {
804         SetTimeSyncFinishInner(true);
805     }
806     if (systemTimeOffset != info.systemTimeOffset) {
807         errCode = metadata_->SetSystemTimeOffset(deviceId_, info.systemTimeOffset);
808         if (errCode != E_OK) {
809             return;
810         }
811     }
812     LOGI("[TimeSync] Mark time sync finish success");
813 }
814 
ClearTimeSyncFinish()815 void TimeSync::ClearTimeSyncFinish()
816 {
817     RuntimeContext::GetInstance()->ClearDeviceTimeInfo(deviceId_);
818     SetTimeSyncFinishInner(false);
819 }
820 
GenerateTimeOffsetIfNeed(TimeOffset systemOffset,TimeOffset senderLocalOffset)821 int TimeSync::GenerateTimeOffsetIfNeed(TimeOffset systemOffset, TimeOffset senderLocalOffset)
822 {
823     if (IsRemoteLowVersion(SOFTWARE_VERSION_RELEASE_9_0)) {
824         return E_OK;
825     }
826     auto [errCode, info] = RuntimeContext::GetInstance()->GetDeviceTimeInfo(deviceId_);
827     bool syncFinish = !IsNeedSync();
828     bool timeChange = metadata_->IsTimeChange(deviceId_);
829     // avoid local time change but remote record time sync finish
830     // should return re time sync, after receive time sync request, reset time change mark
831     // we think offset is ok when local time sync to remote
832     if ((timeChange && !syncFinish) ||
833         (errCode == E_OK && (std::abs(info.systemTimeOffset + systemOffset) > MAX_TIME_OFFSET_NOISE))) {
834         LOGI("[TimeSync] time offset is invalid should do time sync again! packet %" PRId64 " cache %" PRId64
835             " time change %d sync finish %d", -systemOffset, info.systemTimeOffset, static_cast<int>(timeChange),
836             static_cast<int>(syncFinish));
837         ClearTimeSyncFinish();
838         RuntimeContext::GetInstance()->ClearDeviceTimeInfo(deviceId_);
839         return -E_NEED_TIME_SYNC;
840     }
841     // Sender's systemOffset = Sender's deltaTime + requestLocalOffset - responseLocalOffset
842     // Sender's deltaTime = Sender's systemOffset - requestLocalOffset + responseLocalOffset
843     // Receiver's deltaTime = -Sender's deltaTime = -Sender's systemOffset + requestLocalOffset - responseLocalOffset
844     TimeOffset offset = -systemOffset + senderLocalOffset - metadata_->GetLocalTimeOffset();
845     errCode = metadata_->SetSystemTimeOffset(deviceId_, -systemOffset);
846     if (errCode != E_OK) {
847         return errCode;
848     }
849     errCode = metadata_->SaveTimeOffset(deviceId_, offset);
850     if (errCode != E_OK) {
851         return errCode;
852     }
853     SetTimeSyncFinishInner(true);
854     info.systemTimeOffset = -systemOffset;
855     info.recordTime = timeHelper_->GetSysCurrentTime();
856     RuntimeContext::GetInstance()->SetDeviceTimeInfo(deviceId_, info);
857     return E_OK;
858 }
859 
SetTimeSyncFinishInner(bool finish)860 void TimeSync::SetTimeSyncFinishInner(bool finish)
861 {
862     isSynced_ = finish;
863     if (IsRemoteLowVersion(SOFTWARE_VERSION_RELEASE_9_0)) {
864         return;
865     }
866     int errCode = metadata_->SetTimeSyncFinishMark(deviceId_, finish);
867     if (errCode != E_OK) {
868         LOGW("[TimeSync] Set %.3s time sync finish %d mark failed %d", deviceId_.c_str(), static_cast<int>(finish),
869             errCode);
870     }
871 }
872 } // namespace DistributedDB