1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_data_sync.h"
17 
18 #include "db_common.h"
19 #include "db_types.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "intercepted_data_impl.h"
22 #include "log_print.h"
23 #include "message_transform.h"
24 #include "performance_analysis.h"
25 #include "single_ver_data_sync_utils.h"
26 #include "single_ver_sync_state_machine.h"
27 #include "subscribe_manager.h"
28 #ifdef RELATIONAL_STORE
29 #include "relational_db_sync_interface.h"
30 #endif
31 
32 namespace DistributedDB {
SingleVerDataSync()33 SingleVerDataSync::SingleVerDataSync()
34     : mtuSize_(0),
35       storage_(nullptr),
36       communicateHandle_(nullptr),
37       metadata_(nullptr)
38 {
39 }
40 
~SingleVerDataSync()41 SingleVerDataSync::~SingleVerDataSync()
42 {
43     storage_ = nullptr;
44     communicateHandle_ = nullptr;
45     metadata_ = nullptr;
46 }
47 
Initialize(ISyncInterface * inStorage,ICommunicator * inCommunicateHandle,const std::shared_ptr<Metadata> & inMetadata,const std::string & deviceId)48 int SingleVerDataSync::Initialize(ISyncInterface *inStorage, ICommunicator *inCommunicateHandle,
49     const std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId)
50 {
51     if ((inStorage == nullptr) || (inCommunicateHandle == nullptr) || (inMetadata == nullptr)) {
52         return -E_INVALID_ARGS;
53     }
54     storage_ = static_cast<SyncGenericInterface *>(inStorage);
55     communicateHandle_ = inCommunicateHandle;
56     metadata_ = inMetadata;
57     mtuSize_ = DBConstant::MIN_MTU_SIZE; // default size is 1K, it will update when need sync data.
58     std::vector<uint8_t> label = inStorage->GetIdentifier();
59     label.resize(3); // only show 3 Bytes enough
60     label_ = DBCommon::VectorToHexString(label);
61     deviceId_ = deviceId;
62     msgSchedule_.Initialize(label_, deviceId_);
63     return E_OK;
64 }
65 
SyncStart(int mode,SingleVerSyncTaskContext * context)66 int SingleVerDataSync::SyncStart(int mode, SingleVerSyncTaskContext *context)
67 {
68     std::lock_guard<std::mutex> lock(lock_);
69     int errCode = CheckPermitSendData(mode, context);
70     if (errCode != E_OK) {
71         return errCode;
72     }
73     if (sessionId_ != 0) { // auto sync timeout resend
74         return ReSendData(context);
75     }
76     ResetSyncStatus(mode, context);
77     LOGI("[DataSync] SendStart,mode=%d,label=%s,device=%s", mode_, label_.c_str(), STR_MASK(deviceId_));
78     int tmpMode = SyncOperation::TransferSyncMode(mode);
79     if (tmpMode == SyncModeType::PUSH) {
80         errCode = PushStart(context);
81     } else if (tmpMode == SyncModeType::PUSH_AND_PULL) {
82         errCode = PushPullStart(context);
83     } else if (tmpMode == SyncModeType::PULL) {
84         errCode = PullRequestStart(context);
85     } else {
86         errCode = PullResponseStart(context);
87     }
88     if (context->IsSkipTimeoutError(errCode)) {
89         // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
90         // just return to avoid higher pressure for send.
91         return E_OK;
92     }
93     if (errCode != E_OK) {
94         LOGE("[DataSync] SendStart errCode=%d", errCode);
95         return errCode;
96     }
97     if (tmpMode == SyncModeType::PUSH_AND_PULL && context->GetTaskErrCode() == -E_EKEYREVOKED) {
98         LOGE("wait for recv finished for push and pull mode");
99         return -E_EKEYREVOKED;
100     }
101     return InnerSyncStart(context);
102 }
103 
InnerSyncStart(SingleVerSyncTaskContext * context)104 int SingleVerDataSync::InnerSyncStart(SingleVerSyncTaskContext *context)
105 {
106     int errCode = CheckPermitSendData(mode_, context);
107     if (errCode != E_OK) {
108         return errCode;
109     }
110     while (true) {
111         if (windowSize_ <= 0 || isAllDataHasSent_) {
112             LOGD("[DataSync] InnerDataSync winSize=%d,isAllSent=%d,label=%s,device=%s", windowSize_, isAllDataHasSent_,
113                 label_.c_str(), STR_MASK(deviceId_));
114             return E_OK;
115         }
116         int mode = SyncOperation::TransferSyncMode(mode_);
117         if (mode == SyncModeType::PULL) {
118             LOGE("[DataSync] unexpected error");
119             return -E_INVALID_ARGS;
120         }
121         context->IncSequenceId();
122         if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
123             errCode = PushStart(context);
124         } else {
125             errCode = PullResponseStart(context);
126         }
127         if ((mode == SyncModeType::PUSH_AND_PULL) && errCode == -E_EKEYREVOKED) {
128             LOGE("[DataSync] wait for recv finished,label=%s,device=%s", label_.c_str(), STR_MASK(deviceId_));
129             isAllDataHasSent_ = true;
130             return -E_EKEYREVOKED;
131         }
132         if (context->IsSkipTimeoutError(errCode)) {
133             // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
134             // just return to avoid higher pressure for send.
135             return E_OK;
136         }
137         if (errCode != E_OK) {
138             LOGE("[DataSync] InnerSend errCode=%d", errCode);
139             return errCode;
140         }
141     }
142     return E_OK;
143 }
144 
InnerClearSyncStatus()145 void SingleVerDataSync::InnerClearSyncStatus()
146 {
147     sessionId_ = 0;
148     reSendMap_.clear();
149     windowSize_ = 0;
150     maxSequenceIdHasSent_ = 0;
151     isAllDataHasSent_ = false;
152 }
153 
TryContinueSync(SingleVerSyncTaskContext * context,const Message * message)154 int SingleVerDataSync::TryContinueSync(SingleVerSyncTaskContext *context, const Message *message)
155 {
156     if (message == nullptr) {
157         LOGE("[DataSync] AckRecv message nullptr");
158         return -E_INVALID_ARGS;
159     }
160     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
161     if (packet == nullptr) {
162         return -E_INVALID_ARGS;
163     }
164     uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
165     uint32_t sessionId = message->GetSessionId();
166     uint32_t sequenceId = message->GetSequenceId();
167 
168     std::lock_guard<std::mutex> lock(lock_);
169     LOGI("[DataSync] recv ack seqId=%" PRIu32 ",packetId=%" PRIu64 ",winSize=%d,label=%s,dev=%s", sequenceId, packetId,
170         windowSize_, label_.c_str(), STR_MASK(deviceId_));
171     if (sessionId != sessionId_) {
172         LOGI("[DataSync] ignore ack,sessionId is different");
173         return E_OK;
174     }
175     Timestamp lastQueryTime = 0;
176     if (reSendMap_.count(sequenceId) != 0) {
177         lastQueryTime = reSendMap_[sequenceId].end;
178         reSendMap_.erase(sequenceId);
179         windowSize_++;
180     } else {
181         LOGI("[DataSync] ack seqId not in map");
182         return E_OK;
183     }
184     if (context->IsQuerySync() && storage_->GetInterfaceType() == ISyncInterface::SYNC_RELATION) {
185         Timestamp dbLastQueryTime = 0;
186         int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), dbLastQueryTime);
187         if (errCode == E_OK && dbLastQueryTime < lastQueryTime) {
188             errCode = metadata_->SetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), lastQueryTime);
189         }
190         if (errCode != E_OK) {
191             return errCode;
192         }
193     }
194     if (!isAllDataHasSent_) {
195         return InnerSyncStart(context);
196     } else if (reSendMap_.empty()) {
197         context->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
198         InnerClearSyncStatus();
199         return -E_FINISHED;
200     }
201     return E_OK;
202 }
203 
ClearSyncStatus()204 void SingleVerDataSync::ClearSyncStatus()
205 {
206     std::lock_guard<std::mutex> lock(lock_);
207     InnerClearSyncStatus();
208 }
209 
ReSendData(SingleVerSyncTaskContext * context)210 int SingleVerDataSync::ReSendData(SingleVerSyncTaskContext *context)
211 {
212     if (reSendMap_.empty()) {
213         LOGI("[DataSync] ReSend map empty");
214         return -E_INTERNAL_ERROR;
215     }
216     uint32_t sequenceId = reSendMap_.begin()->first;
217     ReSendInfo reSendInfo = reSendMap_.begin()->second;
218     LOGI("[DataSync] ReSend mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",delStart=%" PRIu64 ",delEnd=%" PRIu64 ","
219         "seqId=%" PRIu32 ",packetId=%" PRIu64 ",windowsize=%d,label=%s,deviceId=%s", mode_, reSendInfo.start,
220         reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, sequenceId, reSendInfo.packetId,
221         windowSize_, label_.c_str(), STR_MASK(deviceId_));
222     DataSyncReSendInfo dataReSendInfo = {sessionId_, sequenceId, reSendInfo.start, reSendInfo.end,
223         reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, reSendInfo.packetId};
224     return ReSend(context, dataReSendInfo);
225 }
226 
GetLocalDeviceName()227 std::string SingleVerDataSync::GetLocalDeviceName()
228 {
229     std::string deviceInfo;
230     if (communicateHandle_ != nullptr) {
231         communicateHandle_->GetLocalIdentity(deviceInfo);
232     }
233     return deviceInfo;
234 }
235 
Send(SingleVerSyncTaskContext * context,const Message * message,const CommErrHandler & handler,uint32_t packetLen)236 int SingleVerDataSync::Send(SingleVerSyncTaskContext *context, const Message *message, const CommErrHandler &handler,
237     uint32_t packetLen)
238 {
239     bool startFeedDogRet = false;
240     if (packetLen > mtuSize_ && mtuSize_ > NOTIFY_MIN_MTU_SIZE) {
241         uint32_t time = static_cast<uint32_t>(static_cast<uint64_t>(packetLen) *
242             static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_); // no overflow
243         startFeedDogRet = context->StartFeedDogForSync(time, SyncDirectionFlag::SEND);
244     }
245     SendConfig sendConfig;
246     SetSendConfigParam(storage_->GetDbProperties(), context->GetDeviceId(), false, SEND_TIME_OUT, sendConfig);
247     int errCode = communicateHandle_->SendMessage(context->GetDeviceId(), message, sendConfig, handler);
248     if (errCode != E_OK) {
249         LOGE("[DataSync][Send] send message failed, errCode=%d", errCode);
250         if (startFeedDogRet) {
251             context->StopFeedDogForSync(SyncDirectionFlag::SEND);
252         }
253     }
254     return errCode;
255 }
256 
GetDataWithPerformanceRecord(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)257 int SingleVerDataSync::GetDataWithPerformanceRecord(SingleVerSyncTaskContext *context,
258     std::vector<SendDataItem> &outData, size_t packetSize)
259 {
260     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
261     if (performance != nullptr) {
262         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_READ_DATA);
263     }
264     // start a watch dog before get data
265     // it will send ack util get data finished
266     context->StartFeedDogForGetData(context->GetResponseSessionId());
267     int errCode = GetData(context, packetSize, outData);
268     context->StopFeedDogForGetData();
269     if (performance != nullptr) {
270         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_READ_DATA);
271     }
272     if (!outData.empty()) {
273         SingleVerDataSyncUtils::RecordClientId(*context, *storage_, metadata_);
274     }
275     return errCode;
276 }
277 
GetData(SingleVerSyncTaskContext * context,size_t packetSize,std::vector<SendDataItem> & outData)278 int SingleVerDataSync::GetData(SingleVerSyncTaskContext *context, size_t packetSize, std::vector<SendDataItem> &outData)
279 {
280     int errCode;
281     UpdateMtuSize();
282     if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
283         context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
284         LOGI("[DataSync][GetData] resend data");
285         errCode = GetUnsyncData(context, outData, packetSize);
286     } else {
287         ContinueToken token;
288         context->GetContinueToken(token);
289         if (token == nullptr) {
290             errCode = GetUnsyncData(context, outData, packetSize);
291         } else {
292             LOGD("[DataSync][GetData] get data from token");
293             // if there is data to send, read out data, and update local watermark, send data
294             errCode = GetNextUnsyncData(context, outData, packetSize);
295         }
296     }
297     if (errCode == -E_UNFINISHED) {
298         LOGD("[DataSync][GetData] not finished.");
299     }
300     if (SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
301         std::string localHashName = DBCommon::TransferHashString(GetLocalDeviceName());
302         SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(localHashName, outData);
303     }
304     return errCode;
305 }
306 
GetMatchData(SingleVerSyncTaskContext * context,SyncEntry & syncOutData)307 int SingleVerDataSync::GetMatchData(SingleVerSyncTaskContext *context, SyncEntry &syncOutData)
308 {
309     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
310     size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
311         DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
312     bool needCompressOnSync = false;
313     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
314     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
315     int errCode = GetDataWithPerformanceRecord(context, syncOutData.entries, packetSize);
316     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
317         context->SetTaskErrCode(errCode);
318         return errCode;
319     }
320 
321     int innerCode = InterceptData(syncOutData);
322     if (innerCode != E_OK) {
323         context->SetTaskErrCode(innerCode);
324         return innerCode;
325     }
326 
327     CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
328     if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
329         int compressCode = GenericSingleVerKvEntry::Compress(syncOutData.entries, syncOutData.compressedEntries,
330             { remoteAlgo, version });
331         if (compressCode != E_OK) {
332             return compressCode;
333         }
334     }
335     return errCode;
336 }
337 
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)338 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
339     size_t packetSize)
340 {
341     SyncTimeRange waterRange;
342     DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
343     WaterMark startMark = 0;
344     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
345     GetLocalWaterMark(curType, context->GetQuerySyncId(), context, startMark);
346     if ((waterRange.endTime == 0) || (startMark > waterRange.endTime)) {
347         return E_OK;
348     }
349     waterRange.beginTime = startMark;
350     if (curType == SyncType::QUERY_SYNC_TYPE) {
351         WaterMark deletedStartMark = 0;
352         GetLocalDeleteSyncWaterMark(context, deletedStartMark);
353         Timestamp lastQueryTimestamp = 0;
354         int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(),
355             lastQueryTimestamp);
356         if (errCode != E_OK) {
357             return errCode;
358         }
359         waterRange.deleteBeginTime = deletedStartMark;
360         waterRange.lastQueryTime = lastQueryTimestamp;
361     }
362     return GetUnsyncData(context, outData, syncDataSizeInfo, waterRange);
363 }
364 
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,DataSizeSpecInfo syncDataSizeInfo,SyncTimeRange & waterMarkInfo)365 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
366     DataSizeSpecInfo syncDataSizeInfo, SyncTimeRange &waterMarkInfo)
367 {
368     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
369     ContinueToken token = nullptr;
370     context->GetContinueToken(token);
371     if (token != nullptr) {
372         storage_->ReleaseContinueToken(token);
373     }
374     int errCode;
375     if (curType != SyncType::QUERY_SYNC_TYPE) {
376         errCode = storage_->GetSyncData(waterMarkInfo.beginTime, waterMarkInfo.endTime, outData, token,
377             syncDataSizeInfo);
378     } else {
379         QuerySyncObject queryObj = context->GetQuery();
380         errCode = storage_->GetSyncData(queryObj, waterMarkInfo, syncDataSizeInfo, token, outData);
381     }
382     context->SetContinueToken(token);
383     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
384         LOGE("[DataSync][GetUnsyncData] get unsync data failed,errCode=%d", errCode);
385     }
386     return errCode;
387 }
388 
GetNextUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)389 int SingleVerDataSync::GetNextUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
390     size_t packetSize)
391 {
392     ContinueToken token;
393     context->GetContinueToken(token);
394     DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
395     int errCode = storage_->GetSyncDataNext(outData, token, syncDataSizeInfo);
396     context->SetContinueToken(token);
397     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
398         LOGE("[DataSync][GetNextUnsyncData] get next unsync data failed, errCode=%d", errCode);
399     }
400     return errCode;
401 }
402 
SaveData(const SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,SyncType curType,const QuerySyncObject & query)403 int SingleVerDataSync::SaveData(const SingleVerSyncTaskContext *context, const std::vector<SendDataItem> &inData,
404     SyncType curType, const QuerySyncObject &query)
405 {
406     if (inData.empty()) {
407         return E_OK;
408     }
409     SingleVerDataSyncUtils::RecordClientId(*context, *storage_, metadata_);
410     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
411     if (performance != nullptr) {
412         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SAVE_DATA);
413     }
414     const auto localDeviceName = GetLocalDeviceName();
415     const std::string localHashName = DBCommon::TransferHashString(localDeviceName);
416     SingleVerDataSyncUtils::TransSendDataItemToLocal(context, localHashName, inData);
417     std::vector<SendDataItem> copyData = inData;
418     int errCode = storage_->InterceptData(copyData, GetDeviceId(), localDeviceName, false);
419     if (errCode != E_OK) {
420         LOGE("[DataSync][SaveData] intercept data failed, errCode=%d", errCode);
421         return errCode;
422     }
423     // query only support prefix key and don't have query in packet in 104 version
424     errCode = storage_->PutSyncDataWithQuery(query, copyData, context->GetDeviceId());
425     if (performance != nullptr) {
426         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SAVE_DATA);
427     }
428     if (errCode != E_OK) {
429         LOGE("[DataSync][SaveData] save sync data failed, errCode=%d", errCode);
430     }
431     return errCode;
432 }
433 
ResetSyncStatus(int inMode,SingleVerSyncTaskContext * context)434 void SingleVerDataSync::ResetSyncStatus(int inMode, SingleVerSyncTaskContext *context)
435 {
436     mode_ = inMode;
437     maxSequenceIdHasSent_ = 0;
438     isAllDataHasSent_ = false;
439     context->ReSetSequenceId();
440     reSendMap_.clear();
441     if (context->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
442         windowSize_ = LOW_VERSION_WINDOW_SIZE;
443     } else {
444         windowSize_ = HIGH_VERSION_WINDOW_SIZE;
445     }
446     int mode = SyncOperation::TransferSyncMode(inMode);
447     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::PULL) {
448         sessionId_ = context->GetRequestSessionId();
449     } else {
450         sessionId_ = context->GetResponseSessionId();
451     }
452 }
453 
GetSyncDataTimeRange(SyncType syncType,SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,UpdateWaterMark & isUpdate)454 SyncTimeRange SingleVerDataSync::GetSyncDataTimeRange(SyncType syncType, SingleVerSyncTaskContext *context,
455     const std::vector<SendDataItem> &inData, UpdateWaterMark &isUpdate)
456 {
457     WaterMark localMark = 0;
458     WaterMark deleteMark = 0;
459     GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
460     GetLocalDeleteSyncWaterMark(context, deleteMark);
461     return SingleVerDataSyncUtils::GetSyncDataTimeRange(syncType, localMark, deleteMark, inData, isUpdate);
462 }
463 
SaveLocalWaterMark(SyncType syncType,const SingleVerSyncTaskContext * context,SyncTimeRange dataTimeRange,bool isCheckBeforUpdate) const464 int SingleVerDataSync::SaveLocalWaterMark(SyncType syncType, const SingleVerSyncTaskContext *context,
465     SyncTimeRange dataTimeRange, bool isCheckBeforUpdate) const
466 {
467     WaterMark localMark = 0;
468     int errCode = E_OK;
469     const std::string &deviceId = context->GetDeviceId();
470     std::string queryId = context->GetQuerySyncId();
471     if (syncType != SyncType::QUERY_SYNC_TYPE) {
472         if (isCheckBeforUpdate) {
473             GetLocalWaterMark(syncType, queryId, context, localMark);
474             if (localMark >= dataTimeRange.endTime) {
475                 return E_OK;
476             }
477         }
478         errCode = metadata_->SaveLocalWaterMark(deviceId, dataTimeRange.endTime);
479     } else {
480         bool isNeedUpdateMark = true;
481         bool isNeedUpdateDeleteMark = true;
482         if (isCheckBeforUpdate) {
483             WaterMark deleteDataWaterMark = 0;
484             GetLocalWaterMark(syncType, queryId, context, localMark);
485             GetLocalDeleteSyncWaterMark(context, deleteDataWaterMark);
486             if (localMark >= dataTimeRange.endTime) {
487                 isNeedUpdateMark = false;
488             }
489             if (deleteDataWaterMark >= dataTimeRange.deleteEndTime) {
490                 isNeedUpdateDeleteMark = false;
491             }
492         }
493         if (isNeedUpdateMark) {
494             LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), dataTimeRange.endTime);
495             errCode = metadata_->SetSendQueryWaterMark(queryId, deviceId, dataTimeRange.endTime);
496             if (errCode != E_OK) {
497                 LOGE("[DataSync][SaveLocalWaterMark] save query metadata watermark failed,errCode=%d", errCode);
498                 return errCode;
499             }
500         }
501         if (isNeedUpdateDeleteMark) {
502             LOGD("label=%s,dev=%s,deleteEndTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()),
503                 dataTimeRange.deleteEndTime);
504             errCode = metadata_->SetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), dataTimeRange.deleteEndTime);
505         }
506     }
507     if (errCode != E_OK) {
508         LOGE("[DataSync][SaveLocalWaterMark] save metadata local watermark failed,errCode=%d", errCode);
509     }
510     return errCode;
511 }
512 
GetPeerWaterMark(SyncType syncType,const std::string & queryIdentify,const DeviceID & deviceId,WaterMark & waterMark) const513 void SingleVerDataSync::GetPeerWaterMark(SyncType syncType, const std::string &queryIdentify,
514     const DeviceID &deviceId, WaterMark &waterMark) const
515 {
516     if (syncType != SyncType::QUERY_SYNC_TYPE) {
517         metadata_->GetPeerWaterMark(deviceId, waterMark);
518         return;
519     }
520     metadata_->GetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
521 }
522 
GetPeerDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)523 void SingleVerDataSync::GetPeerDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
524 {
525     metadata_->GetRecvDeleteSyncWaterMark(deviceId, waterMark);
526 }
527 
GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext * context,WaterMark & waterMark) const528 void SingleVerDataSync::GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext *context,
529     WaterMark &waterMark) const
530 {
531     metadata_->GetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), waterMark, context->IsAutoLiftWaterMark());
532 }
533 
GetLocalWaterMark(SyncType syncType,const std::string & queryIdentify,const SingleVerSyncTaskContext * context,WaterMark & waterMark) const534 void SingleVerDataSync::GetLocalWaterMark(SyncType syncType, const std::string &queryIdentify,
535     const SingleVerSyncTaskContext *context, WaterMark &waterMark) const
536 {
537     if (syncType != SyncType::QUERY_SYNC_TYPE) {
538         metadata_->GetLocalWaterMark(context->GetDeviceId(), waterMark);
539         return;
540     }
541     metadata_->GetSendQueryWaterMark(queryIdentify, context->GetDeviceId(),
542         waterMark, context->IsAutoLiftWaterMark());
543 }
544 
RemoveDeviceDataHandle(SingleVerSyncTaskContext * context,const Message * message,WaterMark maxSendDataTime)545 int SingleVerDataSync::RemoveDeviceDataHandle(SingleVerSyncTaskContext *context, const Message *message,
546     WaterMark maxSendDataTime)
547 {
548     bool isNeedClearRemoteData = false;
549     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
550     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
551         uint64_t clearDeviceDataMark = 0;
552         metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
553         isNeedClearRemoteData = (clearDeviceDataMark == REMOVE_DEVICE_DATA_MARK);
554     } else {
555         const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
556         if (packet == nullptr) {
557             LOGE("[RemoveDeviceDataHandle] get packet object failed");
558             return -E_INVALID_ARGS;
559         }
560         SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
561         WaterMark packetLocalMark = packet->GetLocalWaterMark();
562         WaterMark peerMark = 0;
563         GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(), peerMark);
564         isNeedClearRemoteData = ((packetLocalMark == 0) && (peerMark != 0));
565     }
566     if (!isNeedClearRemoteData) {
567         return E_OK;
568     }
569     int errCode = E_OK;
570     if (context->IsNeedClearRemoteStaleData()) {
571         // need to clear remote device history data
572         errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
573         if (errCode != E_OK) {
574             (void)SendDataAck(context, message, errCode, maxSendDataTime);
575             return errCode;
576         }
577         if (context->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_EARLIEST) {
578             // avoid repeat clear in ack
579             metadata_->SaveLocalWaterMark(context->GetDeviceId(), 0);
580         }
581     }
582     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
583         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
584         if (errCode != E_OK) {
585             (void)SendDataAck(context, message, errCode, maxSendDataTime);
586             return errCode;
587         }
588     }
589     return E_OK;
590 }
591 
DealRemoveDeviceDataByAck(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)592 int SingleVerDataSync::DealRemoveDeviceDataByAck(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
593     const std::vector<uint64_t> &reserved)
594 {
595     bool isNeedClearRemoteData = false;
596     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
597     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
598     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
599         uint64_t clearDeviceDataMark = 0;
600         metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
601         isNeedClearRemoteData = (clearDeviceDataMark != 0);
602     } else if (reserved.empty()) {
603         WaterMark localMark = 0;
604         GetLocalWaterMark(curType, context->GetQuery().GetIdentify(), context, localMark);
605         isNeedClearRemoteData = ((localMark != 0) && (ackWaterMark == 0));
606     } else {
607         WaterMark peerMark = 0;
608         GetPeerWaterMark(curType, context->GetQuerySyncId(),
609             context->GetDeviceId(), peerMark);
610         isNeedClearRemoteData = ((reserved[ACK_PACKET_RESERVED_INDEX_LOCAL_WATER_MARK] == 0) && (peerMark != 0));
611     }
612     if (!isNeedClearRemoteData) {
613         return E_OK;
614     }
615     // need to clear remote historydata
616     LOGI("[DataSync][WaterMarkException] AckRecv reserved not empty,rebuilted,clear historydata,label=%s,dev=%s",
617         label_.c_str(), STR_MASK(GetDeviceId()));
618     int errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
619     if (errCode != E_OK) {
620         return errCode;
621     }
622     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
623         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
624     }
625     return errCode;
626 }
627 
SetSessionEndTimestamp(Timestamp end)628 void SingleVerDataSync::SetSessionEndTimestamp(Timestamp end)
629 {
630     sessionEndTimestamp_ = end;
631 }
632 
GetSessionEndTimestamp() const633 Timestamp SingleVerDataSync::GetSessionEndTimestamp() const
634 {
635     return sessionEndTimestamp_;
636 }
637 
UpdateSendInfo(SyncTimeRange dataTimeRange,SingleVerSyncTaskContext * context)638 void SingleVerDataSync::UpdateSendInfo(SyncTimeRange dataTimeRange, SingleVerSyncTaskContext *context)
639 {
640     ReSendInfo reSendInfo;
641     reSendInfo.start = dataTimeRange.beginTime;
642     reSendInfo.end = dataTimeRange.endTime;
643     reSendInfo.deleteBeginTime = dataTimeRange.deleteBeginTime;
644     reSendInfo.deleteEndTime = dataTimeRange.deleteEndTime;
645     reSendInfo.packetId = context->GetPacketId();
646     maxSequenceIdHasSent_++;
647     reSendMap_[maxSequenceIdHasSent_] = reSendInfo;
648     windowSize_--;
649     ContinueToken token;
650     context->GetContinueToken(token);
651     if (token == nullptr) {
652         isAllDataHasSent_ = true;
653     }
654     LOGI("[DataSync] mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",deleteStart=%" PRIu64 ",deleteEnd=%" PRIu64 ","
655         "seqId=%" PRIu32 ",packetId=%" PRIu64 ",window_size=%d,isAllSend=%d,label=%s,device=%s", mode_,
656         reSendInfo.start, reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, maxSequenceIdHasSent_,
657         reSendInfo.packetId, windowSize_, isAllDataHasSent_, label_.c_str(), STR_MASK(deviceId_));
658 }
659 
FillDataRequestPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,SyncEntry & syncData,int sendCode,int mode)660 void SingleVerDataSync::FillDataRequestPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
661     SyncEntry &syncData, int sendCode, int mode)
662 {
663     SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
664     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
665     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
666     WaterMark localMark = 0;
667     WaterMark peerMark = 0;
668     WaterMark deleteMark = 0;
669     bool needCompressOnSync = false;
670     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
671     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
672     std::string id = context->GetQuerySyncId();
673     GetLocalWaterMark(curType, id, context, localMark);
674     GetPeerWaterMark(curType, id, context->GetDeviceId(), peerMark);
675     GetLocalDeleteSyncWaterMark(context, deleteMark);
676     if (((mode != SyncModeType::RESPONSE_PULL && sendCode == E_OK)) ||
677         (mode == SyncModeType::RESPONSE_PULL && sendCode == SEND_FINISHED)) {
678         packet->SetLastSequence();
679     }
680     int tmpMode = mode;
681     if (mode == SyncModeType::RESPONSE_PULL) {
682         tmpMode = (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH;
683     }
684     packet->SetData(syncData.entries);
685     packet->SetCompressData(syncData.compressedEntries);
686     packet->SetBasicInfo(sendCode, version, tmpMode);
687     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
688     packet->SetWaterMark(localMark, peerMark, deleteMark);
689     if (SyncOperation::TransferSyncMode(mode) == SyncModeType::PUSH_AND_PULL) {
690         packet->SetEndWaterMark(context->GetEndMark());
691         packet->SetSessionId(context->GetRequestSessionId());
692     }
693     packet->SetQuery(context->GetQuery());
694     packet->SetQueryId(context->GetQuerySyncId());
695     CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
696     // empty compress data should not mark compress
697     if (!syncData.compressedEntries.empty() && needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
698         packet->SetCompressDataMark();
699         packet->SetCompressAlgo(curAlgo);
700     }
701     SingleVerDataSyncUtils::SetPacketId(packet, context, version);
702     if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
703         context->GetQuery().HasOrderBy())) {
704         packet->SetUpdateWaterMark();
705     }
706     LOGD("[DataSync] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",label=%s,dev=%s,queryId=%s,"
707         "isCompress=%d", static_cast<int>(curType), localMark, deleteMark, context->GetEndMark(), label_.c_str(),
708         STR_MASK(GetDeviceId()), STR_MASK(context->GetQuery().GetIdentify()), packet->IsCompressData());
709 }
710 
RequestStart(SingleVerSyncTaskContext * context,int mode)711 int SingleVerDataSync::RequestStart(SingleVerSyncTaskContext *context, int mode)
712 {
713     int errCode = QuerySyncCheck(context);
714     if (errCode != E_OK) {
715         LOGE("[DataSync][PushStart] check query failed, errCode=%d", errCode);
716         return errCode;
717     }
718     errCode = RemoveDeviceDataIfNeed(context);
719     if (errCode != E_OK) {
720         context->SetTaskErrCode(errCode);
721         return errCode;
722     }
723     SyncEntry syncData;
724     // get data
725     errCode = GetMatchData(context, syncData);
726     SingleVerDataSyncUtils::TranslateErrCodeIfNeed(mode, context->GetRemoteSoftwareVersion(), errCode);
727 
728     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
729         LOGE("[DataSync][PushStart] get data failed, errCode=%d", errCode);
730         return errCode;
731     }
732 
733     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
734     if (packet == nullptr) {
735         LOGE("[DataSync][PushStart] new DataRequestPacket error");
736         return -E_OUT_OF_MEMORY;
737     }
738     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
739     UpdateWaterMark isUpdateWaterMark;
740     SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
741     if (errCode == E_OK) {
742         SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
743     }
744     FillDataRequestPacket(packet, context, syncData, errCode, mode);
745     errCode = SendDataPacket(curType, packet, context);
746     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
747     if (performance != nullptr) {
748         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
749     }
750     if (errCode == E_OK || errCode == -E_TIMEOUT) {
751         UpdateSendInfo(dataTime, context);
752     }
753     if (errCode == E_OK) {
754         if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
755             context->GetQuery().HasOrderBy())) {
756             LOGI("[DataSync][RequestStart] query contain limit/offset/orderby, no need to update watermark.");
757             return E_OK;
758         }
759         SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
760         SaveLocalWaterMark(curType, context, tmpDataTime);
761     }
762     return errCode;
763 }
764 
PushStart(SingleVerSyncTaskContext * context)765 int SingleVerDataSync::PushStart(SingleVerSyncTaskContext *context)
766 {
767     if (context == nullptr) {
768         return -E_INVALID_ARGS;
769     }
770     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
771     return RequestStart(context,
772         (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH);
773 }
774 
PushPullStart(SingleVerSyncTaskContext * context)775 int SingleVerDataSync::PushPullStart(SingleVerSyncTaskContext *context)
776 {
777     if (context == nullptr) {
778         return -E_INVALID_ARGS;
779     }
780     return RequestStart(context, context->GetMode());
781 }
782 
PullRequestStart(SingleVerSyncTaskContext * context)783 int SingleVerDataSync::PullRequestStart(SingleVerSyncTaskContext *context)
784 {
785     if (context == nullptr) {
786         return -E_INVALID_ARGS;
787     }
788     int errCode = QuerySyncCheck(context);
789     if (errCode != E_OK) {
790         return errCode;
791     }
792     errCode = RemoveDeviceDataIfNeed(context);
793     if (errCode != E_OK) {
794         context->SetTaskErrCode(errCode);
795         return errCode;
796     }
797     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
798     if (packet == nullptr) {
799         LOGE("[DataSync][PullRequest]new DataRequestPacket error");
800         return -E_OUT_OF_MEMORY;
801     }
802     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
803     WaterMark peerMark = 0;
804     WaterMark localMark = 0;
805     WaterMark deleteMark = 0;
806     GetPeerWaterMark(syncType, context->GetQuerySyncId(),
807         context->GetDeviceId(), peerMark);
808     GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
809     GetLocalDeleteSyncWaterMark(context, deleteMark);
810     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
811     WaterMark endMark = context->GetEndMark();
812     SyncTimeRange dataTime = {localMark, deleteMark, localMark, deleteMark};
813     SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
814     packet->SetBasicInfo(E_OK, version, context->GetMode());
815     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
816     packet->SetWaterMark(localMark, peerMark, deleteMark);
817     packet->SetEndWaterMark(endMark);
818     packet->SetSessionId(context->GetRequestSessionId());
819     packet->SetQuery(context->GetQuery());
820     packet->SetQueryId(context->GetQuerySyncId());
821     packet->SetLastSequence();
822     SingleVerDataSyncUtils::SetPacketId(packet, context, version);
823     context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
824     LOGD("[DataSync][Pull] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",peer=%" PRIu64 ",label=%s,"
825         "dev=%s", static_cast<int>(syncType), localMark, deleteMark, endMark, peerMark,
826         label_.c_str(), STR_MASK(GetDeviceId()));
827     UpdateSendInfo(dataTime, context);
828     return SendDataPacket(syncType, packet, context);
829 }
830 
PullResponseStart(SingleVerSyncTaskContext * context)831 int SingleVerDataSync::PullResponseStart(SingleVerSyncTaskContext *context)
832 {
833     if (context == nullptr) {
834         return -E_INVALID_ARGS;
835     }
836     SyncEntry syncData;
837     // get data
838     int errCode = GetMatchData(context, syncData);
839     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
840         if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
841             (void)SendPullResponseDataPkt(errCode, syncData, context);
842         }
843         return errCode;
844     }
845     // if send finished
846     int ackCode = E_OK;
847     ContinueToken token = nullptr;
848     context->GetContinueToken(token);
849     if (errCode == E_OK && token == nullptr) {
850         LOGD("[DataSync][PullResponse] send last frame end");
851         ackCode = SEND_FINISHED;
852     }
853     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
854     UpdateWaterMark isUpdateWaterMark;
855     SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
856     if (errCode == E_OK) {
857         SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
858     }
859     errCode = SendPullResponseDataPkt(ackCode, syncData, context);
860     if (errCode == E_OK || errCode == -E_TIMEOUT) {
861         UpdateSendInfo(dataTime, context);
862     }
863     if (errCode == E_OK) {
864         if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
865             context->GetQuery().HasOrderBy())) {
866             LOGI("[DataSync][PullResponseStart] query contain limit/offset/orderby, no need to update watermark.");
867             return E_OK;
868         }
869         SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
870         SaveLocalWaterMark(curType, context, tmpDataTime);
871     }
872     return errCode;
873 }
874 
UpdateQueryPeerWaterMark(SyncType syncType,const std::string & queryId,const SyncTimeRange & dataTime,const SingleVerSyncTaskContext * context,UpdateWaterMark isUpdateWaterMark)875 void SingleVerDataSync::UpdateQueryPeerWaterMark(SyncType syncType, const std::string &queryId,
876     const SyncTimeRange &dataTime, const SingleVerSyncTaskContext *context, UpdateWaterMark isUpdateWaterMark)
877 {
878     WaterMark tmpPeerWatermark = dataTime.endTime;
879     WaterMark tmpPeerDeletedWatermark = dataTime.deleteEndTime;
880     if (isUpdateWaterMark.normalUpdateMark) {
881         tmpPeerWatermark++;
882     }
883     if (isUpdateWaterMark.deleteUpdateMark) {
884         tmpPeerDeletedWatermark++;
885     }
886     UpdatePeerWaterMark(syncType, queryId, context, tmpPeerWatermark, tmpPeerDeletedWatermark);
887 }
888 
UpdatePeerWaterMark(SyncType syncType,const std::string & queryId,const SingleVerSyncTaskContext * context,WaterMark peerWatermark,WaterMark peerDeletedWatermark)889 void SingleVerDataSync::UpdatePeerWaterMark(SyncType syncType, const std::string &queryId,
890     const SingleVerSyncTaskContext *context, WaterMark peerWatermark, WaterMark peerDeletedWatermark)
891 {
892     if (peerWatermark == 0 && peerDeletedWatermark == 0) {
893         return;
894     }
895     int errCode = E_OK;
896     if (syncType != SyncType::QUERY_SYNC_TYPE) {
897         errCode = metadata_->SavePeerWaterMark(context->GetDeviceId(), peerWatermark, true);
898     } else {
899         if (peerWatermark != 0) {
900             LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), peerWatermark);
901             errCode = metadata_->SetRecvQueryWaterMark(queryId, context->GetDeviceId(), peerWatermark);
902             if (errCode != E_OK) {
903                 LOGE("[DataSync][UpdatePeerWaterMark] save query peer water mark failed,errCode=%d", errCode);
904             }
905         }
906         if (peerDeletedWatermark != 0) {
907             LOGD("label=%s,dev=%s,peerDeletedTime=%" PRIu64,
908                 label_.c_str(), STR_MASK(GetDeviceId()), peerDeletedWatermark);
909             errCode = metadata_->SetRecvDeleteSyncWaterMark(context->GetDeleteSyncId(), peerDeletedWatermark);
910         }
911     }
912     if (errCode != E_OK) {
913         LOGE("[DataSync][UpdatePeerWaterMark] save peer water mark failed,errCode=%d", errCode);
914     }
915 }
916 
DoAbilitySyncIfNeed(SingleVerSyncTaskContext * context,const Message * message,bool isControlMsg)917 int SingleVerDataSync::DoAbilitySyncIfNeed(SingleVerSyncTaskContext *context, const Message *message, bool isControlMsg)
918 {
919     uint16_t remoteCommunicatorVersion = 0;
920     if (communicateHandle_->GetRemoteCommunicatorVersion(context->GetDeviceId(), remoteCommunicatorVersion) ==
921         -E_NOT_FOUND) {
922         LOGE("[DataSync][DoAbilitySyncIfNeed] get remote communicator version failed");
923         return -E_VERSION_NOT_SUPPORT;
924     }
925     // If remote is not the first version, we need check SOFTWARE_VERSION_BASE
926     if (remoteCommunicatorVersion == 0) {
927         LOGI("[DataSync] set remote version 0");
928         context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
929         return E_OK;
930     } else {
931         LOGI("[DataSync][DoAbilitySyncIfNeed] need do ability sync");
932         if (isControlMsg) {
933             SendControlAck(context, message, -E_NEED_ABILITY_SYNC, 0);
934         } else {
935             (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
936         }
937         return -E_NEED_ABILITY_SYNC;
938     }
939 }
940 
DataRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)941 int SingleVerDataSync::DataRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
942 {
943     if (context == nullptr || message == nullptr) {
944         return -E_INVALID_ARGS;
945     }
946     auto *packet = message->GetObject<DataRequestPacket>();
947     if (packet == nullptr) {
948         return -E_INVALID_ARGS;
949     }
950     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
951         return DoAbilitySyncIfNeed(context, message);
952     }
953     int32_t sendCode = packet->GetSendCode();
954     if (sendCode == -E_VERSION_NOT_SUPPORT) {
955         LOGE("[DataSync] Version mismatch: ver=%u, current=%u", packet->GetVersion(), SOFTWARE_VERSION_CURRENT);
956         (void)SendDataAck(context, message, -E_VERSION_NOT_SUPPORT, 0);
957         return -E_WAIT_NEXT_MESSAGE;
958     }
959     // only deal with pull response packet errCode
960     if (sendCode != E_OK && sendCode != SEND_FINISHED && sendCode != -E_UNFINISHED &&
961         message->GetSessionId() == context->GetRequestSessionId()) {
962         LOGE("[DataSync][DataRequestRecvPre] remote pullResponse getData sendCode=%d", sendCode);
963         return sendCode;
964     }
965     int errCode = RunPermissionCheck(context, message, packet);
966     if (errCode != E_OK) {
967         return errCode;
968     }
969     if (std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT) > SOFTWARE_VERSION_RELEASE_2_0) {
970         errCode = CheckSchemaStrategy(context, message);
971     }
972     if (errCode == E_OK) {
973         errCode = SingleVerDataSyncUtils::RequestQueryCheck(packet, storage_);
974     }
975     if (errCode != E_OK) {
976         (void)SendDataAck(context, message, errCode, 0);
977         return errCode;
978     }
979     errCode = SingleVerDataSyncUtils::SchemaVersionMatchCheck(*context, *packet, metadata_);
980     if (errCode != E_OK) {
981         (void)SendDataAck(context, message, errCode, 0);
982     }
983     return errCode;
984 }
985 
DataRequestRecv(SingleVerSyncTaskContext * context,const Message * message,WaterMark & pullEndWatermark)986 int SingleVerDataSync::DataRequestRecv(SingleVerSyncTaskContext *context, const Message *message,
987     WaterMark &pullEndWatermark)
988 {
989     int errCode = DataRequestRecvPre(context, message);
990     if (errCode != E_OK) {
991         return errCode;
992     }
993     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
994     const std::vector<SendDataItem> &data = packet->GetData();
995     SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
996     LOGI("[DataSync][DataRequestRecv] curType=%d, remote ver=%" PRIu32 ", size=%zu, errCode=%d, queryId=%s,"
997         " Label=%s, dev=%s", static_cast<int>(curType), packet->GetVersion(), data.size(), packet->GetSendCode(),
998         STR_MASK(packet->GetQueryId()), label_.c_str(), STR_MASK(GetDeviceId()));
999     context->SetReceiveWaterMarkErr(false);
1000     UpdateWaterMark isUpdateWaterMark;
1001     SyncTimeRange dataTime = SingleVerDataSyncUtils::GetRecvDataTimeRange(curType, data, isUpdateWaterMark);
1002     errCode = RemoveDeviceDataHandle(context, message, dataTime.endTime);
1003     if (errCode != E_OK) {
1004         return errCode;
1005     }
1006     Metadata::MetaWaterMarkAutoLock autoLock(metadata_);
1007     if (WaterMarkErrHandle(curType, context, message)) {
1008         return E_OK;
1009     }
1010     GetPullEndWatermark(context, packet, pullEndWatermark);
1011     // save data first
1012     errCode = SaveData(context, data, curType, packet->GetQuery());
1013     if (errCode != E_OK) {
1014         (void)SendDataAck(context, message, errCode, dataTime.endTime);
1015         return errCode;
1016     }
1017     if (pullEndWatermark > 0 && !storage_->IsReadable()) { // pull mode
1018         pullEndWatermark = 0;
1019         errCode = SendDataAck(context, message, -E_EKEYREVOKED, dataTime.endTime);
1020     } else {
1021         // if data is empty, we don't know the max timestap of this packet.
1022         errCode = SendDataAck(context, message, !data.empty() ? E_OK : WATER_MARK_INVALID, dataTime.endTime);
1023     }
1024     RemotePushFinished(packet->GetSendCode(), packet->GetMode(), message->GetSessionId(),
1025         context->GetRequestSessionId());
1026     if (curType != SyncType::QUERY_SYNC_TYPE && isUpdateWaterMark.normalUpdateMark) {
1027         UpdatePeerWaterMark(curType, "", context, dataTime.endTime + 1, 0);
1028     } else if (curType == SyncType::QUERY_SYNC_TYPE && packet->IsNeedUpdateWaterMark()) {
1029         UpdateQueryPeerWaterMark(curType, packet->GetQueryId(), dataTime, context, isUpdateWaterMark);
1030     }
1031     if (errCode != E_OK) {
1032         return errCode;
1033     }
1034     if (packet->GetSendCode() == SEND_FINISHED) {
1035         return -E_RECV_FINISHED;
1036     }
1037     return errCode;
1038 }
1039 
SendDataPacket(SyncType syncType,DataRequestPacket * packet,SingleVerSyncTaskContext * context)1040 int SingleVerDataSync::SendDataPacket(SyncType syncType, DataRequestPacket *packet,
1041     SingleVerSyncTaskContext *context)
1042 {
1043     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1044     if (message == nullptr) {
1045         LOGE("[DataSync][SendDataPacket] new message error");
1046         delete packet;
1047         packet = nullptr;
1048         return -E_OUT_OF_MEMORY;
1049     }
1050     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1051     int errCode = message->SetExternalObject(packet);
1052     if (errCode != E_OK) {
1053         delete packet;
1054         packet = nullptr;
1055         delete message;
1056         message = nullptr;
1057         LOGE("[DataSync][SendDataPacket] set external object failed errCode=%d", errCode);
1058         return errCode;
1059     }
1060     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1061         context->GetSequenceId(), context->GetRequestSessionId());
1062     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
1063     if (performance != nullptr) {
1064         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
1065     }
1066     CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1067         SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1068     };
1069     errCode = Send(context, message, handler, packetLen);
1070     if (errCode != E_OK) {
1071         delete message;
1072         message = nullptr;
1073     }
1074 
1075     return errCode;
1076 }
1077 
SendPullResponseDataPkt(int ackCode,SyncEntry & syncOutData,SingleVerSyncTaskContext * context)1078 int SingleVerDataSync::SendPullResponseDataPkt(int ackCode, SyncEntry &syncOutData,
1079     SingleVerSyncTaskContext *context)
1080 {
1081     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1082     if (packet == nullptr) {
1083         LOGE("[DataSync][SendPullResponseDataPkt] new data request packet error");
1084         return -E_OUT_OF_MEMORY;
1085     }
1086     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1087     FillDataRequestPacket(packet, context, syncOutData, ackCode, SyncModeType::RESPONSE_PULL);
1088     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1089     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1090     if (message == nullptr) {
1091         LOGE("[DataSync][SendPullResponseDataPkt] new message error");
1092         delete packet;
1093         packet = nullptr;
1094         return -E_OUT_OF_MEMORY;
1095     }
1096     int errCode = message->SetExternalObject(packet);
1097     if (errCode != E_OK) {
1098         delete packet;
1099         packet = nullptr;
1100         delete message;
1101         message = nullptr;
1102         LOGE("[SendPullResponseDataPkt] set external object failed, errCode=%d", errCode);
1103         return errCode;
1104     }
1105     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1106         context->GetSequenceId(), context->GetResponseSessionId());
1107     SendResetWatchDogPacket(context, packetLen);
1108     errCode = Send(context, message, nullptr, packetLen);
1109     if (errCode != E_OK) {
1110         delete message;
1111         message = nullptr;
1112     }
1113     return errCode;
1114 }
1115 
SendFinishedDataAck(SingleVerSyncTaskContext * context,const Message * message)1116 void SingleVerDataSync::SendFinishedDataAck(SingleVerSyncTaskContext *context, const Message *message)
1117 {
1118     (void)SendDataAck(context, message, E_OK, 0);
1119 }
1120 
SendDataAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,WaterMark maxSendDataTime)1121 int SingleVerDataSync::SendDataAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1122     WaterMark maxSendDataTime)
1123 {
1124     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1125     if (packet == nullptr) {
1126         return -E_INVALID_ARGS;
1127     }
1128     Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1129     if (ackMessage == nullptr) {
1130         LOGE("[DataSync][SendDataAck] new message error");
1131         return -E_OUT_OF_MEMORY;
1132     }
1133     DataAckPacket ack;
1134     SetAckPacket(ack, context, packet, recvCode, maxSendDataTime);
1135     int errCode = ackMessage->SetCopiedObject(ack);
1136     if (errCode != E_OK) {
1137         delete ackMessage;
1138         ackMessage = nullptr;
1139         LOGE("[DataSync][SendDataAck] set copied object failed, errcode=%d", errCode);
1140         return errCode;
1141     }
1142     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1143         message->GetSequenceId(), message->GetSessionId());
1144 
1145     errCode = Send(context, ackMessage, nullptr, 0);
1146     if (errCode != E_OK) {
1147         delete ackMessage;
1148         ackMessage = nullptr;
1149     }
1150     return errCode;
1151 }
1152 
AckPacketIdCheck(const Message * message)1153 bool SingleVerDataSync::AckPacketIdCheck(const Message *message)
1154 {
1155     if (message == nullptr) {
1156         LOGE("[DataSync] AckRecv message nullptr");
1157         return false;
1158     }
1159     if (message->GetMessageType() == TYPE_NOTIFY || message->IsFeedbackError()) {
1160         return true;
1161     }
1162     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1163     if (packet == nullptr) {
1164         return false;
1165     }
1166     uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1167     std::lock_guard<std::mutex> lock(lock_);
1168     uint32_t sequenceId = message->GetSequenceId();
1169     if (reSendMap_.count(sequenceId) != 0) {
1170         uint64_t originalPacketId = reSendMap_[sequenceId].packetId;
1171         if (DataAckPacket::IsPacketIdValid(packetId) && packetId != originalPacketId) {
1172             LOGE("[DataSync] packetId[%" PRIu64 "] is not match with original[%" PRIu64 "]", packetId,
1173                 originalPacketId);
1174             return false;
1175         }
1176     }
1177     return true;
1178 }
1179 
AckRecv(SingleVerSyncTaskContext * context,const Message * message)1180 int SingleVerDataSync::AckRecv(SingleVerSyncTaskContext *context, const Message *message)
1181 {
1182     int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1183     if (errCode != E_OK) {
1184         return errCode;
1185     }
1186     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1187     if (packet == nullptr) {
1188         return -E_INVALID_ARGS;
1189     }
1190     int32_t recvCode = packet->GetRecvCode();
1191     LOGD("[DataSync][AckRecv] ver=%u,recvCode=%d,myversion=%u,label=%s,dev=%s", packet->GetVersion(), recvCode,
1192         SOFTWARE_VERSION_CURRENT, label_.c_str(), STR_MASK(GetDeviceId()));
1193     if (recvCode == -E_VERSION_NOT_SUPPORT) {
1194         LOGE("[DataSync][AckRecv] Version mismatch");
1195         return -E_VERSION_NOT_SUPPORT;
1196     }
1197 
1198     if (recvCode == -E_NEED_ABILITY_SYNC || recvCode == -E_NOT_PERMIT || recvCode == -E_NEED_TIME_SYNC) {
1199         // we should ReleaseContinueToken, avoid crash
1200         LOGI("[DataSync][AckRecv] Data sync abort,recvCode =%d,label =%s,dev=%s", recvCode, label_.c_str(),
1201             STR_MASK(GetDeviceId()));
1202         context->ReleaseContinueToken();
1203         return recvCode;
1204     }
1205     uint64_t data = packet->GetData();
1206     if (recvCode == LOCAL_WATER_MARK_NOT_INIT) {
1207         return DealWaterMarkException(context, data, packet->GetReserved());
1208     }
1209 
1210     if (recvCode == -E_SAVE_DATA_NOTIFY && data != 0) {
1211         // data only use low 32bit
1212         context->StartFeedDogForSync(static_cast<uint32_t>(data), SyncDirectionFlag::RECEIVE);
1213         LOGI("[DataSync][AckRecv] notify ResetWatchDog=%" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1214             STR_MASK(GetDeviceId()));
1215     }
1216 
1217     if (recvCode != E_OK && recvCode != WATER_MARK_INVALID) {
1218         LOGW("[DataSync][AckRecv] Received a uncatched recvCode=%d,label=%s,dev=%s", recvCode,
1219             label_.c_str(), STR_MASK(GetDeviceId()));
1220         return recvCode;
1221     }
1222 
1223     // Judge if send finished
1224     ContinueToken token;
1225     context->GetContinueToken(token);
1226     if (((message->GetSessionId() == context->GetResponseSessionId()) ||
1227         (message->GetSessionId() == context->GetRequestSessionId())) && (token == nullptr)) {
1228         return -E_NO_DATA_SEND;
1229     }
1230 
1231     // send next data
1232     return -E_SEND_DATA;
1233 }
1234 
SendSaveDataNotifyPacket(SingleVerSyncTaskContext * context,uint32_t pktVersion,uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)1235 void SingleVerDataSync::SendSaveDataNotifyPacket(SingleVerSyncTaskContext *context, uint32_t pktVersion,
1236     uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
1237 {
1238     if (inMsgId != DATA_SYNC_MESSAGE && inMsgId != QUERY_SYNC_MESSAGE) {
1239         LOGE("[SingleVerDataSync] messageId not available.");
1240         return;
1241     }
1242     Message *ackMessage = new (std::nothrow) Message(inMsgId);
1243     if (ackMessage == nullptr) {
1244         LOGE("[DataSync][SaveDataNotify] new message failed");
1245         return;
1246     }
1247 
1248     DataAckPacket ack;
1249     ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1250     ack.SetVersion(pktVersion);
1251     int errCode = ackMessage->SetCopiedObject(ack);
1252     if (errCode != E_OK) {
1253         delete ackMessage;
1254         ackMessage = nullptr;
1255         LOGE("[DataSync][SendSaveDataNotifyPacket] set copied object failed,errcode=%d", errCode);
1256         return;
1257     }
1258     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(), sequenceId, sessionId);
1259 
1260     errCode = Send(context, ackMessage, nullptr, 0);
1261     if (errCode != E_OK) {
1262         delete ackMessage;
1263         ackMessage = nullptr;
1264     }
1265     LOGD("[DataSync][SaveDataNotify] Send SaveDataNotify packet Finished,errcode=%d,label=%s,dev=%s",
1266         errCode, label_.c_str(), STR_MASK(GetDeviceId()));
1267 }
1268 
GetPullEndWatermark(const SingleVerSyncTaskContext * context,const DataRequestPacket * packet,WaterMark & pullEndWatermark) const1269 void SingleVerDataSync::GetPullEndWatermark(const SingleVerSyncTaskContext *context, const DataRequestPacket *packet,
1270     WaterMark &pullEndWatermark) const
1271 {
1272     if (packet == nullptr) {
1273         return;
1274     }
1275     int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1276     if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH_AND_PULL)) {
1277         WaterMark endMark = packet->GetEndWaterMark();
1278         TimeOffset offset;
1279         metadata_->GetTimeOffset(context->GetDeviceId(), offset);
1280         pullEndWatermark = endMark - static_cast<WaterMark>(offset);
1281         LOGD("[DataSync][PullEndWatermark] packetEndMark=%" PRIu64 ",offset=%" PRId64 ",endWaterMark=%" PRIu64 ","
1282             "label=%s,dev=%s", endMark, offset, pullEndWatermark, label_.c_str(), STR_MASK(GetDeviceId()));
1283     }
1284 }
1285 
DealWaterMarkException(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)1286 int SingleVerDataSync::DealWaterMarkException(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
1287     const std::vector<uint64_t> &reserved)
1288 {
1289     WaterMark deletedWaterMark = 0;
1290     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1291     if (curType == SyncType::QUERY_SYNC_TYPE) {
1292         if (reserved.size() <= ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK) {
1293             LOGE("[DataSync] get packet reserve size failed");
1294             return -E_INVALID_ARGS;
1295         }
1296         deletedWaterMark = reserved[ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK];
1297     }
1298     LOGI("[DataSync][WaterMarkException] AckRecv water error, mark=%" PRIu64 ",deleteMark=%" PRIu64 ","
1299         "label=%s,dev=%s", ackWaterMark, deletedWaterMark, label_.c_str(), STR_MASK(GetDeviceId()));
1300     int errCode = SaveLocalWaterMark(curType, context,
1301         {0, 0, ackWaterMark, deletedWaterMark});
1302     if (errCode != E_OK) {
1303         return errCode;
1304     }
1305     context->SetRetryStatus(SyncTaskContext::NEED_RETRY);
1306     context->IncNegotiationCount();
1307     SingleVerDataSyncUtils::PushAndPullKeyRevokHandle(context);
1308     if (!context->IsNeedClearRemoteStaleData()) {
1309         return -E_RE_SEND_DATA;
1310     }
1311     errCode = DealRemoveDeviceDataByAck(context, ackWaterMark, reserved);
1312     if (errCode != E_OK) {
1313         return errCode;
1314     }
1315     return -E_RE_SEND_DATA;
1316 }
1317 
RunPermissionCheck(SingleVerSyncTaskContext * context,const Message * message,const DataRequestPacket * packet)1318 int SingleVerDataSync::RunPermissionCheck(SingleVerSyncTaskContext *context, const Message *message,
1319     const DataRequestPacket *packet)
1320 {
1321     int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1322     int errCode = SingleVerDataSyncUtils::RunPermissionCheck(context, storage_, label_, packet);
1323     if (errCode != E_OK) {
1324         if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_EARLIEST) { // ver 101 can't handle this errCode
1325             (void)SendDataAck(context, message, -E_NOT_PERMIT, 0);
1326         }
1327         return -E_NOT_PERMIT;
1328     }
1329     const std::vector<SendDataItem> &data = packet->GetData();
1330     WaterMark maxSendDataTime = SingleVerDataSyncUtils::GetMaxSendDataTime(data);
1331     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1332     auto securityOption = packet->GetSecurityOption();
1333     if (context->GetRemoteSeccurityOption().securityLabel == SecurityLabel::NOT_SET &&
1334         securityOption.securityLabel != SecurityLabel::NOT_SET) {
1335         context->SetRemoteSeccurityOption(packet->GetSecurityOption());
1336     }
1337     if (version > SOFTWARE_VERSION_RELEASE_2_0 && (mode != SyncModeType::PULL) &&
1338         !context->GetReceivcPermitCheck()) {
1339         bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_, storage_);
1340         if (permitReceive) {
1341             context->SetReceivcPermitCheck(true);
1342         } else {
1343             (void)SendDataAck(context, message, -E_SECURITY_OPTION_CHECK_ERROR, maxSendDataTime);
1344             return -E_SECURITY_OPTION_CHECK_ERROR;
1345         }
1346     }
1347     return errCode;
1348 }
1349 
1350 // used in pull response
SendResetWatchDogPacket(SingleVerSyncTaskContext * context,uint32_t packetLen)1351 void SingleVerDataSync::SendResetWatchDogPacket(SingleVerSyncTaskContext *context, uint32_t packetLen)
1352 {
1353     // When mtu less than 30k, we send data with bluetooth
1354     // In order not to block the bluetooth channel, we don't send notify packet here
1355     if (mtuSize_ >= packetLen || mtuSize_ < NOTIFY_MIN_MTU_SIZE) {
1356         return;
1357     }
1358     uint64_t data = static_cast<uint64_t>(packetLen) * static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_;
1359 
1360     Message *ackMessage = new (std::nothrow) Message(DATA_SYNC_MESSAGE);
1361     if (ackMessage == nullptr) {
1362         LOGE("[DataSync][ResetWatchDog] new message failed");
1363         return;
1364     }
1365 
1366     DataAckPacket ack;
1367     ack.SetData(data);
1368     ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1369     ack.SetVersion(std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT));
1370     int errCode = ackMessage->SetCopiedObject(ack);
1371     if (errCode != E_OK) {
1372         delete ackMessage;
1373         ackMessage = nullptr;
1374         LOGE("[DataSync][ResetWatchDog] set copied object failed, errcode=%d", errCode);
1375         return;
1376     }
1377     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(),
1378         context->GetSequenceId(), context->GetResponseSessionId());
1379 
1380     errCode = Send(context, ackMessage, nullptr, 0);
1381     if (errCode != E_OK) {
1382         delete ackMessage;
1383         ackMessage = nullptr;
1384         LOGE("[DataSync][ResetWatchDog] Send packet failed,errcode=%d,label=%s,dev=%s", errCode, label_.c_str(),
1385             STR_MASK(GetDeviceId()));
1386     } else {
1387         LOGI("[DataSync][ResetWatchDog] data = %" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1388             STR_MASK(GetDeviceId()));
1389     }
1390 }
1391 
ReSend(SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1392 int32_t SingleVerDataSync::ReSend(SingleVerSyncTaskContext *context, DataSyncReSendInfo reSendInfo)
1393 {
1394     if (context == nullptr) {
1395         return -E_INVALID_ARGS;
1396     }
1397     int errCode = RemoveDeviceDataIfNeed(context);
1398     if (errCode != E_OK) {
1399         context->SetTaskErrCode(errCode);
1400         return errCode;
1401     }
1402     SyncEntry syncData;
1403     errCode = GetReSendData(syncData, context, reSendInfo);
1404     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1405         return errCode;
1406     }
1407     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1408     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1409     if (packet == nullptr) {
1410         LOGE("[DataSync][ReSend] new DataRequestPacket error");
1411         return -E_OUT_OF_MEMORY;
1412     }
1413     FillRequestReSendPacket(context, packet, reSendInfo, syncData, errCode);
1414     errCode = SendReSendPacket(packet, context, reSendInfo.sessionId, reSendInfo.sequenceId);
1415     if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() || context->GetQuery().HasOrderBy())) {
1416         LOGI("[DataSync][ReSend] query contain limit/offset/orderby, no need to update watermark.");
1417         return errCode;
1418     }
1419     if (errCode == E_OK && SyncOperation::TransferSyncMode(context->GetMode()) != SyncModeType::PULL) {
1420         // resend.end may not update in localwatermark while E_TIMEOUT occurred in send message last time.
1421         SyncTimeRange dataTime {reSendInfo.start, reSendInfo.deleteDataStart, reSendInfo.end, reSendInfo.deleteDataEnd};
1422         if (reSendInfo.deleteDataEnd > reSendInfo.deleteDataStart && curType == SyncType::QUERY_SYNC_TYPE) {
1423             dataTime.deleteEndTime += 1;
1424         }
1425         if (reSendInfo.end > reSendInfo.start) {
1426             dataTime.endTime += 1;
1427         }
1428         errCode = SaveLocalWaterMark(curType, context, dataTime, true);
1429         if (errCode != E_OK) {
1430             LOGE("[DataSync][ReSend] SaveLocalWaterMark failed.");
1431         }
1432     }
1433     return errCode;
1434 }
1435 
SendReSendPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,uint32_t sessionId,uint32_t sequenceId)1436 int SingleVerDataSync::SendReSendPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
1437     uint32_t sessionId, uint32_t sequenceId)
1438 {
1439     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1440     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1441     if (message == nullptr) {
1442         LOGE("[DataSync][SendDataPacket] new message error");
1443         delete packet;
1444         packet = nullptr;
1445         return -E_OUT_OF_MEMORY;
1446     }
1447     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1448     int errCode = message->SetExternalObject(packet);
1449     if (errCode != E_OK) {
1450         delete packet;
1451         packet = nullptr;
1452         delete message;
1453         message = nullptr;
1454         LOGE("[DataSync][SendReSendPacket] SetExternalObject failed errCode=%d", errCode);
1455         return errCode;
1456     }
1457     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(), sequenceId, sessionId);
1458     CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1459         SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1460     };
1461     errCode = Send(context, message, handler, packetLen);
1462     if (errCode != E_OK) {
1463         delete message;
1464         message = nullptr;
1465     }
1466     return errCode;
1467 }
1468 
CheckPermitSendData(int inMode,SingleVerSyncTaskContext * context)1469 int SingleVerDataSync::CheckPermitSendData(int inMode, SingleVerSyncTaskContext *context)
1470 {
1471     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1472     int mode = SyncOperation::TransferSyncMode(inMode);
1473     // for pull mode it just need to get data, no need to send data.
1474     if (version <= SOFTWARE_VERSION_RELEASE_2_0 || mode == SyncModeType::PULL) {
1475         return E_OK;
1476     }
1477     if (context->GetSendPermitCheck()) {
1478         return E_OK;
1479     }
1480     bool isPermitSync = true;
1481     std::string deviceId = context->GetDeviceId();
1482     SecurityOption remoteSecOption = context->GetRemoteSeccurityOption();
1483     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::RESPONSE_PULL) {
1484         isPermitSync = SingleVerDataSyncUtils::IsPermitRemoteDeviceRecvData(deviceId, remoteSecOption, storage_);
1485     }
1486     LOGI("[DataSync][PermitSendData] mode=%d,dev=%s,label=%d,flag=%d,PermitSync=%d", mode, STR_MASK(deviceId_),
1487         remoteSecOption.securityLabel, remoteSecOption.securityFlag, isPermitSync);
1488     if (isPermitSync) {
1489         context->SetSendPermitCheck(true);
1490         return E_OK;
1491     }
1492     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
1493         context->SetTaskErrCode(-E_SECURITY_OPTION_CHECK_ERROR);
1494         return -E_SECURITY_OPTION_CHECK_ERROR;
1495     }
1496     if (mode == SyncModeType::RESPONSE_PULL) {
1497         SyncEntry syncData;
1498         (void)SendPullResponseDataPkt(-E_SECURITY_OPTION_CHECK_ERROR, syncData, context);
1499         return -E_SECURITY_OPTION_CHECK_ERROR;
1500     }
1501     if (mode == SyncModeType::SUBSCRIBE_QUERY) {
1502         return -E_SECURITY_OPTION_CHECK_ERROR;
1503     }
1504     return E_OK;
1505 }
1506 
GetLabel() const1507 std::string SingleVerDataSync::GetLabel() const
1508 {
1509     return label_;
1510 }
1511 
GetDeviceId() const1512 std::string SingleVerDataSync::GetDeviceId() const
1513 {
1514     return deviceId_;
1515 }
1516 
WaterMarkErrHandle(SyncType syncType,SingleVerSyncTaskContext * context,const Message * message)1517 bool SingleVerDataSync::WaterMarkErrHandle(SyncType syncType, SingleVerSyncTaskContext *context, const Message *message)
1518 {
1519     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1520     if (packet == nullptr) {
1521         LOGE("[WaterMarkErrHandle] get packet object failed");
1522         return -E_INVALID_ARGS;
1523     }
1524     WaterMark packetLocalMark = packet->GetLocalWaterMark();
1525     WaterMark packetDeletedMark = packet->GetDeletedWaterMark();
1526     WaterMark peerMark = 0;
1527     WaterMark deletedMark = 0;
1528     GetPeerWaterMark(syncType, packet->GetQueryId(), context->GetDeviceId(), peerMark);
1529     if (syncType == SyncType::QUERY_SYNC_TYPE) {
1530         GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedMark);
1531     }
1532     if (syncType != SyncType::QUERY_SYNC_TYPE && packetLocalMark > peerMark) {
1533         LOGI("[DataSync][DataRequestRecv] packetLocalMark=%" PRIu64 ",current=%" PRIu64, packetLocalMark, peerMark);
1534         context->SetReceiveWaterMarkErr(true);
1535         (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1536         return true;
1537     }
1538     if (syncType == SyncType::QUERY_SYNC_TYPE && (packetLocalMark > peerMark || packetDeletedMark > deletedMark)) {
1539         LOGI("[DataSync][DataRequestRecv] packetDeletedMark=%" PRIu64 ",deletedMark=%" PRIu64 ","
1540             "packetLocalMark=%" PRIu64 ",peerMark=%" PRIu64, packetDeletedMark, deletedMark, packetLocalMark,
1541             peerMark);
1542         context->SetReceiveWaterMarkErr(true);
1543         (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1544         return true;
1545     }
1546     return false;
1547 }
1548 
CheckSchemaStrategy(SingleVerSyncTaskContext * context,const Message * message)1549 int SingleVerDataSync::CheckSchemaStrategy(SingleVerSyncTaskContext *context, const Message *message)
1550 {
1551     auto *packet = message->GetObject<DataRequestPacket>();
1552     if (packet == nullptr) {
1553         return -E_INVALID_ARGS;
1554     }
1555     if (metadata_->IsAbilitySyncFinish(deviceId_)) {
1556         return E_OK;
1557     }
1558     auto query = packet->GetQuery();
1559     std::pair<bool, bool> schemaSyncStatus = context->GetSchemaSyncStatus(query);
1560     if (!schemaSyncStatus.second) {
1561         LOGE("[DataSync][CheckSchemaStrategy] isSchemaSync=%d check failed", schemaSyncStatus.second);
1562         (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
1563         return -E_NEED_ABILITY_SYNC;
1564     }
1565     if (!schemaSyncStatus.first) {
1566         LOGE("[DataSync][CheckSchemaStrategy] Strategy permitSync=%d check failed", schemaSyncStatus.first);
1567         (void)SendDataAck(context, message, -E_SCHEMA_MISMATCH, 0);
1568         return -E_SCHEMA_MISMATCH;
1569     }
1570     return E_OK;
1571 }
1572 
RemotePushFinished(int sendCode,int inMode,uint32_t msgSessionId,uint32_t contextSessionId)1573 void SingleVerDataSync::RemotePushFinished(int sendCode, int inMode, uint32_t msgSessionId, uint32_t contextSessionId)
1574 {
1575     int mode = SyncOperation::TransferSyncMode(inMode);
1576     if ((mode != SyncModeType::PUSH) && (mode != SyncModeType::PUSH_AND_PULL) && (mode != SyncModeType::QUERY_PUSH) &&
1577         (mode != SyncModeType::QUERY_PUSH_PULL)) {
1578         return;
1579     }
1580 
1581     if ((sendCode == E_OK) && (msgSessionId != 0) && (msgSessionId != contextSessionId))  {
1582         storage_->NotifyRemotePushFinished(deviceId_);
1583     }
1584 }
1585 
SetAckPacket(DataAckPacket & ackPacket,SingleVerSyncTaskContext * context,const DataRequestPacket * packet,int32_t recvCode,WaterMark maxSendDataTime)1586 void SingleVerDataSync::SetAckPacket(DataAckPacket &ackPacket, SingleVerSyncTaskContext *context,
1587     const DataRequestPacket *packet, int32_t recvCode, WaterMark maxSendDataTime)
1588 {
1589     SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
1590     WaterMark localMark = 0;
1591     GetLocalWaterMark(curType, packet->GetQueryId(), context, localMark);
1592     ackPacket.SetRecvCode(recvCode);
1593     // send ack packet
1594     if ((recvCode == E_OK) && (maxSendDataTime != 0)) {
1595         ackPacket.SetData(maxSendDataTime + 1); // + 1 to next start
1596     } else if (recvCode != WATER_MARK_INVALID) {
1597         WaterMark mark = 0;
1598         GetPeerWaterMark(curType, packet->GetQueryId(), context->GetDeviceId(), mark);
1599         ackPacket.SetData(mark);
1600     }
1601     std::vector<uint64_t> reserved {localMark};
1602     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1603     uint64_t packetId = 0;
1604     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1605         packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1606     }
1607     if (version > SOFTWARE_VERSION_RELEASE_2_0 && packetId > 0) {
1608         reserved.push_back(packetId);
1609     }
1610     // while recv is not E_OK, data is peerMark, reserve[2] is deletedPeerMark value
1611     if (curType == SyncType::QUERY_SYNC_TYPE && recvCode != WATER_MARK_INVALID) {
1612         WaterMark deletedPeerMark;
1613         GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedPeerMark);
1614         reserved.push_back(deletedPeerMark); // query sync mode, reserve[2] store deletedPeerMark value
1615     }
1616     ackPacket.SetReserved(reserved);
1617     ackPacket.SetVersion(version);
1618 }
1619 
GetReSendData(SyncEntry & syncData,SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1620 int SingleVerDataSync::GetReSendData(SyncEntry &syncData, SingleVerSyncTaskContext *context,
1621     DataSyncReSendInfo reSendInfo)
1622 {
1623     int mode = SyncOperation::TransferSyncMode(context->GetMode());
1624     if (mode == SyncModeType::PULL) {
1625         return E_OK;
1626     }
1627     ContinueToken token = nullptr;
1628     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1629     size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
1630         DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
1631     DataSizeSpecInfo reSendDataSizeInfo = GetDataSizeSpecInfo(packetSize);
1632     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1633     int errCode;
1634     if (curType != SyncType::QUERY_SYNC_TYPE) {
1635         errCode = storage_->GetSyncData(reSendInfo.start, reSendInfo.end + 1, syncData.entries, token,
1636             reSendDataSizeInfo);
1637     } else {
1638         QuerySyncObject queryObj = context->GetQuery();
1639         errCode = storage_->GetSyncData(queryObj, SyncTimeRange { reSendInfo.start, reSendInfo.deleteDataStart,
1640             reSendInfo.end + 1, reSendInfo.deleteDataEnd + 1 }, reSendDataSizeInfo, token, syncData.entries);
1641     }
1642     if (token != nullptr) {
1643         storage_->ReleaseContinueToken(token);
1644     }
1645     if (errCode == -E_BUSY || errCode == -E_EKEYREVOKED) {
1646         context->SetTaskErrCode(errCode);
1647         return errCode;
1648     }
1649     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1650         return errCode;
1651     }
1652     SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(
1653         DBCommon::TransferHashString(GetLocalDeviceName()), syncData.entries);
1654 
1655     int innerCode = InterceptData(syncData);
1656     if (innerCode != E_OK) {
1657         context->SetTaskErrCode(innerCode);
1658         return innerCode;
1659     }
1660 
1661     bool needCompressOnSync = false;
1662     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1663     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1664     CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
1665     if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
1666         int compressCode = GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
1667             { remoteAlgo, version });
1668         if (compressCode != E_OK) {
1669             return compressCode;
1670         }
1671     }
1672     return errCode;
1673 }
1674 
RemoveDeviceDataIfNeed(SingleVerSyncTaskContext * context)1675 int SingleVerDataSync::RemoveDeviceDataIfNeed(SingleVerSyncTaskContext *context)
1676 {
1677     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_3_0) {
1678         return E_OK;
1679     }
1680     uint64_t clearRemoteDataMark = 0;
1681     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
1682     metadata_->GetRemoveDataMark(context->GetDeviceId(), clearRemoteDataMark);
1683     if (clearRemoteDataMark == 0) {
1684         return E_OK;
1685     }
1686     int errCode = E_OK;
1687     if (context->IsNeedClearRemoteStaleData() && clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1688         errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
1689         if (errCode != E_OK) {
1690             LOGE("clear remote %s data failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1691             return errCode;
1692         }
1693     }
1694     if (clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1695         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
1696         if (errCode != E_OK) {
1697             LOGE("set %s removeDataWaterMark to false failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1698             return errCode;
1699         }
1700     }
1701     return E_OK;
1702 }
1703 
UpdateMtuSize()1704 void SingleVerDataSync::UpdateMtuSize()
1705 {
1706     mtuSize_ = communicateHandle_->GetCommunicatorMtuSize(deviceId_) * 9 / 10; // get the 9/10 of the size
1707 }
1708 
FillRequestReSendPacket(const SingleVerSyncTaskContext * context,DataRequestPacket * packet,DataSyncReSendInfo reSendInfo,SyncEntry & syncData,int sendCode)1709 void SingleVerDataSync::FillRequestReSendPacket(const SingleVerSyncTaskContext *context, DataRequestPacket *packet,
1710     DataSyncReSendInfo reSendInfo, SyncEntry &syncData, int sendCode)
1711 {
1712     SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
1713     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1714     WaterMark peerMark = 0;
1715     GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(),
1716         peerMark);
1717     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1718     // transfer reSend mode, RESPONSE_PULL transfer to push or query push
1719     // PUSH_AND_PULL mode which sequenceId lager than first transfer to push or query push
1720     int reSendMode = SingleVerDataSyncUtils::GetReSendMode(context->GetMode(), reSendInfo.sequenceId, curType);
1721     if (GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) ||
1722         SyncOperation::TransferSyncMode(context->GetMode()) == SyncModeType::PULL) {
1723         LOGI("[DataSync][ReSend] set lastid,label=%s,dev=%s", label_.c_str(), STR_MASK(GetDeviceId()));
1724         packet->SetLastSequence();
1725     }
1726     if (sendCode == E_OK && GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) &&
1727         context->GetMode() == SyncModeType::RESPONSE_PULL) {
1728         sendCode = SEND_FINISHED;
1729     }
1730     packet->SetData(syncData.entries);
1731     packet->SetCompressData(syncData.compressedEntries);
1732     packet->SetBasicInfo(sendCode, version, reSendMode);
1733     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
1734     packet->SetWaterMark(reSendInfo.start, peerMark, reSendInfo.deleteDataStart);
1735     if (SyncOperation::TransferSyncMode(reSendMode) != SyncModeType::PUSH || context->IsQuerySync()) {
1736         packet->SetEndWaterMark(context->GetEndMark());
1737         packet->SetQuery(context->GetQuery());
1738     }
1739     packet->SetQueryId(context->GetQuerySyncId());
1740     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1741         std::vector<uint64_t> reserved {reSendInfo.packetId};
1742         packet->SetReserved(reserved);
1743     }
1744     if (reSendMode == SyncModeType::PULL || reSendMode == SyncModeType::QUERY_PULL) {
1745         // resend pull packet dont set compress type
1746         return;
1747     }
1748     bool needCompressOnSync = false;
1749     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1750     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1751     CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
1752     if (needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
1753         packet->SetCompressDataMark();
1754         packet->SetCompressAlgo(curAlgo);
1755     }
1756 }
1757 
GetDataSizeSpecInfo(size_t packetSize)1758 DataSizeSpecInfo SingleVerDataSync::GetDataSizeSpecInfo(size_t packetSize)
1759 {
1760     bool needCompressOnSync = false;
1761     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1762     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1763     uint32_t blockSize = std::min(static_cast<uint32_t>(DBConstant::MAX_SYNC_BLOCK_SIZE),
1764         mtuSize_ * 100 / compressionRate);  // compressionRate max is 100
1765     return {blockSize, packetSize};
1766 }
1767 
InterceptData(SyncEntry & syncEntry)1768 int SingleVerDataSync::InterceptData(SyncEntry &syncEntry)
1769 {
1770     if (storage_ == nullptr) {
1771         LOGE("Invalid DB. Can not intercept data.");
1772         return -E_INVALID_DB;
1773     }
1774 
1775     // GetLocalDeviceName get local device ID.
1776     // GetDeviceId get remote device ID.
1777     // If intercept data fail, entries will be released.
1778     return storage_->InterceptData(syncEntry.entries, GetLocalDeviceName(), GetDeviceId(), true);
1779 }
1780 
ControlCmdStart(SingleVerSyncTaskContext * context)1781 int SingleVerDataSync::ControlCmdStart(SingleVerSyncTaskContext *context)
1782 {
1783     if (context == nullptr) {
1784         return -E_INVALID_ARGS;
1785     }
1786     std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1787     if (subManager == nullptr) {
1788         return -E_INVALID_ARGS;
1789     }
1790     int errCode = ControlCmdStartCheck(context);
1791     if (errCode != E_OK) {
1792         return errCode;
1793     }
1794     ControlRequestPacket* packet = new (std::nothrow) SubscribeRequest();
1795     if (packet == nullptr) {
1796         LOGE("[DataSync][ControlCmdStart] new SubscribeRequest error");
1797         return -E_OUT_OF_MEMORY;
1798     }
1799     if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1800         errCode = subManager->ReserveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1801         if (errCode != E_OK) {
1802             LOGE("[DataSync][ControlCmdStart] reserve local subscribe query failed,err=%d", errCode);
1803             delete packet;
1804             packet = nullptr;
1805             return errCode;
1806         }
1807     }
1808     SingleVerDataSyncUtils::FillControlRequestPacket(packet, context);
1809     errCode = SendControlPacket(packet, context);
1810     if (errCode != E_OK && context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1811         subManager->DeleteLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1812     }
1813     return errCode;
1814 }
1815 
ControlCmdRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1816 int SingleVerDataSync::ControlCmdRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1817 {
1818     const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1819     if (packet == nullptr) {
1820         return -E_INVALID_ARGS;
1821     }
1822     LOGI("[SingleVerDataSync] recv control cmd message,label=%s,dev=%s,controlType=%u", label_.c_str(),
1823         STR_MASK(GetDeviceId()), packet->GetcontrolCmdType());
1824     int errCode = ControlCmdRequestRecvPre(context, message);
1825     if (errCode != E_OK) {
1826         return errCode;
1827     }
1828     if (packet->GetcontrolCmdType() == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1829         errCode = SubscribeRequestRecv(context, message);
1830     } else if (packet->GetcontrolCmdType() == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1831         errCode = UnsubscribeRequestRecv(context, message);
1832     }
1833     return errCode;
1834 }
1835 
ControlCmdAckRecv(SingleVerSyncTaskContext * context,const Message * message)1836 int SingleVerDataSync::ControlCmdAckRecv(SingleVerSyncTaskContext *context, const Message *message)
1837 {
1838     std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1839     if (subManager == nullptr) {
1840         return -E_INVALID_ARGS;
1841     }
1842     int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1843     if (errCode != E_OK) {
1844         SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
1845         return errCode;
1846     }
1847     const ControlAckPacket *packet = message->GetObject<ControlAckPacket>();
1848     if (packet == nullptr) {
1849         return -E_INVALID_ARGS;
1850     }
1851     int32_t recvCode = packet->GetRecvCode();
1852     uint32_t cmdType = packet->GetcontrolCmdType();
1853     if (recvCode != E_OK) {
1854         LOGE("[DataSync][AckRecv] control sync abort,recvCode=%d,label=%s,dev=%s,type=%u", recvCode, label_.c_str(),
1855             STR_MASK(GetDeviceId()), cmdType);
1856         // for unsubscribe no need to do something
1857         SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
1858         return recvCode;
1859     }
1860     if (cmdType == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1861         errCode = subManager->ActiveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1862     } else if (cmdType == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1863         subManager->RemoveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1864     }
1865     if (errCode != E_OK) {
1866         LOGE("[DataSync] ack handle failed,label =%s,dev=%s,type=%u", label_.c_str(), STR_MASK(GetDeviceId()), cmdType);
1867         return errCode;
1868     }
1869     return -E_NO_DATA_SEND; // means control msg send finished
1870 }
1871 
ControlCmdStartCheck(SingleVerSyncTaskContext * context)1872 int SingleVerDataSync::ControlCmdStartCheck(SingleVerSyncTaskContext *context)
1873 {
1874     if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) &&
1875         (context->GetMode() != SyncModeType::UNSUBSCRIBE_QUERY)) {
1876         LOGE("[ControlCmdStartCheck] not support controlCmd");
1877         return -E_INVALID_ARGS;
1878     }
1879     if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY &&
1880         context->GetQuery().HasInKeys() &&
1881         context->IsNotSupportAbility(SyncConfig::INKEYS_QUERY)) {
1882         return -E_NOT_SUPPORT;
1883     }
1884     if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) || context->GetReceivcPermitCheck()) {
1885         return E_OK;
1886     }
1887     bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_, storage_);
1888     if (permitReceive) {
1889         context->SetReceivcPermitCheck(true);
1890     } else {
1891         return -E_SECURITY_OPTION_CHECK_ERROR;
1892     }
1893     return E_OK;
1894 }
1895 
SendControlPacket(ControlRequestPacket * packet,SingleVerSyncTaskContext * context)1896 int SingleVerDataSync::SendControlPacket(ControlRequestPacket *packet, SingleVerSyncTaskContext *context)
1897 {
1898     Message *message = new (std::nothrow) Message(CONTROL_SYNC_MESSAGE);
1899     if (message == nullptr) {
1900         LOGE("[DataSync][SendControlPacket] new message error");
1901         delete packet;
1902         packet = nullptr;
1903         return -E_OUT_OF_MEMORY;
1904     }
1905     uint32_t packetLen = packet->CalculateLen();
1906     int errCode = message->SetExternalObject(packet);
1907     if (errCode != E_OK) {
1908         delete packet;
1909         packet = nullptr;
1910         delete message;
1911         message = nullptr;
1912         LOGE("[DataSync][SendControlPacket] set external object failed errCode=%d", errCode);
1913         return errCode;
1914     }
1915     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1916         context->GetSequenceId(), context->GetRequestSessionId());
1917     CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1918         SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1919     };
1920     errCode = Send(context, message, handler, packetLen);
1921     if (errCode != E_OK) {
1922         delete message;
1923         message = nullptr;
1924     }
1925     return errCode;
1926 }
1927 
SendControlAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,uint32_t controlCmdType,const CommErrHandler & handler)1928 int SingleVerDataSync::SendControlAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1929     uint32_t controlCmdType, const CommErrHandler &handler)
1930 {
1931     Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1932     if (ackMessage == nullptr) {
1933         LOGE("[DataSync][SendControlAck] new message error");
1934         return -E_OUT_OF_MEMORY;
1935     }
1936     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1937     ControlAckPacket ack;
1938     ack.SetPacketHead(recvCode, version, static_cast<int32_t>(controlCmdType), 0);
1939     int errCode = ackMessage->SetCopiedObject(ack);
1940     if (errCode != E_OK) {
1941         delete ackMessage;
1942         ackMessage = nullptr;
1943         LOGE("[DataSync][SendControlAck] set copied object failed, errcode=%d", errCode);
1944         return errCode;
1945     }
1946     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1947         message->GetSequenceId(), message->GetSessionId());
1948     errCode = Send(context, ackMessage, handler, 0);
1949     if (errCode != E_OK) {
1950         delete ackMessage;
1951         ackMessage = nullptr;
1952     }
1953     return errCode;
1954 }
1955 
ControlCmdRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)1956 int SingleVerDataSync::ControlCmdRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
1957 {
1958     if (context == nullptr || message == nullptr) {
1959         return -E_INVALID_ARGS;
1960     }
1961     const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1962     if (packet == nullptr) {
1963         return -E_INVALID_ARGS;
1964     }
1965     uint32_t controlCmdType = packet->GetcontrolCmdType();
1966     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
1967         return DoAbilitySyncIfNeed(context, message, true);
1968     }
1969     if (controlCmdType >= ControlCmdType::INVALID_CONTROL_CMD) {
1970         SendControlAck(context, message, -E_NOT_SUPPORT, controlCmdType);
1971         return -E_WAIT_NEXT_MESSAGE;
1972     }
1973     return E_OK;
1974 }
1975 
SubscribeRequestRecvPre(SingleVerSyncTaskContext * context,const SubscribeRequest * packet,const Message * message)1976 int SingleVerDataSync::SubscribeRequestRecvPre(SingleVerSyncTaskContext *context, const SubscribeRequest *packet,
1977     const Message *message)
1978 {
1979     uint32_t controlCmdType = packet->GetcontrolCmdType();
1980     if (controlCmdType != ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1981         return E_OK;
1982     }
1983     QuerySyncObject syncQuery = packet->GetQuery();
1984     int errCode;
1985     if (!packet->IsAutoSubscribe()) {
1986         errCode = storage_->CheckAndInitQueryCondition(syncQuery);
1987         if (errCode != E_OK) {
1988             LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
1989             SendControlAck(context, message, errCode, controlCmdType);
1990             return -E_WAIT_NEXT_MESSAGE;
1991         }
1992     }
1993     int mode = SingleVerDataSyncUtils::GetModeByControlCmdType(
1994         static_cast<ControlCmdType>(packet->GetcontrolCmdType()));
1995     if (mode >= SyncModeType::INVALID_MODE) {
1996         LOGE("[SingleVerDataSync] invalid mode");
1997         SendControlAck(context, message, -E_INVALID_ARGS, controlCmdType);
1998         return -E_WAIT_NEXT_MESSAGE;
1999     }
2000     errCode = CheckPermitSendData(mode, context);
2001     if (errCode != E_OK) {
2002         LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
2003         SendControlAck(context, message, errCode, controlCmdType);
2004     }
2005     return errCode;
2006 }
2007 
SubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)2008 int SingleVerDataSync::SubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
2009 {
2010     const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
2011     if (packet == nullptr) {
2012         return -E_INVALID_ARGS;
2013     }
2014     int errCode = SubscribeRequestRecvPre(context, packet, message);
2015     if (errCode != E_OK) {
2016         return errCode;
2017     }
2018     uint32_t controlCmdType = packet->GetcontrolCmdType();
2019     std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
2020     if (subscribeManager == nullptr) {
2021         LOGE("[SingleVerDataSync] subscribeManager check failed");
2022         SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
2023         return -E_INVALID_ARGS;
2024     }
2025     errCode = storage_->AddSubscribe(packet->GetQuery().GetIdentify(), packet->GetQuery(), packet->IsAutoSubscribe());
2026     if (errCode != E_OK) {
2027         LOGE("[SingleVerDataSync] add trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2028             STR_MASK(GetDeviceId()));
2029         SendControlAck(context, message, errCode, controlCmdType);
2030         return errCode;
2031     }
2032     errCode = subscribeManager->ReserveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2033     if (errCode != E_OK) {
2034         LOGE("[SingleVerDataSync] add remote subscribe query failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2035             STR_MASK(GetDeviceId()));
2036         RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
2037         SendControlAck(context, message, errCode, controlCmdType);
2038         return errCode;
2039     }
2040     errCode = SendControlAck(context, message, E_OK, controlCmdType);
2041     if (errCode != E_OK) {
2042         subscribeManager->DeleteRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2043         RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
2044         LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2045             STR_MASK(GetDeviceId()));
2046         return errCode;
2047     }
2048     subscribeManager->ActiveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2049     DBInfo dbInfo;
2050     storage_->GetDBInfo(dbInfo);
2051     RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, context->GetDeviceId(), packet->GetQuery());
2052     return errCode;
2053 }
2054 
UnsubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)2055 int SingleVerDataSync::UnsubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
2056 {
2057     const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
2058     if (packet == nullptr) {
2059         return -E_INVALID_ARGS;
2060     }
2061     uint32_t controlCmdType = packet->GetcontrolCmdType();
2062     std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
2063     if (subscribeManager == nullptr) {
2064         LOGE("[SingleVerDataSync] subscribeManager check failed");
2065         SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
2066         return -E_INVALID_ARGS;
2067     }
2068     int errCode;
2069     std::lock_guard<std::mutex> autoLock(unsubscribeLock_);
2070     if (subscribeManager->IsLastRemoteContainSubscribe(context->GetDeviceId(), packet->GetQuery().GetIdentify())) {
2071         errCode = storage_->RemoveSubscribe(packet->GetQuery().GetIdentify());
2072         if (errCode != E_OK) {
2073             LOGE("[SingleVerDataSync] remove trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2074                 STR_MASK(GetDeviceId()));
2075             SendControlAck(context, message, errCode, controlCmdType);
2076             return errCode;
2077         }
2078     }
2079     errCode = SendControlAck(context, message, E_OK, controlCmdType);
2080     if (errCode != E_OK) {
2081         LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2082             STR_MASK(GetDeviceId()));
2083         return errCode;
2084     }
2085     subscribeManager->RemoveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2086     DBInfo dbInfo;
2087     storage_->GetDBInfo(dbInfo);
2088     RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, context->GetDeviceId(), packet->GetQuery());
2089     metadata_->RemoveQueryFromRecordSet(context->GetDeviceId(), packet->GetQuery().GetIdentify());
2090     return errCode;
2091 }
2092 
PutDataMsg(Message * message)2093 void SingleVerDataSync::PutDataMsg(Message *message)
2094 {
2095     return msgSchedule_.PutMsg(message);
2096 }
2097 
MoveNextDataMsg(SingleVerSyncTaskContext * context,bool & isNeedHandle,bool & isNeedContinue)2098 Message *SingleVerDataSync::MoveNextDataMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle,
2099     bool &isNeedContinue)
2100 {
2101     return msgSchedule_.MoveNextMsg(context, isNeedHandle, isNeedContinue);
2102 }
2103 
IsNeedReloadQueue()2104 bool SingleVerDataSync::IsNeedReloadQueue()
2105 {
2106     return msgSchedule_.IsNeedReloadQueue();
2107 }
2108 
ScheduleInfoHandle(bool isNeedHandleStatus,bool isNeedClearMap,const Message * message)2109 void SingleVerDataSync::ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap, const Message *message)
2110 {
2111     msgSchedule_.ScheduleInfoHandle(isNeedHandleStatus, isNeedClearMap, message);
2112 }
2113 
ClearDataMsg()2114 void SingleVerDataSync::ClearDataMsg()
2115 {
2116     msgSchedule_.ClearMsg();
2117 }
2118 
QuerySyncCheck(SingleVerSyncTaskContext * context)2119 int SingleVerDataSync::QuerySyncCheck(SingleVerSyncTaskContext *context)
2120 {
2121     if (context == nullptr) {
2122         return -E_INVALID_ARGS;
2123     }
2124     bool isCheckStatus = false;
2125     int errCode = SingleVerDataSyncUtils::QuerySyncCheck(context, isCheckStatus);
2126     if (errCode != E_OK) {
2127         return errCode;
2128     }
2129     if (!isCheckStatus) {
2130         context->SetTaskErrCode(-E_NOT_SUPPORT);
2131         return -E_NOT_SUPPORT;
2132     }
2133     return E_OK;
2134 }
2135 
RemoveSubscribeIfNeed(const std::string & queryId,const std::shared_ptr<SubscribeManager> & subscribeManager)2136 void SingleVerDataSync::RemoveSubscribeIfNeed(const std::string &queryId,
2137     const std::shared_ptr<SubscribeManager> &subscribeManager)
2138 {
2139     if (!subscribeManager->IsQueryExistSubscribe(queryId)) {
2140         storage_->RemoveSubscribe(queryId);
2141     }
2142 }
2143 } // namespace DistributedDB
2144