1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "virtual_single_ver_sync_db_Interface.h"
17 
18 #include <algorithm>
19 #include <thread>
20 
21 #include "data_compression.h"
22 #include "db_common.h"
23 #include "db_errno.h"
24 #include "generic_single_ver_kv_entry.h"
25 #include "intercepted_data_impl.h"
26 #include "log_print.h"
27 #include "platform_specific.h"
28 #include "query_object.h"
29 #include "securec.h"
30 
31 namespace DistributedDB {
32 namespace {
GetEntriesFromItems(std::vector<SingleVerKvEntry * > & entries,const std::vector<VirtualDataItem> & dataItems)33     int GetEntriesFromItems(std::vector<SingleVerKvEntry *> &entries, const std::vector<VirtualDataItem> &dataItems)
34     {
35         int errCode = E_OK;
36         for (const auto &item : dataItems) {
37             auto entry = new (std::nothrow) GenericSingleVerKvEntry();
38             if (entry == nullptr) {
39                 LOGE("Create entry failed.");
40                 errCode = -E_OUT_OF_MEMORY;
41                 break;
42             }
43             DataItem storageItem;
44             storageItem.key = item.key;
45             storageItem.value = item.value;
46             storageItem.flag = item.flag;
47             storageItem.timestamp = item.timestamp;
48             storageItem.writeTimestamp = item.writeTimestamp;
49             entry->SetEntryData(std::move(storageItem));
50             entries.push_back(entry);
51         }
52         if (errCode != E_OK) {
53             for (auto &kvEntry : entries) {
54                 delete kvEntry;
55                 kvEntry = nullptr;
56             }
57             entries.clear();
58         }
59         return errCode;
60     }
61 }
62 
VirtualSingleVerSyncDBInterface()63 VirtualSingleVerSyncDBInterface::VirtualSingleVerSyncDBInterface()
64 {
65     (void)OS::GetCurrentSysTimeInMicrosecond(dbCreateTime_);
66     LOGD("virtual device init db createTime");
67 }
68 
GetInterfaceType() const69 int VirtualSingleVerSyncDBInterface::GetInterfaceType() const
70 {
71     return SYNC_SVD;
72 }
73 
IncRefCount()74 void VirtualSingleVerSyncDBInterface::IncRefCount()
75 {
76 }
77 
DecRefCount()78 void VirtualSingleVerSyncDBInterface::DecRefCount()
79 {
80 }
81 
SetIdentifier(std::vector<uint8_t> & identifier)82 void VirtualSingleVerSyncDBInterface::SetIdentifier(std::vector<uint8_t> &identifier)
83 {
84     identifier_ = std::move(identifier);
85 }
86 
GetIdentifier() const87 std::vector<uint8_t> VirtualSingleVerSyncDBInterface::GetIdentifier() const
88 {
89     return identifier_;
90 }
91 
GetMetaData(const Key & key,Value & value) const92 int VirtualSingleVerSyncDBInterface::GetMetaData(const Key &key, Value &value) const
93 {
94     if (readBusy_) {
95         return -E_BUSY;
96     }
97     auto iter = metadata_.find(key);
98     if (iter != metadata_.end()) {
99         value = iter->second;
100         return E_OK;
101     }
102     return -E_NOT_FOUND;
103 }
104 
PutMetaData(const Key & key,const Value & value,bool isInTransaction)105 int VirtualSingleVerSyncDBInterface::PutMetaData(const Key &key, const Value &value, bool isInTransaction)
106 {
107     (void)isInTransaction;
108     if (busy_) {
109         return -E_BUSY;
110     }
111     metadata_[key] = value;
112     return E_OK;
113 }
114 
DeleteMetaData(const std::vector<Key> & keys)115 int VirtualSingleVerSyncDBInterface::DeleteMetaData(const std::vector<Key> &keys)
116 {
117     for (const auto &key : keys) {
118         (void)metadata_.erase(key);
119     }
120     return E_OK;
121 }
122 
GetAllMetaKeys(std::vector<Key> & keys) const123 int VirtualSingleVerSyncDBInterface::GetAllMetaKeys(std::vector<Key> &keys) const
124 {
125     for (auto iter = metadata_.begin(); iter != metadata_.end(); ++iter) {
126         keys.push_back(iter->first);
127     }
128     LOGD("GetAllMetaKeys size %zu", keys.size());
129     return E_OK;
130 }
131 
GetSyncData(Timestamp begin,Timestamp end,std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const132 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, std::vector<DataItem> &dataItems,
133     ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
134 {
135     return -E_NOT_SUPPORT;
136 }
137 
GetSyncDataNext(std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const138 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken,
139     const DataSizeSpecInfo &dataSizeInfo) const
140 {
141     return -E_NOT_SUPPORT;
142 }
143 
ReleaseContinueToken(ContinueToken & continueStmtToken) const144 void VirtualSingleVerSyncDBInterface::ReleaseContinueToken(ContinueToken& continueStmtToken) const
145 {
146     VirtualContinueToken *token = static_cast<VirtualContinueToken *>(continueStmtToken);
147     if (token != nullptr) {
148         delete token;
149         continueStmtToken = nullptr;
150     }
151     return;
152 }
153 
GetSchemaInfo() const154 SchemaObject VirtualSingleVerSyncDBInterface::GetSchemaInfo() const
155 {
156     return schemaObj_;
157 }
158 
CheckCompatible(const std::string & schema,uint8_t type) const159 bool VirtualSingleVerSyncDBInterface::CheckCompatible(const std::string& schema, uint8_t type) const
160 {
161     if (schema_.empty() && schema.empty() && ReadSchemaType(type) != SchemaType::UNRECOGNIZED) {
162         return true;
163     }
164     return (schemaObj_.CompareAgainstSchemaString(schema) == -E_SCHEMA_EQUAL_EXACTLY);
165 }
166 
PutData(const Key & key,const Value & value,const Timestamp & time,int flag)167 int VirtualSingleVerSyncDBInterface::PutData(const Key &key, const Value &value, const Timestamp &time, int flag)
168 {
169     VirtualDataItem item;
170     item.key = key;
171     item.value = value;
172     item.timestamp = time;
173     item.writeTimestamp = time;
174     item.flag = static_cast<uint64_t>(flag);
175     item.isLocal = true;
176     dbData_.push_back(item);
177     return E_OK;
178 }
179 
GetMaxTimestamp(Timestamp & stamp) const180 void VirtualSingleVerSyncDBInterface::GetMaxTimestamp(Timestamp& stamp) const
181 {
182     for (auto iter = dbData_.begin(); iter != dbData_.end(); ++iter) {
183         if (stamp < iter->writeTimestamp) {
184             stamp = iter->writeTimestamp;
185         }
186     }
187     LOGD("VirtualSingleVerSyncDBInterface::GetMaxTimestamp time = %" PRIu64, stamp);
188 }
189 
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)190 int VirtualSingleVerSyncDBInterface::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
191 {
192     std::lock_guard<std::mutex> autoLock(deviceDataLock_);
193     deviceData_.erase(deviceName);
194     uint32_t devId = 0;
195     if (deviceMapping_.find(deviceName) != deviceMapping_.end()) {
196         devId = deviceMapping_[deviceName];
197     }
198     for (auto &item : dbData_) {
199         if (item.deviceId == devId && devId > 0) {
200             item.flag = VirtualDataItem::DELETE_FLAG;
201         }
202     }
203     LOGD("RemoveDeviceData FINISH");
204     return E_OK;
205 }
206 
GetSyncData(const Key & key,VirtualDataItem & dataItem)207 int VirtualSingleVerSyncDBInterface::GetSyncData(const Key &key, VirtualDataItem &dataItem)
208 {
209     auto iter = std::find_if(dbData_.begin(), dbData_.end(),
210         [key](const VirtualDataItem& item) { return item.key == key; });
211     if (iter != dbData_.end()) {
212         if (iter->flag == VirtualDataItem::DELETE_FLAG) {
213             return -E_NOT_FOUND;
214         }
215         dataItem.key = iter->key;
216         dataItem.value = iter->value;
217         dataItem.timestamp = iter->timestamp;
218         dataItem.writeTimestamp = iter->writeTimestamp;
219         dataItem.flag = iter->flag;
220         dataItem.isLocal = iter->isLocal;
221         return E_OK;
222     }
223     return -E_NOT_FOUND;
224 }
225 
GetSyncData(Timestamp begin,Timestamp end,std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const226 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end,
227     std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
228     const DataSizeSpecInfo &dataSizeInfo) const
229 {
230     std::vector<VirtualDataItem> dataItems;
231     int errCode = GetSyncData(begin, end, dataSizeInfo, dataItems, continueStmtToken);
232     if ((errCode != E_OK) && (errCode != -E_UNFINISHED)) {
233         LOGE("[VirtualSingleVerSyncDBInterface][GetSyncData] GetSyncData failed err %d", errCode);
234         return errCode;
235     }
236     int innerCode = GetEntriesFromItems(entries, dataItems);
237     if (innerCode != E_OK) {
238         return innerCode;
239     }
240     return errCode;
241 }
242 
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const243 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
244     ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
245 {
246     if (continueStmtToken == nullptr) {
247         return -E_INVALID_ARGS;
248     }
249     int errCode = DataControl();
250     if (errCode != E_OK) {
251         return errCode;
252     }
253     VirtualContinueToken *token = static_cast<VirtualContinueToken *>(continueStmtToken);
254     Timestamp currentWaterMark = 0;
255     std::vector<VirtualDataItem> dataItems;
256     bool isFinished = GetDataInner(token->begin, token->end, currentWaterMark, dataSizeInfo, dataItems);
257     if (isFinished) {
258         delete token;
259         continueStmtToken = nullptr;
260     } else {
261         currentWaterMark++;
262         token->begin = currentWaterMark;
263     }
264     int innerCode = GetEntriesFromItems(entries, dataItems);
265     if (innerCode != E_OK) {
266         return innerCode;
267     }
268     return isFinished ? E_OK : -E_UNFINISHED;
269 }
270 
GetSyncData(Timestamp begin,Timestamp end,const DataSizeSpecInfo & dataSizeInfo,std::vector<VirtualDataItem> & dataItems,ContinueToken & continueStmtToken) const271 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, const DataSizeSpecInfo &dataSizeInfo,
272     std::vector<VirtualDataItem> &dataItems, ContinueToken &continueStmtToken) const
273 {
274     if (getDataDelayTime_ > 0) {
275         std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
276     }
277     int errCode = DataControl();
278     if (errCode != E_OK) {
279         return errCode;
280     }
281     Timestamp currentWaterMark = 0;
282     bool isFinished = GetDataInner(begin, end, currentWaterMark, dataSizeInfo, dataItems);
283     if (!isFinished) {
284         VirtualContinueToken *token = new(std::nothrow) VirtualContinueToken();
285         if (token == nullptr) {
286             LOGD("virtual alloc token failed");
287             dataItems.clear();
288             return -E_OUT_OF_MEMORY;
289         }
290         currentWaterMark++;
291         token->begin = currentWaterMark;
292         token->end = end;
293         continueStmtToken = static_cast<VirtualContinueToken *>(token);
294     }
295     LOGD("dataItems size %zu", dataItems.size());
296     return isFinished ? E_OK : -E_UNFINISHED;
297 }
298 
SetSaveDataDelayTime(uint64_t milliDelayTime)299 void VirtualSingleVerSyncDBInterface::SetSaveDataDelayTime(uint64_t milliDelayTime)
300 {
301     saveDataDelayTime_ = milliDelayTime;
302 }
303 
GetSyncDataNext(std::vector<VirtualDataItem> & dataItems,uint32_t blockSize,ContinueToken & continueStmtToken) const304 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<VirtualDataItem>& dataItems,
305     uint32_t blockSize, ContinueToken& continueStmtToken) const
306 {
307     if (continueStmtToken == nullptr) {
308         return -E_NOT_SUPPORT;
309     }
310     return 0;
311 }
312 
PutSyncData(std::vector<VirtualDataItem> & dataItems,const std::string & deviceName)313 int VirtualSingleVerSyncDBInterface::PutSyncData(std::vector<VirtualDataItem>& dataItems,
314     const std::string &deviceName)
315 {
316     if (dataItems.size() > 0 && deviceMapping_.find(deviceName) == deviceMapping_.end()) {
317         availableDeviceId_++;
318         deviceMapping_[deviceName] = availableDeviceId_;
319         LOGD("put deviceName=%s into device map", deviceName.c_str());
320     }
321     for (auto iter = dataItems.begin(); iter != dataItems.end(); ++iter) {
322         LOGD("PutSyncData");
323         auto dbDataIter = std::find_if(dbData_.begin(), dbData_.end(),
324             [iter](VirtualDataItem item) { return item.key == iter->key; });
325         if ((dbDataIter != dbData_.end()) && (dbDataIter->writeTimestamp < iter->writeTimestamp)) {
326             // if has conflict, compare writeTimestamp
327             LOGI("conflict data time local %" PRIu64 ", remote %" PRIu64, dbDataIter->writeTimestamp,
328                 iter->writeTimestamp);
329             dbDataIter->key = iter->key;
330             dbDataIter->value = iter->value;
331             dbDataIter->timestamp = iter->timestamp;
332             dbDataIter->writeTimestamp = iter->writeTimestamp;
333             dbDataIter->flag = iter->flag;
334             dbDataIter->isLocal = false;
335             dbDataIter->deviceId = deviceMapping_[deviceName];
336         } else {
337             LOGI("PutSyncData, use remote data %" PRIu64, iter->timestamp);
338             VirtualDataItem dataItem;
339             dataItem.key = iter->key;
340             dataItem.value = iter->value;
341             dataItem.timestamp = iter->timestamp;
342             dataItem.writeTimestamp = iter->writeTimestamp;
343             dataItem.flag = iter->flag;
344             dataItem.isLocal = false;
345             dataItem.deviceId = deviceMapping_[deviceName];
346             dbData_.push_back(dataItem);
347         }
348     }
349     return E_OK;
350 }
351 
SetSchemaInfo(const std::string & schema)352 void VirtualSingleVerSyncDBInterface::SetSchemaInfo(const std::string& schema)
353 {
354     schema_ = schema;
355     SchemaObject emptyObj;
356     schemaObj_ = emptyObj;
357     schemaObj_.ParseFromSchemaString(schema);
358 }
359 
GetDbProperties() const360 const KvDBProperties &VirtualSingleVerSyncDBInterface::GetDbProperties() const
361 {
362     return properties_;
363 }
364 
GetSecurityOption(SecurityOption & option) const365 int VirtualSingleVerSyncDBInterface::GetSecurityOption(SecurityOption &option) const
366 {
367     if (getSecurityOptionCallBack_) {
368         return getSecurityOptionCallBack_(option);
369     }
370     if (secOption_.securityLabel == NOT_SET) {
371         return -E_NOT_SUPPORT;
372     }
373     option = secOption_;
374     return E_OK;
375 }
376 
IsReadable() const377 bool VirtualSingleVerSyncDBInterface::IsReadable() const
378 {
379     return true;
380 }
381 
SetSecurityOption(SecurityOption & option)382 void VirtualSingleVerSyncDBInterface::SetSecurityOption(SecurityOption &option)
383 {
384     secOption_ = option;
385 }
386 
NotifyRemotePushFinished(const std::string & targetId) const387 void VirtualSingleVerSyncDBInterface::NotifyRemotePushFinished(const std::string &targetId) const
388 {
389     std::lock_guard<std::mutex> autoLock(pushNotifierMutex_);
390     if (pushNotifier_) {
391         pushNotifier_(targetId);
392         LOGI("[VirtualSingleVerSyncDBInterface] Notify remote push finished");
393     }
394 }
395 
GetDatabaseCreateTimestamp(Timestamp & outTime) const396 int VirtualSingleVerSyncDBInterface::GetDatabaseCreateTimestamp(Timestamp &outTime) const
397 {
398     outTime = dbCreateTime_;
399     return E_OK;
400 }
401 
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const402 int VirtualSingleVerSyncDBInterface::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
403     const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
404     std::vector<SingleVerKvEntry *> &entries) const
405 {
406     if (getDataDelayTime_ > 0) {
407         std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
408     }
409     int errCode = DataControl();
410     if (errCode != E_OK) {
411         return errCode;
412     }
413     const auto &startKey = query.GetPrefixKey();
414     Key endKey = startKey;
415     endKey.resize(DBConstant::MAX_KEY_SIZE, UCHAR_MAX);
416 
417     std::vector<VirtualDataItem> dataItems;
418     for (const auto &data : dbData_) {
419         // Only get local data.
420         if (!data.isLocal) {
421             continue;
422         }
423 
424         if ((data.flag & VirtualDataItem::DELETE_FLAG) != 0) {
425             if (data.timestamp >= timeRange.deleteBeginTime && data.timestamp < timeRange.deleteEndTime) {
426                 dataItems.push_back(data);
427             }
428         } else {
429             if (data.timestamp >= timeRange.beginTime && data.timestamp < timeRange.endTime &&
430                 data.key >= startKey && data.key <= endKey) {
431                 dataItems.push_back(data);
432             }
433         }
434     }
435 
436     LOGD("dataItems size %zu", dataItems.size());
437     return GetEntriesFromItems(entries, dataItems);
438 }
439 
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const440 int VirtualSingleVerSyncDBInterface::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
441 {
442     size_t prefixKeySize = keyPrefix.size();
443     for (auto iter = metadata_.begin(); iter != metadata_.end();) {
444         if (prefixKeySize <= iter->first.size() &&
445             keyPrefix == Key(iter->first.begin(), std::next(iter->first.begin(), prefixKeySize))) {
446             iter = metadata_.erase(iter);
447         } else {
448             ++iter;
449         }
450     }
451     return E_OK;
452 }
453 
GetCompressionOption(bool & needCompressOnSync,uint8_t & compressionRate) const454 int VirtualSingleVerSyncDBInterface::GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const
455 {
456     if (compressSync_) {
457         needCompressOnSync = true;
458         compressionRate = 100; // compress rate 100
459     }
460     return E_OK;
461 }
462 
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const463 int VirtualSingleVerSyncDBInterface::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
464 {
465     if (compressSync_) {
466         DataCompression::GetCompressionAlgo(algorithmSet);
467     }
468     return E_OK;
469 }
470 
PutSyncData(const DataItem & item)471 int VirtualSingleVerSyncDBInterface::PutSyncData(const DataItem &item)
472 {
473     return E_OK;
474 }
475 
CheckAndInitQueryCondition(QueryObject & query) const476 int VirtualSingleVerSyncDBInterface::CheckAndInitQueryCondition(QueryObject &query) const
477 {
478     return E_OK;
479 }
480 
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID,bool isPush) const481 int VirtualSingleVerSyncDBInterface::InterceptData(std::vector<SingleVerKvEntry *> &entries,
482     const std::string &sourceID, const std::string &targetID, bool isPush) const
483 {
484     return E_OK;
485 }
486 
PutSyncDataWithQuery(const QueryObject & query,const std::vector<SingleVerKvEntry * > & entries,const std::string & deviceName)487 int VirtualSingleVerSyncDBInterface::PutSyncDataWithQuery(const QueryObject &query,
488     const std::vector<SingleVerKvEntry *> &entries, const std::string &deviceName)
489 {
490     std::function<void()> callback;
491     {
492         std::lock_guard<std::mutex> autoLock(saveDataMutex_);
493         callback = saveDataCallback_;
494     }
495     if (callback) {
496         callback();
497     }
498     std::this_thread::sleep_for(std::chrono::milliseconds(saveDataDelayTime_));
499     std::vector<VirtualDataItem> dataItems;
500     for (auto kvEntry : entries) {
501         auto genericKvEntry = static_cast<GenericSingleVerKvEntry *>(kvEntry);
502         VirtualDataItem item;
503         genericKvEntry->GetKey(item.key);
504         genericKvEntry->GetValue(item.value);
505         item.timestamp = genericKvEntry->GetTimestamp();
506         item.writeTimestamp = genericKvEntry->GetWriteTimestamp();
507         item.flag = genericKvEntry->GetFlag();
508         item.isLocal = false;
509         dataItems.push_back(item);
510     }
511     return PutSyncData(dataItems, deviceName);
512 }
513 
AddSubscribe(const std::string & subscribeId,const QueryObject & query,bool needCacheSubscribe)514 int VirtualSingleVerSyncDBInterface::AddSubscribe(const std::string &subscribeId, const QueryObject &query,
515     bool needCacheSubscribe)
516 {
517     return E_OK;
518 }
519 
RemoveSubscribe(const std::string & subscribeId)520 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::string &subscribeId)
521 {
522     return E_OK;
523 }
524 
RemoveSubscribe(const std::vector<std::string> & subscribeIds)525 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::vector<std::string> &subscribeIds)
526 {
527     return E_OK;
528 }
529 
SetBusy(bool busy,bool readBusy)530 void VirtualSingleVerSyncDBInterface::SetBusy(bool busy, bool readBusy)
531 {
532     busy_ = busy;
533     readBusy_ = readBusy;
534 }
535 
PutDeviceData(const std::string & deviceName,const Key & key,const Value & value)536 void VirtualSingleVerSyncDBInterface::PutDeviceData(const std::string &deviceName, const Key &key, const Value &value)
537 {
538     std::lock_guard<std::mutex> autoLock(deviceDataLock_);
539     deviceData_[deviceName][key] = value;
540 }
541 
GetDeviceData(const std::string & deviceName,const Key & key,Value & value)542 void VirtualSingleVerSyncDBInterface::GetDeviceData(const std::string &deviceName, const Key &key, Value &value)
543 {
544     std::lock_guard<std::mutex> autoLock(deviceDataLock_);
545     value = deviceData_[deviceName][key];
546 }
547 
SetDbProperties(KvDBProperties & kvDBProperties)548 void VirtualSingleVerSyncDBInterface::SetDbProperties(KvDBProperties &kvDBProperties)
549 {
550     properties_ = kvDBProperties;
551 }
552 
DelayGetSyncData(uint32_t milliDelayTime)553 void VirtualSingleVerSyncDBInterface::DelayGetSyncData(uint32_t milliDelayTime)
554 {
555     getDataDelayTime_ = milliDelayTime;
556 }
557 
SetGetDataErrCode(int whichTime,int errCode,bool isGetDataControl)558 void VirtualSingleVerSyncDBInterface::SetGetDataErrCode(int whichTime, int errCode, bool isGetDataControl)
559 {
560     countDown_ = whichTime;
561     expectedErrCode_ = errCode;
562     isGetDataControl_ = isGetDataControl;
563 }
564 
DataControl() const565 int VirtualSingleVerSyncDBInterface::DataControl() const
566 {
567     static int getDataTimes = 0;
568     if (countDown_ == -1) { // init -1
569         getDataTimes = 0;
570     }
571     if (isGetDataControl_ && countDown_ > 0) {
572         getDataTimes++;
573     }
574     if (isGetDataControl_ && countDown_ == getDataTimes) {
575         LOGD("virtual device get data failed = %d", expectedErrCode_);
576         getDataTimes = 0;
577         return expectedErrCode_;
578     }
579     return E_OK;
580 }
581 
ResetDataControl()582 void VirtualSingleVerSyncDBInterface::ResetDataControl()
583 {
584     countDown_ = -1;
585     expectedErrCode_ = E_OK;
586 }
587 
GetDataInner(Timestamp begin,Timestamp end,Timestamp & currentWaterMark,const DataSizeSpecInfo & dataSizeInfo,std::vector<VirtualDataItem> & dataItems) const588 bool VirtualSingleVerSyncDBInterface::GetDataInner(Timestamp begin, Timestamp end, Timestamp &currentWaterMark,
589     const DataSizeSpecInfo &dataSizeInfo, std::vector<VirtualDataItem> &dataItems) const
590 {
591     bool isFinished = true;
592     for (const auto &data : dbData_) {
593         if (data.isLocal) {
594             if (dataItems.size() >= dataSizeInfo.packetSize) {
595                 LOGD("virtual device dataItem size reach to packetSize=%u", dataSizeInfo.packetSize);
596                 isFinished = false;
597                 break;
598             }
599             if (data.writeTimestamp >= begin && data.writeTimestamp < end) {
600                 dataItems.push_back(data);
601                 currentWaterMark = data.writeTimestamp;
602             }
603         }
604     }
605     return isFinished;
606 }
607 
SetSaveDataCallback(const std::function<void ()> & callback)608 void VirtualSingleVerSyncDBInterface::SetSaveDataCallback(const std::function<void()> &callback)
609 {
610     std::lock_guard<std::mutex> autoLock(saveDataMutex_);
611     saveDataCallback_ = callback;
612 }
613 
ForkGetSecurityOption(std::function<int (SecurityOption &)> callBack)614 void VirtualSingleVerSyncDBInterface::ForkGetSecurityOption(std::function<int(SecurityOption &)> callBack)
615 {
616     getSecurityOptionCallBack_ = callBack;
617 }
618 
SetPushNotifier(const std::function<void (const std::string &)> & pushNotifier)619 void VirtualSingleVerSyncDBInterface::SetPushNotifier(const std::function<void(const std::string &)> &pushNotifier)
620 {
621     std::lock_guard<std::mutex> autoLock(pushNotifierMutex_);
622     pushNotifier_ = pushNotifier;
623 }
624 
SetCompressSync(bool compressSync)625 void VirtualSingleVerSyncDBInterface::SetCompressSync(bool compressSync)
626 {
627     compressSync_ = compressSync;
628 }
629 }  // namespace DistributedDB
630