1  /*
2   * Copyright (c) 2021 Huawei Device Co., Ltd.
3   * Licensed under the Apache License, Version 2.0 (the "License");
4   * you may not use this file except in compliance with the License.
5   * You may obtain a copy of the License at
6   *
7   *     http://www.apache.org/licenses/LICENSE-2.0
8   *
9   * Unless required by applicable law or agreed to in writing, software
10   * distributed under the License is distributed on an "AS IS" BASIS,
11   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   * See the License for the specific language governing permissions and
13   * limitations under the License.
14   */
15  
16  #include "time_sync.h"
17  
18  #include "parcel.h"
19  #include "log_print.h"
20  #include "sync_types.h"
21  #include "message_transform.h"
22  #include "version.h"
23  #include "isync_task_context.h"
24  
25  namespace DistributedDB {
26  std::mutex TimeSync::timeSyncSetLock_;
27  std::set<TimeSync *> TimeSync::timeSyncSet_;
28  namespace {
29      constexpr uint64_t TIME_SYNC_INTERVAL = 24 * 60 * 60 * 1000; // 24h
30      constexpr int TRIP_DIV_HALF = 2;
31      constexpr int64_t MAX_TIME_OFFSET_NOISE = 1 * 1000 * 10000; // 1 second in 100-nanosecond units
32      constexpr int64_t MAX_TIME_RTT_NOISE = 1 * 1000 * 10000; // 1 second in 100-nanosecond units
33      constexpr uint64_t RTT_NOISE_CHECK_INTERVAL = 30 * 60 * 1000 * 10000u; // 30 minute in 100-nanosecond units
34  }
35  
36  // Class TimeSyncPacket
TimeSyncPacket()37  TimeSyncPacket::TimeSyncPacket()
38      : sourceTimeBegin_(0),
39        sourceTimeEnd_(0),
40        targetTimeBegin_(0),
41        targetTimeEnd_(0),
42        version_(TIME_SYNC_VERSION_V1),
43        requestLocalOffset_(0),
44        responseLocalOffset_(0)
45  {
46  }
47  
~TimeSyncPacket()48  TimeSyncPacket::~TimeSyncPacket()
49  {
50  }
51  
SetSourceTimeBegin(Timestamp sourceTimeBegin)52  void TimeSyncPacket::SetSourceTimeBegin(Timestamp sourceTimeBegin)
53  {
54      sourceTimeBegin_ = sourceTimeBegin;
55  }
56  
GetSourceTimeBegin() const57  Timestamp TimeSyncPacket::GetSourceTimeBegin() const
58  {
59      return sourceTimeBegin_;
60  }
61  
SetSourceTimeEnd(Timestamp sourceTimeEnd)62  void TimeSyncPacket::SetSourceTimeEnd(Timestamp sourceTimeEnd)
63  {
64      sourceTimeEnd_ = sourceTimeEnd;
65  }
66  
GetSourceTimeEnd() const67  Timestamp TimeSyncPacket::GetSourceTimeEnd() const
68  {
69      return sourceTimeEnd_;
70  }
71  
SetTargetTimeBegin(Timestamp targetTimeBegin)72  void TimeSyncPacket::SetTargetTimeBegin(Timestamp targetTimeBegin)
73  {
74      targetTimeBegin_ = targetTimeBegin;
75  }
76  
GetTargetTimeBegin() const77  Timestamp TimeSyncPacket::GetTargetTimeBegin() const
78  {
79      return targetTimeBegin_;
80  }
81  
SetTargetTimeEnd(Timestamp targetTimeEnd)82  void TimeSyncPacket::SetTargetTimeEnd(Timestamp targetTimeEnd)
83  {
84      targetTimeEnd_ = targetTimeEnd;
85  }
86  
GetTargetTimeEnd() const87  Timestamp TimeSyncPacket::GetTargetTimeEnd() const
88  {
89      return targetTimeEnd_;
90  }
91  
SetVersion(uint32_t version)92  void TimeSyncPacket::SetVersion(uint32_t version)
93  {
94      version_ = version;
95  }
96  
GetVersion() const97  uint32_t TimeSyncPacket::GetVersion() const
98  {
99      return version_;
100  }
101  
GetRequestLocalOffset() const102  TimeOffset TimeSyncPacket::GetRequestLocalOffset() const
103  {
104      return requestLocalOffset_;
105  }
106  
SetRequestLocalOffset(TimeOffset offset)107  void TimeSyncPacket::SetRequestLocalOffset(TimeOffset offset)
108  {
109      requestLocalOffset_ = offset;
110  }
111  
GetResponseLocalOffset() const112  TimeOffset TimeSyncPacket::GetResponseLocalOffset() const
113  {
114      return responseLocalOffset_;
115  }
116  
SetResponseLocalOffset(TimeOffset offset)117  void TimeSyncPacket::SetResponseLocalOffset(TimeOffset offset)
118  {
119      responseLocalOffset_ = offset;
120  }
121  
CalculateLen()122  uint32_t TimeSyncPacket::CalculateLen()
123  {
124      uint32_t len = Parcel::GetUInt32Len(); // version_
125      len += Parcel::GetUInt64Len(); // sourceTimeBegin_
126      len += Parcel::GetUInt64Len(); // sourceTimeEnd_
127      len += Parcel::GetUInt64Len(); // targetTimeBegin_
128      len += Parcel::GetUInt64Len(); // targetTimeEnd_
129      len = Parcel::GetEightByteAlign(len);
130      len += Parcel::GetInt64Len(); // requestLocalOffset_
131      len += Parcel::GetInt64Len(); // responseLocalOffset_
132      return len;
133  }
134  
135  // Class TimeSync
TimeSync()136  TimeSync::TimeSync()
137      : communicateHandle_(nullptr),
138        metadata_(nullptr),
139        timeHelper_(nullptr),
140        retryTime_(0),
141        driverTimerId_(0),
142        isSynced_(false),
143        isAckReceived_(false),
144        timeChangedListener_(nullptr),
145        timeDriverLockCount_(0),
146        isOnline_(true),
147        closed_(false)
148  {
149  }
150  
~TimeSync()151  TimeSync::~TimeSync()
152  {
153      Finalize();
154      driverTimerId_ = 0;
155  
156      if (timeChangedListener_ != nullptr) {
157          timeChangedListener_->Drop(true);
158          timeChangedListener_ = nullptr;
159      }
160      timeHelper_ = nullptr;
161      communicateHandle_ = nullptr;
162      metadata_ = nullptr;
163  
164      std::lock_guard<std::mutex> lock(timeSyncSetLock_);
165      timeSyncSet_.erase(this);
166  }
167  
RegisterTransformFunc()168  int TimeSync::RegisterTransformFunc()
169  {
170      TransformFunc func;
171      func.computeFunc = [](const Message *inMsg) { return CalculateLen(inMsg); };
172      func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
173          return Serialization(buffer, length, inMsg);
174      };
175      func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
176          return DeSerialization(buffer, length, inMsg);
177      };
178      return MessageTransform::RegTransformFunction(TIME_SYNC_MESSAGE, func);
179  }
180  
Initialize(ICommunicator * communicator,const std::shared_ptr<Metadata> & metadata,const ISyncInterface * storage,const DeviceID & deviceId)181  int TimeSync::Initialize(ICommunicator *communicator, const std::shared_ptr<Metadata> &metadata,
182      const ISyncInterface *storage, const DeviceID &deviceId)
183  {
184      if ((communicator == nullptr) || (storage == nullptr) || (metadata == nullptr)) {
185          return -E_INVALID_ARGS;
186      }
187      {
188          std::lock_guard<std::mutex> lock(timeSyncSetLock_);
189          timeSyncSet_.insert(this);
190      }
191      communicateHandle_ = communicator;
192      metadata_ = metadata;
193      deviceId_ = deviceId;
194      timeHelper_ = std::make_unique<TimeHelper>();
195  
196      int errCode = timeHelper_->Initialize(storage, metadata_);
197      if (errCode != E_OK) {
198          timeHelper_ = nullptr;
199          LOGE("[TimeSync] timeHelper Init failed, err %d.", errCode);
200          return errCode;
201      }
202      dbId_ = storage->GetIdentifier();
203      driverCallback_ = [this](TimerId timerId) { return TimeSyncDriver(timerId); };
204      errCode = RuntimeContext::GetInstance()->SetTimer(TIME_SYNC_INTERVAL, driverCallback_, nullptr, driverTimerId_);
205      if (errCode != E_OK) {
206          return errCode;
207      }
208      isSynced_ = metadata_->IsTimeSyncFinish(deviceId_);
209      return errCode;
210  }
211  
Finalize()212  void TimeSync::Finalize()
213  {
214      // Stop the timer
215      LOGD("[TimeSync] Finalize enter!");
216      RuntimeContext *runtimeContext = RuntimeContext::GetInstance();
217      TimerId timerId;
218      {
219          std::unique_lock<std::mutex> lock(timeDriverLock_);
220          timerId = driverTimerId_;
221      }
222      runtimeContext->RemoveTimer(timerId, true);
223      std::unique_lock<std::mutex> lock(timeDriverLock_);
224      timeDriverCond_.wait(lock, [this]() { return this->timeDriverLockCount_ == 0; });
225      LOGD("[TimeSync] Finalized!");
226  }
227  
SyncStart(const CommErrHandler & handler,uint32_t sessionId)228  int TimeSync::SyncStart(const CommErrHandler &handler,  uint32_t sessionId)
229  {
230      isOnline_ = true;
231      TimeSyncPacket packet;
232      Timestamp startTime = timeHelper_->GetTime();
233      packet.SetSourceTimeBegin(startTime);
234      TimeOffset timeOffset = metadata_->GetLocalTimeOffset();
235      packet.SetRequestLocalOffset(timeOffset);
236      // send timeSync request
237      LOGD("[TimeSync] startTime = %" PRIu64 ", offset = % " PRId64 " , dev = %s{private}", startTime, timeOffset,
238          deviceId_.c_str());
239  
240      Message *message = new (std::nothrow) Message(TIME_SYNC_MESSAGE);
241      if (message == nullptr) {
242          return -E_OUT_OF_MEMORY;
243      }
244      message->SetSessionId(sessionId);
245      message->SetMessageType(TYPE_REQUEST);
246      message->SetPriority(Priority::NORMAL);
247      int errCode = message->SetCopiedObject<>(packet);
248      if (errCode != E_OK) {
249          delete message;
250          message = nullptr;
251          return errCode;
252      }
253      errCode = SendMessageWithSendEnd(message, handler);
254      if (errCode != E_OK) {
255          delete message;
256          message = nullptr;
257      }
258      return errCode;
259  }
260  
CalculateLen(const Message * inMsg)261  uint32_t TimeSync::CalculateLen(const Message *inMsg)
262  {
263      if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
264          return 0;
265      }
266  
267      const TimeSyncPacket *packet = const_cast<TimeSyncPacket *>(inMsg->GetObject<TimeSyncPacket>());
268      if (packet == nullptr) {
269          return 0;
270      }
271  
272      return TimeSyncPacket::CalculateLen();
273  }
274  
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)275  int TimeSync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
276  {
277      if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
278          return -E_INVALID_ARGS;
279      }
280      const TimeSyncPacket *packet = inMsg->GetObject<TimeSyncPacket>();
281      if ((packet == nullptr) || (length != TimeSyncPacket::CalculateLen())) {
282          return -E_INVALID_ARGS;
283      }
284  
285      Parcel parcel(buffer, length);
286      Timestamp srcBegin = packet->GetSourceTimeBegin();
287      Timestamp srcEnd = packet->GetSourceTimeEnd();
288      Timestamp targetBegin = packet->GetTargetTimeBegin();
289      Timestamp targetEnd = packet->GetTargetTimeEnd();
290  
291      int errCode = parcel.WriteUInt32(TIME_SYNC_VERSION_V1);
292      if (errCode != E_OK) {
293          return -E_SECUREC_ERROR;
294      }
295      errCode = parcel.WriteUInt64(srcBegin);
296      if (errCode != E_OK) {
297          return -E_SECUREC_ERROR;
298      }
299      errCode = parcel.WriteUInt64(srcEnd);
300      if (errCode != E_OK) {
301          return -E_SECUREC_ERROR;
302      }
303      errCode = parcel.WriteUInt64(targetBegin);
304      if (errCode != E_OK) {
305          return -E_SECUREC_ERROR;
306      }
307      errCode = parcel.WriteUInt64(targetEnd);
308      if (errCode != E_OK) {
309          return -E_SECUREC_ERROR;
310      }
311      parcel.EightByteAlign();
312      parcel.WriteInt64(packet->GetRequestLocalOffset());
313      parcel.WriteInt64(packet->GetResponseLocalOffset());
314      if (parcel.IsError()) {
315          return -E_PARSE_FAIL;
316      }
317      return errCode;
318  }
319  
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)320  int TimeSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
321  {
322      if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
323          return -E_INVALID_ARGS;
324      }
325      TimeSyncPacket packet;
326      Parcel parcel(const_cast<uint8_t *>(buffer), length);
327      Timestamp srcBegin;
328      Timestamp srcEnd;
329      Timestamp targetBegin;
330      Timestamp targetEnd;
331  
332      uint32_t version = 0;
333      parcel.ReadUInt32(version);
334      if (parcel.IsError()) {
335          return -E_INVALID_ARGS;
336      }
337      if (version > TIME_SYNC_VERSION_V1) {
338          packet.SetVersion(version);
339          return inMsg->SetCopiedObject<>(packet);
340      }
341      parcel.ReadUInt64(srcBegin);
342      parcel.ReadUInt64(srcEnd);
343      parcel.ReadUInt64(targetBegin);
344      parcel.ReadUInt64(targetEnd);
345      if (parcel.IsError()) {
346          return -E_INVALID_ARGS;
347      }
348      packet.SetSourceTimeBegin(srcBegin);
349      packet.SetSourceTimeEnd(srcEnd);
350      packet.SetTargetTimeBegin(targetBegin);
351      packet.SetTargetTimeEnd(targetEnd);
352      parcel.EightByteAlign();
353      if (parcel.IsContinueRead()) {
354          TimeOffset requestLocalOffset;
355          TimeOffset responseLocalOffset;
356          parcel.ReadInt64(requestLocalOffset);
357          parcel.ReadInt64(responseLocalOffset);
358          if (parcel.IsError()) {
359              LOGE("[TimeSync] Parse packet failed, message type %u", inMsg->GetMessageType());
360              return -E_PARSE_FAIL;
361          }
362          packet.SetRequestLocalOffset(requestLocalOffset);
363          packet.SetResponseLocalOffset(responseLocalOffset);
364      }
365  
366      return inMsg->SetCopiedObject<>(packet);
367  }
368  
AckRecv(const Message * message,uint32_t targetSessionId)369  int TimeSync::AckRecv(const Message *message, uint32_t targetSessionId)
370  {
371      // only check when sessionId is not 0, because old version timesync sessionId is 0.
372      if (message != nullptr && message->GetSessionId() != 0 &&
373          message->GetErrorNo() == E_FEEDBACK_COMMUNICATOR_NOT_FOUND && message->GetSessionId() == targetSessionId) {
374          LOGE("[AbilitySync][AckMsgCheck] Remote db is closed");
375          return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND;
376      }
377      if (!IsPacketValid(message, TYPE_RESPONSE)) {
378          return -E_INVALID_ARGS;
379      }
380      const TimeSyncPacket *packet = message->GetObject<TimeSyncPacket>();
381      if (packet == nullptr) {
382          LOGE("[TimeSync] AckRecv packet is null");
383          return -E_INVALID_ARGS;
384      }
385  
386      TimeSyncPacket packetData = TimeSyncPacket(*packet);
387      Timestamp sourceTimeEnd = timeHelper_->GetTime();
388      packetData.SetSourceTimeBegin(GetSourceBeginTime(packetData.GetSourceTimeBegin(), targetSessionId));
389      packetData.SetSourceTimeEnd(sourceTimeEnd);
390      if (packetData.GetSourceTimeBegin() > packetData.GetSourceTimeEnd() ||
391          packetData.GetTargetTimeBegin() > packetData.GetTargetTimeEnd() ||
392          packetData.GetSourceTimeEnd() > TimeHelper::MAX_VALID_TIME ||
393          packetData.GetTargetTimeEnd() > TimeHelper::MAX_VALID_TIME) {
394          LOGD("[TimeSync][AckRecv] Time valid check failed.");
395          return -E_INVALID_TIME;
396      }
397      int errCode = SaveOffsetWithAck(packetData);
398      {
399          std::lock_guard<std::mutex> lock(cvLock_);
400          isAckReceived_ = true;
401      }
402      conditionVar_.notify_all();
403      ResetTimer();
404      return errCode;
405  }
406  
RequestRecv(const Message * message)407  int TimeSync::RequestRecv(const Message *message)
408  {
409      if (!IsPacketValid(message, TYPE_REQUEST)) {
410          return -E_INVALID_ARGS;
411      }
412  
413      const TimeSyncPacket *packet = message->GetObject<TimeSyncPacket>();
414      if (packet == nullptr) {
415          return -E_INVALID_ARGS;
416      }
417  
418      // build timeSync ack packet
419      TimeSyncPacket ackPacket = BuildAckPacket(*packet);
420      if (ackPacket.GetSourceTimeBegin() > TimeHelper::MAX_VALID_TIME) {
421          LOGD("[TimeSync][RequestRecv] Time valid check failed.");
422          return -E_INVALID_TIME;
423      }
424      ReTimeSyncIfNeed(ackPacket);
425  
426      Message *ackMessage = new (std::nothrow) Message(TIME_SYNC_MESSAGE);
427      if (ackMessage == nullptr) {
428          return -E_OUT_OF_MEMORY;
429      }
430      ackMessage->SetSessionId(message->GetSessionId());
431      ackMessage->SetPriority(Priority::NORMAL);
432      ackMessage->SetMessageType(TYPE_RESPONSE);
433      ackMessage->SetTarget(deviceId_);
434      int errCode = ackMessage->SetCopiedObject<>(ackPacket);
435      if (errCode != E_OK) {
436          delete ackMessage;
437          ackMessage = nullptr;
438          return errCode;
439      }
440  
441      errCode = SendPacket(deviceId_, ackMessage);
442      if (errCode != E_OK) {
443          delete ackMessage;
444          ackMessage = nullptr;
445      }
446      return errCode;
447  }
448  
SaveTimeOffset(const DeviceID & deviceID,TimeOffset timeOffset)449  int TimeSync::SaveTimeOffset(const DeviceID &deviceID, TimeOffset timeOffset)
450  {
451      return metadata_->SaveTimeOffset(deviceID, timeOffset);
452  }
453  
CalculateTimeOffset(const TimeSyncPacket & timeSyncInfo)454  std::pair<TimeOffset, TimeOffset> TimeSync::CalculateTimeOffset(const TimeSyncPacket &timeSyncInfo)
455  {
456      TimeOffset roundTrip = static_cast<TimeOffset>((timeSyncInfo.GetSourceTimeEnd() -
457          timeSyncInfo.GetSourceTimeBegin()) - (timeSyncInfo.GetTargetTimeEnd() - timeSyncInfo.GetTargetTimeBegin()));
458      TimeOffset offset1 = static_cast<TimeOffset>(timeSyncInfo.GetTargetTimeBegin() -
459          timeSyncInfo.GetSourceTimeBegin() - (roundTrip / TRIP_DIV_HALF));
460      TimeOffset offset2 = static_cast<TimeOffset>(timeSyncInfo.GetTargetTimeEnd() + (roundTrip / TRIP_DIV_HALF) -
461          timeSyncInfo.GetSourceTimeEnd());
462      TimeOffset offset = (offset1 / TRIP_DIV_HALF) + (offset2 / TRIP_DIV_HALF);
463      LOGD("TimeSync::CalculateTimeOffset roundTrip= %" PRId64 ", offset1 = %" PRId64 ", offset2 = %" PRId64
464          ", offset = %" PRId64, roundTrip, offset1, offset2, offset);
465      return {offset, roundTrip};
466  }
467  
IsPacketValid(const Message * inMsg,uint16_t messageType)468  bool TimeSync::IsPacketValid(const Message *inMsg, uint16_t messageType)
469  {
470      if (inMsg == nullptr) {
471          return false;
472      }
473      if (inMsg->GetMessageId() != TIME_SYNC_MESSAGE) {
474          LOGD("message Id = %d", inMsg->GetMessageId());
475          return false;
476      }
477      if (messageType != inMsg->GetMessageType()) {
478          LOGD("input Type = %" PRIu16 ", inMsg type = %" PRIu16, messageType, inMsg->GetMessageType());
479          return false;
480      }
481      return true;
482  }
483  
SendPacket(const DeviceID & deviceId,const Message * message,const CommErrHandler & handler)484  int TimeSync::SendPacket(const DeviceID &deviceId, const Message *message, const CommErrHandler &handler)
485  {
486      SendConfig conf;
487      timeHelper_->SetSendConfig(deviceId, false, SEND_TIME_OUT, conf);
488      int errCode = communicateHandle_->SendMessage(deviceId, message, conf, handler);
489      if (errCode != E_OK) {
490          LOGE("[TimeSync] SendPacket failed, err %d", errCode);
491      }
492      return errCode;
493  }
494  
TimeSyncDriver(TimerId timerId)495  int TimeSync::TimeSyncDriver(TimerId timerId)
496  {
497      if (timerId != driverTimerId_) {
498          return -E_INTERNAL_ERROR;
499      }
500      if (!isOnline_) {
501          return E_OK;
502      }
503      std::lock_guard<std::mutex> lock(timeDriverLock_);
504      int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
505          CommErrHandler handler = [this](int ret, bool isDirectEnd) { CommErrHandlerFunc(ret, this); };
506          (void)this->SyncStart(handler);
507          std::lock_guard<std::mutex> innerLock(this->timeDriverLock_);
508          this->timeDriverLockCount_--;
509          this->timeDriverCond_.notify_all();
510      });
511      if (errCode != E_OK) {
512          LOGE("[TimeSync][TimerSyncDriver] ScheduleTask failed err %d", errCode);
513          return errCode;
514      }
515      timeDriverLockCount_++;
516      return E_OK;
517  }
518  
GetTimeOffset(TimeOffset & outOffset,uint32_t timeout,uint32_t sessionId)519  int TimeSync::GetTimeOffset(TimeOffset &outOffset, uint32_t timeout, uint32_t sessionId)
520  {
521      if (!isSynced_) {
522          {
523              std::lock_guard<std::mutex> lock(cvLock_);
524              isAckReceived_ = false;
525          }
526          CommErrHandler handler = [this](int ret, bool isDirectEnd) { CommErrHandlerFunc(ret, this); };
527          int errCode = SyncStart(handler, sessionId);
528          LOGD("TimeSync::GetTimeOffset start, current time = %" PRIu64 ", errCode = %d, timeout = %" PRIu32 " ms",
529              TimeHelper::GetSysCurrentTime(), errCode, timeout);
530          std::unique_lock<std::mutex> lock(cvLock_);
531          if (errCode != E_OK || !conditionVar_.wait_for(lock, std::chrono::milliseconds(timeout), [this]() {
532              return this->isAckReceived_ || this->closed_;
533              })) {
534              LOGD("TimeSync::GetTimeOffset, retryTime_ = %d", retryTime_);
535              retryTime_++;
536              if (retryTime_ < MAX_RETRY_TIME) {
537                  lock.unlock();
538                  LOGI("TimeSync::GetTimeOffset timeout, try again");
539                  return GetTimeOffset(outOffset, timeout);
540              }
541              retryTime_ = 0;
542              return -E_TIMEOUT;
543          }
544      }
545      if (IsClosed()) {
546          return -E_BUSY;
547      }
548      retryTime_ = 0;
549      metadata_->GetTimeOffset(deviceId_, outOffset);
550      return E_OK;
551  }
552  
IsNeedSync() const553  bool TimeSync::IsNeedSync() const
554  {
555      return !isSynced_;
556  }
557  
SetOnline(bool isOnline)558  void TimeSync::SetOnline(bool isOnline)
559  {
560      isOnline_ = isOnline;
561  }
562  
CommErrHandlerFunc(int errCode,TimeSync * timeSync)563  void TimeSync::CommErrHandlerFunc(int errCode, TimeSync *timeSync)
564  {
565      LOGD("[TimeSync][CommErrHandle] errCode:%d", errCode);
566      std::lock_guard<std::mutex> lock(timeSyncSetLock_);
567      if (timeSyncSet_.count(timeSync) == 0) {
568          LOGI("[TimeSync][CommErrHandle] timeSync has been killed");
569          return;
570      }
571      if (timeSync == nullptr) {
572          LOGI("[TimeSync][CommErrHandle] timeSync is nullptr");
573          return;
574      }
575      if (errCode != E_OK) {
576          timeSync->SetOnline(false);
577      } else {
578          timeSync->SetOnline(true);
579      }
580  }
581  
ResetTimer()582  void TimeSync::ResetTimer()
583  {
584      TimerId timerId;
585      {
586          std::lock_guard<std::mutex> lock(timeDriverLock_);
587          timerId = driverTimerId_;
588          driverTimerId_ = 0u;
589      }
590      if (timerId == 0u) {
591          return;
592      }
593      RuntimeContext::GetInstance()->RemoveTimer(timerId, true);
594      int errCode = RuntimeContext::GetInstance()->SetTimer(
595          TIME_SYNC_INTERVAL, driverCallback_, nullptr, timerId);
596      if (errCode != E_OK) {
597          LOGW("[TimeSync] Reset TimeSync timer failed err :%d", errCode);
598      } else {
599          std::lock_guard<std::mutex> lock(timeDriverLock_);
600          driverTimerId_ = timerId;
601      }
602  }
603  
Close()604  void TimeSync::Close()
605  {
606      Finalize();
607      {
608          std::lock_guard<std::mutex> lock(cvLock_);
609          closed_ = true;
610      }
611      conditionVar_.notify_all();
612  }
613  
IsClosed() const614  bool TimeSync::IsClosed() const
615  {
616      std::lock_guard<std::mutex> lock(cvLock_);
617      return closed_ ;
618  }
619  
SendMessageWithSendEnd(const Message * message,const CommErrHandler & handler)620  int TimeSync::SendMessageWithSendEnd(const Message *message, const CommErrHandler &handler)
621  {
622      std::shared_ptr<TimeSync> timeSyncPtr = shared_from_this();
623      auto sessionId = message->GetSessionId();
624      return SendPacket(deviceId_, message, [handler, timeSyncPtr, sessionId, this](int errCode, bool isDirectEnd) {
625          if (closed_) {
626              LOGW("[TimeSync] DB closed, ignore send end! dev=%.3s", deviceId_.c_str());
627              return;
628          }
629          {
630              std::lock_guard<std::mutex> autoLock(beginTimeMutex_);
631              sessionBeginTime_.clear();
632              sessionBeginTime_[sessionId] = timeHelper_->GetTime();
633          }
634          if (handler != nullptr) {
635              handler(errCode, isDirectEnd);
636          }
637      });
638  }
639  
GetSourceBeginTime(Timestamp packetBeginTime,uint32_t sessionId)640  Timestamp TimeSync::GetSourceBeginTime(Timestamp packetBeginTime, uint32_t sessionId)
641  {
642      std::lock_guard<std::mutex> autoLock(beginTimeMutex_);
643      if (sessionBeginTime_.find(sessionId) == sessionBeginTime_.end()) {
644          LOGW("[TimeSync] Current cache not exist packet send time");
645          return packetBeginTime;
646      }
647      auto sendTime = sessionBeginTime_[sessionId];
648      LOGD("[TimeSync] Use packet send time %" PRIu64 " rather than %" PRIu64, sendTime, packetBeginTime);
649      return sendTime;
650  }
651  
BuildAckPacket(const TimeSyncPacket & request)652  TimeSyncPacket TimeSync::BuildAckPacket(const TimeSyncPacket &request)
653  {
654      TimeSyncPacket ackPacket = TimeSyncPacket(request);
655      Timestamp targetTimeBegin = timeHelper_->GetTime();
656      ackPacket.SetTargetTimeBegin(targetTimeBegin);
657      Timestamp targetTimeEnd = timeHelper_->GetTime();
658      ackPacket.SetTargetTimeEnd(targetTimeEnd);
659      TimeOffset requestOffset = request.GetRequestLocalOffset();
660      TimeOffset responseOffset = metadata_->GetLocalTimeOffset();
661      ackPacket.SetRequestLocalOffset(requestOffset);
662      ackPacket.SetResponseLocalOffset(responseOffset);
663      LOGD("TimeSync::RequestRecv, dev = %s{private}, sTimeEnd = %" PRIu64 ", tTimeEnd = %" PRIu64 ", sbegin = %" PRIu64
664          ", tbegin = %" PRIu64 ", request offset = %" PRId64 ", response offset = %" PRId64, deviceId_.c_str(),
665          ackPacket.GetSourceTimeEnd(), ackPacket.GetTargetTimeEnd(), ackPacket.GetSourceTimeBegin(),
666          ackPacket.GetTargetTimeBegin(), requestOffset, responseOffset);
667      return ackPacket;
668  }
669  
IsRemoteLowVersion(uint32_t checkVersion)670  bool TimeSync::IsRemoteLowVersion(uint32_t checkVersion)
671  {
672      uint16_t version = 0;
673      int errCode = communicateHandle_->GetRemoteCommunicatorVersion(deviceId_, version);
674      return errCode == -E_NOT_FOUND || (version < checkVersion - SOFTWARE_VERSION_EARLIEST);
675  }
676  
ReTimeSyncIfNeed(const TimeSyncPacket & ackPacket)677  void TimeSync::ReTimeSyncIfNeed(const TimeSyncPacket &ackPacket)
678  {
679      TimeOffset timeOffsetIgnoreRtt =
680          static_cast<TimeOffset>(ackPacket.GetSourceTimeBegin() - ackPacket.GetTargetTimeBegin());
681      bool reTimeSync = false;
682      if (IsRemoteLowVersion(SOFTWARE_VERSION_RELEASE_9_0)) {
683          reTimeSync = CheckReTimeSyncIfNeedWithLowVersion(timeOffsetIgnoreRtt);
684      } else {
685          reTimeSync = CheckReTimeSyncIfNeedWithHighVersion(timeOffsetIgnoreRtt, ackPacket);
686      }
687  
688      if ((std::abs(timeOffsetIgnoreRtt) >= INT64_MAX / 2) || reTimeSync) { // 2 is half of INT64_MAX
689          LOGI("[TimeSync][RequestRecv] timeOffSet invalid, should do time sync");
690          SetTimeSyncFinishInner(false);
691          RuntimeContext::GetInstance()->ClearDeviceTimeInfo(deviceId_);
692      }
693  
694      // reset time change by time sync
695      int errCode = metadata_->SetTimeChangeMark(deviceId_, false);
696      if (errCode != E_OK) {
697          LOGW("[TimeSync] Mark dev %.3s no time change failed %d", deviceId_.c_str(), errCode);
698      }
699  }
700  
CheckReTimeSyncIfNeedWithLowVersion(TimeOffset timeOffsetIgnoreRtt)701  bool TimeSync::CheckReTimeSyncIfNeedWithLowVersion(TimeOffset timeOffsetIgnoreRtt)
702  {
703      TimeOffset metadataTimeOffset;
704      metadata_->GetTimeOffset(deviceId_, metadataTimeOffset);
705      return (std::abs(metadataTimeOffset) >= INT64_MAX / 2) || // 2 is half of INT64_MAX
706          (std::abs(metadataTimeOffset - timeOffsetIgnoreRtt) > MAX_TIME_OFFSET_NOISE);
707  }
708  
CheckReTimeSyncIfNeedWithHighVersion(TimeOffset timeOffsetIgnoreRtt,const TimeSyncPacket & ackPacket)709  bool TimeSync::CheckReTimeSyncIfNeedWithHighVersion(TimeOffset timeOffsetIgnoreRtt, const TimeSyncPacket &ackPacket)
710  {
711      TimeOffset rawTimeOffset = timeOffsetIgnoreRtt - ackPacket.GetRequestLocalOffset() +
712          ackPacket.GetResponseLocalOffset();
713      auto [errCode, info] = RuntimeContext::GetInstance()->GetDeviceTimeInfo(deviceId_);
714      return errCode == -E_NOT_FOUND || (std::abs(info.systemTimeOffset - rawTimeOffset) > MAX_TIME_OFFSET_NOISE);
715  }
716  
SaveOffsetWithAck(const TimeSyncPacket & ackPacket)717  int TimeSync::SaveOffsetWithAck(const TimeSyncPacket &ackPacket)
718  {
719      // calculate timeoffset of two devices
720      auto [offset, rtt] = CalculateTimeOffset(ackPacket);
721      TimeOffset rawOffset = CalculateRawTimeOffset(ackPacket, offset);
722      LOGD("TimeSync::AckRecv, dev = %s{private}, sEnd = %" PRIu64 ", tEnd = %" PRIu64 ", sBegin = %" PRIu64
723          ", tBegin = %" PRIu64 ", offset = %" PRId64 ", rawOffset = %" PRId64 ", requestLocalOffset = %" PRId64
724          ", responseLocalOffset = %" PRId64,
725          deviceId_.c_str(),
726          ackPacket.GetSourceTimeEnd(),
727          ackPacket.GetTargetTimeEnd(),
728          ackPacket.GetSourceTimeBegin(),
729          ackPacket.GetTargetTimeBegin(),
730          offset,
731          rawOffset,
732          ackPacket.GetRequestLocalOffset(),
733          ackPacket.GetResponseLocalOffset());
734  
735      // save timeoffset into metadata, maybe a block action
736      int errCode = SaveTimeOffset(deviceId_, offset);
737      if (errCode != E_OK) {
738          return errCode;
739      }
740      errCode = metadata_->SetSystemTimeOffset(deviceId_, rawOffset);
741      if (errCode != E_OK) {
742          return errCode;
743      }
744      DeviceTimeInfo info;
745      info.systemTimeOffset = rawOffset;
746      info.recordTime = timeHelper_->GetSysCurrentTime();
747      info.rtt = rtt;
748      RuntimeContext::GetInstance()->SetDeviceTimeInfo(deviceId_, info);
749      SetTimeSyncFinishInner(true);
750      // save finish next time after save failed
751      return E_OK;
752  }
753  
CalculateRawTimeOffset(const TimeSyncPacket & timeSyncInfo,TimeOffset deltaTime)754  TimeOffset TimeSync::CalculateRawTimeOffset(const TimeSyncPacket &timeSyncInfo, TimeOffset deltaTime)
755  {
756      // deltaTime = (t1' + response - t1 - request + t2' + response - t2 - request)/2
757      // rawTimeOffset =  request - response + (t1' - t1 + t2' - t2)/2
758      // rawTimeOffset = deltaTime + requestLocalOffset - responseLocalOffset
759      return deltaTime + timeSyncInfo.GetRequestLocalOffset() - timeSyncInfo.GetResponseLocalOffset();
760  }
761  
CheckSkipTimeSync(const DeviceTimeInfo & info)762  bool TimeSync::CheckSkipTimeSync(const DeviceTimeInfo &info)
763  {
764      uint64_t currentRawTime = timeHelper_->GetSysCurrentTime();
765      if (currentRawTime < info.recordTime) {
766          LOGW("[TimeSync] current time %" PRIu64 " less than record time %" PRIu64, currentRawTime, info.recordTime);
767          return false;
768      }
769      uint64_t interval = timeHelper_->GetSysCurrentTime() - info.recordTime;
770      if (info.rtt < MAX_TIME_RTT_NOISE) {
771          return true;
772      }
773      if (interval > RTT_NOISE_CHECK_INTERVAL) {
774          LOGI("[TimeSync] rtt %" PRId64 " is greater than noise should re time sync, interval is %" PRIu64, info.rtt,
775              interval);
776          return false;
777      }
778  #ifdef DE_DEBUG_ENV
779  #ifdef TEST_RTT_NOISE_CHECK_INTERVAL
780      if (interval > TEST_RTT_NOISE_CHECK_INTERVAL) {
781          LOGI("[TimeSync][TEST] rtt %" PRId64 " is greater than noise should re time sync, interval is %" PRIu64,
782              info.rtt, interval);
783          return false;
784      }
785  #endif
786  #endif
787      return true;
788  }
789  
SetTimeSyncFinishIfNeed()790  void TimeSync::SetTimeSyncFinishIfNeed()
791  {
792      auto [errCode, info] = RuntimeContext::GetInstance()->GetDeviceTimeInfo(deviceId_);
793      if (errCode != E_OK) {
794          return;
795      }
796      int64_t systemTimeOffset = metadata_->GetSystemTimeOffset(deviceId_);
797      LOGD("[TimeSync] Check db offset %" PRId64 " cache offset %" PRId64, systemTimeOffset, info.systemTimeOffset);
798      if (!CheckSkipTimeSync(info) || (IsNeedSync() &&
799          std::abs(systemTimeOffset - info.systemTimeOffset) >= MAX_TIME_OFFSET_NOISE)) {
800          SetTimeSyncFinishInner(false);
801          return;
802      }
803      if (IsNeedSync()) {
804          SetTimeSyncFinishInner(true);
805      }
806      if (systemTimeOffset != info.systemTimeOffset) {
807          errCode = metadata_->SetSystemTimeOffset(deviceId_, info.systemTimeOffset);
808          if (errCode != E_OK) {
809              return;
810          }
811      }
812      LOGI("[TimeSync] Mark time sync finish success");
813  }
814  
ClearTimeSyncFinish()815  void TimeSync::ClearTimeSyncFinish()
816  {
817      RuntimeContext::GetInstance()->ClearDeviceTimeInfo(deviceId_);
818      SetTimeSyncFinishInner(false);
819  }
820  
GenerateTimeOffsetIfNeed(TimeOffset systemOffset,TimeOffset senderLocalOffset)821  int TimeSync::GenerateTimeOffsetIfNeed(TimeOffset systemOffset, TimeOffset senderLocalOffset)
822  {
823      if (IsRemoteLowVersion(SOFTWARE_VERSION_RELEASE_9_0)) {
824          return E_OK;
825      }
826      auto [errCode, info] = RuntimeContext::GetInstance()->GetDeviceTimeInfo(deviceId_);
827      bool syncFinish = !IsNeedSync();
828      bool timeChange = metadata_->IsTimeChange(deviceId_);
829      // avoid local time change but remote record time sync finish
830      // should return re time sync, after receive time sync request, reset time change mark
831      // we think offset is ok when local time sync to remote
832      if ((timeChange && !syncFinish) ||
833          (errCode == E_OK && (std::abs(info.systemTimeOffset + systemOffset) > MAX_TIME_OFFSET_NOISE))) {
834          LOGI("[TimeSync] time offset is invalid should do time sync again! packet %" PRId64 " cache %" PRId64
835              " time change %d sync finish %d", -systemOffset, info.systemTimeOffset, static_cast<int>(timeChange),
836              static_cast<int>(syncFinish));
837          ClearTimeSyncFinish();
838          RuntimeContext::GetInstance()->ClearDeviceTimeInfo(deviceId_);
839          return -E_NEED_TIME_SYNC;
840      }
841      // Sender's systemOffset = Sender's deltaTime + requestLocalOffset - responseLocalOffset
842      // Sender's deltaTime = Sender's systemOffset - requestLocalOffset + responseLocalOffset
843      // Receiver's deltaTime = -Sender's deltaTime = -Sender's systemOffset + requestLocalOffset - responseLocalOffset
844      TimeOffset offset = -systemOffset + senderLocalOffset - metadata_->GetLocalTimeOffset();
845      errCode = metadata_->SetSystemTimeOffset(deviceId_, -systemOffset);
846      if (errCode != E_OK) {
847          return errCode;
848      }
849      errCode = metadata_->SaveTimeOffset(deviceId_, offset);
850      if (errCode != E_OK) {
851          return errCode;
852      }
853      SetTimeSyncFinishInner(true);
854      info.systemTimeOffset = -systemOffset;
855      info.recordTime = timeHelper_->GetSysCurrentTime();
856      RuntimeContext::GetInstance()->SetDeviceTimeInfo(deviceId_, info);
857      return E_OK;
858  }
859  
SetTimeSyncFinishInner(bool finish)860  void TimeSync::SetTimeSyncFinishInner(bool finish)
861  {
862      isSynced_ = finish;
863      if (IsRemoteLowVersion(SOFTWARE_VERSION_RELEASE_9_0)) {
864          return;
865      }
866      int errCode = metadata_->SetTimeSyncFinishMark(deviceId_, finish);
867      if (errCode != E_OK) {
868          LOGW("[TimeSync] Set %.3s time sync finish %d mark failed %d", deviceId_.c_str(), static_cast<int>(finish),
869              errCode);
870      }
871  }
872  } // namespace DistributedDB