1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "virtual_single_ver_sync_db_Interface.h"
17
18 #include <algorithm>
19 #include <thread>
20
21 #include "data_compression.h"
22 #include "db_common.h"
23 #include "db_errno.h"
24 #include "generic_single_ver_kv_entry.h"
25 #include "intercepted_data_impl.h"
26 #include "log_print.h"
27 #include "platform_specific.h"
28 #include "query_object.h"
29 #include "securec.h"
30
31 namespace DistributedDB {
32 namespace {
GetEntriesFromItems(std::vector<SingleVerKvEntry * > & entries,const std::vector<VirtualDataItem> & dataItems)33 int GetEntriesFromItems(std::vector<SingleVerKvEntry *> &entries, const std::vector<VirtualDataItem> &dataItems)
34 {
35 int errCode = E_OK;
36 for (const auto &item : dataItems) {
37 auto entry = new (std::nothrow) GenericSingleVerKvEntry();
38 if (entry == nullptr) {
39 LOGE("Create entry failed.");
40 errCode = -E_OUT_OF_MEMORY;
41 break;
42 }
43 DataItem storageItem;
44 storageItem.key = item.key;
45 storageItem.value = item.value;
46 storageItem.flag = item.flag;
47 storageItem.timestamp = item.timestamp;
48 storageItem.writeTimestamp = item.writeTimestamp;
49 entry->SetEntryData(std::move(storageItem));
50 entries.push_back(entry);
51 }
52 if (errCode != E_OK) {
53 for (auto &kvEntry : entries) {
54 delete kvEntry;
55 kvEntry = nullptr;
56 }
57 entries.clear();
58 }
59 return errCode;
60 }
61 }
62
VirtualSingleVerSyncDBInterface()63 VirtualSingleVerSyncDBInterface::VirtualSingleVerSyncDBInterface()
64 {
65 (void)OS::GetCurrentSysTimeInMicrosecond(dbCreateTime_);
66 LOGD("virtual device init db createTime");
67 }
68
GetInterfaceType() const69 int VirtualSingleVerSyncDBInterface::GetInterfaceType() const
70 {
71 return SYNC_SVD;
72 }
73
IncRefCount()74 void VirtualSingleVerSyncDBInterface::IncRefCount()
75 {
76 }
77
DecRefCount()78 void VirtualSingleVerSyncDBInterface::DecRefCount()
79 {
80 }
81
SetIdentifier(std::vector<uint8_t> & identifier)82 void VirtualSingleVerSyncDBInterface::SetIdentifier(std::vector<uint8_t> &identifier)
83 {
84 identifier_ = std::move(identifier);
85 }
86
GetIdentifier() const87 std::vector<uint8_t> VirtualSingleVerSyncDBInterface::GetIdentifier() const
88 {
89 return identifier_;
90 }
91
GetMetaData(const Key & key,Value & value) const92 int VirtualSingleVerSyncDBInterface::GetMetaData(const Key &key, Value &value) const
93 {
94 if (readBusy_) {
95 return -E_BUSY;
96 }
97 auto iter = metadata_.find(key);
98 if (iter != metadata_.end()) {
99 value = iter->second;
100 return E_OK;
101 }
102 return -E_NOT_FOUND;
103 }
104
PutMetaData(const Key & key,const Value & value,bool isInTransaction)105 int VirtualSingleVerSyncDBInterface::PutMetaData(const Key &key, const Value &value, bool isInTransaction)
106 {
107 (void)isInTransaction;
108 if (busy_) {
109 return -E_BUSY;
110 }
111 metadata_[key] = value;
112 return E_OK;
113 }
114
DeleteMetaData(const std::vector<Key> & keys)115 int VirtualSingleVerSyncDBInterface::DeleteMetaData(const std::vector<Key> &keys)
116 {
117 for (const auto &key : keys) {
118 (void)metadata_.erase(key);
119 }
120 return E_OK;
121 }
122
GetAllMetaKeys(std::vector<Key> & keys) const123 int VirtualSingleVerSyncDBInterface::GetAllMetaKeys(std::vector<Key> &keys) const
124 {
125 for (auto iter = metadata_.begin(); iter != metadata_.end(); ++iter) {
126 keys.push_back(iter->first);
127 }
128 LOGD("GetAllMetaKeys size %zu", keys.size());
129 return E_OK;
130 }
131
GetSyncData(Timestamp begin,Timestamp end,std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const132 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, std::vector<DataItem> &dataItems,
133 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
134 {
135 return -E_NOT_SUPPORT;
136 }
137
GetSyncDataNext(std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const138 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken,
139 const DataSizeSpecInfo &dataSizeInfo) const
140 {
141 return -E_NOT_SUPPORT;
142 }
143
ReleaseContinueToken(ContinueToken & continueStmtToken) const144 void VirtualSingleVerSyncDBInterface::ReleaseContinueToken(ContinueToken& continueStmtToken) const
145 {
146 VirtualContinueToken *token = static_cast<VirtualContinueToken *>(continueStmtToken);
147 if (token != nullptr) {
148 delete token;
149 continueStmtToken = nullptr;
150 }
151 return;
152 }
153
GetSchemaInfo() const154 SchemaObject VirtualSingleVerSyncDBInterface::GetSchemaInfo() const
155 {
156 return schemaObj_;
157 }
158
CheckCompatible(const std::string & schema,uint8_t type) const159 bool VirtualSingleVerSyncDBInterface::CheckCompatible(const std::string& schema, uint8_t type) const
160 {
161 if (schema_.empty() && schema.empty() && ReadSchemaType(type) != SchemaType::UNRECOGNIZED) {
162 return true;
163 }
164 return (schemaObj_.CompareAgainstSchemaString(schema) == -E_SCHEMA_EQUAL_EXACTLY);
165 }
166
PutData(const Key & key,const Value & value,const Timestamp & time,int flag)167 int VirtualSingleVerSyncDBInterface::PutData(const Key &key, const Value &value, const Timestamp &time, int flag)
168 {
169 VirtualDataItem item;
170 item.key = key;
171 item.value = value;
172 item.timestamp = time;
173 item.writeTimestamp = time;
174 item.flag = static_cast<uint64_t>(flag);
175 item.isLocal = true;
176 dbData_.push_back(item);
177 return E_OK;
178 }
179
GetMaxTimestamp(Timestamp & stamp) const180 void VirtualSingleVerSyncDBInterface::GetMaxTimestamp(Timestamp& stamp) const
181 {
182 for (auto iter = dbData_.begin(); iter != dbData_.end(); ++iter) {
183 if (stamp < iter->writeTimestamp) {
184 stamp = iter->writeTimestamp;
185 }
186 }
187 LOGD("VirtualSingleVerSyncDBInterface::GetMaxTimestamp time = %" PRIu64, stamp);
188 }
189
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)190 int VirtualSingleVerSyncDBInterface::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
191 {
192 std::lock_guard<std::mutex> autoLock(deviceDataLock_);
193 deviceData_.erase(deviceName);
194 uint32_t devId = 0;
195 if (deviceMapping_.find(deviceName) != deviceMapping_.end()) {
196 devId = deviceMapping_[deviceName];
197 }
198 for (auto &item : dbData_) {
199 if (item.deviceId == devId && devId > 0) {
200 item.flag = VirtualDataItem::DELETE_FLAG;
201 }
202 }
203 LOGD("RemoveDeviceData FINISH");
204 return E_OK;
205 }
206
GetSyncData(const Key & key,VirtualDataItem & dataItem)207 int VirtualSingleVerSyncDBInterface::GetSyncData(const Key &key, VirtualDataItem &dataItem)
208 {
209 auto iter = std::find_if(dbData_.begin(), dbData_.end(),
210 [key](const VirtualDataItem& item) { return item.key == key; });
211 if (iter != dbData_.end()) {
212 if (iter->flag == VirtualDataItem::DELETE_FLAG) {
213 return -E_NOT_FOUND;
214 }
215 dataItem.key = iter->key;
216 dataItem.value = iter->value;
217 dataItem.timestamp = iter->timestamp;
218 dataItem.writeTimestamp = iter->writeTimestamp;
219 dataItem.flag = iter->flag;
220 dataItem.isLocal = iter->isLocal;
221 return E_OK;
222 }
223 return -E_NOT_FOUND;
224 }
225
GetSyncData(Timestamp begin,Timestamp end,std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const226 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end,
227 std::vector<SingleVerKvEntry *> &entries, ContinueToken &continueStmtToken,
228 const DataSizeSpecInfo &dataSizeInfo) const
229 {
230 std::vector<VirtualDataItem> dataItems;
231 int errCode = GetSyncData(begin, end, dataSizeInfo, dataItems, continueStmtToken);
232 if ((errCode != E_OK) && (errCode != -E_UNFINISHED)) {
233 LOGE("[VirtualSingleVerSyncDBInterface][GetSyncData] GetSyncData failed err %d", errCode);
234 return errCode;
235 }
236 int innerCode = GetEntriesFromItems(entries, dataItems);
237 if (innerCode != E_OK) {
238 return innerCode;
239 }
240 return errCode;
241 }
242
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const243 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
244 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
245 {
246 if (continueStmtToken == nullptr) {
247 return -E_INVALID_ARGS;
248 }
249 int errCode = DataControl();
250 if (errCode != E_OK) {
251 return errCode;
252 }
253 VirtualContinueToken *token = static_cast<VirtualContinueToken *>(continueStmtToken);
254 Timestamp currentWaterMark = 0;
255 std::vector<VirtualDataItem> dataItems;
256 bool isFinished = GetDataInner(token->begin, token->end, currentWaterMark, dataSizeInfo, dataItems);
257 if (isFinished) {
258 delete token;
259 continueStmtToken = nullptr;
260 } else {
261 currentWaterMark++;
262 token->begin = currentWaterMark;
263 }
264 int innerCode = GetEntriesFromItems(entries, dataItems);
265 if (innerCode != E_OK) {
266 return innerCode;
267 }
268 return isFinished ? E_OK : -E_UNFINISHED;
269 }
270
GetSyncData(Timestamp begin,Timestamp end,const DataSizeSpecInfo & dataSizeInfo,std::vector<VirtualDataItem> & dataItems,ContinueToken & continueStmtToken) const271 int VirtualSingleVerSyncDBInterface::GetSyncData(Timestamp begin, Timestamp end, const DataSizeSpecInfo &dataSizeInfo,
272 std::vector<VirtualDataItem> &dataItems, ContinueToken &continueStmtToken) const
273 {
274 if (getDataDelayTime_ > 0) {
275 std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
276 }
277 int errCode = DataControl();
278 if (errCode != E_OK) {
279 return errCode;
280 }
281 Timestamp currentWaterMark = 0;
282 bool isFinished = GetDataInner(begin, end, currentWaterMark, dataSizeInfo, dataItems);
283 if (!isFinished) {
284 VirtualContinueToken *token = new(std::nothrow) VirtualContinueToken();
285 if (token == nullptr) {
286 LOGD("virtual alloc token failed");
287 dataItems.clear();
288 return -E_OUT_OF_MEMORY;
289 }
290 currentWaterMark++;
291 token->begin = currentWaterMark;
292 token->end = end;
293 continueStmtToken = static_cast<VirtualContinueToken *>(token);
294 }
295 LOGD("dataItems size %zu", dataItems.size());
296 return isFinished ? E_OK : -E_UNFINISHED;
297 }
298
SetSaveDataDelayTime(uint64_t milliDelayTime)299 void VirtualSingleVerSyncDBInterface::SetSaveDataDelayTime(uint64_t milliDelayTime)
300 {
301 saveDataDelayTime_ = milliDelayTime;
302 }
303
GetSyncDataNext(std::vector<VirtualDataItem> & dataItems,uint32_t blockSize,ContinueToken & continueStmtToken) const304 int VirtualSingleVerSyncDBInterface::GetSyncDataNext(std::vector<VirtualDataItem>& dataItems,
305 uint32_t blockSize, ContinueToken& continueStmtToken) const
306 {
307 if (continueStmtToken == nullptr) {
308 return -E_NOT_SUPPORT;
309 }
310 return 0;
311 }
312
PutSyncData(std::vector<VirtualDataItem> & dataItems,const std::string & deviceName)313 int VirtualSingleVerSyncDBInterface::PutSyncData(std::vector<VirtualDataItem>& dataItems,
314 const std::string &deviceName)
315 {
316 if (dataItems.size() > 0 && deviceMapping_.find(deviceName) == deviceMapping_.end()) {
317 availableDeviceId_++;
318 deviceMapping_[deviceName] = availableDeviceId_;
319 LOGD("put deviceName=%s into device map", deviceName.c_str());
320 }
321 for (auto iter = dataItems.begin(); iter != dataItems.end(); ++iter) {
322 LOGD("PutSyncData");
323 auto dbDataIter = std::find_if(dbData_.begin(), dbData_.end(),
324 [iter](VirtualDataItem item) { return item.key == iter->key; });
325 if ((dbDataIter != dbData_.end()) && (dbDataIter->writeTimestamp < iter->writeTimestamp)) {
326 // if has conflict, compare writeTimestamp
327 LOGI("conflict data time local %" PRIu64 ", remote %" PRIu64, dbDataIter->writeTimestamp,
328 iter->writeTimestamp);
329 dbDataIter->key = iter->key;
330 dbDataIter->value = iter->value;
331 dbDataIter->timestamp = iter->timestamp;
332 dbDataIter->writeTimestamp = iter->writeTimestamp;
333 dbDataIter->flag = iter->flag;
334 dbDataIter->isLocal = false;
335 dbDataIter->deviceId = deviceMapping_[deviceName];
336 } else {
337 LOGI("PutSyncData, use remote data %" PRIu64, iter->timestamp);
338 VirtualDataItem dataItem;
339 dataItem.key = iter->key;
340 dataItem.value = iter->value;
341 dataItem.timestamp = iter->timestamp;
342 dataItem.writeTimestamp = iter->writeTimestamp;
343 dataItem.flag = iter->flag;
344 dataItem.isLocal = false;
345 dataItem.deviceId = deviceMapping_[deviceName];
346 dbData_.push_back(dataItem);
347 }
348 }
349 return E_OK;
350 }
351
SetSchemaInfo(const std::string & schema)352 void VirtualSingleVerSyncDBInterface::SetSchemaInfo(const std::string& schema)
353 {
354 schema_ = schema;
355 SchemaObject emptyObj;
356 schemaObj_ = emptyObj;
357 schemaObj_.ParseFromSchemaString(schema);
358 }
359
GetDbProperties() const360 const KvDBProperties &VirtualSingleVerSyncDBInterface::GetDbProperties() const
361 {
362 return properties_;
363 }
364
GetSecurityOption(SecurityOption & option) const365 int VirtualSingleVerSyncDBInterface::GetSecurityOption(SecurityOption &option) const
366 {
367 if (getSecurityOptionCallBack_) {
368 return getSecurityOptionCallBack_(option);
369 }
370 if (secOption_.securityLabel == NOT_SET) {
371 return -E_NOT_SUPPORT;
372 }
373 option = secOption_;
374 return E_OK;
375 }
376
IsReadable() const377 bool VirtualSingleVerSyncDBInterface::IsReadable() const
378 {
379 return true;
380 }
381
SetSecurityOption(SecurityOption & option)382 void VirtualSingleVerSyncDBInterface::SetSecurityOption(SecurityOption &option)
383 {
384 secOption_ = option;
385 }
386
NotifyRemotePushFinished(const std::string & targetId) const387 void VirtualSingleVerSyncDBInterface::NotifyRemotePushFinished(const std::string &targetId) const
388 {
389 std::lock_guard<std::mutex> autoLock(pushNotifierMutex_);
390 if (pushNotifier_) {
391 pushNotifier_(targetId);
392 LOGI("[VirtualSingleVerSyncDBInterface] Notify remote push finished");
393 }
394 }
395
GetDatabaseCreateTimestamp(Timestamp & outTime) const396 int VirtualSingleVerSyncDBInterface::GetDatabaseCreateTimestamp(Timestamp &outTime) const
397 {
398 outTime = dbCreateTime_;
399 return E_OK;
400 }
401
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const402 int VirtualSingleVerSyncDBInterface::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
403 const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
404 std::vector<SingleVerKvEntry *> &entries) const
405 {
406 if (getDataDelayTime_ > 0) {
407 std::this_thread::sleep_for(std::chrono::milliseconds(getDataDelayTime_));
408 }
409 int errCode = DataControl();
410 if (errCode != E_OK) {
411 return errCode;
412 }
413 const auto &startKey = query.GetPrefixKey();
414 Key endKey = startKey;
415 endKey.resize(DBConstant::MAX_KEY_SIZE, UCHAR_MAX);
416
417 std::vector<VirtualDataItem> dataItems;
418 for (const auto &data : dbData_) {
419 // Only get local data.
420 if (!data.isLocal) {
421 continue;
422 }
423
424 if ((data.flag & VirtualDataItem::DELETE_FLAG) != 0) {
425 if (data.timestamp >= timeRange.deleteBeginTime && data.timestamp < timeRange.deleteEndTime) {
426 dataItems.push_back(data);
427 }
428 } else {
429 if (data.timestamp >= timeRange.beginTime && data.timestamp < timeRange.endTime &&
430 data.key >= startKey && data.key <= endKey) {
431 dataItems.push_back(data);
432 }
433 }
434 }
435
436 LOGD("dataItems size %zu", dataItems.size());
437 return GetEntriesFromItems(entries, dataItems);
438 }
439
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const440 int VirtualSingleVerSyncDBInterface::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
441 {
442 size_t prefixKeySize = keyPrefix.size();
443 for (auto iter = metadata_.begin(); iter != metadata_.end();) {
444 if (prefixKeySize <= iter->first.size() &&
445 keyPrefix == Key(iter->first.begin(), std::next(iter->first.begin(), prefixKeySize))) {
446 iter = metadata_.erase(iter);
447 } else {
448 ++iter;
449 }
450 }
451 return E_OK;
452 }
453
GetCompressionOption(bool & needCompressOnSync,uint8_t & compressionRate) const454 int VirtualSingleVerSyncDBInterface::GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const
455 {
456 if (compressSync_) {
457 needCompressOnSync = true;
458 compressionRate = 100; // compress rate 100
459 }
460 return E_OK;
461 }
462
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const463 int VirtualSingleVerSyncDBInterface::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
464 {
465 if (compressSync_) {
466 DataCompression::GetCompressionAlgo(algorithmSet);
467 }
468 return E_OK;
469 }
470
PutSyncData(const DataItem & item)471 int VirtualSingleVerSyncDBInterface::PutSyncData(const DataItem &item)
472 {
473 return E_OK;
474 }
475
CheckAndInitQueryCondition(QueryObject & query) const476 int VirtualSingleVerSyncDBInterface::CheckAndInitQueryCondition(QueryObject &query) const
477 {
478 return E_OK;
479 }
480
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID,bool isPush) const481 int VirtualSingleVerSyncDBInterface::InterceptData(std::vector<SingleVerKvEntry *> &entries,
482 const std::string &sourceID, const std::string &targetID, bool isPush) const
483 {
484 return E_OK;
485 }
486
PutSyncDataWithQuery(const QueryObject & query,const std::vector<SingleVerKvEntry * > & entries,const std::string & deviceName)487 int VirtualSingleVerSyncDBInterface::PutSyncDataWithQuery(const QueryObject &query,
488 const std::vector<SingleVerKvEntry *> &entries, const std::string &deviceName)
489 {
490 std::function<void()> callback;
491 {
492 std::lock_guard<std::mutex> autoLock(saveDataMutex_);
493 callback = saveDataCallback_;
494 }
495 if (callback) {
496 callback();
497 }
498 std::this_thread::sleep_for(std::chrono::milliseconds(saveDataDelayTime_));
499 std::vector<VirtualDataItem> dataItems;
500 for (auto kvEntry : entries) {
501 auto genericKvEntry = static_cast<GenericSingleVerKvEntry *>(kvEntry);
502 VirtualDataItem item;
503 genericKvEntry->GetKey(item.key);
504 genericKvEntry->GetValue(item.value);
505 item.timestamp = genericKvEntry->GetTimestamp();
506 item.writeTimestamp = genericKvEntry->GetWriteTimestamp();
507 item.flag = genericKvEntry->GetFlag();
508 item.isLocal = false;
509 dataItems.push_back(item);
510 }
511 return PutSyncData(dataItems, deviceName);
512 }
513
AddSubscribe(const std::string & subscribeId,const QueryObject & query,bool needCacheSubscribe)514 int VirtualSingleVerSyncDBInterface::AddSubscribe(const std::string &subscribeId, const QueryObject &query,
515 bool needCacheSubscribe)
516 {
517 return E_OK;
518 }
519
RemoveSubscribe(const std::string & subscribeId)520 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::string &subscribeId)
521 {
522 return E_OK;
523 }
524
RemoveSubscribe(const std::vector<std::string> & subscribeIds)525 int VirtualSingleVerSyncDBInterface::RemoveSubscribe(const std::vector<std::string> &subscribeIds)
526 {
527 return E_OK;
528 }
529
SetBusy(bool busy,bool readBusy)530 void VirtualSingleVerSyncDBInterface::SetBusy(bool busy, bool readBusy)
531 {
532 busy_ = busy;
533 readBusy_ = readBusy;
534 }
535
PutDeviceData(const std::string & deviceName,const Key & key,const Value & value)536 void VirtualSingleVerSyncDBInterface::PutDeviceData(const std::string &deviceName, const Key &key, const Value &value)
537 {
538 std::lock_guard<std::mutex> autoLock(deviceDataLock_);
539 deviceData_[deviceName][key] = value;
540 }
541
GetDeviceData(const std::string & deviceName,const Key & key,Value & value)542 void VirtualSingleVerSyncDBInterface::GetDeviceData(const std::string &deviceName, const Key &key, Value &value)
543 {
544 std::lock_guard<std::mutex> autoLock(deviceDataLock_);
545 value = deviceData_[deviceName][key];
546 }
547
SetDbProperties(KvDBProperties & kvDBProperties)548 void VirtualSingleVerSyncDBInterface::SetDbProperties(KvDBProperties &kvDBProperties)
549 {
550 properties_ = kvDBProperties;
551 }
552
DelayGetSyncData(uint32_t milliDelayTime)553 void VirtualSingleVerSyncDBInterface::DelayGetSyncData(uint32_t milliDelayTime)
554 {
555 getDataDelayTime_ = milliDelayTime;
556 }
557
SetGetDataErrCode(int whichTime,int errCode,bool isGetDataControl)558 void VirtualSingleVerSyncDBInterface::SetGetDataErrCode(int whichTime, int errCode, bool isGetDataControl)
559 {
560 countDown_ = whichTime;
561 expectedErrCode_ = errCode;
562 isGetDataControl_ = isGetDataControl;
563 }
564
DataControl() const565 int VirtualSingleVerSyncDBInterface::DataControl() const
566 {
567 static int getDataTimes = 0;
568 if (countDown_ == -1) { // init -1
569 getDataTimes = 0;
570 }
571 if (isGetDataControl_ && countDown_ > 0) {
572 getDataTimes++;
573 }
574 if (isGetDataControl_ && countDown_ == getDataTimes) {
575 LOGD("virtual device get data failed = %d", expectedErrCode_);
576 getDataTimes = 0;
577 return expectedErrCode_;
578 }
579 return E_OK;
580 }
581
ResetDataControl()582 void VirtualSingleVerSyncDBInterface::ResetDataControl()
583 {
584 countDown_ = -1;
585 expectedErrCode_ = E_OK;
586 }
587
GetDataInner(Timestamp begin,Timestamp end,Timestamp & currentWaterMark,const DataSizeSpecInfo & dataSizeInfo,std::vector<VirtualDataItem> & dataItems) const588 bool VirtualSingleVerSyncDBInterface::GetDataInner(Timestamp begin, Timestamp end, Timestamp ¤tWaterMark,
589 const DataSizeSpecInfo &dataSizeInfo, std::vector<VirtualDataItem> &dataItems) const
590 {
591 bool isFinished = true;
592 for (const auto &data : dbData_) {
593 if (data.isLocal) {
594 if (dataItems.size() >= dataSizeInfo.packetSize) {
595 LOGD("virtual device dataItem size reach to packetSize=%u", dataSizeInfo.packetSize);
596 isFinished = false;
597 break;
598 }
599 if (data.writeTimestamp >= begin && data.writeTimestamp < end) {
600 dataItems.push_back(data);
601 currentWaterMark = data.writeTimestamp;
602 }
603 }
604 }
605 return isFinished;
606 }
607
SetSaveDataCallback(const std::function<void ()> & callback)608 void VirtualSingleVerSyncDBInterface::SetSaveDataCallback(const std::function<void()> &callback)
609 {
610 std::lock_guard<std::mutex> autoLock(saveDataMutex_);
611 saveDataCallback_ = callback;
612 }
613
ForkGetSecurityOption(std::function<int (SecurityOption &)> callBack)614 void VirtualSingleVerSyncDBInterface::ForkGetSecurityOption(std::function<int(SecurityOption &)> callBack)
615 {
616 getSecurityOptionCallBack_ = callBack;
617 }
618
SetPushNotifier(const std::function<void (const std::string &)> & pushNotifier)619 void VirtualSingleVerSyncDBInterface::SetPushNotifier(const std::function<void(const std::string &)> &pushNotifier)
620 {
621 std::lock_guard<std::mutex> autoLock(pushNotifierMutex_);
622 pushNotifier_ = pushNotifier;
623 }
624
SetCompressSync(bool compressSync)625 void VirtualSingleVerSyncDBInterface::SetCompressSync(bool compressSync)
626 {
627 compressSync_ = compressSync;
628 }
629 } // namespace DistributedDB
630