1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #include "time_sync.h" 17 18 #include "parcel.h" 19 #include "log_print.h" 20 #include "sync_types.h" 21 #include "message_transform.h" 22 #include "version.h" 23 #include "isync_task_context.h" 24 25 namespace DistributedDB { 26 std::mutex TimeSync::timeSyncSetLock_; 27 std::set<TimeSync *> TimeSync::timeSyncSet_; 28 namespace { 29 constexpr uint64_t TIME_SYNC_INTERVAL = 24 * 60 * 60 * 1000; // 24h 30 constexpr int TRIP_DIV_HALF = 2; 31 constexpr int64_t MAX_TIME_OFFSET_NOISE = 1 * 1000 * 10000; // 1 second in 100-nanosecond units 32 constexpr int64_t MAX_TIME_RTT_NOISE = 1 * 1000 * 10000; // 1 second in 100-nanosecond units 33 constexpr uint64_t RTT_NOISE_CHECK_INTERVAL = 30 * 60 * 1000 * 10000u; // 30 minute in 100-nanosecond units 34 } 35 36 // Class TimeSyncPacket TimeSyncPacket()37 TimeSyncPacket::TimeSyncPacket() 38 : sourceTimeBegin_(0), 39 sourceTimeEnd_(0), 40 targetTimeBegin_(0), 41 targetTimeEnd_(0), 42 version_(TIME_SYNC_VERSION_V1), 43 requestLocalOffset_(0), 44 responseLocalOffset_(0) 45 { 46 } 47 ~TimeSyncPacket()48 TimeSyncPacket::~TimeSyncPacket() 49 { 50 } 51 SetSourceTimeBegin(Timestamp sourceTimeBegin)52 void TimeSyncPacket::SetSourceTimeBegin(Timestamp sourceTimeBegin) 53 { 54 sourceTimeBegin_ = sourceTimeBegin; 55 } 56 GetSourceTimeBegin() const57 Timestamp TimeSyncPacket::GetSourceTimeBegin() const 58 { 59 return sourceTimeBegin_; 60 } 61 SetSourceTimeEnd(Timestamp sourceTimeEnd)62 void TimeSyncPacket::SetSourceTimeEnd(Timestamp sourceTimeEnd) 63 { 64 sourceTimeEnd_ = sourceTimeEnd; 65 } 66 GetSourceTimeEnd() const67 Timestamp TimeSyncPacket::GetSourceTimeEnd() const 68 { 69 return sourceTimeEnd_; 70 } 71 SetTargetTimeBegin(Timestamp targetTimeBegin)72 void TimeSyncPacket::SetTargetTimeBegin(Timestamp targetTimeBegin) 73 { 74 targetTimeBegin_ = targetTimeBegin; 75 } 76 GetTargetTimeBegin() const77 Timestamp TimeSyncPacket::GetTargetTimeBegin() const 78 { 79 return targetTimeBegin_; 80 } 81 SetTargetTimeEnd(Timestamp targetTimeEnd)82 void TimeSyncPacket::SetTargetTimeEnd(Timestamp targetTimeEnd) 83 { 84 targetTimeEnd_ = targetTimeEnd; 85 } 86 GetTargetTimeEnd() const87 Timestamp TimeSyncPacket::GetTargetTimeEnd() const 88 { 89 return targetTimeEnd_; 90 } 91 SetVersion(uint32_t version)92 void TimeSyncPacket::SetVersion(uint32_t version) 93 { 94 version_ = version; 95 } 96 GetVersion() const97 uint32_t TimeSyncPacket::GetVersion() const 98 { 99 return version_; 100 } 101 GetRequestLocalOffset() const102 TimeOffset TimeSyncPacket::GetRequestLocalOffset() const 103 { 104 return requestLocalOffset_; 105 } 106 SetRequestLocalOffset(TimeOffset offset)107 void TimeSyncPacket::SetRequestLocalOffset(TimeOffset offset) 108 { 109 requestLocalOffset_ = offset; 110 } 111 GetResponseLocalOffset() const112 TimeOffset TimeSyncPacket::GetResponseLocalOffset() const 113 { 114 return responseLocalOffset_; 115 } 116 SetResponseLocalOffset(TimeOffset offset)117 void TimeSyncPacket::SetResponseLocalOffset(TimeOffset offset) 118 { 119 responseLocalOffset_ = offset; 120 } 121 CalculateLen()122 uint32_t TimeSyncPacket::CalculateLen() 123 { 124 uint32_t len = Parcel::GetUInt32Len(); // version_ 125 len += Parcel::GetUInt64Len(); // sourceTimeBegin_ 126 len += Parcel::GetUInt64Len(); // sourceTimeEnd_ 127 len += Parcel::GetUInt64Len(); // targetTimeBegin_ 128 len += Parcel::GetUInt64Len(); // targetTimeEnd_ 129 len = Parcel::GetEightByteAlign(len); 130 len += Parcel::GetInt64Len(); // requestLocalOffset_ 131 len += Parcel::GetInt64Len(); // responseLocalOffset_ 132 return len; 133 } 134 135 // Class TimeSync TimeSync()136 TimeSync::TimeSync() 137 : communicateHandle_(nullptr), 138 metadata_(nullptr), 139 timeHelper_(nullptr), 140 retryTime_(0), 141 driverTimerId_(0), 142 isSynced_(false), 143 isAckReceived_(false), 144 timeChangedListener_(nullptr), 145 timeDriverLockCount_(0), 146 isOnline_(true), 147 closed_(false) 148 { 149 } 150 ~TimeSync()151 TimeSync::~TimeSync() 152 { 153 Finalize(); 154 driverTimerId_ = 0; 155 156 if (timeChangedListener_ != nullptr) { 157 timeChangedListener_->Drop(true); 158 timeChangedListener_ = nullptr; 159 } 160 timeHelper_ = nullptr; 161 communicateHandle_ = nullptr; 162 metadata_ = nullptr; 163 164 std::lock_guard<std::mutex> lock(timeSyncSetLock_); 165 timeSyncSet_.erase(this); 166 } 167 RegisterTransformFunc()168 int TimeSync::RegisterTransformFunc() 169 { 170 TransformFunc func; 171 func.computeFunc = [](const Message *inMsg) { return CalculateLen(inMsg); }; 172 func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) { 173 return Serialization(buffer, length, inMsg); 174 }; 175 func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) { 176 return DeSerialization(buffer, length, inMsg); 177 }; 178 return MessageTransform::RegTransformFunction(TIME_SYNC_MESSAGE, func); 179 } 180 Initialize(ICommunicator * communicator,const std::shared_ptr<Metadata> & metadata,const ISyncInterface * storage,const DeviceID & deviceId)181 int TimeSync::Initialize(ICommunicator *communicator, const std::shared_ptr<Metadata> &metadata, 182 const ISyncInterface *storage, const DeviceID &deviceId) 183 { 184 if ((communicator == nullptr) || (storage == nullptr) || (metadata == nullptr)) { 185 return -E_INVALID_ARGS; 186 } 187 { 188 std::lock_guard<std::mutex> lock(timeSyncSetLock_); 189 timeSyncSet_.insert(this); 190 } 191 communicateHandle_ = communicator; 192 metadata_ = metadata; 193 deviceId_ = deviceId; 194 timeHelper_ = std::make_unique<TimeHelper>(); 195 196 int errCode = timeHelper_->Initialize(storage, metadata_); 197 if (errCode != E_OK) { 198 timeHelper_ = nullptr; 199 LOGE("[TimeSync] timeHelper Init failed, err %d.", errCode); 200 return errCode; 201 } 202 dbId_ = storage->GetIdentifier(); 203 driverCallback_ = [this](TimerId timerId) { return TimeSyncDriver(timerId); }; 204 errCode = RuntimeContext::GetInstance()->SetTimer(TIME_SYNC_INTERVAL, driverCallback_, nullptr, driverTimerId_); 205 if (errCode != E_OK) { 206 return errCode; 207 } 208 isSynced_ = metadata_->IsTimeSyncFinish(deviceId_); 209 return errCode; 210 } 211 Finalize()212 void TimeSync::Finalize() 213 { 214 // Stop the timer 215 LOGD("[TimeSync] Finalize enter!"); 216 RuntimeContext *runtimeContext = RuntimeContext::GetInstance(); 217 TimerId timerId; 218 { 219 std::unique_lock<std::mutex> lock(timeDriverLock_); 220 timerId = driverTimerId_; 221 } 222 runtimeContext->RemoveTimer(timerId, true); 223 std::unique_lock<std::mutex> lock(timeDriverLock_); 224 timeDriverCond_.wait(lock, [this]() { return this->timeDriverLockCount_ == 0; }); 225 LOGD("[TimeSync] Finalized!"); 226 } 227 SyncStart(const CommErrHandler & handler,uint32_t sessionId)228 int TimeSync::SyncStart(const CommErrHandler &handler, uint32_t sessionId) 229 { 230 isOnline_ = true; 231 TimeSyncPacket packet; 232 Timestamp startTime = timeHelper_->GetTime(); 233 packet.SetSourceTimeBegin(startTime); 234 TimeOffset timeOffset = metadata_->GetLocalTimeOffset(); 235 packet.SetRequestLocalOffset(timeOffset); 236 // send timeSync request 237 LOGD("[TimeSync] startTime = %" PRIu64 ", offset = % " PRId64 " , dev = %s{private}", startTime, timeOffset, 238 deviceId_.c_str()); 239 240 Message *message = new (std::nothrow) Message(TIME_SYNC_MESSAGE); 241 if (message == nullptr) { 242 return -E_OUT_OF_MEMORY; 243 } 244 message->SetSessionId(sessionId); 245 message->SetMessageType(TYPE_REQUEST); 246 message->SetPriority(Priority::NORMAL); 247 int errCode = message->SetCopiedObject<>(packet); 248 if (errCode != E_OK) { 249 delete message; 250 message = nullptr; 251 return errCode; 252 } 253 errCode = SendMessageWithSendEnd(message, handler); 254 if (errCode != E_OK) { 255 delete message; 256 message = nullptr; 257 } 258 return errCode; 259 } 260 CalculateLen(const Message * inMsg)261 uint32_t TimeSync::CalculateLen(const Message *inMsg) 262 { 263 if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) { 264 return 0; 265 } 266 267 const TimeSyncPacket *packet = const_cast<TimeSyncPacket *>(inMsg->GetObject<TimeSyncPacket>()); 268 if (packet == nullptr) { 269 return 0; 270 } 271 272 return TimeSyncPacket::CalculateLen(); 273 } 274 Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)275 int TimeSync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg) 276 { 277 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) { 278 return -E_INVALID_ARGS; 279 } 280 const TimeSyncPacket *packet = inMsg->GetObject<TimeSyncPacket>(); 281 if ((packet == nullptr) || (length != TimeSyncPacket::CalculateLen())) { 282 return -E_INVALID_ARGS; 283 } 284 285 Parcel parcel(buffer, length); 286 Timestamp srcBegin = packet->GetSourceTimeBegin(); 287 Timestamp srcEnd = packet->GetSourceTimeEnd(); 288 Timestamp targetBegin = packet->GetTargetTimeBegin(); 289 Timestamp targetEnd = packet->GetTargetTimeEnd(); 290 291 int errCode = parcel.WriteUInt32(TIME_SYNC_VERSION_V1); 292 if (errCode != E_OK) { 293 return -E_SECUREC_ERROR; 294 } 295 errCode = parcel.WriteUInt64(srcBegin); 296 if (errCode != E_OK) { 297 return -E_SECUREC_ERROR; 298 } 299 errCode = parcel.WriteUInt64(srcEnd); 300 if (errCode != E_OK) { 301 return -E_SECUREC_ERROR; 302 } 303 errCode = parcel.WriteUInt64(targetBegin); 304 if (errCode != E_OK) { 305 return -E_SECUREC_ERROR; 306 } 307 errCode = parcel.WriteUInt64(targetEnd); 308 if (errCode != E_OK) { 309 return -E_SECUREC_ERROR; 310 } 311 parcel.EightByteAlign(); 312 parcel.WriteInt64(packet->GetRequestLocalOffset()); 313 parcel.WriteInt64(packet->GetResponseLocalOffset()); 314 if (parcel.IsError()) { 315 return -E_PARSE_FAIL; 316 } 317 return errCode; 318 } 319 DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)320 int TimeSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg) 321 { 322 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) { 323 return -E_INVALID_ARGS; 324 } 325 TimeSyncPacket packet; 326 Parcel parcel(const_cast<uint8_t *>(buffer), length); 327 Timestamp srcBegin; 328 Timestamp srcEnd; 329 Timestamp targetBegin; 330 Timestamp targetEnd; 331 332 uint32_t version = 0; 333 parcel.ReadUInt32(version); 334 if (parcel.IsError()) { 335 return -E_INVALID_ARGS; 336 } 337 if (version > TIME_SYNC_VERSION_V1) { 338 packet.SetVersion(version); 339 return inMsg->SetCopiedObject<>(packet); 340 } 341 parcel.ReadUInt64(srcBegin); 342 parcel.ReadUInt64(srcEnd); 343 parcel.ReadUInt64(targetBegin); 344 parcel.ReadUInt64(targetEnd); 345 if (parcel.IsError()) { 346 return -E_INVALID_ARGS; 347 } 348 packet.SetSourceTimeBegin(srcBegin); 349 packet.SetSourceTimeEnd(srcEnd); 350 packet.SetTargetTimeBegin(targetBegin); 351 packet.SetTargetTimeEnd(targetEnd); 352 parcel.EightByteAlign(); 353 if (parcel.IsContinueRead()) { 354 TimeOffset requestLocalOffset; 355 TimeOffset responseLocalOffset; 356 parcel.ReadInt64(requestLocalOffset); 357 parcel.ReadInt64(responseLocalOffset); 358 if (parcel.IsError()) { 359 LOGE("[TimeSync] Parse packet failed, message type %u", inMsg->GetMessageType()); 360 return -E_PARSE_FAIL; 361 } 362 packet.SetRequestLocalOffset(requestLocalOffset); 363 packet.SetResponseLocalOffset(responseLocalOffset); 364 } 365 366 return inMsg->SetCopiedObject<>(packet); 367 } 368 AckRecv(const Message * message,uint32_t targetSessionId)369 int TimeSync::AckRecv(const Message *message, uint32_t targetSessionId) 370 { 371 // only check when sessionId is not 0, because old version timesync sessionId is 0. 372 if (message != nullptr && message->GetSessionId() != 0 && 373 message->GetErrorNo() == E_FEEDBACK_COMMUNICATOR_NOT_FOUND && message->GetSessionId() == targetSessionId) { 374 LOGE("[AbilitySync][AckMsgCheck] Remote db is closed"); 375 return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND; 376 } 377 if (!IsPacketValid(message, TYPE_RESPONSE)) { 378 return -E_INVALID_ARGS; 379 } 380 const TimeSyncPacket *packet = message->GetObject<TimeSyncPacket>(); 381 if (packet == nullptr) { 382 LOGE("[TimeSync] AckRecv packet is null"); 383 return -E_INVALID_ARGS; 384 } 385 386 TimeSyncPacket packetData = TimeSyncPacket(*packet); 387 Timestamp sourceTimeEnd = timeHelper_->GetTime(); 388 packetData.SetSourceTimeBegin(GetSourceBeginTime(packetData.GetSourceTimeBegin(), targetSessionId)); 389 packetData.SetSourceTimeEnd(sourceTimeEnd); 390 if (packetData.GetSourceTimeBegin() > packetData.GetSourceTimeEnd() || 391 packetData.GetTargetTimeBegin() > packetData.GetTargetTimeEnd() || 392 packetData.GetSourceTimeEnd() > TimeHelper::MAX_VALID_TIME || 393 packetData.GetTargetTimeEnd() > TimeHelper::MAX_VALID_TIME) { 394 LOGD("[TimeSync][AckRecv] Time valid check failed."); 395 return -E_INVALID_TIME; 396 } 397 int errCode = SaveOffsetWithAck(packetData); 398 { 399 std::lock_guard<std::mutex> lock(cvLock_); 400 isAckReceived_ = true; 401 } 402 conditionVar_.notify_all(); 403 ResetTimer(); 404 return errCode; 405 } 406 RequestRecv(const Message * message)407 int TimeSync::RequestRecv(const Message *message) 408 { 409 if (!IsPacketValid(message, TYPE_REQUEST)) { 410 return -E_INVALID_ARGS; 411 } 412 413 const TimeSyncPacket *packet = message->GetObject<TimeSyncPacket>(); 414 if (packet == nullptr) { 415 return -E_INVALID_ARGS; 416 } 417 418 // build timeSync ack packet 419 TimeSyncPacket ackPacket = BuildAckPacket(*packet); 420 if (ackPacket.GetSourceTimeBegin() > TimeHelper::MAX_VALID_TIME) { 421 LOGD("[TimeSync][RequestRecv] Time valid check failed."); 422 return -E_INVALID_TIME; 423 } 424 ReTimeSyncIfNeed(ackPacket); 425 426 Message *ackMessage = new (std::nothrow) Message(TIME_SYNC_MESSAGE); 427 if (ackMessage == nullptr) { 428 return -E_OUT_OF_MEMORY; 429 } 430 ackMessage->SetSessionId(message->GetSessionId()); 431 ackMessage->SetPriority(Priority::NORMAL); 432 ackMessage->SetMessageType(TYPE_RESPONSE); 433 ackMessage->SetTarget(deviceId_); 434 int errCode = ackMessage->SetCopiedObject<>(ackPacket); 435 if (errCode != E_OK) { 436 delete ackMessage; 437 ackMessage = nullptr; 438 return errCode; 439 } 440 441 errCode = SendPacket(deviceId_, ackMessage); 442 if (errCode != E_OK) { 443 delete ackMessage; 444 ackMessage = nullptr; 445 } 446 return errCode; 447 } 448 SaveTimeOffset(const DeviceID & deviceID,TimeOffset timeOffset)449 int TimeSync::SaveTimeOffset(const DeviceID &deviceID, TimeOffset timeOffset) 450 { 451 return metadata_->SaveTimeOffset(deviceID, timeOffset); 452 } 453 CalculateTimeOffset(const TimeSyncPacket & timeSyncInfo)454 std::pair<TimeOffset, TimeOffset> TimeSync::CalculateTimeOffset(const TimeSyncPacket &timeSyncInfo) 455 { 456 TimeOffset roundTrip = static_cast<TimeOffset>((timeSyncInfo.GetSourceTimeEnd() - 457 timeSyncInfo.GetSourceTimeBegin()) - (timeSyncInfo.GetTargetTimeEnd() - timeSyncInfo.GetTargetTimeBegin())); 458 TimeOffset offset1 = static_cast<TimeOffset>(timeSyncInfo.GetTargetTimeBegin() - 459 timeSyncInfo.GetSourceTimeBegin() - (roundTrip / TRIP_DIV_HALF)); 460 TimeOffset offset2 = static_cast<TimeOffset>(timeSyncInfo.GetTargetTimeEnd() + (roundTrip / TRIP_DIV_HALF) - 461 timeSyncInfo.GetSourceTimeEnd()); 462 TimeOffset offset = (offset1 / TRIP_DIV_HALF) + (offset2 / TRIP_DIV_HALF); 463 LOGD("TimeSync::CalculateTimeOffset roundTrip= %" PRId64 ", offset1 = %" PRId64 ", offset2 = %" PRId64 464 ", offset = %" PRId64, roundTrip, offset1, offset2, offset); 465 return {offset, roundTrip}; 466 } 467 IsPacketValid(const Message * inMsg,uint16_t messageType)468 bool TimeSync::IsPacketValid(const Message *inMsg, uint16_t messageType) 469 { 470 if (inMsg == nullptr) { 471 return false; 472 } 473 if (inMsg->GetMessageId() != TIME_SYNC_MESSAGE) { 474 LOGD("message Id = %d", inMsg->GetMessageId()); 475 return false; 476 } 477 if (messageType != inMsg->GetMessageType()) { 478 LOGD("input Type = %" PRIu16 ", inMsg type = %" PRIu16, messageType, inMsg->GetMessageType()); 479 return false; 480 } 481 return true; 482 } 483 SendPacket(const DeviceID & deviceId,const Message * message,const CommErrHandler & handler)484 int TimeSync::SendPacket(const DeviceID &deviceId, const Message *message, const CommErrHandler &handler) 485 { 486 SendConfig conf; 487 timeHelper_->SetSendConfig(deviceId, false, SEND_TIME_OUT, conf); 488 int errCode = communicateHandle_->SendMessage(deviceId, message, conf, handler); 489 if (errCode != E_OK) { 490 LOGE("[TimeSync] SendPacket failed, err %d", errCode); 491 } 492 return errCode; 493 } 494 TimeSyncDriver(TimerId timerId)495 int TimeSync::TimeSyncDriver(TimerId timerId) 496 { 497 if (timerId != driverTimerId_) { 498 return -E_INTERNAL_ERROR; 499 } 500 if (!isOnline_) { 501 return E_OK; 502 } 503 std::lock_guard<std::mutex> lock(timeDriverLock_); 504 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() { 505 CommErrHandler handler = [this](int ret, bool isDirectEnd) { CommErrHandlerFunc(ret, this); }; 506 (void)this->SyncStart(handler); 507 std::lock_guard<std::mutex> innerLock(this->timeDriverLock_); 508 this->timeDriverLockCount_--; 509 this->timeDriverCond_.notify_all(); 510 }); 511 if (errCode != E_OK) { 512 LOGE("[TimeSync][TimerSyncDriver] ScheduleTask failed err %d", errCode); 513 return errCode; 514 } 515 timeDriverLockCount_++; 516 return E_OK; 517 } 518 GetTimeOffset(TimeOffset & outOffset,uint32_t timeout,uint32_t sessionId)519 int TimeSync::GetTimeOffset(TimeOffset &outOffset, uint32_t timeout, uint32_t sessionId) 520 { 521 if (!isSynced_) { 522 { 523 std::lock_guard<std::mutex> lock(cvLock_); 524 isAckReceived_ = false; 525 } 526 CommErrHandler handler = [this](int ret, bool isDirectEnd) { CommErrHandlerFunc(ret, this); }; 527 int errCode = SyncStart(handler, sessionId); 528 LOGD("TimeSync::GetTimeOffset start, current time = %" PRIu64 ", errCode = %d, timeout = %" PRIu32 " ms", 529 TimeHelper::GetSysCurrentTime(), errCode, timeout); 530 std::unique_lock<std::mutex> lock(cvLock_); 531 if (errCode != E_OK || !conditionVar_.wait_for(lock, std::chrono::milliseconds(timeout), [this]() { 532 return this->isAckReceived_ || this->closed_; 533 })) { 534 LOGD("TimeSync::GetTimeOffset, retryTime_ = %d", retryTime_); 535 retryTime_++; 536 if (retryTime_ < MAX_RETRY_TIME) { 537 lock.unlock(); 538 LOGI("TimeSync::GetTimeOffset timeout, try again"); 539 return GetTimeOffset(outOffset, timeout); 540 } 541 retryTime_ = 0; 542 return -E_TIMEOUT; 543 } 544 } 545 if (IsClosed()) { 546 return -E_BUSY; 547 } 548 retryTime_ = 0; 549 metadata_->GetTimeOffset(deviceId_, outOffset); 550 return E_OK; 551 } 552 IsNeedSync() const553 bool TimeSync::IsNeedSync() const 554 { 555 return !isSynced_; 556 } 557 SetOnline(bool isOnline)558 void TimeSync::SetOnline(bool isOnline) 559 { 560 isOnline_ = isOnline; 561 } 562 CommErrHandlerFunc(int errCode,TimeSync * timeSync)563 void TimeSync::CommErrHandlerFunc(int errCode, TimeSync *timeSync) 564 { 565 LOGD("[TimeSync][CommErrHandle] errCode:%d", errCode); 566 std::lock_guard<std::mutex> lock(timeSyncSetLock_); 567 if (timeSyncSet_.count(timeSync) == 0) { 568 LOGI("[TimeSync][CommErrHandle] timeSync has been killed"); 569 return; 570 } 571 if (timeSync == nullptr) { 572 LOGI("[TimeSync][CommErrHandle] timeSync is nullptr"); 573 return; 574 } 575 if (errCode != E_OK) { 576 timeSync->SetOnline(false); 577 } else { 578 timeSync->SetOnline(true); 579 } 580 } 581 ResetTimer()582 void TimeSync::ResetTimer() 583 { 584 TimerId timerId; 585 { 586 std::lock_guard<std::mutex> lock(timeDriverLock_); 587 timerId = driverTimerId_; 588 driverTimerId_ = 0u; 589 } 590 if (timerId == 0u) { 591 return; 592 } 593 RuntimeContext::GetInstance()->RemoveTimer(timerId, true); 594 int errCode = RuntimeContext::GetInstance()->SetTimer( 595 TIME_SYNC_INTERVAL, driverCallback_, nullptr, timerId); 596 if (errCode != E_OK) { 597 LOGW("[TimeSync] Reset TimeSync timer failed err :%d", errCode); 598 } else { 599 std::lock_guard<std::mutex> lock(timeDriverLock_); 600 driverTimerId_ = timerId; 601 } 602 } 603 Close()604 void TimeSync::Close() 605 { 606 Finalize(); 607 { 608 std::lock_guard<std::mutex> lock(cvLock_); 609 closed_ = true; 610 } 611 conditionVar_.notify_all(); 612 } 613 IsClosed() const614 bool TimeSync::IsClosed() const 615 { 616 std::lock_guard<std::mutex> lock(cvLock_); 617 return closed_ ; 618 } 619 SendMessageWithSendEnd(const Message * message,const CommErrHandler & handler)620 int TimeSync::SendMessageWithSendEnd(const Message *message, const CommErrHandler &handler) 621 { 622 std::shared_ptr<TimeSync> timeSyncPtr = shared_from_this(); 623 auto sessionId = message->GetSessionId(); 624 return SendPacket(deviceId_, message, [handler, timeSyncPtr, sessionId, this](int errCode, bool isDirectEnd) { 625 if (closed_) { 626 LOGW("[TimeSync] DB closed, ignore send end! dev=%.3s", deviceId_.c_str()); 627 return; 628 } 629 { 630 std::lock_guard<std::mutex> autoLock(beginTimeMutex_); 631 sessionBeginTime_.clear(); 632 sessionBeginTime_[sessionId] = timeHelper_->GetTime(); 633 } 634 if (handler != nullptr) { 635 handler(errCode, isDirectEnd); 636 } 637 }); 638 } 639 GetSourceBeginTime(Timestamp packetBeginTime,uint32_t sessionId)640 Timestamp TimeSync::GetSourceBeginTime(Timestamp packetBeginTime, uint32_t sessionId) 641 { 642 std::lock_guard<std::mutex> autoLock(beginTimeMutex_); 643 if (sessionBeginTime_.find(sessionId) == sessionBeginTime_.end()) { 644 LOGW("[TimeSync] Current cache not exist packet send time"); 645 return packetBeginTime; 646 } 647 auto sendTime = sessionBeginTime_[sessionId]; 648 LOGD("[TimeSync] Use packet send time %" PRIu64 " rather than %" PRIu64, sendTime, packetBeginTime); 649 return sendTime; 650 } 651 BuildAckPacket(const TimeSyncPacket & request)652 TimeSyncPacket TimeSync::BuildAckPacket(const TimeSyncPacket &request) 653 { 654 TimeSyncPacket ackPacket = TimeSyncPacket(request); 655 Timestamp targetTimeBegin = timeHelper_->GetTime(); 656 ackPacket.SetTargetTimeBegin(targetTimeBegin); 657 Timestamp targetTimeEnd = timeHelper_->GetTime(); 658 ackPacket.SetTargetTimeEnd(targetTimeEnd); 659 TimeOffset requestOffset = request.GetRequestLocalOffset(); 660 TimeOffset responseOffset = metadata_->GetLocalTimeOffset(); 661 ackPacket.SetRequestLocalOffset(requestOffset); 662 ackPacket.SetResponseLocalOffset(responseOffset); 663 LOGD("TimeSync::RequestRecv, dev = %s{private}, sTimeEnd = %" PRIu64 ", tTimeEnd = %" PRIu64 ", sbegin = %" PRIu64 664 ", tbegin = %" PRIu64 ", request offset = %" PRId64 ", response offset = %" PRId64, deviceId_.c_str(), 665 ackPacket.GetSourceTimeEnd(), ackPacket.GetTargetTimeEnd(), ackPacket.GetSourceTimeBegin(), 666 ackPacket.GetTargetTimeBegin(), requestOffset, responseOffset); 667 return ackPacket; 668 } 669 IsRemoteLowVersion(uint32_t checkVersion)670 bool TimeSync::IsRemoteLowVersion(uint32_t checkVersion) 671 { 672 uint16_t version = 0; 673 int errCode = communicateHandle_->GetRemoteCommunicatorVersion(deviceId_, version); 674 return errCode == -E_NOT_FOUND || (version < checkVersion - SOFTWARE_VERSION_EARLIEST); 675 } 676 ReTimeSyncIfNeed(const TimeSyncPacket & ackPacket)677 void TimeSync::ReTimeSyncIfNeed(const TimeSyncPacket &ackPacket) 678 { 679 TimeOffset timeOffsetIgnoreRtt = 680 static_cast<TimeOffset>(ackPacket.GetSourceTimeBegin() - ackPacket.GetTargetTimeBegin()); 681 bool reTimeSync = false; 682 if (IsRemoteLowVersion(SOFTWARE_VERSION_RELEASE_9_0)) { 683 reTimeSync = CheckReTimeSyncIfNeedWithLowVersion(timeOffsetIgnoreRtt); 684 } else { 685 reTimeSync = CheckReTimeSyncIfNeedWithHighVersion(timeOffsetIgnoreRtt, ackPacket); 686 } 687 688 if ((std::abs(timeOffsetIgnoreRtt) >= INT64_MAX / 2) || reTimeSync) { // 2 is half of INT64_MAX 689 LOGI("[TimeSync][RequestRecv] timeOffSet invalid, should do time sync"); 690 SetTimeSyncFinishInner(false); 691 RuntimeContext::GetInstance()->ClearDeviceTimeInfo(deviceId_); 692 } 693 694 // reset time change by time sync 695 int errCode = metadata_->SetTimeChangeMark(deviceId_, false); 696 if (errCode != E_OK) { 697 LOGW("[TimeSync] Mark dev %.3s no time change failed %d", deviceId_.c_str(), errCode); 698 } 699 } 700 CheckReTimeSyncIfNeedWithLowVersion(TimeOffset timeOffsetIgnoreRtt)701 bool TimeSync::CheckReTimeSyncIfNeedWithLowVersion(TimeOffset timeOffsetIgnoreRtt) 702 { 703 TimeOffset metadataTimeOffset; 704 metadata_->GetTimeOffset(deviceId_, metadataTimeOffset); 705 return (std::abs(metadataTimeOffset) >= INT64_MAX / 2) || // 2 is half of INT64_MAX 706 (std::abs(metadataTimeOffset - timeOffsetIgnoreRtt) > MAX_TIME_OFFSET_NOISE); 707 } 708 CheckReTimeSyncIfNeedWithHighVersion(TimeOffset timeOffsetIgnoreRtt,const TimeSyncPacket & ackPacket)709 bool TimeSync::CheckReTimeSyncIfNeedWithHighVersion(TimeOffset timeOffsetIgnoreRtt, const TimeSyncPacket &ackPacket) 710 { 711 TimeOffset rawTimeOffset = timeOffsetIgnoreRtt - ackPacket.GetRequestLocalOffset() + 712 ackPacket.GetResponseLocalOffset(); 713 auto [errCode, info] = RuntimeContext::GetInstance()->GetDeviceTimeInfo(deviceId_); 714 return errCode == -E_NOT_FOUND || (std::abs(info.systemTimeOffset - rawTimeOffset) > MAX_TIME_OFFSET_NOISE); 715 } 716 SaveOffsetWithAck(const TimeSyncPacket & ackPacket)717 int TimeSync::SaveOffsetWithAck(const TimeSyncPacket &ackPacket) 718 { 719 // calculate timeoffset of two devices 720 auto [offset, rtt] = CalculateTimeOffset(ackPacket); 721 TimeOffset rawOffset = CalculateRawTimeOffset(ackPacket, offset); 722 LOGD("TimeSync::AckRecv, dev = %s{private}, sEnd = %" PRIu64 ", tEnd = %" PRIu64 ", sBegin = %" PRIu64 723 ", tBegin = %" PRIu64 ", offset = %" PRId64 ", rawOffset = %" PRId64 ", requestLocalOffset = %" PRId64 724 ", responseLocalOffset = %" PRId64, 725 deviceId_.c_str(), 726 ackPacket.GetSourceTimeEnd(), 727 ackPacket.GetTargetTimeEnd(), 728 ackPacket.GetSourceTimeBegin(), 729 ackPacket.GetTargetTimeBegin(), 730 offset, 731 rawOffset, 732 ackPacket.GetRequestLocalOffset(), 733 ackPacket.GetResponseLocalOffset()); 734 735 // save timeoffset into metadata, maybe a block action 736 int errCode = SaveTimeOffset(deviceId_, offset); 737 if (errCode != E_OK) { 738 return errCode; 739 } 740 errCode = metadata_->SetSystemTimeOffset(deviceId_, rawOffset); 741 if (errCode != E_OK) { 742 return errCode; 743 } 744 DeviceTimeInfo info; 745 info.systemTimeOffset = rawOffset; 746 info.recordTime = timeHelper_->GetSysCurrentTime(); 747 info.rtt = rtt; 748 RuntimeContext::GetInstance()->SetDeviceTimeInfo(deviceId_, info); 749 SetTimeSyncFinishInner(true); 750 // save finish next time after save failed 751 return E_OK; 752 } 753 CalculateRawTimeOffset(const TimeSyncPacket & timeSyncInfo,TimeOffset deltaTime)754 TimeOffset TimeSync::CalculateRawTimeOffset(const TimeSyncPacket &timeSyncInfo, TimeOffset deltaTime) 755 { 756 // deltaTime = (t1' + response - t1 - request + t2' + response - t2 - request)/2 757 // rawTimeOffset = request - response + (t1' - t1 + t2' - t2)/2 758 // rawTimeOffset = deltaTime + requestLocalOffset - responseLocalOffset 759 return deltaTime + timeSyncInfo.GetRequestLocalOffset() - timeSyncInfo.GetResponseLocalOffset(); 760 } 761 CheckSkipTimeSync(const DeviceTimeInfo & info)762 bool TimeSync::CheckSkipTimeSync(const DeviceTimeInfo &info) 763 { 764 uint64_t currentRawTime = timeHelper_->GetSysCurrentTime(); 765 if (currentRawTime < info.recordTime) { 766 LOGW("[TimeSync] current time %" PRIu64 " less than record time %" PRIu64, currentRawTime, info.recordTime); 767 return false; 768 } 769 uint64_t interval = timeHelper_->GetSysCurrentTime() - info.recordTime; 770 if (info.rtt < MAX_TIME_RTT_NOISE) { 771 return true; 772 } 773 if (interval > RTT_NOISE_CHECK_INTERVAL) { 774 LOGI("[TimeSync] rtt %" PRId64 " is greater than noise should re time sync, interval is %" PRIu64, info.rtt, 775 interval); 776 return false; 777 } 778 #ifdef DE_DEBUG_ENV 779 #ifdef TEST_RTT_NOISE_CHECK_INTERVAL 780 if (interval > TEST_RTT_NOISE_CHECK_INTERVAL) { 781 LOGI("[TimeSync][TEST] rtt %" PRId64 " is greater than noise should re time sync, interval is %" PRIu64, 782 info.rtt, interval); 783 return false; 784 } 785 #endif 786 #endif 787 return true; 788 } 789 SetTimeSyncFinishIfNeed()790 void TimeSync::SetTimeSyncFinishIfNeed() 791 { 792 auto [errCode, info] = RuntimeContext::GetInstance()->GetDeviceTimeInfo(deviceId_); 793 if (errCode != E_OK) { 794 return; 795 } 796 int64_t systemTimeOffset = metadata_->GetSystemTimeOffset(deviceId_); 797 LOGD("[TimeSync] Check db offset %" PRId64 " cache offset %" PRId64, systemTimeOffset, info.systemTimeOffset); 798 if (!CheckSkipTimeSync(info) || (IsNeedSync() && 799 std::abs(systemTimeOffset - info.systemTimeOffset) >= MAX_TIME_OFFSET_NOISE)) { 800 SetTimeSyncFinishInner(false); 801 return; 802 } 803 if (IsNeedSync()) { 804 SetTimeSyncFinishInner(true); 805 } 806 if (systemTimeOffset != info.systemTimeOffset) { 807 errCode = metadata_->SetSystemTimeOffset(deviceId_, info.systemTimeOffset); 808 if (errCode != E_OK) { 809 return; 810 } 811 } 812 LOGI("[TimeSync] Mark time sync finish success"); 813 } 814 ClearTimeSyncFinish()815 void TimeSync::ClearTimeSyncFinish() 816 { 817 RuntimeContext::GetInstance()->ClearDeviceTimeInfo(deviceId_); 818 SetTimeSyncFinishInner(false); 819 } 820 GenerateTimeOffsetIfNeed(TimeOffset systemOffset,TimeOffset senderLocalOffset)821 int TimeSync::GenerateTimeOffsetIfNeed(TimeOffset systemOffset, TimeOffset senderLocalOffset) 822 { 823 if (IsRemoteLowVersion(SOFTWARE_VERSION_RELEASE_9_0)) { 824 return E_OK; 825 } 826 auto [errCode, info] = RuntimeContext::GetInstance()->GetDeviceTimeInfo(deviceId_); 827 bool syncFinish = !IsNeedSync(); 828 bool timeChange = metadata_->IsTimeChange(deviceId_); 829 // avoid local time change but remote record time sync finish 830 // should return re time sync, after receive time sync request, reset time change mark 831 // we think offset is ok when local time sync to remote 832 if ((timeChange && !syncFinish) || 833 (errCode == E_OK && (std::abs(info.systemTimeOffset + systemOffset) > MAX_TIME_OFFSET_NOISE))) { 834 LOGI("[TimeSync] time offset is invalid should do time sync again! packet %" PRId64 " cache %" PRId64 835 " time change %d sync finish %d", -systemOffset, info.systemTimeOffset, static_cast<int>(timeChange), 836 static_cast<int>(syncFinish)); 837 ClearTimeSyncFinish(); 838 RuntimeContext::GetInstance()->ClearDeviceTimeInfo(deviceId_); 839 return -E_NEED_TIME_SYNC; 840 } 841 // Sender's systemOffset = Sender's deltaTime + requestLocalOffset - responseLocalOffset 842 // Sender's deltaTime = Sender's systemOffset - requestLocalOffset + responseLocalOffset 843 // Receiver's deltaTime = -Sender's deltaTime = -Sender's systemOffset + requestLocalOffset - responseLocalOffset 844 TimeOffset offset = -systemOffset + senderLocalOffset - metadata_->GetLocalTimeOffset(); 845 errCode = metadata_->SetSystemTimeOffset(deviceId_, -systemOffset); 846 if (errCode != E_OK) { 847 return errCode; 848 } 849 errCode = metadata_->SaveTimeOffset(deviceId_, offset); 850 if (errCode != E_OK) { 851 return errCode; 852 } 853 SetTimeSyncFinishInner(true); 854 info.systemTimeOffset = -systemOffset; 855 info.recordTime = timeHelper_->GetSysCurrentTime(); 856 RuntimeContext::GetInstance()->SetDeviceTimeInfo(deviceId_, info); 857 return E_OK; 858 } 859 SetTimeSyncFinishInner(bool finish)860 void TimeSync::SetTimeSyncFinishInner(bool finish) 861 { 862 isSynced_ = finish; 863 if (IsRemoteLowVersion(SOFTWARE_VERSION_RELEASE_9_0)) { 864 return; 865 } 866 int errCode = metadata_->SetTimeSyncFinishMark(deviceId_, finish); 867 if (errCode != E_OK) { 868 LOGW("[TimeSync] Set %.3s time sync finish %d mark failed %d", deviceId_.c_str(), static_cast<int>(finish), 869 errCode); 870 } 871 } 872 } // namespace DistributedDB