1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "time_sync.h"
17
18 #include "parcel.h"
19 #include "log_print.h"
20 #include "sync_types.h"
21 #include "message_transform.h"
22 #include "version.h"
23 #include "isync_task_context.h"
24
25 namespace DistributedDB {
26 std::mutex TimeSync::timeSyncSetLock_;
27 std::set<TimeSync *> TimeSync::timeSyncSet_;
28 namespace {
29 constexpr uint64_t TIME_SYNC_INTERVAL = 24 * 60 * 60 * 1000; // 24h
30 constexpr int TRIP_DIV_HALF = 2;
31 constexpr int64_t MAX_TIME_OFFSET_NOISE = 1 * 1000 * 10000; // 1 second in 100-nanosecond units
32 constexpr int64_t MAX_TIME_RTT_NOISE = 1 * 1000 * 10000; // 1 second in 100-nanosecond units
33 constexpr uint64_t RTT_NOISE_CHECK_INTERVAL = 30 * 60 * 1000 * 10000u; // 30 minute in 100-nanosecond units
34 }
35
36 // Class TimeSyncPacket
TimeSyncPacket()37 TimeSyncPacket::TimeSyncPacket()
38 : sourceTimeBegin_(0),
39 sourceTimeEnd_(0),
40 targetTimeBegin_(0),
41 targetTimeEnd_(0),
42 version_(TIME_SYNC_VERSION_V1),
43 requestLocalOffset_(0),
44 responseLocalOffset_(0)
45 {
46 }
47
~TimeSyncPacket()48 TimeSyncPacket::~TimeSyncPacket()
49 {
50 }
51
SetSourceTimeBegin(Timestamp sourceTimeBegin)52 void TimeSyncPacket::SetSourceTimeBegin(Timestamp sourceTimeBegin)
53 {
54 sourceTimeBegin_ = sourceTimeBegin;
55 }
56
GetSourceTimeBegin() const57 Timestamp TimeSyncPacket::GetSourceTimeBegin() const
58 {
59 return sourceTimeBegin_;
60 }
61
SetSourceTimeEnd(Timestamp sourceTimeEnd)62 void TimeSyncPacket::SetSourceTimeEnd(Timestamp sourceTimeEnd)
63 {
64 sourceTimeEnd_ = sourceTimeEnd;
65 }
66
GetSourceTimeEnd() const67 Timestamp TimeSyncPacket::GetSourceTimeEnd() const
68 {
69 return sourceTimeEnd_;
70 }
71
SetTargetTimeBegin(Timestamp targetTimeBegin)72 void TimeSyncPacket::SetTargetTimeBegin(Timestamp targetTimeBegin)
73 {
74 targetTimeBegin_ = targetTimeBegin;
75 }
76
GetTargetTimeBegin() const77 Timestamp TimeSyncPacket::GetTargetTimeBegin() const
78 {
79 return targetTimeBegin_;
80 }
81
SetTargetTimeEnd(Timestamp targetTimeEnd)82 void TimeSyncPacket::SetTargetTimeEnd(Timestamp targetTimeEnd)
83 {
84 targetTimeEnd_ = targetTimeEnd;
85 }
86
GetTargetTimeEnd() const87 Timestamp TimeSyncPacket::GetTargetTimeEnd() const
88 {
89 return targetTimeEnd_;
90 }
91
SetVersion(uint32_t version)92 void TimeSyncPacket::SetVersion(uint32_t version)
93 {
94 version_ = version;
95 }
96
GetVersion() const97 uint32_t TimeSyncPacket::GetVersion() const
98 {
99 return version_;
100 }
101
GetRequestLocalOffset() const102 TimeOffset TimeSyncPacket::GetRequestLocalOffset() const
103 {
104 return requestLocalOffset_;
105 }
106
SetRequestLocalOffset(TimeOffset offset)107 void TimeSyncPacket::SetRequestLocalOffset(TimeOffset offset)
108 {
109 requestLocalOffset_ = offset;
110 }
111
GetResponseLocalOffset() const112 TimeOffset TimeSyncPacket::GetResponseLocalOffset() const
113 {
114 return responseLocalOffset_;
115 }
116
SetResponseLocalOffset(TimeOffset offset)117 void TimeSyncPacket::SetResponseLocalOffset(TimeOffset offset)
118 {
119 responseLocalOffset_ = offset;
120 }
121
CalculateLen()122 uint32_t TimeSyncPacket::CalculateLen()
123 {
124 uint32_t len = Parcel::GetUInt32Len(); // version_
125 len += Parcel::GetUInt64Len(); // sourceTimeBegin_
126 len += Parcel::GetUInt64Len(); // sourceTimeEnd_
127 len += Parcel::GetUInt64Len(); // targetTimeBegin_
128 len += Parcel::GetUInt64Len(); // targetTimeEnd_
129 len = Parcel::GetEightByteAlign(len);
130 len += Parcel::GetInt64Len(); // requestLocalOffset_
131 len += Parcel::GetInt64Len(); // responseLocalOffset_
132 return len;
133 }
134
135 // Class TimeSync
TimeSync()136 TimeSync::TimeSync()
137 : communicateHandle_(nullptr),
138 metadata_(nullptr),
139 timeHelper_(nullptr),
140 retryTime_(0),
141 driverTimerId_(0),
142 isSynced_(false),
143 isAckReceived_(false),
144 timeChangedListener_(nullptr),
145 timeDriverLockCount_(0),
146 isOnline_(true),
147 closed_(false)
148 {
149 }
150
~TimeSync()151 TimeSync::~TimeSync()
152 {
153 Finalize();
154 driverTimerId_ = 0;
155
156 if (timeChangedListener_ != nullptr) {
157 timeChangedListener_->Drop(true);
158 timeChangedListener_ = nullptr;
159 }
160 timeHelper_ = nullptr;
161 communicateHandle_ = nullptr;
162 metadata_ = nullptr;
163
164 std::lock_guard<std::mutex> lock(timeSyncSetLock_);
165 timeSyncSet_.erase(this);
166 }
167
RegisterTransformFunc()168 int TimeSync::RegisterTransformFunc()
169 {
170 TransformFunc func;
171 func.computeFunc = [](const Message *inMsg) { return CalculateLen(inMsg); };
172 func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
173 return Serialization(buffer, length, inMsg);
174 };
175 func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
176 return DeSerialization(buffer, length, inMsg);
177 };
178 return MessageTransform::RegTransformFunction(TIME_SYNC_MESSAGE, func);
179 }
180
Initialize(ICommunicator * communicator,const std::shared_ptr<Metadata> & metadata,const ISyncInterface * storage,const DeviceID & deviceId)181 int TimeSync::Initialize(ICommunicator *communicator, const std::shared_ptr<Metadata> &metadata,
182 const ISyncInterface *storage, const DeviceID &deviceId)
183 {
184 if ((communicator == nullptr) || (storage == nullptr) || (metadata == nullptr)) {
185 return -E_INVALID_ARGS;
186 }
187 {
188 std::lock_guard<std::mutex> lock(timeSyncSetLock_);
189 timeSyncSet_.insert(this);
190 }
191 communicateHandle_ = communicator;
192 metadata_ = metadata;
193 deviceId_ = deviceId;
194 timeHelper_ = std::make_unique<TimeHelper>();
195
196 int errCode = timeHelper_->Initialize(storage, metadata_);
197 if (errCode != E_OK) {
198 timeHelper_ = nullptr;
199 LOGE("[TimeSync] timeHelper Init failed, err %d.", errCode);
200 return errCode;
201 }
202 dbId_ = storage->GetIdentifier();
203 driverCallback_ = [this](TimerId timerId) { return TimeSyncDriver(timerId); };
204 errCode = RuntimeContext::GetInstance()->SetTimer(TIME_SYNC_INTERVAL, driverCallback_, nullptr, driverTimerId_);
205 if (errCode != E_OK) {
206 return errCode;
207 }
208 isSynced_ = metadata_->IsTimeSyncFinish(deviceId_);
209 return errCode;
210 }
211
Finalize()212 void TimeSync::Finalize()
213 {
214 // Stop the timer
215 LOGD("[TimeSync] Finalize enter!");
216 RuntimeContext *runtimeContext = RuntimeContext::GetInstance();
217 TimerId timerId;
218 {
219 std::unique_lock<std::mutex> lock(timeDriverLock_);
220 timerId = driverTimerId_;
221 }
222 runtimeContext->RemoveTimer(timerId, true);
223 std::unique_lock<std::mutex> lock(timeDriverLock_);
224 timeDriverCond_.wait(lock, [this]() { return this->timeDriverLockCount_ == 0; });
225 LOGD("[TimeSync] Finalized!");
226 }
227
SyncStart(const CommErrHandler & handler,uint32_t sessionId)228 int TimeSync::SyncStart(const CommErrHandler &handler, uint32_t sessionId)
229 {
230 isOnline_ = true;
231 TimeSyncPacket packet;
232 Timestamp startTime = timeHelper_->GetTime();
233 packet.SetSourceTimeBegin(startTime);
234 TimeOffset timeOffset = metadata_->GetLocalTimeOffset();
235 packet.SetRequestLocalOffset(timeOffset);
236 // send timeSync request
237 LOGD("[TimeSync] startTime = %" PRIu64 ", offset = % " PRId64 " , dev = %s{private}", startTime, timeOffset,
238 deviceId_.c_str());
239
240 Message *message = new (std::nothrow) Message(TIME_SYNC_MESSAGE);
241 if (message == nullptr) {
242 return -E_OUT_OF_MEMORY;
243 }
244 message->SetSessionId(sessionId);
245 message->SetMessageType(TYPE_REQUEST);
246 message->SetPriority(Priority::NORMAL);
247 int errCode = message->SetCopiedObject<>(packet);
248 if (errCode != E_OK) {
249 delete message;
250 message = nullptr;
251 return errCode;
252 }
253 errCode = SendMessageWithSendEnd(message, handler);
254 if (errCode != E_OK) {
255 delete message;
256 message = nullptr;
257 }
258 return errCode;
259 }
260
CalculateLen(const Message * inMsg)261 uint32_t TimeSync::CalculateLen(const Message *inMsg)
262 {
263 if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
264 return 0;
265 }
266
267 const TimeSyncPacket *packet = const_cast<TimeSyncPacket *>(inMsg->GetObject<TimeSyncPacket>());
268 if (packet == nullptr) {
269 return 0;
270 }
271
272 return TimeSyncPacket::CalculateLen();
273 }
274
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)275 int TimeSync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
276 {
277 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
278 return -E_INVALID_ARGS;
279 }
280 const TimeSyncPacket *packet = inMsg->GetObject<TimeSyncPacket>();
281 if ((packet == nullptr) || (length != TimeSyncPacket::CalculateLen())) {
282 return -E_INVALID_ARGS;
283 }
284
285 Parcel parcel(buffer, length);
286 Timestamp srcBegin = packet->GetSourceTimeBegin();
287 Timestamp srcEnd = packet->GetSourceTimeEnd();
288 Timestamp targetBegin = packet->GetTargetTimeBegin();
289 Timestamp targetEnd = packet->GetTargetTimeEnd();
290
291 int errCode = parcel.WriteUInt32(TIME_SYNC_VERSION_V1);
292 if (errCode != E_OK) {
293 return -E_SECUREC_ERROR;
294 }
295 errCode = parcel.WriteUInt64(srcBegin);
296 if (errCode != E_OK) {
297 return -E_SECUREC_ERROR;
298 }
299 errCode = parcel.WriteUInt64(srcEnd);
300 if (errCode != E_OK) {
301 return -E_SECUREC_ERROR;
302 }
303 errCode = parcel.WriteUInt64(targetBegin);
304 if (errCode != E_OK) {
305 return -E_SECUREC_ERROR;
306 }
307 errCode = parcel.WriteUInt64(targetEnd);
308 if (errCode != E_OK) {
309 return -E_SECUREC_ERROR;
310 }
311 parcel.EightByteAlign();
312 parcel.WriteInt64(packet->GetRequestLocalOffset());
313 parcel.WriteInt64(packet->GetResponseLocalOffset());
314 if (parcel.IsError()) {
315 return -E_PARSE_FAIL;
316 }
317 return errCode;
318 }
319
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)320 int TimeSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
321 {
322 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
323 return -E_INVALID_ARGS;
324 }
325 TimeSyncPacket packet;
326 Parcel parcel(const_cast<uint8_t *>(buffer), length);
327 Timestamp srcBegin;
328 Timestamp srcEnd;
329 Timestamp targetBegin;
330 Timestamp targetEnd;
331
332 uint32_t version = 0;
333 parcel.ReadUInt32(version);
334 if (parcel.IsError()) {
335 return -E_INVALID_ARGS;
336 }
337 if (version > TIME_SYNC_VERSION_V1) {
338 packet.SetVersion(version);
339 return inMsg->SetCopiedObject<>(packet);
340 }
341 parcel.ReadUInt64(srcBegin);
342 parcel.ReadUInt64(srcEnd);
343 parcel.ReadUInt64(targetBegin);
344 parcel.ReadUInt64(targetEnd);
345 if (parcel.IsError()) {
346 return -E_INVALID_ARGS;
347 }
348 packet.SetSourceTimeBegin(srcBegin);
349 packet.SetSourceTimeEnd(srcEnd);
350 packet.SetTargetTimeBegin(targetBegin);
351 packet.SetTargetTimeEnd(targetEnd);
352 parcel.EightByteAlign();
353 if (parcel.IsContinueRead()) {
354 TimeOffset requestLocalOffset;
355 TimeOffset responseLocalOffset;
356 parcel.ReadInt64(requestLocalOffset);
357 parcel.ReadInt64(responseLocalOffset);
358 if (parcel.IsError()) {
359 LOGE("[TimeSync] Parse packet failed, message type %u", inMsg->GetMessageType());
360 return -E_PARSE_FAIL;
361 }
362 packet.SetRequestLocalOffset(requestLocalOffset);
363 packet.SetResponseLocalOffset(responseLocalOffset);
364 }
365
366 return inMsg->SetCopiedObject<>(packet);
367 }
368
AckRecv(const Message * message,uint32_t targetSessionId)369 int TimeSync::AckRecv(const Message *message, uint32_t targetSessionId)
370 {
371 // only check when sessionId is not 0, because old version timesync sessionId is 0.
372 if (message != nullptr && message->GetSessionId() != 0 &&
373 message->GetErrorNo() == E_FEEDBACK_COMMUNICATOR_NOT_FOUND && message->GetSessionId() == targetSessionId) {
374 LOGE("[AbilitySync][AckMsgCheck] Remote db is closed");
375 return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND;
376 }
377 if (!IsPacketValid(message, TYPE_RESPONSE)) {
378 return -E_INVALID_ARGS;
379 }
380 const TimeSyncPacket *packet = message->GetObject<TimeSyncPacket>();
381 if (packet == nullptr) {
382 LOGE("[TimeSync] AckRecv packet is null");
383 return -E_INVALID_ARGS;
384 }
385
386 TimeSyncPacket packetData = TimeSyncPacket(*packet);
387 Timestamp sourceTimeEnd = timeHelper_->GetTime();
388 packetData.SetSourceTimeBegin(GetSourceBeginTime(packetData.GetSourceTimeBegin(), targetSessionId));
389 packetData.SetSourceTimeEnd(sourceTimeEnd);
390 if (packetData.GetSourceTimeBegin() > packetData.GetSourceTimeEnd() ||
391 packetData.GetTargetTimeBegin() > packetData.GetTargetTimeEnd() ||
392 packetData.GetSourceTimeEnd() > TimeHelper::MAX_VALID_TIME ||
393 packetData.GetTargetTimeEnd() > TimeHelper::MAX_VALID_TIME) {
394 LOGD("[TimeSync][AckRecv] Time valid check failed.");
395 return -E_INVALID_TIME;
396 }
397 int errCode = SaveOffsetWithAck(packetData);
398 {
399 std::lock_guard<std::mutex> lock(cvLock_);
400 isAckReceived_ = true;
401 }
402 conditionVar_.notify_all();
403 ResetTimer();
404 return errCode;
405 }
406
RequestRecv(const Message * message)407 int TimeSync::RequestRecv(const Message *message)
408 {
409 if (!IsPacketValid(message, TYPE_REQUEST)) {
410 return -E_INVALID_ARGS;
411 }
412
413 const TimeSyncPacket *packet = message->GetObject<TimeSyncPacket>();
414 if (packet == nullptr) {
415 return -E_INVALID_ARGS;
416 }
417
418 // build timeSync ack packet
419 TimeSyncPacket ackPacket = BuildAckPacket(*packet);
420 if (ackPacket.GetSourceTimeBegin() > TimeHelper::MAX_VALID_TIME) {
421 LOGD("[TimeSync][RequestRecv] Time valid check failed.");
422 return -E_INVALID_TIME;
423 }
424 ReTimeSyncIfNeed(ackPacket);
425
426 Message *ackMessage = new (std::nothrow) Message(TIME_SYNC_MESSAGE);
427 if (ackMessage == nullptr) {
428 return -E_OUT_OF_MEMORY;
429 }
430 ackMessage->SetSessionId(message->GetSessionId());
431 ackMessage->SetPriority(Priority::NORMAL);
432 ackMessage->SetMessageType(TYPE_RESPONSE);
433 ackMessage->SetTarget(deviceId_);
434 int errCode = ackMessage->SetCopiedObject<>(ackPacket);
435 if (errCode != E_OK) {
436 delete ackMessage;
437 ackMessage = nullptr;
438 return errCode;
439 }
440
441 errCode = SendPacket(deviceId_, ackMessage);
442 if (errCode != E_OK) {
443 delete ackMessage;
444 ackMessage = nullptr;
445 }
446 return errCode;
447 }
448
SaveTimeOffset(const DeviceID & deviceID,TimeOffset timeOffset)449 int TimeSync::SaveTimeOffset(const DeviceID &deviceID, TimeOffset timeOffset)
450 {
451 return metadata_->SaveTimeOffset(deviceID, timeOffset);
452 }
453
CalculateTimeOffset(const TimeSyncPacket & timeSyncInfo)454 std::pair<TimeOffset, TimeOffset> TimeSync::CalculateTimeOffset(const TimeSyncPacket &timeSyncInfo)
455 {
456 TimeOffset roundTrip = static_cast<TimeOffset>((timeSyncInfo.GetSourceTimeEnd() -
457 timeSyncInfo.GetSourceTimeBegin()) - (timeSyncInfo.GetTargetTimeEnd() - timeSyncInfo.GetTargetTimeBegin()));
458 TimeOffset offset1 = static_cast<TimeOffset>(timeSyncInfo.GetTargetTimeBegin() -
459 timeSyncInfo.GetSourceTimeBegin() - (roundTrip / TRIP_DIV_HALF));
460 TimeOffset offset2 = static_cast<TimeOffset>(timeSyncInfo.GetTargetTimeEnd() + (roundTrip / TRIP_DIV_HALF) -
461 timeSyncInfo.GetSourceTimeEnd());
462 TimeOffset offset = (offset1 / TRIP_DIV_HALF) + (offset2 / TRIP_DIV_HALF);
463 LOGD("TimeSync::CalculateTimeOffset roundTrip= %" PRId64 ", offset1 = %" PRId64 ", offset2 = %" PRId64
464 ", offset = %" PRId64, roundTrip, offset1, offset2, offset);
465 return {offset, roundTrip};
466 }
467
IsPacketValid(const Message * inMsg,uint16_t messageType)468 bool TimeSync::IsPacketValid(const Message *inMsg, uint16_t messageType)
469 {
470 if (inMsg == nullptr) {
471 return false;
472 }
473 if (inMsg->GetMessageId() != TIME_SYNC_MESSAGE) {
474 LOGD("message Id = %d", inMsg->GetMessageId());
475 return false;
476 }
477 if (messageType != inMsg->GetMessageType()) {
478 LOGD("input Type = %" PRIu16 ", inMsg type = %" PRIu16, messageType, inMsg->GetMessageType());
479 return false;
480 }
481 return true;
482 }
483
SendPacket(const DeviceID & deviceId,const Message * message,const CommErrHandler & handler)484 int TimeSync::SendPacket(const DeviceID &deviceId, const Message *message, const CommErrHandler &handler)
485 {
486 SendConfig conf;
487 timeHelper_->SetSendConfig(deviceId, false, SEND_TIME_OUT, conf);
488 int errCode = communicateHandle_->SendMessage(deviceId, message, conf, handler);
489 if (errCode != E_OK) {
490 LOGE("[TimeSync] SendPacket failed, err %d", errCode);
491 }
492 return errCode;
493 }
494
TimeSyncDriver(TimerId timerId)495 int TimeSync::TimeSyncDriver(TimerId timerId)
496 {
497 if (timerId != driverTimerId_) {
498 return -E_INTERNAL_ERROR;
499 }
500 if (!isOnline_) {
501 return E_OK;
502 }
503 std::lock_guard<std::mutex> lock(timeDriverLock_);
504 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
505 CommErrHandler handler = [this](int ret, bool isDirectEnd) { CommErrHandlerFunc(ret, this); };
506 (void)this->SyncStart(handler);
507 std::lock_guard<std::mutex> innerLock(this->timeDriverLock_);
508 this->timeDriverLockCount_--;
509 this->timeDriverCond_.notify_all();
510 });
511 if (errCode != E_OK) {
512 LOGE("[TimeSync][TimerSyncDriver] ScheduleTask failed err %d", errCode);
513 return errCode;
514 }
515 timeDriverLockCount_++;
516 return E_OK;
517 }
518
GetTimeOffset(TimeOffset & outOffset,uint32_t timeout,uint32_t sessionId)519 int TimeSync::GetTimeOffset(TimeOffset &outOffset, uint32_t timeout, uint32_t sessionId)
520 {
521 if (!isSynced_) {
522 {
523 std::lock_guard<std::mutex> lock(cvLock_);
524 isAckReceived_ = false;
525 }
526 CommErrHandler handler = [this](int ret, bool isDirectEnd) { CommErrHandlerFunc(ret, this); };
527 int errCode = SyncStart(handler, sessionId);
528 LOGD("TimeSync::GetTimeOffset start, current time = %" PRIu64 ", errCode = %d, timeout = %" PRIu32 " ms",
529 TimeHelper::GetSysCurrentTime(), errCode, timeout);
530 std::unique_lock<std::mutex> lock(cvLock_);
531 if (errCode != E_OK || !conditionVar_.wait_for(lock, std::chrono::milliseconds(timeout), [this]() {
532 return this->isAckReceived_ || this->closed_;
533 })) {
534 LOGD("TimeSync::GetTimeOffset, retryTime_ = %d", retryTime_);
535 retryTime_++;
536 if (retryTime_ < MAX_RETRY_TIME) {
537 lock.unlock();
538 LOGI("TimeSync::GetTimeOffset timeout, try again");
539 return GetTimeOffset(outOffset, timeout);
540 }
541 retryTime_ = 0;
542 return -E_TIMEOUT;
543 }
544 }
545 if (IsClosed()) {
546 return -E_BUSY;
547 }
548 retryTime_ = 0;
549 metadata_->GetTimeOffset(deviceId_, outOffset);
550 return E_OK;
551 }
552
IsNeedSync() const553 bool TimeSync::IsNeedSync() const
554 {
555 return !isSynced_;
556 }
557
SetOnline(bool isOnline)558 void TimeSync::SetOnline(bool isOnline)
559 {
560 isOnline_ = isOnline;
561 }
562
CommErrHandlerFunc(int errCode,TimeSync * timeSync)563 void TimeSync::CommErrHandlerFunc(int errCode, TimeSync *timeSync)
564 {
565 LOGD("[TimeSync][CommErrHandle] errCode:%d", errCode);
566 std::lock_guard<std::mutex> lock(timeSyncSetLock_);
567 if (timeSyncSet_.count(timeSync) == 0) {
568 LOGI("[TimeSync][CommErrHandle] timeSync has been killed");
569 return;
570 }
571 if (timeSync == nullptr) {
572 LOGI("[TimeSync][CommErrHandle] timeSync is nullptr");
573 return;
574 }
575 if (errCode != E_OK) {
576 timeSync->SetOnline(false);
577 } else {
578 timeSync->SetOnline(true);
579 }
580 }
581
ResetTimer()582 void TimeSync::ResetTimer()
583 {
584 TimerId timerId;
585 {
586 std::lock_guard<std::mutex> lock(timeDriverLock_);
587 timerId = driverTimerId_;
588 driverTimerId_ = 0u;
589 }
590 if (timerId == 0u) {
591 return;
592 }
593 RuntimeContext::GetInstance()->RemoveTimer(timerId, true);
594 int errCode = RuntimeContext::GetInstance()->SetTimer(
595 TIME_SYNC_INTERVAL, driverCallback_, nullptr, timerId);
596 if (errCode != E_OK) {
597 LOGW("[TimeSync] Reset TimeSync timer failed err :%d", errCode);
598 } else {
599 std::lock_guard<std::mutex> lock(timeDriverLock_);
600 driverTimerId_ = timerId;
601 }
602 }
603
Close()604 void TimeSync::Close()
605 {
606 Finalize();
607 {
608 std::lock_guard<std::mutex> lock(cvLock_);
609 closed_ = true;
610 }
611 conditionVar_.notify_all();
612 }
613
IsClosed() const614 bool TimeSync::IsClosed() const
615 {
616 std::lock_guard<std::mutex> lock(cvLock_);
617 return closed_ ;
618 }
619
SendMessageWithSendEnd(const Message * message,const CommErrHandler & handler)620 int TimeSync::SendMessageWithSendEnd(const Message *message, const CommErrHandler &handler)
621 {
622 std::shared_ptr<TimeSync> timeSyncPtr = shared_from_this();
623 auto sessionId = message->GetSessionId();
624 return SendPacket(deviceId_, message, [handler, timeSyncPtr, sessionId, this](int errCode, bool isDirectEnd) {
625 if (closed_) {
626 LOGW("[TimeSync] DB closed, ignore send end! dev=%.3s", deviceId_.c_str());
627 return;
628 }
629 {
630 std::lock_guard<std::mutex> autoLock(beginTimeMutex_);
631 sessionBeginTime_.clear();
632 sessionBeginTime_[sessionId] = timeHelper_->GetTime();
633 }
634 if (handler != nullptr) {
635 handler(errCode, isDirectEnd);
636 }
637 });
638 }
639
GetSourceBeginTime(Timestamp packetBeginTime,uint32_t sessionId)640 Timestamp TimeSync::GetSourceBeginTime(Timestamp packetBeginTime, uint32_t sessionId)
641 {
642 std::lock_guard<std::mutex> autoLock(beginTimeMutex_);
643 if (sessionBeginTime_.find(sessionId) == sessionBeginTime_.end()) {
644 LOGW("[TimeSync] Current cache not exist packet send time");
645 return packetBeginTime;
646 }
647 auto sendTime = sessionBeginTime_[sessionId];
648 LOGD("[TimeSync] Use packet send time %" PRIu64 " rather than %" PRIu64, sendTime, packetBeginTime);
649 return sendTime;
650 }
651
BuildAckPacket(const TimeSyncPacket & request)652 TimeSyncPacket TimeSync::BuildAckPacket(const TimeSyncPacket &request)
653 {
654 TimeSyncPacket ackPacket = TimeSyncPacket(request);
655 Timestamp targetTimeBegin = timeHelper_->GetTime();
656 ackPacket.SetTargetTimeBegin(targetTimeBegin);
657 Timestamp targetTimeEnd = timeHelper_->GetTime();
658 ackPacket.SetTargetTimeEnd(targetTimeEnd);
659 TimeOffset requestOffset = request.GetRequestLocalOffset();
660 TimeOffset responseOffset = metadata_->GetLocalTimeOffset();
661 ackPacket.SetRequestLocalOffset(requestOffset);
662 ackPacket.SetResponseLocalOffset(responseOffset);
663 LOGD("TimeSync::RequestRecv, dev = %s{private}, sTimeEnd = %" PRIu64 ", tTimeEnd = %" PRIu64 ", sbegin = %" PRIu64
664 ", tbegin = %" PRIu64 ", request offset = %" PRId64 ", response offset = %" PRId64, deviceId_.c_str(),
665 ackPacket.GetSourceTimeEnd(), ackPacket.GetTargetTimeEnd(), ackPacket.GetSourceTimeBegin(),
666 ackPacket.GetTargetTimeBegin(), requestOffset, responseOffset);
667 return ackPacket;
668 }
669
IsRemoteLowVersion(uint32_t checkVersion)670 bool TimeSync::IsRemoteLowVersion(uint32_t checkVersion)
671 {
672 uint16_t version = 0;
673 int errCode = communicateHandle_->GetRemoteCommunicatorVersion(deviceId_, version);
674 return errCode == -E_NOT_FOUND || (version < checkVersion - SOFTWARE_VERSION_EARLIEST);
675 }
676
ReTimeSyncIfNeed(const TimeSyncPacket & ackPacket)677 void TimeSync::ReTimeSyncIfNeed(const TimeSyncPacket &ackPacket)
678 {
679 TimeOffset timeOffsetIgnoreRtt =
680 static_cast<TimeOffset>(ackPacket.GetSourceTimeBegin() - ackPacket.GetTargetTimeBegin());
681 bool reTimeSync = false;
682 if (IsRemoteLowVersion(SOFTWARE_VERSION_RELEASE_9_0)) {
683 reTimeSync = CheckReTimeSyncIfNeedWithLowVersion(timeOffsetIgnoreRtt);
684 } else {
685 reTimeSync = CheckReTimeSyncIfNeedWithHighVersion(timeOffsetIgnoreRtt, ackPacket);
686 }
687
688 if ((std::abs(timeOffsetIgnoreRtt) >= INT64_MAX / 2) || reTimeSync) { // 2 is half of INT64_MAX
689 LOGI("[TimeSync][RequestRecv] timeOffSet invalid, should do time sync");
690 SetTimeSyncFinishInner(false);
691 RuntimeContext::GetInstance()->ClearDeviceTimeInfo(deviceId_);
692 }
693
694 // reset time change by time sync
695 int errCode = metadata_->SetTimeChangeMark(deviceId_, false);
696 if (errCode != E_OK) {
697 LOGW("[TimeSync] Mark dev %.3s no time change failed %d", deviceId_.c_str(), errCode);
698 }
699 }
700
CheckReTimeSyncIfNeedWithLowVersion(TimeOffset timeOffsetIgnoreRtt)701 bool TimeSync::CheckReTimeSyncIfNeedWithLowVersion(TimeOffset timeOffsetIgnoreRtt)
702 {
703 TimeOffset metadataTimeOffset;
704 metadata_->GetTimeOffset(deviceId_, metadataTimeOffset);
705 return (std::abs(metadataTimeOffset) >= INT64_MAX / 2) || // 2 is half of INT64_MAX
706 (std::abs(metadataTimeOffset - timeOffsetIgnoreRtt) > MAX_TIME_OFFSET_NOISE);
707 }
708
CheckReTimeSyncIfNeedWithHighVersion(TimeOffset timeOffsetIgnoreRtt,const TimeSyncPacket & ackPacket)709 bool TimeSync::CheckReTimeSyncIfNeedWithHighVersion(TimeOffset timeOffsetIgnoreRtt, const TimeSyncPacket &ackPacket)
710 {
711 TimeOffset rawTimeOffset = timeOffsetIgnoreRtt - ackPacket.GetRequestLocalOffset() +
712 ackPacket.GetResponseLocalOffset();
713 auto [errCode, info] = RuntimeContext::GetInstance()->GetDeviceTimeInfo(deviceId_);
714 return errCode == -E_NOT_FOUND || (std::abs(info.systemTimeOffset - rawTimeOffset) > MAX_TIME_OFFSET_NOISE);
715 }
716
SaveOffsetWithAck(const TimeSyncPacket & ackPacket)717 int TimeSync::SaveOffsetWithAck(const TimeSyncPacket &ackPacket)
718 {
719 // calculate timeoffset of two devices
720 auto [offset, rtt] = CalculateTimeOffset(ackPacket);
721 TimeOffset rawOffset = CalculateRawTimeOffset(ackPacket, offset);
722 LOGD("TimeSync::AckRecv, dev = %s{private}, sEnd = %" PRIu64 ", tEnd = %" PRIu64 ", sBegin = %" PRIu64
723 ", tBegin = %" PRIu64 ", offset = %" PRId64 ", rawOffset = %" PRId64 ", requestLocalOffset = %" PRId64
724 ", responseLocalOffset = %" PRId64,
725 deviceId_.c_str(),
726 ackPacket.GetSourceTimeEnd(),
727 ackPacket.GetTargetTimeEnd(),
728 ackPacket.GetSourceTimeBegin(),
729 ackPacket.GetTargetTimeBegin(),
730 offset,
731 rawOffset,
732 ackPacket.GetRequestLocalOffset(),
733 ackPacket.GetResponseLocalOffset());
734
735 // save timeoffset into metadata, maybe a block action
736 int errCode = SaveTimeOffset(deviceId_, offset);
737 if (errCode != E_OK) {
738 return errCode;
739 }
740 errCode = metadata_->SetSystemTimeOffset(deviceId_, rawOffset);
741 if (errCode != E_OK) {
742 return errCode;
743 }
744 DeviceTimeInfo info;
745 info.systemTimeOffset = rawOffset;
746 info.recordTime = timeHelper_->GetSysCurrentTime();
747 info.rtt = rtt;
748 RuntimeContext::GetInstance()->SetDeviceTimeInfo(deviceId_, info);
749 SetTimeSyncFinishInner(true);
750 // save finish next time after save failed
751 return E_OK;
752 }
753
CalculateRawTimeOffset(const TimeSyncPacket & timeSyncInfo,TimeOffset deltaTime)754 TimeOffset TimeSync::CalculateRawTimeOffset(const TimeSyncPacket &timeSyncInfo, TimeOffset deltaTime)
755 {
756 // deltaTime = (t1' + response - t1 - request + t2' + response - t2 - request)/2
757 // rawTimeOffset = request - response + (t1' - t1 + t2' - t2)/2
758 // rawTimeOffset = deltaTime + requestLocalOffset - responseLocalOffset
759 return deltaTime + timeSyncInfo.GetRequestLocalOffset() - timeSyncInfo.GetResponseLocalOffset();
760 }
761
CheckSkipTimeSync(const DeviceTimeInfo & info)762 bool TimeSync::CheckSkipTimeSync(const DeviceTimeInfo &info)
763 {
764 uint64_t currentRawTime = timeHelper_->GetSysCurrentTime();
765 if (currentRawTime < info.recordTime) {
766 LOGW("[TimeSync] current time %" PRIu64 " less than record time %" PRIu64, currentRawTime, info.recordTime);
767 return false;
768 }
769 uint64_t interval = timeHelper_->GetSysCurrentTime() - info.recordTime;
770 if (info.rtt < MAX_TIME_RTT_NOISE) {
771 return true;
772 }
773 if (interval > RTT_NOISE_CHECK_INTERVAL) {
774 LOGI("[TimeSync] rtt %" PRId64 " is greater than noise should re time sync, interval is %" PRIu64, info.rtt,
775 interval);
776 return false;
777 }
778 #ifdef DE_DEBUG_ENV
779 #ifdef TEST_RTT_NOISE_CHECK_INTERVAL
780 if (interval > TEST_RTT_NOISE_CHECK_INTERVAL) {
781 LOGI("[TimeSync][TEST] rtt %" PRId64 " is greater than noise should re time sync, interval is %" PRIu64,
782 info.rtt, interval);
783 return false;
784 }
785 #endif
786 #endif
787 return true;
788 }
789
SetTimeSyncFinishIfNeed()790 void TimeSync::SetTimeSyncFinishIfNeed()
791 {
792 auto [errCode, info] = RuntimeContext::GetInstance()->GetDeviceTimeInfo(deviceId_);
793 if (errCode != E_OK) {
794 return;
795 }
796 int64_t systemTimeOffset = metadata_->GetSystemTimeOffset(deviceId_);
797 LOGD("[TimeSync] Check db offset %" PRId64 " cache offset %" PRId64, systemTimeOffset, info.systemTimeOffset);
798 if (!CheckSkipTimeSync(info) || (IsNeedSync() &&
799 std::abs(systemTimeOffset - info.systemTimeOffset) >= MAX_TIME_OFFSET_NOISE)) {
800 SetTimeSyncFinishInner(false);
801 return;
802 }
803 if (IsNeedSync()) {
804 SetTimeSyncFinishInner(true);
805 }
806 if (systemTimeOffset != info.systemTimeOffset) {
807 errCode = metadata_->SetSystemTimeOffset(deviceId_, info.systemTimeOffset);
808 if (errCode != E_OK) {
809 return;
810 }
811 }
812 LOGI("[TimeSync] Mark time sync finish success");
813 }
814
ClearTimeSyncFinish()815 void TimeSync::ClearTimeSyncFinish()
816 {
817 RuntimeContext::GetInstance()->ClearDeviceTimeInfo(deviceId_);
818 SetTimeSyncFinishInner(false);
819 }
820
GenerateTimeOffsetIfNeed(TimeOffset systemOffset,TimeOffset senderLocalOffset)821 int TimeSync::GenerateTimeOffsetIfNeed(TimeOffset systemOffset, TimeOffset senderLocalOffset)
822 {
823 if (IsRemoteLowVersion(SOFTWARE_VERSION_RELEASE_9_0)) {
824 return E_OK;
825 }
826 auto [errCode, info] = RuntimeContext::GetInstance()->GetDeviceTimeInfo(deviceId_);
827 bool syncFinish = !IsNeedSync();
828 bool timeChange = metadata_->IsTimeChange(deviceId_);
829 // avoid local time change but remote record time sync finish
830 // should return re time sync, after receive time sync request, reset time change mark
831 // we think offset is ok when local time sync to remote
832 if ((timeChange && !syncFinish) ||
833 (errCode == E_OK && (std::abs(info.systemTimeOffset + systemOffset) > MAX_TIME_OFFSET_NOISE))) {
834 LOGI("[TimeSync] time offset is invalid should do time sync again! packet %" PRId64 " cache %" PRId64
835 " time change %d sync finish %d", -systemOffset, info.systemTimeOffset, static_cast<int>(timeChange),
836 static_cast<int>(syncFinish));
837 ClearTimeSyncFinish();
838 RuntimeContext::GetInstance()->ClearDeviceTimeInfo(deviceId_);
839 return -E_NEED_TIME_SYNC;
840 }
841 // Sender's systemOffset = Sender's deltaTime + requestLocalOffset - responseLocalOffset
842 // Sender's deltaTime = Sender's systemOffset - requestLocalOffset + responseLocalOffset
843 // Receiver's deltaTime = -Sender's deltaTime = -Sender's systemOffset + requestLocalOffset - responseLocalOffset
844 TimeOffset offset = -systemOffset + senderLocalOffset - metadata_->GetLocalTimeOffset();
845 errCode = metadata_->SetSystemTimeOffset(deviceId_, -systemOffset);
846 if (errCode != E_OK) {
847 return errCode;
848 }
849 errCode = metadata_->SaveTimeOffset(deviceId_, offset);
850 if (errCode != E_OK) {
851 return errCode;
852 }
853 SetTimeSyncFinishInner(true);
854 info.systemTimeOffset = -systemOffset;
855 info.recordTime = timeHelper_->GetSysCurrentTime();
856 RuntimeContext::GetInstance()->SetDeviceTimeInfo(deviceId_, info);
857 return E_OK;
858 }
859
SetTimeSyncFinishInner(bool finish)860 void TimeSync::SetTimeSyncFinishInner(bool finish)
861 {
862 isSynced_ = finish;
863 if (IsRemoteLowVersion(SOFTWARE_VERSION_RELEASE_9_0)) {
864 return;
865 }
866 int errCode = metadata_->SetTimeSyncFinishMark(deviceId_, finish);
867 if (errCode != E_OK) {
868 LOGW("[TimeSync] Set %.3s time sync finish %d mark failed %d", deviceId_.c_str(), static_cast<int>(finish),
869 errCode);
870 }
871 }
872 } // namespace DistributedDB