1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_data_sync.h"
17
18 #include "db_common.h"
19 #include "db_types.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "intercepted_data_impl.h"
22 #include "log_print.h"
23 #include "message_transform.h"
24 #include "performance_analysis.h"
25 #include "single_ver_data_sync_utils.h"
26 #include "single_ver_sync_state_machine.h"
27 #include "subscribe_manager.h"
28 #ifdef RELATIONAL_STORE
29 #include "relational_db_sync_interface.h"
30 #endif
31
32 namespace DistributedDB {
SingleVerDataSync()33 SingleVerDataSync::SingleVerDataSync()
34 : mtuSize_(0),
35 storage_(nullptr),
36 communicateHandle_(nullptr),
37 metadata_(nullptr)
38 {
39 }
40
~SingleVerDataSync()41 SingleVerDataSync::~SingleVerDataSync()
42 {
43 storage_ = nullptr;
44 communicateHandle_ = nullptr;
45 metadata_ = nullptr;
46 }
47
Initialize(ISyncInterface * inStorage,ICommunicator * inCommunicateHandle,const std::shared_ptr<Metadata> & inMetadata,const std::string & deviceId)48 int SingleVerDataSync::Initialize(ISyncInterface *inStorage, ICommunicator *inCommunicateHandle,
49 const std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId)
50 {
51 if ((inStorage == nullptr) || (inCommunicateHandle == nullptr) || (inMetadata == nullptr)) {
52 return -E_INVALID_ARGS;
53 }
54 storage_ = static_cast<SyncGenericInterface *>(inStorage);
55 communicateHandle_ = inCommunicateHandle;
56 metadata_ = inMetadata;
57 mtuSize_ = DBConstant::MIN_MTU_SIZE; // default size is 1K, it will update when need sync data.
58 std::vector<uint8_t> label = inStorage->GetIdentifier();
59 label.resize(3); // only show 3 Bytes enough
60 label_ = DBCommon::VectorToHexString(label);
61 deviceId_ = deviceId;
62 msgSchedule_.Initialize(label_, deviceId_);
63 return E_OK;
64 }
65
SyncStart(int mode,SingleVerSyncTaskContext * context)66 int SingleVerDataSync::SyncStart(int mode, SingleVerSyncTaskContext *context)
67 {
68 std::lock_guard<std::mutex> lock(lock_);
69 int errCode = CheckPermitSendData(mode, context);
70 if (errCode != E_OK) {
71 return errCode;
72 }
73 if (sessionId_ != 0) { // auto sync timeout resend
74 return ReSendData(context);
75 }
76 ResetSyncStatus(mode, context);
77 LOGI("[DataSync] SendStart,mode=%d,label=%s,device=%s", mode_, label_.c_str(), STR_MASK(deviceId_));
78 int tmpMode = SyncOperation::TransferSyncMode(mode);
79 if (tmpMode == SyncModeType::PUSH) {
80 errCode = PushStart(context);
81 } else if (tmpMode == SyncModeType::PUSH_AND_PULL) {
82 errCode = PushPullStart(context);
83 } else if (tmpMode == SyncModeType::PULL) {
84 errCode = PullRequestStart(context);
85 } else {
86 errCode = PullResponseStart(context);
87 }
88 if (context->IsSkipTimeoutError(errCode)) {
89 // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
90 // just return to avoid higher pressure for send.
91 return E_OK;
92 }
93 if (errCode != E_OK) {
94 LOGE("[DataSync] SendStart errCode=%d", errCode);
95 return errCode;
96 }
97 if (tmpMode == SyncModeType::PUSH_AND_PULL && context->GetTaskErrCode() == -E_EKEYREVOKED) {
98 LOGE("wait for recv finished for push and pull mode");
99 return -E_EKEYREVOKED;
100 }
101 return InnerSyncStart(context);
102 }
103
InnerSyncStart(SingleVerSyncTaskContext * context)104 int SingleVerDataSync::InnerSyncStart(SingleVerSyncTaskContext *context)
105 {
106 int errCode = CheckPermitSendData(mode_, context);
107 if (errCode != E_OK) {
108 return errCode;
109 }
110 while (true) {
111 if (windowSize_ <= 0 || isAllDataHasSent_) {
112 LOGD("[DataSync] InnerDataSync winSize=%d,isAllSent=%d,label=%s,device=%s", windowSize_, isAllDataHasSent_,
113 label_.c_str(), STR_MASK(deviceId_));
114 return E_OK;
115 }
116 int mode = SyncOperation::TransferSyncMode(mode_);
117 if (mode == SyncModeType::PULL) {
118 LOGE("[DataSync] unexpected error");
119 return -E_INVALID_ARGS;
120 }
121 context->IncSequenceId();
122 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
123 errCode = PushStart(context);
124 } else {
125 errCode = PullResponseStart(context);
126 }
127 if ((mode == SyncModeType::PUSH_AND_PULL) && errCode == -E_EKEYREVOKED) {
128 LOGE("[DataSync] wait for recv finished,label=%s,device=%s", label_.c_str(), STR_MASK(deviceId_));
129 isAllDataHasSent_ = true;
130 return -E_EKEYREVOKED;
131 }
132 if (context->IsSkipTimeoutError(errCode)) {
133 // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
134 // just return to avoid higher pressure for send.
135 return E_OK;
136 }
137 if (errCode != E_OK) {
138 LOGE("[DataSync] InnerSend errCode=%d", errCode);
139 return errCode;
140 }
141 }
142 return E_OK;
143 }
144
InnerClearSyncStatus()145 void SingleVerDataSync::InnerClearSyncStatus()
146 {
147 sessionId_ = 0;
148 reSendMap_.clear();
149 windowSize_ = 0;
150 maxSequenceIdHasSent_ = 0;
151 isAllDataHasSent_ = false;
152 }
153
TryContinueSync(SingleVerSyncTaskContext * context,const Message * message)154 int SingleVerDataSync::TryContinueSync(SingleVerSyncTaskContext *context, const Message *message)
155 {
156 if (message == nullptr) {
157 LOGE("[DataSync] AckRecv message nullptr");
158 return -E_INVALID_ARGS;
159 }
160 const DataAckPacket *packet = message->GetObject<DataAckPacket>();
161 if (packet == nullptr) {
162 return -E_INVALID_ARGS;
163 }
164 uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
165 uint32_t sessionId = message->GetSessionId();
166 uint32_t sequenceId = message->GetSequenceId();
167
168 std::lock_guard<std::mutex> lock(lock_);
169 LOGI("[DataSync] recv ack seqId=%" PRIu32 ",packetId=%" PRIu64 ",winSize=%d,label=%s,dev=%s", sequenceId, packetId,
170 windowSize_, label_.c_str(), STR_MASK(deviceId_));
171 if (sessionId != sessionId_) {
172 LOGI("[DataSync] ignore ack,sessionId is different");
173 return E_OK;
174 }
175 Timestamp lastQueryTime = 0;
176 if (reSendMap_.count(sequenceId) != 0) {
177 lastQueryTime = reSendMap_[sequenceId].end;
178 reSendMap_.erase(sequenceId);
179 windowSize_++;
180 } else {
181 LOGI("[DataSync] ack seqId not in map");
182 return E_OK;
183 }
184 if (context->IsQuerySync() && storage_->GetInterfaceType() == ISyncInterface::SYNC_RELATION) {
185 Timestamp dbLastQueryTime = 0;
186 int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), dbLastQueryTime);
187 if (errCode == E_OK && dbLastQueryTime < lastQueryTime) {
188 errCode = metadata_->SetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), lastQueryTime);
189 }
190 if (errCode != E_OK) {
191 return errCode;
192 }
193 }
194 if (!isAllDataHasSent_) {
195 return InnerSyncStart(context);
196 } else if (reSendMap_.empty()) {
197 context->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
198 InnerClearSyncStatus();
199 return -E_FINISHED;
200 }
201 return E_OK;
202 }
203
ClearSyncStatus()204 void SingleVerDataSync::ClearSyncStatus()
205 {
206 std::lock_guard<std::mutex> lock(lock_);
207 InnerClearSyncStatus();
208 }
209
ReSendData(SingleVerSyncTaskContext * context)210 int SingleVerDataSync::ReSendData(SingleVerSyncTaskContext *context)
211 {
212 if (reSendMap_.empty()) {
213 LOGI("[DataSync] ReSend map empty");
214 return -E_INTERNAL_ERROR;
215 }
216 uint32_t sequenceId = reSendMap_.begin()->first;
217 ReSendInfo reSendInfo = reSendMap_.begin()->second;
218 LOGI("[DataSync] ReSend mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",delStart=%" PRIu64 ",delEnd=%" PRIu64 ","
219 "seqId=%" PRIu32 ",packetId=%" PRIu64 ",windowsize=%d,label=%s,deviceId=%s", mode_, reSendInfo.start,
220 reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, sequenceId, reSendInfo.packetId,
221 windowSize_, label_.c_str(), STR_MASK(deviceId_));
222 DataSyncReSendInfo dataReSendInfo = {sessionId_, sequenceId, reSendInfo.start, reSendInfo.end,
223 reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, reSendInfo.packetId};
224 return ReSend(context, dataReSendInfo);
225 }
226
GetLocalDeviceName()227 std::string SingleVerDataSync::GetLocalDeviceName()
228 {
229 std::string deviceInfo;
230 if (communicateHandle_ != nullptr) {
231 communicateHandle_->GetLocalIdentity(deviceInfo);
232 }
233 return deviceInfo;
234 }
235
Send(SingleVerSyncTaskContext * context,const Message * message,const CommErrHandler & handler,uint32_t packetLen)236 int SingleVerDataSync::Send(SingleVerSyncTaskContext *context, const Message *message, const CommErrHandler &handler,
237 uint32_t packetLen)
238 {
239 bool startFeedDogRet = false;
240 if (packetLen > mtuSize_ && mtuSize_ > NOTIFY_MIN_MTU_SIZE) {
241 uint32_t time = static_cast<uint32_t>(static_cast<uint64_t>(packetLen) *
242 static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_); // no overflow
243 startFeedDogRet = context->StartFeedDogForSync(time, SyncDirectionFlag::SEND);
244 }
245 SendConfig sendConfig;
246 SetSendConfigParam(storage_->GetDbProperties(), context->GetDeviceId(), false, SEND_TIME_OUT, sendConfig);
247 int errCode = communicateHandle_->SendMessage(context->GetDeviceId(), message, sendConfig, handler);
248 if (errCode != E_OK) {
249 LOGE("[DataSync][Send] send message failed, errCode=%d", errCode);
250 if (startFeedDogRet) {
251 context->StopFeedDogForSync(SyncDirectionFlag::SEND);
252 }
253 }
254 return errCode;
255 }
256
GetDataWithPerformanceRecord(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)257 int SingleVerDataSync::GetDataWithPerformanceRecord(SingleVerSyncTaskContext *context,
258 std::vector<SendDataItem> &outData, size_t packetSize)
259 {
260 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
261 if (performance != nullptr) {
262 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_READ_DATA);
263 }
264 // start a watch dog before get data
265 // it will send ack util get data finished
266 context->StartFeedDogForGetData(context->GetResponseSessionId());
267 int errCode = GetData(context, packetSize, outData);
268 context->StopFeedDogForGetData();
269 if (performance != nullptr) {
270 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_READ_DATA);
271 }
272 if (!outData.empty()) {
273 SingleVerDataSyncUtils::RecordClientId(*context, *storage_, metadata_);
274 }
275 return errCode;
276 }
277
GetData(SingleVerSyncTaskContext * context,size_t packetSize,std::vector<SendDataItem> & outData)278 int SingleVerDataSync::GetData(SingleVerSyncTaskContext *context, size_t packetSize, std::vector<SendDataItem> &outData)
279 {
280 int errCode;
281 UpdateMtuSize();
282 if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
283 context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
284 LOGI("[DataSync][GetData] resend data");
285 errCode = GetUnsyncData(context, outData, packetSize);
286 } else {
287 ContinueToken token;
288 context->GetContinueToken(token);
289 if (token == nullptr) {
290 errCode = GetUnsyncData(context, outData, packetSize);
291 } else {
292 LOGD("[DataSync][GetData] get data from token");
293 // if there is data to send, read out data, and update local watermark, send data
294 errCode = GetNextUnsyncData(context, outData, packetSize);
295 }
296 }
297 if (errCode == -E_UNFINISHED) {
298 LOGD("[DataSync][GetData] not finished.");
299 }
300 if (SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
301 std::string localHashName = DBCommon::TransferHashString(GetLocalDeviceName());
302 SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(localHashName, outData);
303 }
304 return errCode;
305 }
306
GetMatchData(SingleVerSyncTaskContext * context,SyncEntry & syncOutData)307 int SingleVerDataSync::GetMatchData(SingleVerSyncTaskContext *context, SyncEntry &syncOutData)
308 {
309 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
310 size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
311 DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
312 bool needCompressOnSync = false;
313 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
314 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
315 int errCode = GetDataWithPerformanceRecord(context, syncOutData.entries, packetSize);
316 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
317 context->SetTaskErrCode(errCode);
318 return errCode;
319 }
320
321 int innerCode = InterceptData(syncOutData);
322 if (innerCode != E_OK) {
323 context->SetTaskErrCode(innerCode);
324 return innerCode;
325 }
326
327 CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
328 if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
329 int compressCode = GenericSingleVerKvEntry::Compress(syncOutData.entries, syncOutData.compressedEntries,
330 { remoteAlgo, version });
331 if (compressCode != E_OK) {
332 return compressCode;
333 }
334 }
335 return errCode;
336 }
337
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)338 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
339 size_t packetSize)
340 {
341 SyncTimeRange waterRange;
342 DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
343 WaterMark startMark = 0;
344 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
345 GetLocalWaterMark(curType, context->GetQuerySyncId(), context, startMark);
346 if ((waterRange.endTime == 0) || (startMark > waterRange.endTime)) {
347 return E_OK;
348 }
349 waterRange.beginTime = startMark;
350 if (curType == SyncType::QUERY_SYNC_TYPE) {
351 WaterMark deletedStartMark = 0;
352 GetLocalDeleteSyncWaterMark(context, deletedStartMark);
353 Timestamp lastQueryTimestamp = 0;
354 int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(),
355 lastQueryTimestamp);
356 if (errCode != E_OK) {
357 return errCode;
358 }
359 waterRange.deleteBeginTime = deletedStartMark;
360 waterRange.lastQueryTime = lastQueryTimestamp;
361 }
362 return GetUnsyncData(context, outData, syncDataSizeInfo, waterRange);
363 }
364
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,DataSizeSpecInfo syncDataSizeInfo,SyncTimeRange & waterMarkInfo)365 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
366 DataSizeSpecInfo syncDataSizeInfo, SyncTimeRange &waterMarkInfo)
367 {
368 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
369 ContinueToken token = nullptr;
370 context->GetContinueToken(token);
371 if (token != nullptr) {
372 storage_->ReleaseContinueToken(token);
373 }
374 int errCode;
375 if (curType != SyncType::QUERY_SYNC_TYPE) {
376 errCode = storage_->GetSyncData(waterMarkInfo.beginTime, waterMarkInfo.endTime, outData, token,
377 syncDataSizeInfo);
378 } else {
379 QuerySyncObject queryObj = context->GetQuery();
380 errCode = storage_->GetSyncData(queryObj, waterMarkInfo, syncDataSizeInfo, token, outData);
381 }
382 context->SetContinueToken(token);
383 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
384 LOGE("[DataSync][GetUnsyncData] get unsync data failed,errCode=%d", errCode);
385 }
386 return errCode;
387 }
388
GetNextUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)389 int SingleVerDataSync::GetNextUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
390 size_t packetSize)
391 {
392 ContinueToken token;
393 context->GetContinueToken(token);
394 DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
395 int errCode = storage_->GetSyncDataNext(outData, token, syncDataSizeInfo);
396 context->SetContinueToken(token);
397 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
398 LOGE("[DataSync][GetNextUnsyncData] get next unsync data failed, errCode=%d", errCode);
399 }
400 return errCode;
401 }
402
SaveData(const SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,SyncType curType,const QuerySyncObject & query)403 int SingleVerDataSync::SaveData(const SingleVerSyncTaskContext *context, const std::vector<SendDataItem> &inData,
404 SyncType curType, const QuerySyncObject &query)
405 {
406 if (inData.empty()) {
407 return E_OK;
408 }
409 SingleVerDataSyncUtils::RecordClientId(*context, *storage_, metadata_);
410 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
411 if (performance != nullptr) {
412 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SAVE_DATA);
413 }
414 const auto localDeviceName = GetLocalDeviceName();
415 const std::string localHashName = DBCommon::TransferHashString(localDeviceName);
416 SingleVerDataSyncUtils::TransSendDataItemToLocal(context, localHashName, inData);
417 std::vector<SendDataItem> copyData = inData;
418 int errCode = storage_->InterceptData(copyData, GetDeviceId(), localDeviceName, false);
419 if (errCode != E_OK) {
420 LOGE("[DataSync][SaveData] intercept data failed, errCode=%d", errCode);
421 return errCode;
422 }
423 // query only support prefix key and don't have query in packet in 104 version
424 errCode = storage_->PutSyncDataWithQuery(query, copyData, context->GetDeviceId());
425 if (performance != nullptr) {
426 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SAVE_DATA);
427 }
428 if (errCode != E_OK) {
429 LOGE("[DataSync][SaveData] save sync data failed, errCode=%d", errCode);
430 }
431 return errCode;
432 }
433
ResetSyncStatus(int inMode,SingleVerSyncTaskContext * context)434 void SingleVerDataSync::ResetSyncStatus(int inMode, SingleVerSyncTaskContext *context)
435 {
436 mode_ = inMode;
437 maxSequenceIdHasSent_ = 0;
438 isAllDataHasSent_ = false;
439 context->ReSetSequenceId();
440 reSendMap_.clear();
441 if (context->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
442 windowSize_ = LOW_VERSION_WINDOW_SIZE;
443 } else {
444 windowSize_ = HIGH_VERSION_WINDOW_SIZE;
445 }
446 int mode = SyncOperation::TransferSyncMode(inMode);
447 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::PULL) {
448 sessionId_ = context->GetRequestSessionId();
449 } else {
450 sessionId_ = context->GetResponseSessionId();
451 }
452 }
453
GetSyncDataTimeRange(SyncType syncType,SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,UpdateWaterMark & isUpdate)454 SyncTimeRange SingleVerDataSync::GetSyncDataTimeRange(SyncType syncType, SingleVerSyncTaskContext *context,
455 const std::vector<SendDataItem> &inData, UpdateWaterMark &isUpdate)
456 {
457 WaterMark localMark = 0;
458 WaterMark deleteMark = 0;
459 GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
460 GetLocalDeleteSyncWaterMark(context, deleteMark);
461 return SingleVerDataSyncUtils::GetSyncDataTimeRange(syncType, localMark, deleteMark, inData, isUpdate);
462 }
463
SaveLocalWaterMark(SyncType syncType,const SingleVerSyncTaskContext * context,SyncTimeRange dataTimeRange,bool isCheckBeforUpdate) const464 int SingleVerDataSync::SaveLocalWaterMark(SyncType syncType, const SingleVerSyncTaskContext *context,
465 SyncTimeRange dataTimeRange, bool isCheckBeforUpdate) const
466 {
467 WaterMark localMark = 0;
468 int errCode = E_OK;
469 const std::string &deviceId = context->GetDeviceId();
470 std::string queryId = context->GetQuerySyncId();
471 if (syncType != SyncType::QUERY_SYNC_TYPE) {
472 if (isCheckBeforUpdate) {
473 GetLocalWaterMark(syncType, queryId, context, localMark);
474 if (localMark >= dataTimeRange.endTime) {
475 return E_OK;
476 }
477 }
478 errCode = metadata_->SaveLocalWaterMark(deviceId, dataTimeRange.endTime);
479 } else {
480 bool isNeedUpdateMark = true;
481 bool isNeedUpdateDeleteMark = true;
482 if (isCheckBeforUpdate) {
483 WaterMark deleteDataWaterMark = 0;
484 GetLocalWaterMark(syncType, queryId, context, localMark);
485 GetLocalDeleteSyncWaterMark(context, deleteDataWaterMark);
486 if (localMark >= dataTimeRange.endTime) {
487 isNeedUpdateMark = false;
488 }
489 if (deleteDataWaterMark >= dataTimeRange.deleteEndTime) {
490 isNeedUpdateDeleteMark = false;
491 }
492 }
493 if (isNeedUpdateMark) {
494 LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), dataTimeRange.endTime);
495 errCode = metadata_->SetSendQueryWaterMark(queryId, deviceId, dataTimeRange.endTime);
496 if (errCode != E_OK) {
497 LOGE("[DataSync][SaveLocalWaterMark] save query metadata watermark failed,errCode=%d", errCode);
498 return errCode;
499 }
500 }
501 if (isNeedUpdateDeleteMark) {
502 LOGD("label=%s,dev=%s,deleteEndTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()),
503 dataTimeRange.deleteEndTime);
504 errCode = metadata_->SetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), dataTimeRange.deleteEndTime);
505 }
506 }
507 if (errCode != E_OK) {
508 LOGE("[DataSync][SaveLocalWaterMark] save metadata local watermark failed,errCode=%d", errCode);
509 }
510 return errCode;
511 }
512
GetPeerWaterMark(SyncType syncType,const std::string & queryIdentify,const DeviceID & deviceId,WaterMark & waterMark) const513 void SingleVerDataSync::GetPeerWaterMark(SyncType syncType, const std::string &queryIdentify,
514 const DeviceID &deviceId, WaterMark &waterMark) const
515 {
516 if (syncType != SyncType::QUERY_SYNC_TYPE) {
517 metadata_->GetPeerWaterMark(deviceId, waterMark);
518 return;
519 }
520 metadata_->GetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
521 }
522
GetPeerDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)523 void SingleVerDataSync::GetPeerDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
524 {
525 metadata_->GetRecvDeleteSyncWaterMark(deviceId, waterMark);
526 }
527
GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext * context,WaterMark & waterMark) const528 void SingleVerDataSync::GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext *context,
529 WaterMark &waterMark) const
530 {
531 metadata_->GetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), waterMark, context->IsAutoLiftWaterMark());
532 }
533
GetLocalWaterMark(SyncType syncType,const std::string & queryIdentify,const SingleVerSyncTaskContext * context,WaterMark & waterMark) const534 void SingleVerDataSync::GetLocalWaterMark(SyncType syncType, const std::string &queryIdentify,
535 const SingleVerSyncTaskContext *context, WaterMark &waterMark) const
536 {
537 if (syncType != SyncType::QUERY_SYNC_TYPE) {
538 metadata_->GetLocalWaterMark(context->GetDeviceId(), waterMark);
539 return;
540 }
541 metadata_->GetSendQueryWaterMark(queryIdentify, context->GetDeviceId(),
542 waterMark, context->IsAutoLiftWaterMark());
543 }
544
RemoveDeviceDataHandle(SingleVerSyncTaskContext * context,const Message * message,WaterMark maxSendDataTime)545 int SingleVerDataSync::RemoveDeviceDataHandle(SingleVerSyncTaskContext *context, const Message *message,
546 WaterMark maxSendDataTime)
547 {
548 bool isNeedClearRemoteData = false;
549 std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
550 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
551 uint64_t clearDeviceDataMark = 0;
552 metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
553 isNeedClearRemoteData = (clearDeviceDataMark == REMOVE_DEVICE_DATA_MARK);
554 } else {
555 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
556 if (packet == nullptr) {
557 LOGE("[RemoveDeviceDataHandle] get packet object failed");
558 return -E_INVALID_ARGS;
559 }
560 SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
561 WaterMark packetLocalMark = packet->GetLocalWaterMark();
562 WaterMark peerMark = 0;
563 GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(), peerMark);
564 isNeedClearRemoteData = ((packetLocalMark == 0) && (peerMark != 0));
565 }
566 if (!isNeedClearRemoteData) {
567 return E_OK;
568 }
569 int errCode = E_OK;
570 if (context->IsNeedClearRemoteStaleData()) {
571 // need to clear remote device history data
572 errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
573 if (errCode != E_OK) {
574 (void)SendDataAck(context, message, errCode, maxSendDataTime);
575 return errCode;
576 }
577 if (context->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_EARLIEST) {
578 // avoid repeat clear in ack
579 metadata_->SaveLocalWaterMark(context->GetDeviceId(), 0);
580 }
581 }
582 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
583 errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
584 if (errCode != E_OK) {
585 (void)SendDataAck(context, message, errCode, maxSendDataTime);
586 return errCode;
587 }
588 }
589 return E_OK;
590 }
591
DealRemoveDeviceDataByAck(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)592 int SingleVerDataSync::DealRemoveDeviceDataByAck(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
593 const std::vector<uint64_t> &reserved)
594 {
595 bool isNeedClearRemoteData = false;
596 std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
597 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
598 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
599 uint64_t clearDeviceDataMark = 0;
600 metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
601 isNeedClearRemoteData = (clearDeviceDataMark != 0);
602 } else if (reserved.empty()) {
603 WaterMark localMark = 0;
604 GetLocalWaterMark(curType, context->GetQuery().GetIdentify(), context, localMark);
605 isNeedClearRemoteData = ((localMark != 0) && (ackWaterMark == 0));
606 } else {
607 WaterMark peerMark = 0;
608 GetPeerWaterMark(curType, context->GetQuerySyncId(),
609 context->GetDeviceId(), peerMark);
610 isNeedClearRemoteData = ((reserved[ACK_PACKET_RESERVED_INDEX_LOCAL_WATER_MARK] == 0) && (peerMark != 0));
611 }
612 if (!isNeedClearRemoteData) {
613 return E_OK;
614 }
615 // need to clear remote historydata
616 LOGI("[DataSync][WaterMarkException] AckRecv reserved not empty,rebuilted,clear historydata,label=%s,dev=%s",
617 label_.c_str(), STR_MASK(GetDeviceId()));
618 int errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
619 if (errCode != E_OK) {
620 return errCode;
621 }
622 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
623 errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
624 }
625 return errCode;
626 }
627
SetSessionEndTimestamp(Timestamp end)628 void SingleVerDataSync::SetSessionEndTimestamp(Timestamp end)
629 {
630 sessionEndTimestamp_ = end;
631 }
632
GetSessionEndTimestamp() const633 Timestamp SingleVerDataSync::GetSessionEndTimestamp() const
634 {
635 return sessionEndTimestamp_;
636 }
637
UpdateSendInfo(SyncTimeRange dataTimeRange,SingleVerSyncTaskContext * context)638 void SingleVerDataSync::UpdateSendInfo(SyncTimeRange dataTimeRange, SingleVerSyncTaskContext *context)
639 {
640 ReSendInfo reSendInfo;
641 reSendInfo.start = dataTimeRange.beginTime;
642 reSendInfo.end = dataTimeRange.endTime;
643 reSendInfo.deleteBeginTime = dataTimeRange.deleteBeginTime;
644 reSendInfo.deleteEndTime = dataTimeRange.deleteEndTime;
645 reSendInfo.packetId = context->GetPacketId();
646 maxSequenceIdHasSent_++;
647 reSendMap_[maxSequenceIdHasSent_] = reSendInfo;
648 windowSize_--;
649 ContinueToken token;
650 context->GetContinueToken(token);
651 if (token == nullptr) {
652 isAllDataHasSent_ = true;
653 }
654 LOGI("[DataSync] mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",deleteStart=%" PRIu64 ",deleteEnd=%" PRIu64 ","
655 "seqId=%" PRIu32 ",packetId=%" PRIu64 ",window_size=%d,isAllSend=%d,label=%s,device=%s", mode_,
656 reSendInfo.start, reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, maxSequenceIdHasSent_,
657 reSendInfo.packetId, windowSize_, isAllDataHasSent_, label_.c_str(), STR_MASK(deviceId_));
658 }
659
FillDataRequestPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,SyncEntry & syncData,int sendCode,int mode)660 void SingleVerDataSync::FillDataRequestPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
661 SyncEntry &syncData, int sendCode, int mode)
662 {
663 SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
664 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
665 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
666 WaterMark localMark = 0;
667 WaterMark peerMark = 0;
668 WaterMark deleteMark = 0;
669 bool needCompressOnSync = false;
670 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
671 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
672 std::string id = context->GetQuerySyncId();
673 GetLocalWaterMark(curType, id, context, localMark);
674 GetPeerWaterMark(curType, id, context->GetDeviceId(), peerMark);
675 GetLocalDeleteSyncWaterMark(context, deleteMark);
676 if (((mode != SyncModeType::RESPONSE_PULL && sendCode == E_OK)) ||
677 (mode == SyncModeType::RESPONSE_PULL && sendCode == SEND_FINISHED)) {
678 packet->SetLastSequence();
679 }
680 int tmpMode = mode;
681 if (mode == SyncModeType::RESPONSE_PULL) {
682 tmpMode = (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH;
683 }
684 packet->SetData(syncData.entries);
685 packet->SetCompressData(syncData.compressedEntries);
686 packet->SetBasicInfo(sendCode, version, tmpMode);
687 packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
688 packet->SetWaterMark(localMark, peerMark, deleteMark);
689 if (SyncOperation::TransferSyncMode(mode) == SyncModeType::PUSH_AND_PULL) {
690 packet->SetEndWaterMark(context->GetEndMark());
691 packet->SetSessionId(context->GetRequestSessionId());
692 }
693 packet->SetQuery(context->GetQuery());
694 packet->SetQueryId(context->GetQuerySyncId());
695 CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
696 // empty compress data should not mark compress
697 if (!syncData.compressedEntries.empty() && needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
698 packet->SetCompressDataMark();
699 packet->SetCompressAlgo(curAlgo);
700 }
701 SingleVerDataSyncUtils::SetPacketId(packet, context, version);
702 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
703 context->GetQuery().HasOrderBy())) {
704 packet->SetUpdateWaterMark();
705 }
706 LOGD("[DataSync] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",label=%s,dev=%s,queryId=%s,"
707 "isCompress=%d", static_cast<int>(curType), localMark, deleteMark, context->GetEndMark(), label_.c_str(),
708 STR_MASK(GetDeviceId()), STR_MASK(context->GetQuery().GetIdentify()), packet->IsCompressData());
709 }
710
RequestStart(SingleVerSyncTaskContext * context,int mode)711 int SingleVerDataSync::RequestStart(SingleVerSyncTaskContext *context, int mode)
712 {
713 int errCode = QuerySyncCheck(context);
714 if (errCode != E_OK) {
715 LOGE("[DataSync][PushStart] check query failed, errCode=%d", errCode);
716 return errCode;
717 }
718 errCode = RemoveDeviceDataIfNeed(context);
719 if (errCode != E_OK) {
720 context->SetTaskErrCode(errCode);
721 return errCode;
722 }
723 SyncEntry syncData;
724 // get data
725 errCode = GetMatchData(context, syncData);
726 SingleVerDataSyncUtils::TranslateErrCodeIfNeed(mode, context->GetRemoteSoftwareVersion(), errCode);
727
728 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
729 LOGE("[DataSync][PushStart] get data failed, errCode=%d", errCode);
730 return errCode;
731 }
732
733 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
734 if (packet == nullptr) {
735 LOGE("[DataSync][PushStart] new DataRequestPacket error");
736 return -E_OUT_OF_MEMORY;
737 }
738 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
739 UpdateWaterMark isUpdateWaterMark;
740 SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
741 if (errCode == E_OK) {
742 SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
743 }
744 FillDataRequestPacket(packet, context, syncData, errCode, mode);
745 errCode = SendDataPacket(curType, packet, context);
746 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
747 if (performance != nullptr) {
748 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
749 }
750 if (errCode == E_OK || errCode == -E_TIMEOUT) {
751 UpdateSendInfo(dataTime, context);
752 }
753 if (errCode == E_OK) {
754 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
755 context->GetQuery().HasOrderBy())) {
756 LOGI("[DataSync][RequestStart] query contain limit/offset/orderby, no need to update watermark.");
757 return E_OK;
758 }
759 SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
760 SaveLocalWaterMark(curType, context, tmpDataTime);
761 }
762 return errCode;
763 }
764
PushStart(SingleVerSyncTaskContext * context)765 int SingleVerDataSync::PushStart(SingleVerSyncTaskContext *context)
766 {
767 if (context == nullptr) {
768 return -E_INVALID_ARGS;
769 }
770 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
771 return RequestStart(context,
772 (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH);
773 }
774
PushPullStart(SingleVerSyncTaskContext * context)775 int SingleVerDataSync::PushPullStart(SingleVerSyncTaskContext *context)
776 {
777 if (context == nullptr) {
778 return -E_INVALID_ARGS;
779 }
780 return RequestStart(context, context->GetMode());
781 }
782
PullRequestStart(SingleVerSyncTaskContext * context)783 int SingleVerDataSync::PullRequestStart(SingleVerSyncTaskContext *context)
784 {
785 if (context == nullptr) {
786 return -E_INVALID_ARGS;
787 }
788 int errCode = QuerySyncCheck(context);
789 if (errCode != E_OK) {
790 return errCode;
791 }
792 errCode = RemoveDeviceDataIfNeed(context);
793 if (errCode != E_OK) {
794 context->SetTaskErrCode(errCode);
795 return errCode;
796 }
797 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
798 if (packet == nullptr) {
799 LOGE("[DataSync][PullRequest]new DataRequestPacket error");
800 return -E_OUT_OF_MEMORY;
801 }
802 SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
803 WaterMark peerMark = 0;
804 WaterMark localMark = 0;
805 WaterMark deleteMark = 0;
806 GetPeerWaterMark(syncType, context->GetQuerySyncId(),
807 context->GetDeviceId(), peerMark);
808 GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
809 GetLocalDeleteSyncWaterMark(context, deleteMark);
810 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
811 WaterMark endMark = context->GetEndMark();
812 SyncTimeRange dataTime = {localMark, deleteMark, localMark, deleteMark};
813 SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
814 packet->SetBasicInfo(E_OK, version, context->GetMode());
815 packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
816 packet->SetWaterMark(localMark, peerMark, deleteMark);
817 packet->SetEndWaterMark(endMark);
818 packet->SetSessionId(context->GetRequestSessionId());
819 packet->SetQuery(context->GetQuery());
820 packet->SetQueryId(context->GetQuerySyncId());
821 packet->SetLastSequence();
822 SingleVerDataSyncUtils::SetPacketId(packet, context, version);
823 context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
824 LOGD("[DataSync][Pull] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",peer=%" PRIu64 ",label=%s,"
825 "dev=%s", static_cast<int>(syncType), localMark, deleteMark, endMark, peerMark,
826 label_.c_str(), STR_MASK(GetDeviceId()));
827 UpdateSendInfo(dataTime, context);
828 return SendDataPacket(syncType, packet, context);
829 }
830
PullResponseStart(SingleVerSyncTaskContext * context)831 int SingleVerDataSync::PullResponseStart(SingleVerSyncTaskContext *context)
832 {
833 if (context == nullptr) {
834 return -E_INVALID_ARGS;
835 }
836 SyncEntry syncData;
837 // get data
838 int errCode = GetMatchData(context, syncData);
839 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
840 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
841 (void)SendPullResponseDataPkt(errCode, syncData, context);
842 }
843 return errCode;
844 }
845 // if send finished
846 int ackCode = E_OK;
847 ContinueToken token = nullptr;
848 context->GetContinueToken(token);
849 if (errCode == E_OK && token == nullptr) {
850 LOGD("[DataSync][PullResponse] send last frame end");
851 ackCode = SEND_FINISHED;
852 }
853 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
854 UpdateWaterMark isUpdateWaterMark;
855 SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
856 if (errCode == E_OK) {
857 SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
858 }
859 errCode = SendPullResponseDataPkt(ackCode, syncData, context);
860 if (errCode == E_OK || errCode == -E_TIMEOUT) {
861 UpdateSendInfo(dataTime, context);
862 }
863 if (errCode == E_OK) {
864 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
865 context->GetQuery().HasOrderBy())) {
866 LOGI("[DataSync][PullResponseStart] query contain limit/offset/orderby, no need to update watermark.");
867 return E_OK;
868 }
869 SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
870 SaveLocalWaterMark(curType, context, tmpDataTime);
871 }
872 return errCode;
873 }
874
UpdateQueryPeerWaterMark(SyncType syncType,const std::string & queryId,const SyncTimeRange & dataTime,const SingleVerSyncTaskContext * context,UpdateWaterMark isUpdateWaterMark)875 void SingleVerDataSync::UpdateQueryPeerWaterMark(SyncType syncType, const std::string &queryId,
876 const SyncTimeRange &dataTime, const SingleVerSyncTaskContext *context, UpdateWaterMark isUpdateWaterMark)
877 {
878 WaterMark tmpPeerWatermark = dataTime.endTime;
879 WaterMark tmpPeerDeletedWatermark = dataTime.deleteEndTime;
880 if (isUpdateWaterMark.normalUpdateMark) {
881 tmpPeerWatermark++;
882 }
883 if (isUpdateWaterMark.deleteUpdateMark) {
884 tmpPeerDeletedWatermark++;
885 }
886 UpdatePeerWaterMark(syncType, queryId, context, tmpPeerWatermark, tmpPeerDeletedWatermark);
887 }
888
UpdatePeerWaterMark(SyncType syncType,const std::string & queryId,const SingleVerSyncTaskContext * context,WaterMark peerWatermark,WaterMark peerDeletedWatermark)889 void SingleVerDataSync::UpdatePeerWaterMark(SyncType syncType, const std::string &queryId,
890 const SingleVerSyncTaskContext *context, WaterMark peerWatermark, WaterMark peerDeletedWatermark)
891 {
892 if (peerWatermark == 0 && peerDeletedWatermark == 0) {
893 return;
894 }
895 int errCode = E_OK;
896 if (syncType != SyncType::QUERY_SYNC_TYPE) {
897 errCode = metadata_->SavePeerWaterMark(context->GetDeviceId(), peerWatermark, true);
898 } else {
899 if (peerWatermark != 0) {
900 LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), peerWatermark);
901 errCode = metadata_->SetRecvQueryWaterMark(queryId, context->GetDeviceId(), peerWatermark);
902 if (errCode != E_OK) {
903 LOGE("[DataSync][UpdatePeerWaterMark] save query peer water mark failed,errCode=%d", errCode);
904 }
905 }
906 if (peerDeletedWatermark != 0) {
907 LOGD("label=%s,dev=%s,peerDeletedTime=%" PRIu64,
908 label_.c_str(), STR_MASK(GetDeviceId()), peerDeletedWatermark);
909 errCode = metadata_->SetRecvDeleteSyncWaterMark(context->GetDeleteSyncId(), peerDeletedWatermark);
910 }
911 }
912 if (errCode != E_OK) {
913 LOGE("[DataSync][UpdatePeerWaterMark] save peer water mark failed,errCode=%d", errCode);
914 }
915 }
916
DoAbilitySyncIfNeed(SingleVerSyncTaskContext * context,const Message * message,bool isControlMsg)917 int SingleVerDataSync::DoAbilitySyncIfNeed(SingleVerSyncTaskContext *context, const Message *message, bool isControlMsg)
918 {
919 uint16_t remoteCommunicatorVersion = 0;
920 if (communicateHandle_->GetRemoteCommunicatorVersion(context->GetDeviceId(), remoteCommunicatorVersion) ==
921 -E_NOT_FOUND) {
922 LOGE("[DataSync][DoAbilitySyncIfNeed] get remote communicator version failed");
923 return -E_VERSION_NOT_SUPPORT;
924 }
925 // If remote is not the first version, we need check SOFTWARE_VERSION_BASE
926 if (remoteCommunicatorVersion == 0) {
927 LOGI("[DataSync] set remote version 0");
928 context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
929 return E_OK;
930 } else {
931 LOGI("[DataSync][DoAbilitySyncIfNeed] need do ability sync");
932 if (isControlMsg) {
933 SendControlAck(context, message, -E_NEED_ABILITY_SYNC, 0);
934 } else {
935 (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
936 }
937 return -E_NEED_ABILITY_SYNC;
938 }
939 }
940
DataRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)941 int SingleVerDataSync::DataRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
942 {
943 if (context == nullptr || message == nullptr) {
944 return -E_INVALID_ARGS;
945 }
946 auto *packet = message->GetObject<DataRequestPacket>();
947 if (packet == nullptr) {
948 return -E_INVALID_ARGS;
949 }
950 if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
951 return DoAbilitySyncIfNeed(context, message);
952 }
953 int32_t sendCode = packet->GetSendCode();
954 if (sendCode == -E_VERSION_NOT_SUPPORT) {
955 LOGE("[DataSync] Version mismatch: ver=%u, current=%u", packet->GetVersion(), SOFTWARE_VERSION_CURRENT);
956 (void)SendDataAck(context, message, -E_VERSION_NOT_SUPPORT, 0);
957 return -E_WAIT_NEXT_MESSAGE;
958 }
959 // only deal with pull response packet errCode
960 if (sendCode != E_OK && sendCode != SEND_FINISHED && sendCode != -E_UNFINISHED &&
961 message->GetSessionId() == context->GetRequestSessionId()) {
962 LOGE("[DataSync][DataRequestRecvPre] remote pullResponse getData sendCode=%d", sendCode);
963 return sendCode;
964 }
965 int errCode = RunPermissionCheck(context, message, packet);
966 if (errCode != E_OK) {
967 return errCode;
968 }
969 if (std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT) > SOFTWARE_VERSION_RELEASE_2_0) {
970 errCode = CheckSchemaStrategy(context, message);
971 }
972 if (errCode == E_OK) {
973 errCode = SingleVerDataSyncUtils::RequestQueryCheck(packet, storage_);
974 }
975 if (errCode != E_OK) {
976 (void)SendDataAck(context, message, errCode, 0);
977 return errCode;
978 }
979 errCode = SingleVerDataSyncUtils::SchemaVersionMatchCheck(*context, *packet, metadata_);
980 if (errCode != E_OK) {
981 (void)SendDataAck(context, message, errCode, 0);
982 }
983 return errCode;
984 }
985
DataRequestRecv(SingleVerSyncTaskContext * context,const Message * message,WaterMark & pullEndWatermark)986 int SingleVerDataSync::DataRequestRecv(SingleVerSyncTaskContext *context, const Message *message,
987 WaterMark &pullEndWatermark)
988 {
989 int errCode = DataRequestRecvPre(context, message);
990 if (errCode != E_OK) {
991 return errCode;
992 }
993 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
994 const std::vector<SendDataItem> &data = packet->GetData();
995 SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
996 LOGI("[DataSync][DataRequestRecv] curType=%d, remote ver=%" PRIu32 ", size=%zu, errCode=%d, queryId=%s,"
997 " Label=%s, dev=%s", static_cast<int>(curType), packet->GetVersion(), data.size(), packet->GetSendCode(),
998 STR_MASK(packet->GetQueryId()), label_.c_str(), STR_MASK(GetDeviceId()));
999 context->SetReceiveWaterMarkErr(false);
1000 UpdateWaterMark isUpdateWaterMark;
1001 SyncTimeRange dataTime = SingleVerDataSyncUtils::GetRecvDataTimeRange(curType, data, isUpdateWaterMark);
1002 errCode = RemoveDeviceDataHandle(context, message, dataTime.endTime);
1003 if (errCode != E_OK) {
1004 return errCode;
1005 }
1006 Metadata::MetaWaterMarkAutoLock autoLock(metadata_);
1007 if (WaterMarkErrHandle(curType, context, message)) {
1008 return E_OK;
1009 }
1010 GetPullEndWatermark(context, packet, pullEndWatermark);
1011 // save data first
1012 errCode = SaveData(context, data, curType, packet->GetQuery());
1013 if (errCode != E_OK) {
1014 (void)SendDataAck(context, message, errCode, dataTime.endTime);
1015 return errCode;
1016 }
1017 if (pullEndWatermark > 0 && !storage_->IsReadable()) { // pull mode
1018 pullEndWatermark = 0;
1019 errCode = SendDataAck(context, message, -E_EKEYREVOKED, dataTime.endTime);
1020 } else {
1021 // if data is empty, we don't know the max timestap of this packet.
1022 errCode = SendDataAck(context, message, !data.empty() ? E_OK : WATER_MARK_INVALID, dataTime.endTime);
1023 }
1024 RemotePushFinished(packet->GetSendCode(), packet->GetMode(), message->GetSessionId(),
1025 context->GetRequestSessionId());
1026 if (curType != SyncType::QUERY_SYNC_TYPE && isUpdateWaterMark.normalUpdateMark) {
1027 UpdatePeerWaterMark(curType, "", context, dataTime.endTime + 1, 0);
1028 } else if (curType == SyncType::QUERY_SYNC_TYPE && packet->IsNeedUpdateWaterMark()) {
1029 UpdateQueryPeerWaterMark(curType, packet->GetQueryId(), dataTime, context, isUpdateWaterMark);
1030 }
1031 if (errCode != E_OK) {
1032 return errCode;
1033 }
1034 if (packet->GetSendCode() == SEND_FINISHED) {
1035 return -E_RECV_FINISHED;
1036 }
1037 return errCode;
1038 }
1039
SendDataPacket(SyncType syncType,DataRequestPacket * packet,SingleVerSyncTaskContext * context)1040 int SingleVerDataSync::SendDataPacket(SyncType syncType, DataRequestPacket *packet,
1041 SingleVerSyncTaskContext *context)
1042 {
1043 Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1044 if (message == nullptr) {
1045 LOGE("[DataSync][SendDataPacket] new message error");
1046 delete packet;
1047 packet = nullptr;
1048 return -E_OUT_OF_MEMORY;
1049 }
1050 uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1051 int errCode = message->SetExternalObject(packet);
1052 if (errCode != E_OK) {
1053 delete packet;
1054 packet = nullptr;
1055 delete message;
1056 message = nullptr;
1057 LOGE("[DataSync][SendDataPacket] set external object failed errCode=%d", errCode);
1058 return errCode;
1059 }
1060 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1061 context->GetSequenceId(), context->GetRequestSessionId());
1062 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
1063 if (performance != nullptr) {
1064 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
1065 }
1066 CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1067 SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1068 };
1069 errCode = Send(context, message, handler, packetLen);
1070 if (errCode != E_OK) {
1071 delete message;
1072 message = nullptr;
1073 }
1074
1075 return errCode;
1076 }
1077
SendPullResponseDataPkt(int ackCode,SyncEntry & syncOutData,SingleVerSyncTaskContext * context)1078 int SingleVerDataSync::SendPullResponseDataPkt(int ackCode, SyncEntry &syncOutData,
1079 SingleVerSyncTaskContext *context)
1080 {
1081 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1082 if (packet == nullptr) {
1083 LOGE("[DataSync][SendPullResponseDataPkt] new data request packet error");
1084 return -E_OUT_OF_MEMORY;
1085 }
1086 SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1087 FillDataRequestPacket(packet, context, syncOutData, ackCode, SyncModeType::RESPONSE_PULL);
1088 uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1089 Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1090 if (message == nullptr) {
1091 LOGE("[DataSync][SendPullResponseDataPkt] new message error");
1092 delete packet;
1093 packet = nullptr;
1094 return -E_OUT_OF_MEMORY;
1095 }
1096 int errCode = message->SetExternalObject(packet);
1097 if (errCode != E_OK) {
1098 delete packet;
1099 packet = nullptr;
1100 delete message;
1101 message = nullptr;
1102 LOGE("[SendPullResponseDataPkt] set external object failed, errCode=%d", errCode);
1103 return errCode;
1104 }
1105 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1106 context->GetSequenceId(), context->GetResponseSessionId());
1107 SendResetWatchDogPacket(context, packetLen);
1108 errCode = Send(context, message, nullptr, packetLen);
1109 if (errCode != E_OK) {
1110 delete message;
1111 message = nullptr;
1112 }
1113 return errCode;
1114 }
1115
SendFinishedDataAck(SingleVerSyncTaskContext * context,const Message * message)1116 void SingleVerDataSync::SendFinishedDataAck(SingleVerSyncTaskContext *context, const Message *message)
1117 {
1118 (void)SendDataAck(context, message, E_OK, 0);
1119 }
1120
SendDataAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,WaterMark maxSendDataTime)1121 int SingleVerDataSync::SendDataAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1122 WaterMark maxSendDataTime)
1123 {
1124 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1125 if (packet == nullptr) {
1126 return -E_INVALID_ARGS;
1127 }
1128 Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1129 if (ackMessage == nullptr) {
1130 LOGE("[DataSync][SendDataAck] new message error");
1131 return -E_OUT_OF_MEMORY;
1132 }
1133 DataAckPacket ack;
1134 SetAckPacket(ack, context, packet, recvCode, maxSendDataTime);
1135 int errCode = ackMessage->SetCopiedObject(ack);
1136 if (errCode != E_OK) {
1137 delete ackMessage;
1138 ackMessage = nullptr;
1139 LOGE("[DataSync][SendDataAck] set copied object failed, errcode=%d", errCode);
1140 return errCode;
1141 }
1142 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1143 message->GetSequenceId(), message->GetSessionId());
1144
1145 errCode = Send(context, ackMessage, nullptr, 0);
1146 if (errCode != E_OK) {
1147 delete ackMessage;
1148 ackMessage = nullptr;
1149 }
1150 return errCode;
1151 }
1152
AckPacketIdCheck(const Message * message)1153 bool SingleVerDataSync::AckPacketIdCheck(const Message *message)
1154 {
1155 if (message == nullptr) {
1156 LOGE("[DataSync] AckRecv message nullptr");
1157 return false;
1158 }
1159 if (message->GetMessageType() == TYPE_NOTIFY || message->IsFeedbackError()) {
1160 return true;
1161 }
1162 const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1163 if (packet == nullptr) {
1164 return false;
1165 }
1166 uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1167 std::lock_guard<std::mutex> lock(lock_);
1168 uint32_t sequenceId = message->GetSequenceId();
1169 if (reSendMap_.count(sequenceId) != 0) {
1170 uint64_t originalPacketId = reSendMap_[sequenceId].packetId;
1171 if (DataAckPacket::IsPacketIdValid(packetId) && packetId != originalPacketId) {
1172 LOGE("[DataSync] packetId[%" PRIu64 "] is not match with original[%" PRIu64 "]", packetId,
1173 originalPacketId);
1174 return false;
1175 }
1176 }
1177 return true;
1178 }
1179
AckRecv(SingleVerSyncTaskContext * context,const Message * message)1180 int SingleVerDataSync::AckRecv(SingleVerSyncTaskContext *context, const Message *message)
1181 {
1182 int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1183 if (errCode != E_OK) {
1184 return errCode;
1185 }
1186 const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1187 if (packet == nullptr) {
1188 return -E_INVALID_ARGS;
1189 }
1190 int32_t recvCode = packet->GetRecvCode();
1191 LOGD("[DataSync][AckRecv] ver=%u,recvCode=%d,myversion=%u,label=%s,dev=%s", packet->GetVersion(), recvCode,
1192 SOFTWARE_VERSION_CURRENT, label_.c_str(), STR_MASK(GetDeviceId()));
1193 if (recvCode == -E_VERSION_NOT_SUPPORT) {
1194 LOGE("[DataSync][AckRecv] Version mismatch");
1195 return -E_VERSION_NOT_SUPPORT;
1196 }
1197
1198 if (recvCode == -E_NEED_ABILITY_SYNC || recvCode == -E_NOT_PERMIT || recvCode == -E_NEED_TIME_SYNC) {
1199 // we should ReleaseContinueToken, avoid crash
1200 LOGI("[DataSync][AckRecv] Data sync abort,recvCode =%d,label =%s,dev=%s", recvCode, label_.c_str(),
1201 STR_MASK(GetDeviceId()));
1202 context->ReleaseContinueToken();
1203 return recvCode;
1204 }
1205 uint64_t data = packet->GetData();
1206 if (recvCode == LOCAL_WATER_MARK_NOT_INIT) {
1207 return DealWaterMarkException(context, data, packet->GetReserved());
1208 }
1209
1210 if (recvCode == -E_SAVE_DATA_NOTIFY && data != 0) {
1211 // data only use low 32bit
1212 context->StartFeedDogForSync(static_cast<uint32_t>(data), SyncDirectionFlag::RECEIVE);
1213 LOGI("[DataSync][AckRecv] notify ResetWatchDog=%" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1214 STR_MASK(GetDeviceId()));
1215 }
1216
1217 if (recvCode != E_OK && recvCode != WATER_MARK_INVALID) {
1218 LOGW("[DataSync][AckRecv] Received a uncatched recvCode=%d,label=%s,dev=%s", recvCode,
1219 label_.c_str(), STR_MASK(GetDeviceId()));
1220 return recvCode;
1221 }
1222
1223 // Judge if send finished
1224 ContinueToken token;
1225 context->GetContinueToken(token);
1226 if (((message->GetSessionId() == context->GetResponseSessionId()) ||
1227 (message->GetSessionId() == context->GetRequestSessionId())) && (token == nullptr)) {
1228 return -E_NO_DATA_SEND;
1229 }
1230
1231 // send next data
1232 return -E_SEND_DATA;
1233 }
1234
SendSaveDataNotifyPacket(SingleVerSyncTaskContext * context,uint32_t pktVersion,uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)1235 void SingleVerDataSync::SendSaveDataNotifyPacket(SingleVerSyncTaskContext *context, uint32_t pktVersion,
1236 uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
1237 {
1238 if (inMsgId != DATA_SYNC_MESSAGE && inMsgId != QUERY_SYNC_MESSAGE) {
1239 LOGE("[SingleVerDataSync] messageId not available.");
1240 return;
1241 }
1242 Message *ackMessage = new (std::nothrow) Message(inMsgId);
1243 if (ackMessage == nullptr) {
1244 LOGE("[DataSync][SaveDataNotify] new message failed");
1245 return;
1246 }
1247
1248 DataAckPacket ack;
1249 ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1250 ack.SetVersion(pktVersion);
1251 int errCode = ackMessage->SetCopiedObject(ack);
1252 if (errCode != E_OK) {
1253 delete ackMessage;
1254 ackMessage = nullptr;
1255 LOGE("[DataSync][SendSaveDataNotifyPacket] set copied object failed,errcode=%d", errCode);
1256 return;
1257 }
1258 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(), sequenceId, sessionId);
1259
1260 errCode = Send(context, ackMessage, nullptr, 0);
1261 if (errCode != E_OK) {
1262 delete ackMessage;
1263 ackMessage = nullptr;
1264 }
1265 LOGD("[DataSync][SaveDataNotify] Send SaveDataNotify packet Finished,errcode=%d,label=%s,dev=%s",
1266 errCode, label_.c_str(), STR_MASK(GetDeviceId()));
1267 }
1268
GetPullEndWatermark(const SingleVerSyncTaskContext * context,const DataRequestPacket * packet,WaterMark & pullEndWatermark) const1269 void SingleVerDataSync::GetPullEndWatermark(const SingleVerSyncTaskContext *context, const DataRequestPacket *packet,
1270 WaterMark &pullEndWatermark) const
1271 {
1272 if (packet == nullptr) {
1273 return;
1274 }
1275 int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1276 if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH_AND_PULL)) {
1277 WaterMark endMark = packet->GetEndWaterMark();
1278 TimeOffset offset;
1279 metadata_->GetTimeOffset(context->GetDeviceId(), offset);
1280 pullEndWatermark = endMark - static_cast<WaterMark>(offset);
1281 LOGD("[DataSync][PullEndWatermark] packetEndMark=%" PRIu64 ",offset=%" PRId64 ",endWaterMark=%" PRIu64 ","
1282 "label=%s,dev=%s", endMark, offset, pullEndWatermark, label_.c_str(), STR_MASK(GetDeviceId()));
1283 }
1284 }
1285
DealWaterMarkException(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)1286 int SingleVerDataSync::DealWaterMarkException(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
1287 const std::vector<uint64_t> &reserved)
1288 {
1289 WaterMark deletedWaterMark = 0;
1290 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1291 if (curType == SyncType::QUERY_SYNC_TYPE) {
1292 if (reserved.size() <= ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK) {
1293 LOGE("[DataSync] get packet reserve size failed");
1294 return -E_INVALID_ARGS;
1295 }
1296 deletedWaterMark = reserved[ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK];
1297 }
1298 LOGI("[DataSync][WaterMarkException] AckRecv water error, mark=%" PRIu64 ",deleteMark=%" PRIu64 ","
1299 "label=%s,dev=%s", ackWaterMark, deletedWaterMark, label_.c_str(), STR_MASK(GetDeviceId()));
1300 int errCode = SaveLocalWaterMark(curType, context,
1301 {0, 0, ackWaterMark, deletedWaterMark});
1302 if (errCode != E_OK) {
1303 return errCode;
1304 }
1305 context->SetRetryStatus(SyncTaskContext::NEED_RETRY);
1306 context->IncNegotiationCount();
1307 SingleVerDataSyncUtils::PushAndPullKeyRevokHandle(context);
1308 if (!context->IsNeedClearRemoteStaleData()) {
1309 return -E_RE_SEND_DATA;
1310 }
1311 errCode = DealRemoveDeviceDataByAck(context, ackWaterMark, reserved);
1312 if (errCode != E_OK) {
1313 return errCode;
1314 }
1315 return -E_RE_SEND_DATA;
1316 }
1317
RunPermissionCheck(SingleVerSyncTaskContext * context,const Message * message,const DataRequestPacket * packet)1318 int SingleVerDataSync::RunPermissionCheck(SingleVerSyncTaskContext *context, const Message *message,
1319 const DataRequestPacket *packet)
1320 {
1321 int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1322 int errCode = SingleVerDataSyncUtils::RunPermissionCheck(context, storage_, label_, packet);
1323 if (errCode != E_OK) {
1324 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_EARLIEST) { // ver 101 can't handle this errCode
1325 (void)SendDataAck(context, message, -E_NOT_PERMIT, 0);
1326 }
1327 return -E_NOT_PERMIT;
1328 }
1329 const std::vector<SendDataItem> &data = packet->GetData();
1330 WaterMark maxSendDataTime = SingleVerDataSyncUtils::GetMaxSendDataTime(data);
1331 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1332 auto securityOption = packet->GetSecurityOption();
1333 if (context->GetRemoteSeccurityOption().securityLabel == SecurityLabel::NOT_SET &&
1334 securityOption.securityLabel != SecurityLabel::NOT_SET) {
1335 context->SetRemoteSeccurityOption(packet->GetSecurityOption());
1336 }
1337 if (version > SOFTWARE_VERSION_RELEASE_2_0 && (mode != SyncModeType::PULL) &&
1338 !context->GetReceivcPermitCheck()) {
1339 bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_, storage_);
1340 if (permitReceive) {
1341 context->SetReceivcPermitCheck(true);
1342 } else {
1343 (void)SendDataAck(context, message, -E_SECURITY_OPTION_CHECK_ERROR, maxSendDataTime);
1344 return -E_SECURITY_OPTION_CHECK_ERROR;
1345 }
1346 }
1347 return errCode;
1348 }
1349
1350 // used in pull response
SendResetWatchDogPacket(SingleVerSyncTaskContext * context,uint32_t packetLen)1351 void SingleVerDataSync::SendResetWatchDogPacket(SingleVerSyncTaskContext *context, uint32_t packetLen)
1352 {
1353 // When mtu less than 30k, we send data with bluetooth
1354 // In order not to block the bluetooth channel, we don't send notify packet here
1355 if (mtuSize_ >= packetLen || mtuSize_ < NOTIFY_MIN_MTU_SIZE) {
1356 return;
1357 }
1358 uint64_t data = static_cast<uint64_t>(packetLen) * static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_;
1359
1360 Message *ackMessage = new (std::nothrow) Message(DATA_SYNC_MESSAGE);
1361 if (ackMessage == nullptr) {
1362 LOGE("[DataSync][ResetWatchDog] new message failed");
1363 return;
1364 }
1365
1366 DataAckPacket ack;
1367 ack.SetData(data);
1368 ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1369 ack.SetVersion(std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT));
1370 int errCode = ackMessage->SetCopiedObject(ack);
1371 if (errCode != E_OK) {
1372 delete ackMessage;
1373 ackMessage = nullptr;
1374 LOGE("[DataSync][ResetWatchDog] set copied object failed, errcode=%d", errCode);
1375 return;
1376 }
1377 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(),
1378 context->GetSequenceId(), context->GetResponseSessionId());
1379
1380 errCode = Send(context, ackMessage, nullptr, 0);
1381 if (errCode != E_OK) {
1382 delete ackMessage;
1383 ackMessage = nullptr;
1384 LOGE("[DataSync][ResetWatchDog] Send packet failed,errcode=%d,label=%s,dev=%s", errCode, label_.c_str(),
1385 STR_MASK(GetDeviceId()));
1386 } else {
1387 LOGI("[DataSync][ResetWatchDog] data = %" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1388 STR_MASK(GetDeviceId()));
1389 }
1390 }
1391
ReSend(SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1392 int32_t SingleVerDataSync::ReSend(SingleVerSyncTaskContext *context, DataSyncReSendInfo reSendInfo)
1393 {
1394 if (context == nullptr) {
1395 return -E_INVALID_ARGS;
1396 }
1397 int errCode = RemoveDeviceDataIfNeed(context);
1398 if (errCode != E_OK) {
1399 context->SetTaskErrCode(errCode);
1400 return errCode;
1401 }
1402 SyncEntry syncData;
1403 errCode = GetReSendData(syncData, context, reSendInfo);
1404 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1405 return errCode;
1406 }
1407 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1408 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1409 if (packet == nullptr) {
1410 LOGE("[DataSync][ReSend] new DataRequestPacket error");
1411 return -E_OUT_OF_MEMORY;
1412 }
1413 FillRequestReSendPacket(context, packet, reSendInfo, syncData, errCode);
1414 errCode = SendReSendPacket(packet, context, reSendInfo.sessionId, reSendInfo.sequenceId);
1415 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() || context->GetQuery().HasOrderBy())) {
1416 LOGI("[DataSync][ReSend] query contain limit/offset/orderby, no need to update watermark.");
1417 return errCode;
1418 }
1419 if (errCode == E_OK && SyncOperation::TransferSyncMode(context->GetMode()) != SyncModeType::PULL) {
1420 // resend.end may not update in localwatermark while E_TIMEOUT occurred in send message last time.
1421 SyncTimeRange dataTime {reSendInfo.start, reSendInfo.deleteDataStart, reSendInfo.end, reSendInfo.deleteDataEnd};
1422 if (reSendInfo.deleteDataEnd > reSendInfo.deleteDataStart && curType == SyncType::QUERY_SYNC_TYPE) {
1423 dataTime.deleteEndTime += 1;
1424 }
1425 if (reSendInfo.end > reSendInfo.start) {
1426 dataTime.endTime += 1;
1427 }
1428 errCode = SaveLocalWaterMark(curType, context, dataTime, true);
1429 if (errCode != E_OK) {
1430 LOGE("[DataSync][ReSend] SaveLocalWaterMark failed.");
1431 }
1432 }
1433 return errCode;
1434 }
1435
SendReSendPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,uint32_t sessionId,uint32_t sequenceId)1436 int SingleVerDataSync::SendReSendPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
1437 uint32_t sessionId, uint32_t sequenceId)
1438 {
1439 SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1440 Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1441 if (message == nullptr) {
1442 LOGE("[DataSync][SendDataPacket] new message error");
1443 delete packet;
1444 packet = nullptr;
1445 return -E_OUT_OF_MEMORY;
1446 }
1447 uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1448 int errCode = message->SetExternalObject(packet);
1449 if (errCode != E_OK) {
1450 delete packet;
1451 packet = nullptr;
1452 delete message;
1453 message = nullptr;
1454 LOGE("[DataSync][SendReSendPacket] SetExternalObject failed errCode=%d", errCode);
1455 return errCode;
1456 }
1457 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(), sequenceId, sessionId);
1458 CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1459 SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1460 };
1461 errCode = Send(context, message, handler, packetLen);
1462 if (errCode != E_OK) {
1463 delete message;
1464 message = nullptr;
1465 }
1466 return errCode;
1467 }
1468
CheckPermitSendData(int inMode,SingleVerSyncTaskContext * context)1469 int SingleVerDataSync::CheckPermitSendData(int inMode, SingleVerSyncTaskContext *context)
1470 {
1471 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1472 int mode = SyncOperation::TransferSyncMode(inMode);
1473 // for pull mode it just need to get data, no need to send data.
1474 if (version <= SOFTWARE_VERSION_RELEASE_2_0 || mode == SyncModeType::PULL) {
1475 return E_OK;
1476 }
1477 if (context->GetSendPermitCheck()) {
1478 return E_OK;
1479 }
1480 bool isPermitSync = true;
1481 std::string deviceId = context->GetDeviceId();
1482 SecurityOption remoteSecOption = context->GetRemoteSeccurityOption();
1483 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::RESPONSE_PULL) {
1484 isPermitSync = SingleVerDataSyncUtils::IsPermitRemoteDeviceRecvData(deviceId, remoteSecOption, storage_);
1485 }
1486 LOGI("[DataSync][PermitSendData] mode=%d,dev=%s,label=%d,flag=%d,PermitSync=%d", mode, STR_MASK(deviceId_),
1487 remoteSecOption.securityLabel, remoteSecOption.securityFlag, isPermitSync);
1488 if (isPermitSync) {
1489 context->SetSendPermitCheck(true);
1490 return E_OK;
1491 }
1492 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
1493 context->SetTaskErrCode(-E_SECURITY_OPTION_CHECK_ERROR);
1494 return -E_SECURITY_OPTION_CHECK_ERROR;
1495 }
1496 if (mode == SyncModeType::RESPONSE_PULL) {
1497 SyncEntry syncData;
1498 (void)SendPullResponseDataPkt(-E_SECURITY_OPTION_CHECK_ERROR, syncData, context);
1499 return -E_SECURITY_OPTION_CHECK_ERROR;
1500 }
1501 if (mode == SyncModeType::SUBSCRIBE_QUERY) {
1502 return -E_SECURITY_OPTION_CHECK_ERROR;
1503 }
1504 return E_OK;
1505 }
1506
GetLabel() const1507 std::string SingleVerDataSync::GetLabel() const
1508 {
1509 return label_;
1510 }
1511
GetDeviceId() const1512 std::string SingleVerDataSync::GetDeviceId() const
1513 {
1514 return deviceId_;
1515 }
1516
WaterMarkErrHandle(SyncType syncType,SingleVerSyncTaskContext * context,const Message * message)1517 bool SingleVerDataSync::WaterMarkErrHandle(SyncType syncType, SingleVerSyncTaskContext *context, const Message *message)
1518 {
1519 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1520 if (packet == nullptr) {
1521 LOGE("[WaterMarkErrHandle] get packet object failed");
1522 return -E_INVALID_ARGS;
1523 }
1524 WaterMark packetLocalMark = packet->GetLocalWaterMark();
1525 WaterMark packetDeletedMark = packet->GetDeletedWaterMark();
1526 WaterMark peerMark = 0;
1527 WaterMark deletedMark = 0;
1528 GetPeerWaterMark(syncType, packet->GetQueryId(), context->GetDeviceId(), peerMark);
1529 if (syncType == SyncType::QUERY_SYNC_TYPE) {
1530 GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedMark);
1531 }
1532 if (syncType != SyncType::QUERY_SYNC_TYPE && packetLocalMark > peerMark) {
1533 LOGI("[DataSync][DataRequestRecv] packetLocalMark=%" PRIu64 ",current=%" PRIu64, packetLocalMark, peerMark);
1534 context->SetReceiveWaterMarkErr(true);
1535 (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1536 return true;
1537 }
1538 if (syncType == SyncType::QUERY_SYNC_TYPE && (packetLocalMark > peerMark || packetDeletedMark > deletedMark)) {
1539 LOGI("[DataSync][DataRequestRecv] packetDeletedMark=%" PRIu64 ",deletedMark=%" PRIu64 ","
1540 "packetLocalMark=%" PRIu64 ",peerMark=%" PRIu64, packetDeletedMark, deletedMark, packetLocalMark,
1541 peerMark);
1542 context->SetReceiveWaterMarkErr(true);
1543 (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1544 return true;
1545 }
1546 return false;
1547 }
1548
CheckSchemaStrategy(SingleVerSyncTaskContext * context,const Message * message)1549 int SingleVerDataSync::CheckSchemaStrategy(SingleVerSyncTaskContext *context, const Message *message)
1550 {
1551 auto *packet = message->GetObject<DataRequestPacket>();
1552 if (packet == nullptr) {
1553 return -E_INVALID_ARGS;
1554 }
1555 if (metadata_->IsAbilitySyncFinish(deviceId_)) {
1556 return E_OK;
1557 }
1558 auto query = packet->GetQuery();
1559 std::pair<bool, bool> schemaSyncStatus = context->GetSchemaSyncStatus(query);
1560 if (!schemaSyncStatus.second) {
1561 LOGE("[DataSync][CheckSchemaStrategy] isSchemaSync=%d check failed", schemaSyncStatus.second);
1562 (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
1563 return -E_NEED_ABILITY_SYNC;
1564 }
1565 if (!schemaSyncStatus.first) {
1566 LOGE("[DataSync][CheckSchemaStrategy] Strategy permitSync=%d check failed", schemaSyncStatus.first);
1567 (void)SendDataAck(context, message, -E_SCHEMA_MISMATCH, 0);
1568 return -E_SCHEMA_MISMATCH;
1569 }
1570 return E_OK;
1571 }
1572
RemotePushFinished(int sendCode,int inMode,uint32_t msgSessionId,uint32_t contextSessionId)1573 void SingleVerDataSync::RemotePushFinished(int sendCode, int inMode, uint32_t msgSessionId, uint32_t contextSessionId)
1574 {
1575 int mode = SyncOperation::TransferSyncMode(inMode);
1576 if ((mode != SyncModeType::PUSH) && (mode != SyncModeType::PUSH_AND_PULL) && (mode != SyncModeType::QUERY_PUSH) &&
1577 (mode != SyncModeType::QUERY_PUSH_PULL)) {
1578 return;
1579 }
1580
1581 if ((sendCode == E_OK) && (msgSessionId != 0) && (msgSessionId != contextSessionId)) {
1582 storage_->NotifyRemotePushFinished(deviceId_);
1583 }
1584 }
1585
SetAckPacket(DataAckPacket & ackPacket,SingleVerSyncTaskContext * context,const DataRequestPacket * packet,int32_t recvCode,WaterMark maxSendDataTime)1586 void SingleVerDataSync::SetAckPacket(DataAckPacket &ackPacket, SingleVerSyncTaskContext *context,
1587 const DataRequestPacket *packet, int32_t recvCode, WaterMark maxSendDataTime)
1588 {
1589 SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
1590 WaterMark localMark = 0;
1591 GetLocalWaterMark(curType, packet->GetQueryId(), context, localMark);
1592 ackPacket.SetRecvCode(recvCode);
1593 // send ack packet
1594 if ((recvCode == E_OK) && (maxSendDataTime != 0)) {
1595 ackPacket.SetData(maxSendDataTime + 1); // + 1 to next start
1596 } else if (recvCode != WATER_MARK_INVALID) {
1597 WaterMark mark = 0;
1598 GetPeerWaterMark(curType, packet->GetQueryId(), context->GetDeviceId(), mark);
1599 ackPacket.SetData(mark);
1600 }
1601 std::vector<uint64_t> reserved {localMark};
1602 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1603 uint64_t packetId = 0;
1604 if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1605 packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1606 }
1607 if (version > SOFTWARE_VERSION_RELEASE_2_0 && packetId > 0) {
1608 reserved.push_back(packetId);
1609 }
1610 // while recv is not E_OK, data is peerMark, reserve[2] is deletedPeerMark value
1611 if (curType == SyncType::QUERY_SYNC_TYPE && recvCode != WATER_MARK_INVALID) {
1612 WaterMark deletedPeerMark;
1613 GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedPeerMark);
1614 reserved.push_back(deletedPeerMark); // query sync mode, reserve[2] store deletedPeerMark value
1615 }
1616 ackPacket.SetReserved(reserved);
1617 ackPacket.SetVersion(version);
1618 }
1619
GetReSendData(SyncEntry & syncData,SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1620 int SingleVerDataSync::GetReSendData(SyncEntry &syncData, SingleVerSyncTaskContext *context,
1621 DataSyncReSendInfo reSendInfo)
1622 {
1623 int mode = SyncOperation::TransferSyncMode(context->GetMode());
1624 if (mode == SyncModeType::PULL) {
1625 return E_OK;
1626 }
1627 ContinueToken token = nullptr;
1628 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1629 size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
1630 DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
1631 DataSizeSpecInfo reSendDataSizeInfo = GetDataSizeSpecInfo(packetSize);
1632 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1633 int errCode;
1634 if (curType != SyncType::QUERY_SYNC_TYPE) {
1635 errCode = storage_->GetSyncData(reSendInfo.start, reSendInfo.end + 1, syncData.entries, token,
1636 reSendDataSizeInfo);
1637 } else {
1638 QuerySyncObject queryObj = context->GetQuery();
1639 errCode = storage_->GetSyncData(queryObj, SyncTimeRange { reSendInfo.start, reSendInfo.deleteDataStart,
1640 reSendInfo.end + 1, reSendInfo.deleteDataEnd + 1 }, reSendDataSizeInfo, token, syncData.entries);
1641 }
1642 if (token != nullptr) {
1643 storage_->ReleaseContinueToken(token);
1644 }
1645 if (errCode == -E_BUSY || errCode == -E_EKEYREVOKED) {
1646 context->SetTaskErrCode(errCode);
1647 return errCode;
1648 }
1649 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1650 return errCode;
1651 }
1652 SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(
1653 DBCommon::TransferHashString(GetLocalDeviceName()), syncData.entries);
1654
1655 int innerCode = InterceptData(syncData);
1656 if (innerCode != E_OK) {
1657 context->SetTaskErrCode(innerCode);
1658 return innerCode;
1659 }
1660
1661 bool needCompressOnSync = false;
1662 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1663 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1664 CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
1665 if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
1666 int compressCode = GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
1667 { remoteAlgo, version });
1668 if (compressCode != E_OK) {
1669 return compressCode;
1670 }
1671 }
1672 return errCode;
1673 }
1674
RemoveDeviceDataIfNeed(SingleVerSyncTaskContext * context)1675 int SingleVerDataSync::RemoveDeviceDataIfNeed(SingleVerSyncTaskContext *context)
1676 {
1677 if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_3_0) {
1678 return E_OK;
1679 }
1680 uint64_t clearRemoteDataMark = 0;
1681 std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
1682 metadata_->GetRemoveDataMark(context->GetDeviceId(), clearRemoteDataMark);
1683 if (clearRemoteDataMark == 0) {
1684 return E_OK;
1685 }
1686 int errCode = E_OK;
1687 if (context->IsNeedClearRemoteStaleData() && clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1688 errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
1689 if (errCode != E_OK) {
1690 LOGE("clear remote %s data failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1691 return errCode;
1692 }
1693 }
1694 if (clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1695 errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
1696 if (errCode != E_OK) {
1697 LOGE("set %s removeDataWaterMark to false failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1698 return errCode;
1699 }
1700 }
1701 return E_OK;
1702 }
1703
UpdateMtuSize()1704 void SingleVerDataSync::UpdateMtuSize()
1705 {
1706 mtuSize_ = communicateHandle_->GetCommunicatorMtuSize(deviceId_) * 9 / 10; // get the 9/10 of the size
1707 }
1708
FillRequestReSendPacket(const SingleVerSyncTaskContext * context,DataRequestPacket * packet,DataSyncReSendInfo reSendInfo,SyncEntry & syncData,int sendCode)1709 void SingleVerDataSync::FillRequestReSendPacket(const SingleVerSyncTaskContext *context, DataRequestPacket *packet,
1710 DataSyncReSendInfo reSendInfo, SyncEntry &syncData, int sendCode)
1711 {
1712 SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
1713 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1714 WaterMark peerMark = 0;
1715 GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(),
1716 peerMark);
1717 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1718 // transfer reSend mode, RESPONSE_PULL transfer to push or query push
1719 // PUSH_AND_PULL mode which sequenceId lager than first transfer to push or query push
1720 int reSendMode = SingleVerDataSyncUtils::GetReSendMode(context->GetMode(), reSendInfo.sequenceId, curType);
1721 if (GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) ||
1722 SyncOperation::TransferSyncMode(context->GetMode()) == SyncModeType::PULL) {
1723 LOGI("[DataSync][ReSend] set lastid,label=%s,dev=%s", label_.c_str(), STR_MASK(GetDeviceId()));
1724 packet->SetLastSequence();
1725 }
1726 if (sendCode == E_OK && GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) &&
1727 context->GetMode() == SyncModeType::RESPONSE_PULL) {
1728 sendCode = SEND_FINISHED;
1729 }
1730 packet->SetData(syncData.entries);
1731 packet->SetCompressData(syncData.compressedEntries);
1732 packet->SetBasicInfo(sendCode, version, reSendMode);
1733 packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
1734 packet->SetWaterMark(reSendInfo.start, peerMark, reSendInfo.deleteDataStart);
1735 if (SyncOperation::TransferSyncMode(reSendMode) != SyncModeType::PUSH || context->IsQuerySync()) {
1736 packet->SetEndWaterMark(context->GetEndMark());
1737 packet->SetQuery(context->GetQuery());
1738 }
1739 packet->SetQueryId(context->GetQuerySyncId());
1740 if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1741 std::vector<uint64_t> reserved {reSendInfo.packetId};
1742 packet->SetReserved(reserved);
1743 }
1744 if (reSendMode == SyncModeType::PULL || reSendMode == SyncModeType::QUERY_PULL) {
1745 // resend pull packet dont set compress type
1746 return;
1747 }
1748 bool needCompressOnSync = false;
1749 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1750 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1751 CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
1752 if (needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
1753 packet->SetCompressDataMark();
1754 packet->SetCompressAlgo(curAlgo);
1755 }
1756 }
1757
GetDataSizeSpecInfo(size_t packetSize)1758 DataSizeSpecInfo SingleVerDataSync::GetDataSizeSpecInfo(size_t packetSize)
1759 {
1760 bool needCompressOnSync = false;
1761 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1762 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1763 uint32_t blockSize = std::min(static_cast<uint32_t>(DBConstant::MAX_SYNC_BLOCK_SIZE),
1764 mtuSize_ * 100 / compressionRate); // compressionRate max is 100
1765 return {blockSize, packetSize};
1766 }
1767
InterceptData(SyncEntry & syncEntry)1768 int SingleVerDataSync::InterceptData(SyncEntry &syncEntry)
1769 {
1770 if (storage_ == nullptr) {
1771 LOGE("Invalid DB. Can not intercept data.");
1772 return -E_INVALID_DB;
1773 }
1774
1775 // GetLocalDeviceName get local device ID.
1776 // GetDeviceId get remote device ID.
1777 // If intercept data fail, entries will be released.
1778 return storage_->InterceptData(syncEntry.entries, GetLocalDeviceName(), GetDeviceId(), true);
1779 }
1780
ControlCmdStart(SingleVerSyncTaskContext * context)1781 int SingleVerDataSync::ControlCmdStart(SingleVerSyncTaskContext *context)
1782 {
1783 if (context == nullptr) {
1784 return -E_INVALID_ARGS;
1785 }
1786 std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1787 if (subManager == nullptr) {
1788 return -E_INVALID_ARGS;
1789 }
1790 int errCode = ControlCmdStartCheck(context);
1791 if (errCode != E_OK) {
1792 return errCode;
1793 }
1794 ControlRequestPacket* packet = new (std::nothrow) SubscribeRequest();
1795 if (packet == nullptr) {
1796 LOGE("[DataSync][ControlCmdStart] new SubscribeRequest error");
1797 return -E_OUT_OF_MEMORY;
1798 }
1799 if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1800 errCode = subManager->ReserveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1801 if (errCode != E_OK) {
1802 LOGE("[DataSync][ControlCmdStart] reserve local subscribe query failed,err=%d", errCode);
1803 delete packet;
1804 packet = nullptr;
1805 return errCode;
1806 }
1807 }
1808 SingleVerDataSyncUtils::FillControlRequestPacket(packet, context);
1809 errCode = SendControlPacket(packet, context);
1810 if (errCode != E_OK && context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1811 subManager->DeleteLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1812 }
1813 return errCode;
1814 }
1815
ControlCmdRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1816 int SingleVerDataSync::ControlCmdRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1817 {
1818 const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1819 if (packet == nullptr) {
1820 return -E_INVALID_ARGS;
1821 }
1822 LOGI("[SingleVerDataSync] recv control cmd message,label=%s,dev=%s,controlType=%u", label_.c_str(),
1823 STR_MASK(GetDeviceId()), packet->GetcontrolCmdType());
1824 int errCode = ControlCmdRequestRecvPre(context, message);
1825 if (errCode != E_OK) {
1826 return errCode;
1827 }
1828 if (packet->GetcontrolCmdType() == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1829 errCode = SubscribeRequestRecv(context, message);
1830 } else if (packet->GetcontrolCmdType() == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1831 errCode = UnsubscribeRequestRecv(context, message);
1832 }
1833 return errCode;
1834 }
1835
ControlCmdAckRecv(SingleVerSyncTaskContext * context,const Message * message)1836 int SingleVerDataSync::ControlCmdAckRecv(SingleVerSyncTaskContext *context, const Message *message)
1837 {
1838 std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1839 if (subManager == nullptr) {
1840 return -E_INVALID_ARGS;
1841 }
1842 int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1843 if (errCode != E_OK) {
1844 SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
1845 return errCode;
1846 }
1847 const ControlAckPacket *packet = message->GetObject<ControlAckPacket>();
1848 if (packet == nullptr) {
1849 return -E_INVALID_ARGS;
1850 }
1851 int32_t recvCode = packet->GetRecvCode();
1852 uint32_t cmdType = packet->GetcontrolCmdType();
1853 if (recvCode != E_OK) {
1854 LOGE("[DataSync][AckRecv] control sync abort,recvCode=%d,label=%s,dev=%s,type=%u", recvCode, label_.c_str(),
1855 STR_MASK(GetDeviceId()), cmdType);
1856 // for unsubscribe no need to do something
1857 SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
1858 return recvCode;
1859 }
1860 if (cmdType == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1861 errCode = subManager->ActiveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1862 } else if (cmdType == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1863 subManager->RemoveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1864 }
1865 if (errCode != E_OK) {
1866 LOGE("[DataSync] ack handle failed,label =%s,dev=%s,type=%u", label_.c_str(), STR_MASK(GetDeviceId()), cmdType);
1867 return errCode;
1868 }
1869 return -E_NO_DATA_SEND; // means control msg send finished
1870 }
1871
ControlCmdStartCheck(SingleVerSyncTaskContext * context)1872 int SingleVerDataSync::ControlCmdStartCheck(SingleVerSyncTaskContext *context)
1873 {
1874 if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) &&
1875 (context->GetMode() != SyncModeType::UNSUBSCRIBE_QUERY)) {
1876 LOGE("[ControlCmdStartCheck] not support controlCmd");
1877 return -E_INVALID_ARGS;
1878 }
1879 if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY &&
1880 context->GetQuery().HasInKeys() &&
1881 context->IsNotSupportAbility(SyncConfig::INKEYS_QUERY)) {
1882 return -E_NOT_SUPPORT;
1883 }
1884 if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) || context->GetReceivcPermitCheck()) {
1885 return E_OK;
1886 }
1887 bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_, storage_);
1888 if (permitReceive) {
1889 context->SetReceivcPermitCheck(true);
1890 } else {
1891 return -E_SECURITY_OPTION_CHECK_ERROR;
1892 }
1893 return E_OK;
1894 }
1895
SendControlPacket(ControlRequestPacket * packet,SingleVerSyncTaskContext * context)1896 int SingleVerDataSync::SendControlPacket(ControlRequestPacket *packet, SingleVerSyncTaskContext *context)
1897 {
1898 Message *message = new (std::nothrow) Message(CONTROL_SYNC_MESSAGE);
1899 if (message == nullptr) {
1900 LOGE("[DataSync][SendControlPacket] new message error");
1901 delete packet;
1902 packet = nullptr;
1903 return -E_OUT_OF_MEMORY;
1904 }
1905 uint32_t packetLen = packet->CalculateLen();
1906 int errCode = message->SetExternalObject(packet);
1907 if (errCode != E_OK) {
1908 delete packet;
1909 packet = nullptr;
1910 delete message;
1911 message = nullptr;
1912 LOGE("[DataSync][SendControlPacket] set external object failed errCode=%d", errCode);
1913 return errCode;
1914 }
1915 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1916 context->GetSequenceId(), context->GetRequestSessionId());
1917 CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1918 SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1919 };
1920 errCode = Send(context, message, handler, packetLen);
1921 if (errCode != E_OK) {
1922 delete message;
1923 message = nullptr;
1924 }
1925 return errCode;
1926 }
1927
SendControlAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,uint32_t controlCmdType,const CommErrHandler & handler)1928 int SingleVerDataSync::SendControlAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1929 uint32_t controlCmdType, const CommErrHandler &handler)
1930 {
1931 Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1932 if (ackMessage == nullptr) {
1933 LOGE("[DataSync][SendControlAck] new message error");
1934 return -E_OUT_OF_MEMORY;
1935 }
1936 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1937 ControlAckPacket ack;
1938 ack.SetPacketHead(recvCode, version, static_cast<int32_t>(controlCmdType), 0);
1939 int errCode = ackMessage->SetCopiedObject(ack);
1940 if (errCode != E_OK) {
1941 delete ackMessage;
1942 ackMessage = nullptr;
1943 LOGE("[DataSync][SendControlAck] set copied object failed, errcode=%d", errCode);
1944 return errCode;
1945 }
1946 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1947 message->GetSequenceId(), message->GetSessionId());
1948 errCode = Send(context, ackMessage, handler, 0);
1949 if (errCode != E_OK) {
1950 delete ackMessage;
1951 ackMessage = nullptr;
1952 }
1953 return errCode;
1954 }
1955
ControlCmdRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)1956 int SingleVerDataSync::ControlCmdRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
1957 {
1958 if (context == nullptr || message == nullptr) {
1959 return -E_INVALID_ARGS;
1960 }
1961 const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1962 if (packet == nullptr) {
1963 return -E_INVALID_ARGS;
1964 }
1965 uint32_t controlCmdType = packet->GetcontrolCmdType();
1966 if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
1967 return DoAbilitySyncIfNeed(context, message, true);
1968 }
1969 if (controlCmdType >= ControlCmdType::INVALID_CONTROL_CMD) {
1970 SendControlAck(context, message, -E_NOT_SUPPORT, controlCmdType);
1971 return -E_WAIT_NEXT_MESSAGE;
1972 }
1973 return E_OK;
1974 }
1975
SubscribeRequestRecvPre(SingleVerSyncTaskContext * context,const SubscribeRequest * packet,const Message * message)1976 int SingleVerDataSync::SubscribeRequestRecvPre(SingleVerSyncTaskContext *context, const SubscribeRequest *packet,
1977 const Message *message)
1978 {
1979 uint32_t controlCmdType = packet->GetcontrolCmdType();
1980 if (controlCmdType != ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1981 return E_OK;
1982 }
1983 QuerySyncObject syncQuery = packet->GetQuery();
1984 int errCode;
1985 if (!packet->IsAutoSubscribe()) {
1986 errCode = storage_->CheckAndInitQueryCondition(syncQuery);
1987 if (errCode != E_OK) {
1988 LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
1989 SendControlAck(context, message, errCode, controlCmdType);
1990 return -E_WAIT_NEXT_MESSAGE;
1991 }
1992 }
1993 int mode = SingleVerDataSyncUtils::GetModeByControlCmdType(
1994 static_cast<ControlCmdType>(packet->GetcontrolCmdType()));
1995 if (mode >= SyncModeType::INVALID_MODE) {
1996 LOGE("[SingleVerDataSync] invalid mode");
1997 SendControlAck(context, message, -E_INVALID_ARGS, controlCmdType);
1998 return -E_WAIT_NEXT_MESSAGE;
1999 }
2000 errCode = CheckPermitSendData(mode, context);
2001 if (errCode != E_OK) {
2002 LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
2003 SendControlAck(context, message, errCode, controlCmdType);
2004 }
2005 return errCode;
2006 }
2007
SubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)2008 int SingleVerDataSync::SubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
2009 {
2010 const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
2011 if (packet == nullptr) {
2012 return -E_INVALID_ARGS;
2013 }
2014 int errCode = SubscribeRequestRecvPre(context, packet, message);
2015 if (errCode != E_OK) {
2016 return errCode;
2017 }
2018 uint32_t controlCmdType = packet->GetcontrolCmdType();
2019 std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
2020 if (subscribeManager == nullptr) {
2021 LOGE("[SingleVerDataSync] subscribeManager check failed");
2022 SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
2023 return -E_INVALID_ARGS;
2024 }
2025 errCode = storage_->AddSubscribe(packet->GetQuery().GetIdentify(), packet->GetQuery(), packet->IsAutoSubscribe());
2026 if (errCode != E_OK) {
2027 LOGE("[SingleVerDataSync] add trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2028 STR_MASK(GetDeviceId()));
2029 SendControlAck(context, message, errCode, controlCmdType);
2030 return errCode;
2031 }
2032 errCode = subscribeManager->ReserveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2033 if (errCode != E_OK) {
2034 LOGE("[SingleVerDataSync] add remote subscribe query failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2035 STR_MASK(GetDeviceId()));
2036 RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
2037 SendControlAck(context, message, errCode, controlCmdType);
2038 return errCode;
2039 }
2040 errCode = SendControlAck(context, message, E_OK, controlCmdType);
2041 if (errCode != E_OK) {
2042 subscribeManager->DeleteRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2043 RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
2044 LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2045 STR_MASK(GetDeviceId()));
2046 return errCode;
2047 }
2048 subscribeManager->ActiveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2049 DBInfo dbInfo;
2050 storage_->GetDBInfo(dbInfo);
2051 RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, context->GetDeviceId(), packet->GetQuery());
2052 return errCode;
2053 }
2054
UnsubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)2055 int SingleVerDataSync::UnsubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
2056 {
2057 const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
2058 if (packet == nullptr) {
2059 return -E_INVALID_ARGS;
2060 }
2061 uint32_t controlCmdType = packet->GetcontrolCmdType();
2062 std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
2063 if (subscribeManager == nullptr) {
2064 LOGE("[SingleVerDataSync] subscribeManager check failed");
2065 SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
2066 return -E_INVALID_ARGS;
2067 }
2068 int errCode;
2069 std::lock_guard<std::mutex> autoLock(unsubscribeLock_);
2070 if (subscribeManager->IsLastRemoteContainSubscribe(context->GetDeviceId(), packet->GetQuery().GetIdentify())) {
2071 errCode = storage_->RemoveSubscribe(packet->GetQuery().GetIdentify());
2072 if (errCode != E_OK) {
2073 LOGE("[SingleVerDataSync] remove trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2074 STR_MASK(GetDeviceId()));
2075 SendControlAck(context, message, errCode, controlCmdType);
2076 return errCode;
2077 }
2078 }
2079 errCode = SendControlAck(context, message, E_OK, controlCmdType);
2080 if (errCode != E_OK) {
2081 LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2082 STR_MASK(GetDeviceId()));
2083 return errCode;
2084 }
2085 subscribeManager->RemoveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2086 DBInfo dbInfo;
2087 storage_->GetDBInfo(dbInfo);
2088 RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, context->GetDeviceId(), packet->GetQuery());
2089 metadata_->RemoveQueryFromRecordSet(context->GetDeviceId(), packet->GetQuery().GetIdentify());
2090 return errCode;
2091 }
2092
PutDataMsg(Message * message)2093 void SingleVerDataSync::PutDataMsg(Message *message)
2094 {
2095 return msgSchedule_.PutMsg(message);
2096 }
2097
MoveNextDataMsg(SingleVerSyncTaskContext * context,bool & isNeedHandle,bool & isNeedContinue)2098 Message *SingleVerDataSync::MoveNextDataMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle,
2099 bool &isNeedContinue)
2100 {
2101 return msgSchedule_.MoveNextMsg(context, isNeedHandle, isNeedContinue);
2102 }
2103
IsNeedReloadQueue()2104 bool SingleVerDataSync::IsNeedReloadQueue()
2105 {
2106 return msgSchedule_.IsNeedReloadQueue();
2107 }
2108
ScheduleInfoHandle(bool isNeedHandleStatus,bool isNeedClearMap,const Message * message)2109 void SingleVerDataSync::ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap, const Message *message)
2110 {
2111 msgSchedule_.ScheduleInfoHandle(isNeedHandleStatus, isNeedClearMap, message);
2112 }
2113
ClearDataMsg()2114 void SingleVerDataSync::ClearDataMsg()
2115 {
2116 msgSchedule_.ClearMsg();
2117 }
2118
QuerySyncCheck(SingleVerSyncTaskContext * context)2119 int SingleVerDataSync::QuerySyncCheck(SingleVerSyncTaskContext *context)
2120 {
2121 if (context == nullptr) {
2122 return -E_INVALID_ARGS;
2123 }
2124 bool isCheckStatus = false;
2125 int errCode = SingleVerDataSyncUtils::QuerySyncCheck(context, isCheckStatus);
2126 if (errCode != E_OK) {
2127 return errCode;
2128 }
2129 if (!isCheckStatus) {
2130 context->SetTaskErrCode(-E_NOT_SUPPORT);
2131 return -E_NOT_SUPPORT;
2132 }
2133 return E_OK;
2134 }
2135
RemoveSubscribeIfNeed(const std::string & queryId,const std::shared_ptr<SubscribeManager> & subscribeManager)2136 void SingleVerDataSync::RemoveSubscribeIfNeed(const std::string &queryId,
2137 const std::shared_ptr<SubscribeManager> &subscribeManager)
2138 {
2139 if (!subscribeManager->IsQueryExistSubscribe(queryId)) {
2140 storage_->RemoveSubscribe(queryId);
2141 }
2142 }
2143 } // namespace DistributedDB
2144