1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "KVDBGeneralStore"
16 #include "kvdb_general_store.h"
17
18 #include <endian.h>
19 #include "bootstrap.h"
20 #include "checker/checker_manager.h"
21 #include "cloud/cloud_sync_finished_event.h"
22 #include "cloud/schema_meta.h"
23 #include "crypto_manager.h"
24 #include "device_matrix.h"
25 #include "directory/directory_manager.h"
26 #include "eventcenter/event_center.h"
27 #include "kvdb_query.h"
28 #include "log_print.h"
29 #include "metadata/meta_data_manager.h"
30 #include "metadata/secret_key_meta_data.h"
31 #include "metadata/store_meta_data_local.h"
32 #include "query_helper.h"
33 #include "rdb_cloud.h"
34 #include "snapshot/bind_event.h"
35 #include "types.h"
36 #include "user_delegate.h"
37 #include "utils/anonymous.h"
38 #include "water_version_manager.h"
39 #include "device_manager_adapter.h"
40 #include "utils/anonymous.h"
41 #include "app_id_mapping/app_id_mapping_config_manager.h"
42
43 namespace OHOS::DistributedKv {
44 using namespace DistributedData;
45 using namespace DistributedDB;
46 using DBField = DistributedDB::Field;
47 using DBTable = DistributedDB::TableSchema;
48 using DBSchema = DistributedDB::DataBaseSchema;
49 using ClearMode = DistributedDB::ClearMode;
50 using DMAdapter = DistributedData::DeviceManagerAdapter;
51 using DBInterceptedData = DistributedDB::InterceptedData;
52 constexpr int UUID_WIDTH = 4;
53 const std::map<DBStatus, KVDBGeneralStore::GenErr> KVDBGeneralStore::dbStatusMap_ = {
54 { DBStatus::OK, GenErr::E_OK },
55 { DBStatus::CLOUD_NETWORK_ERROR, GenErr::E_NETWORK_ERROR },
56 { DBStatus::CLOUD_LOCK_ERROR, GenErr::E_LOCKED_BY_OTHERS },
57 { DBStatus::CLOUD_FULL_RECORDS, GenErr::E_RECODE_LIMIT_EXCEEDED },
58 { DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT, GenErr::E_NO_SPACE_FOR_ASSET },
59 { DBStatus::CLOUD_SYNC_TASK_MERGED, GenErr::E_SYNC_TASK_MERGED },
60 { DBStatus::BUSY, GenErr::E_DB_ERROR },
61 { DBStatus::DB_ERROR, GenErr::E_DB_ERROR },
62 { DBStatus::INVALID_ARGS, GenErr::E_INVALID_ARGS },
63 { DBStatus::NOT_FOUND, GenErr::E_RECORD_NOT_FOUND },
64 { DBStatus::INVALID_VALUE_FIELDS, GenErr::E_INVALID_VALUE_FIELDS },
65 { DBStatus::INVALID_FIELD_TYPE, GenErr::E_INVALID_FIELD_TYPE },
66 { DBStatus::CONSTRAIN_VIOLATION, GenErr::E_CONSTRAIN_VIOLATION },
67 { DBStatus::INVALID_FORMAT, GenErr::E_INVALID_FORMAT },
68 { DBStatus::INVALID_QUERY_FORMAT, GenErr::E_INVALID_QUERY_FORMAT },
69 { DBStatus::INVALID_QUERY_FIELD, GenErr::E_INVALID_QUERY_FIELD },
70 { DBStatus::NOT_SUPPORT, GenErr::E_NOT_SUPPORT },
71 { DBStatus::TIME_OUT, GenErr::E_TIME_OUT },
72 { DBStatus::OVER_MAX_LIMITS, GenErr::E_OVER_MAX_LIMITS },
73 { DBStatus::EKEYREVOKED_ERROR, GenErr::E_SECURITY_LEVEL_ERROR },
74 { DBStatus::SECURITY_OPTION_CHECK_ERROR, GenErr::E_SECURITY_LEVEL_ERROR },
75 };
76
77 constexpr uint32_t LOCK_TIMEOUT = 3600; // second
GetDBSchema(const Database & database)78 static DBSchema GetDBSchema(const Database &database)
79 {
80 DBSchema schema;
81 schema.tables.resize(database.tables.size());
82 for (size_t i = 0; i < database.tables.size(); i++) {
83 const Table &table = database.tables[i];
84 DBTable &dbTable = schema.tables[i];
85 dbTable.name = table.name;
86 dbTable.sharedTableName = table.sharedTableName;
87 for (auto &field : table.fields) {
88 DBField dbField;
89 dbField.colName = field.colName;
90 dbField.type = field.type;
91 dbField.primary = field.primary;
92 dbField.nullable = field.nullable;
93 dbTable.fields.push_back(std::move(dbField));
94 }
95 }
96 return schema;
97 }
GetDBPassword(const StoreMetaData & data)98 KVDBGeneralStore::DBPassword KVDBGeneralStore::GetDBPassword(const StoreMetaData &data)
99 {
100 DBPassword dbPassword;
101 if (!data.isEncrypt) {
102 return dbPassword;
103 }
104
105 SecretKeyMetaData secretKey;
106 secretKey.storeType = data.storeType;
107 auto storeKey = data.GetSecretKey();
108 MetaDataManager::GetInstance().LoadMeta(storeKey, secretKey, true);
109 std::vector<uint8_t> password;
110 CryptoManager::GetInstance().Decrypt(secretKey.sKey, password);
111 dbPassword.SetValue(password.data(), password.size());
112 password.assign(password.size(), 0);
113 return dbPassword;
114 }
115
GetDBSecurity(int32_t secLevel)116 KVDBGeneralStore::DBSecurity KVDBGeneralStore::GetDBSecurity(int32_t secLevel)
117 {
118 if (secLevel < SecurityLevel::NO_LABEL || secLevel > SecurityLevel::S4) {
119 return { DistributedDB::NOT_SET, DistributedDB::ECE };
120 }
121 if (secLevel == SecurityLevel::S3) {
122 return { DistributedDB::S3, DistributedDB::SECE };
123 }
124 if (secLevel == SecurityLevel::S4) {
125 return { DistributedDB::S4, DistributedDB::ECE };
126 }
127 return { secLevel, DistributedDB::ECE };
128 }
129
GetDBOption(const StoreMetaData & data,const DBPassword & password)130 KVDBGeneralStore::DBOption KVDBGeneralStore::GetDBOption(const StoreMetaData &data, const DBPassword &password)
131 {
132 DBOption dbOption;
133 dbOption.createIfNecessary = false;
134 dbOption.isMemoryDb = false;
135 dbOption.isEncryptedDb = data.isEncrypt;
136 dbOption.isNeedCompressOnSync = data.isNeedCompress;
137 if (data.isEncrypt) {
138 dbOption.cipher = DistributedDB::CipherType::AES_256_GCM;
139 dbOption.passwd = password;
140 }
141 StoreMetaDataLocal local;
142 MetaDataManager::GetInstance().LoadMeta(data.GetKeyLocal(), local, true);
143 if (local.isPublic || data.storeType == KvStoreType::DEVICE_COLLABORATION) {
144 dbOption.conflictResolvePolicy = DistributedDB::DEVICE_COLLABORATION;
145 } else if (data.storeType == KvStoreType::SINGLE_VERSION) {
146 dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
147 }
148 if (data.appId == Bootstrap::GetInstance().GetProcessLabel()) {
149 dbOption.compressionRate = META_COMPRESS_RATE;
150 dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
151 } else {
152 dbOption.syncDualTupleMode = true; // tuple of (appid+storeid)
153 }
154 dbOption.schema = data.schema;
155 dbOption.createDirByStoreIdOnly = true;
156 dbOption.secOption = GetDBSecurity(data.securityLevel);
157 return dbOption;
158 }
159
KVDBGeneralStore(const StoreMetaData & meta)160 KVDBGeneralStore::KVDBGeneralStore(const StoreMetaData &meta)
161 : manager_(meta.appId, meta.appId == Bootstrap::GetInstance().GetProcessLabel() ? defaultAccountId : meta.user,
162 meta.instanceId)
163 {
164 observer_.storeId_ = meta.storeId;
165 StoreMetaDataLocal local;
166 MetaDataManager::GetInstance().LoadMeta(meta.GetKeyLocal(), local, true);
167 isPublic_ = local.isPublic;
168 DBStatus status = DBStatus::NOT_FOUND;
169 manager_.SetKvStoreConfig({ meta.dataDir });
170 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
171 manager_.GetKvStore(
172 meta.storeId, GetDBOption(meta, GetDBPassword(meta)), [&status, this](auto dbStatus, auto *tmpStore) {
173 status = dbStatus;
174 delegate_ = tmpStore;
175 });
176 if (delegate_ == nullptr || status != DBStatus::OK) {
177 ZLOGE("GetKvStore failed. delegate is null?[%{public}d], status = %{public}d", delegate_ == nullptr, status);
178 manager_.CloseKvStore(delegate_);
179 return;
180 }
181 SetDBPushDataInterceptor(meta.storeType);
182 SetDBReceiveDataInterceptor(meta.storeType);
183 delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_FOREIGN, &observer_);
184 delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_CLOUD, &observer_);
185 if (DeviceMatrix::GetInstance().IsDynamic(meta) || DeviceMatrix::GetInstance().IsStatics(meta)) {
186 delegate_->SetRemotePushFinishedNotify([meta](const DistributedDB::RemotePushNotifyInfo &info) {
187 DeviceMatrix::GetInstance().OnExchanged(info.deviceId, meta, DeviceMatrix::ChangeType::CHANGE_REMOTE);
188 });
189 }
190 if (meta.isAutoSync) {
191 bool param = true;
192 auto data = static_cast<DistributedDB::PragmaData>(¶m);
193 delegate_->Pragma(DistributedDB::SET_SYNC_RETRY, data);
194 }
195 storeInfo_.tokenId = meta.tokenId;
196 storeInfo_.bundleName = meta.bundleName;
197 storeInfo_.storeName = meta.storeId;
198 storeInfo_.instanceId = meta.instanceId;
199 storeInfo_.user = std::stoi(meta.user);
200 enableCloud_ = meta.enableCloud;
201 }
202
~KVDBGeneralStore()203 KVDBGeneralStore::~KVDBGeneralStore()
204 {
205 {
206 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
207 if (delegate_ != nullptr) {
208 delegate_->UnRegisterObserver(&observer_);
209 }
210 manager_.CloseKvStore(delegate_);
211 delegate_ = nullptr;
212 }
213 for (auto &bindInfo_ : bindInfos_) {
214 if (bindInfo_.db_ != nullptr) {
215 bindInfo_.db_->Close();
216 }
217 }
218 bindInfos_.clear();
219 dbClouds_.clear();
220 }
221
BindSnapshots(std::shared_ptr<std::map<std::string,std::shared_ptr<Snapshot>>> bindAssets)222 int32_t KVDBGeneralStore::BindSnapshots(std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>> bindAssets)
223 {
224 return GenErr::E_NOT_SUPPORT;
225 }
226
Bind(Database & database,const std::map<uint32_t,BindInfo> & bindInfos,const CloudConfig & config)227 int32_t KVDBGeneralStore::Bind(Database &database, const std::map<uint32_t, BindInfo> &bindInfos,
228 const CloudConfig &config)
229 {
230 if (bindInfos.empty()) {
231 ZLOGW("No cloudDB!");
232 return GeneralError::E_OK;
233 }
234
235 std::map<std::string, DataBaseSchema> schemas;
236 auto dbSchema = GetDBSchema(database);
237 for (auto &[userId, bindInfo] : bindInfos) {
238 if (bindInfo.db_ == nullptr) {
239 return GeneralError::E_INVALID_ARGS;
240 }
241
242 if (isBound_.exchange(true)) {
243 return GeneralError::E_OK;
244 }
245
246 dbClouds_.insert({ std::to_string(userId), std::make_shared<DistributedRdb::RdbCloud>(bindInfo.db_, nullptr) });
247 bindInfos_.insert(std::move(bindInfo));
248 schemas.insert({ std::to_string(userId), dbSchema });
249 }
250 DistributedDB::CloudSyncConfig dbConfig;
251 dbConfig.maxUploadCount = config.maxNumber;
252 dbConfig.maxUploadSize = config.maxSize;
253 dbConfig.maxRetryConflictTimes = config.maxRetryConflictTimes;
254 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
255 if (delegate_ == nullptr) {
256 return GeneralError::E_ALREADY_CLOSED;
257 }
258 delegate_->SetCloudDB(dbClouds_);
259 delegate_->SetCloudDbSchema(std::move(schemas));
260 delegate_->SetCloudSyncConfig(dbConfig);
261 return GeneralError::E_OK;
262 }
263
IsBound()264 bool KVDBGeneralStore::IsBound()
265 {
266 return isBound_;
267 }
268
Close(bool isForce)269 int32_t KVDBGeneralStore::Close(bool isForce)
270 {
271 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_, std::chrono::seconds(isForce ? LOCK_TIMEOUT : 0));
272 if (!lock) {
273 return GeneralError::E_BUSY;
274 }
275
276 if (delegate_ == nullptr) {
277 return GeneralError::E_OK;
278 }
279 if (!isForce && delegate_->GetTaskCount() > 0) {
280 return GeneralError::E_BUSY;
281 }
282 if (delegate_ != nullptr) {
283 delegate_->UnRegisterObserver(&observer_);
284 }
285 auto status = manager_.CloseKvStore(delegate_);
286 if (status != DBStatus::OK) {
287 return status;
288 }
289 delegate_ = nullptr;
290 return GeneralError::E_OK;
291 }
292
Execute(const std::string & table,const std::string & sql)293 int32_t KVDBGeneralStore::Execute(const std::string &table, const std::string &sql)
294 {
295 return GeneralError::E_NOT_SUPPORT;
296 }
297
Insert(const std::string & table,VBuckets && values)298 int32_t KVDBGeneralStore::Insert(const std::string &table, VBuckets &&values)
299 {
300 return GeneralError::E_NOT_SUPPORT;
301 }
302
Update(const std::string & table,const std::string & setSql,Values && values,const std::string & whereSql,Values && conditions)303 int32_t KVDBGeneralStore::Update(const std::string &table, const std::string &setSql, Values &&values,
304 const std::string &whereSql, Values &&conditions)
305 {
306 return GeneralError::E_NOT_SUPPORT;
307 }
308
Delete(const std::string & table,const std::string & sql,Values && args)309 int32_t KVDBGeneralStore::Delete(const std::string &table, const std::string &sql, Values &&args)
310 {
311 return GeneralError::E_NOT_SUPPORT;
312 }
313
Replace(const std::string & table,VBucket && value)314 int32_t KVDBGeneralStore::Replace(const std::string &table, VBucket &&value)
315 {
316 return GeneralError::E_NOT_SUPPORT;
317 }
318
Query(const std::string & table,const std::string & sql,Values && args)319 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::Query(
320 __attribute__((unused)) const std::string &table, const std::string &sql, Values &&args)
321 {
322 return { GeneralError::E_NOT_SUPPORT, nullptr };
323 }
324
Query(const std::string & table,GenQuery & query)325 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::Query(const std::string &table, GenQuery &query)
326 {
327 return { GeneralError::E_NOT_SUPPORT, nullptr };
328 }
329
MergeMigratedData(const std::string & tableName,VBuckets && values)330 int32_t KVDBGeneralStore::MergeMigratedData(const std::string &tableName, VBuckets &&values)
331 {
332 return GeneralError::E_NOT_SUPPORT;
333 }
334
CleanTrackerData(const std::string & tableName,int64_t cursor)335 int32_t KVDBGeneralStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
336 {
337 return GeneralError::E_NOT_SUPPORT;
338 }
339
GetDBSyncCompleteCB(DetailAsync async)340 KVDBGeneralStore::DBSyncCallback KVDBGeneralStore::GetDBSyncCompleteCB(DetailAsync async)
341 {
342 if (!async) {
343 return [](auto &) {};
344 }
345 return [async = std::move(async)](const std::map<std::string, DBStatus> &status) {
346 GenDetails details;
347 for (auto &[key, dbStatus] : status) {
348 auto &value = details[key];
349 value.progress = FINISHED;
350 value.code = GeneralError::E_OK;
351 if (dbStatus != DBStatus::OK) {
352 value.code = dbStatus;
353 }
354 }
355 async(details);
356 };
357 }
358
CloudSync(const Devices & devices,DistributedDB::SyncMode cloudSyncMode,DetailAsync async,int64_t wait,const std::string & prepareTraceId)359 DBStatus KVDBGeneralStore::CloudSync(const Devices &devices, DistributedDB::SyncMode cloudSyncMode, DetailAsync async,
360 int64_t wait, const std::string &prepareTraceId)
361 {
362 DistributedDB::CloudSyncOption syncOption;
363 syncOption.devices = devices;
364 syncOption.mode = cloudSyncMode;
365 syncOption.waitTime = wait;
366 syncOption.prepareTraceId = prepareTraceId;
367 syncOption.lockAction = DistributedDB::LockAction::NONE;
368 if (storeInfo_.user == 0) {
369 std::vector<int32_t> users;
370 AccountDelegate::GetInstance()->QueryUsers(users);
371 syncOption.users.push_back(std::to_string(users[0]));
372 } else {
373 syncOption.users.push_back(std::to_string(storeInfo_.user));
374 }
375 return delegate_->Sync(syncOption, GetDBProcessCB(async));
376 }
377
Sync(const Devices & devices,GenQuery & query,DetailAsync async,const SyncParam & syncParam)378 std::pair<int32_t, int32_t> KVDBGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAsync async,
379 const SyncParam &syncParam)
380 {
381 auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
382 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
383 if (delegate_ == nullptr) {
384 ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d", devices.size(),
385 devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), syncParam.mode);
386 return { GeneralError::E_ALREADY_CLOSED, DBStatus::OK };
387 }
388 DBStatus dbStatus;
389 auto dbMode = DistributedDB::SyncMode(syncMode);
390 if (syncMode > NEARBY_END && syncMode < CLOUD_END) {
391 if (!enableCloud_) {
392 return { GeneralError::E_NOT_SUPPORT, DBStatus::OK };
393 }
394 dbStatus = CloudSync(devices, dbMode, async, syncParam.wait, syncParam.prepareTraceId);
395 } else {
396 if (devices.empty()) {
397 ZLOGE("Devices is empty! mode:%{public}d", syncParam.mode);
398 return { GeneralError::E_INVALID_ARGS, DBStatus::OK };
399 }
400 KVDBQuery *kvQuery = nullptr;
401 auto ret = query.QueryInterface(kvQuery);
402 DistributedDB::Query dbQuery;
403 if (ret == GeneralError::E_OK && kvQuery != nullptr && kvQuery->IsValidQuery()) {
404 dbQuery = kvQuery->GetDBQuery();
405 } else {
406 return { GeneralError::E_INVALID_ARGS, DBStatus::OK };
407 }
408 if (syncMode == NEARBY_SUBSCRIBE_REMOTE) {
409 dbStatus = delegate_->SubscribeRemoteQuery(devices, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
410 } else if (syncMode == NEARBY_UNSUBSCRIBE_REMOTE) {
411 dbStatus =
412 delegate_->UnSubscribeRemoteQuery(devices, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
413 } else if (syncMode < NEARBY_END) {
414 if (kvQuery->IsEmpty()) {
415 dbStatus = delegate_->Sync(devices, dbMode, GetDBSyncCompleteCB(std::move(async)), false);
416 } else {
417 dbStatus = delegate_->Sync(devices, dbMode, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
418 }
419 } else {
420 ZLOGE("Err sync mode! sync mode:%{public}d", syncMode);
421 dbStatus = DistributedDB::INVALID_ARGS;
422 }
423 }
424 return { ConvertStatus(dbStatus), dbStatus };
425 }
426
SetEqualIdentifier(const std::string & appId,const std::string & storeId,std::string account)427 void KVDBGeneralStore::SetEqualIdentifier(const std::string &appId, const std::string &storeId, std::string account)
428 {
429 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
430 if (delegate_ == nullptr) {
431 ZLOGE("store already closed! appId:%{public}s storeId:%{public}s", appId.c_str(),
432 Anonymous::Change(storeId).c_str());
433 return;
434 }
435 std::vector<std::string> sameAccountDevs {};
436 std::vector<std::string> defaultAccountDevs {};
437 auto uuids = DMAdapter::ToUUID(DMAdapter::GetInstance().GetRemoteDevices());
438 if (uuids.empty()) {
439 ZLOGI("no remote device to sync.appId:%{public}s", appId.c_str());
440 return;
441 }
442 GetIdentifierParams(sameAccountDevs, uuids, IDENTICAL_ACCOUNT);
443 GetIdentifierParams(defaultAccountDevs, uuids, NO_ACCOUNT);
444 if (!sameAccountDevs.empty()) {
445 auto accountId = account.empty() ? AccountDelegate::GetInstance()->GetUnencryptedAccountId() : account;
446 auto convertedIds = AppIdMappingConfigManager::GetInstance().Convert(appId, accountId);
447 auto identifier = KvManager::GetKvStoreIdentifier(convertedIds.second, convertedIds.first, storeId);
448 ZLOGI("same account store:%{public}s, user:%{public}s, device:%{public}.10s, appId:%{public}s",
449 Anonymous::Change(storeId).c_str(), Anonymous::Change(convertedIds.second).c_str(),
450 DistributedData::Serializable::Marshall(sameAccountDevs).c_str(), convertedIds.first.c_str());
451 delegate_->SetEqualIdentifier(identifier, sameAccountDevs);
452 }
453 if (!defaultAccountDevs.empty()) {
454 auto convertedIds = AppIdMappingConfigManager::GetInstance().Convert(appId, defaultAccountId);
455 auto identifier = KvManager::GetKvStoreIdentifier(convertedIds.second, convertedIds.first, storeId);
456 ZLOGI("no account store:%{public}s, device:%{public}.10s, appId:%{public}s",
457 Anonymous::Change(storeId).c_str(),
458 DistributedData::Serializable::Marshall(defaultAccountDevs).c_str(), convertedIds.first.c_str());
459 delegate_->SetEqualIdentifier(identifier, defaultAccountDevs);
460 }
461 }
462
GetIdentifierParams(std::vector<std::string> & devices,const std::vector<std::string> & uuids,int32_t authType)463 void KVDBGeneralStore::GetIdentifierParams(std::vector<std::string> &devices,
464 const std::vector<std::string> &uuids, int32_t authType)
465 {
466 for (const auto &devId : uuids) {
467 if (DMAdapter::GetInstance().IsOHOSType(devId)) {
468 continue;
469 }
470 if (DMAdapter::GetInstance().GetAuthType(devId) != authType) {
471 continue;
472 }
473 devices.push_back(devId);
474 }
475 ZLOGI("devices size: %{public}zu", devices.size());
476 }
477
PreSharing(GenQuery & query)478 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::PreSharing(GenQuery &query)
479 {
480 return { GeneralError::E_NOT_SUPPORT, nullptr };
481 }
482
Clean(const std::vector<std::string> & devices,int32_t mode,const std::string & tableName)483 int32_t KVDBGeneralStore::Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName)
484 {
485 if (mode < 0 || mode > CLEAN_MODE_BUTT) {
486 return GeneralError::E_INVALID_ARGS;
487 }
488 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
489 if (delegate_ == nullptr) {
490 ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d", devices.size(),
491 devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode);
492 return GeneralError::E_ALREADY_CLOSED;
493 }
494 DBStatus status = OK;
495 switch (mode) {
496 case CLOUD_INFO:
497 status = delegate_->RemoveDeviceData(
498 "", isPublic_ ? static_cast<ClearMode>(CLOUD_DATA) : static_cast<ClearMode>(CLOUD_INFO));
499 break;
500 case CLOUD_DATA:
501 status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_DATA));
502 break;
503 case NEARBY_DATA:
504 if (devices.empty()) {
505 status = delegate_->RemoveDeviceData();
506 break;
507 }
508 for (auto device : devices) {
509 status = delegate_->RemoveDeviceData(device);
510 }
511 break;
512 default:
513 return GeneralError::E_ERROR;
514 }
515 return ConvertStatus(status);
516 }
517
Watch(int32_t origin,Watcher & watcher)518 int32_t KVDBGeneralStore::Watch(int32_t origin, Watcher &watcher)
519 {
520 if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != nullptr) {
521 return GeneralError::E_INVALID_ARGS;
522 }
523
524 observer_.watcher_ = &watcher;
525 return GeneralError::E_OK;
526 }
527
Unwatch(int32_t origin,Watcher & watcher)528 int32_t KVDBGeneralStore::Unwatch(int32_t origin, Watcher &watcher)
529 {
530 if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != &watcher) {
531 return GeneralError::E_INVALID_ARGS;
532 }
533
534 observer_.watcher_ = nullptr;
535 return GeneralError::E_OK;
536 }
537
Release()538 int32_t KVDBGeneralStore::Release()
539 {
540 auto ref = 1;
541 {
542 std::lock_guard<decltype(mutex_)> lock(mutex_);
543 if (ref_ == 0) {
544 return 0;
545 }
546 ref = --ref_;
547 }
548 ZLOGD("ref:%{public}d", ref);
549 if (ref == 0) {
550 delete this;
551 }
552 return ref;
553 }
554
AddRef()555 int32_t KVDBGeneralStore::AddRef()
556 {
557 std::lock_guard<decltype(mutex_)> lock(mutex_);
558 if (ref_ == 0) {
559 return 0;
560 }
561 return ++ref_;
562 }
563
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const std::vector<Reference> & references)564 int32_t KVDBGeneralStore::SetDistributedTables(
565 const std::vector<std::string> &tables, int32_t type, const std::vector<Reference> &references)
566 {
567 return GeneralError::E_OK;
568 }
569
SetTrackerTable(const std::string & tableName,const std::set<std::string> & trackerColNames,const std::string & extendColName,bool isForceUpgrade)570 int32_t KVDBGeneralStore::SetTrackerTable(const std::string &tableName, const std::set<std::string> &trackerColNames,
571 const std::string &extendColName, bool isForceUpgrade)
572 {
573 return GeneralError::E_OK;
574 }
575
ConvertStatus(DistributedDB::DBStatus status)576 KVDBGeneralStore::GenErr KVDBGeneralStore::ConvertStatus(DistributedDB::DBStatus status)
577 {
578 auto it = dbStatusMap_.find(status);
579 if (it != dbStatusMap_.end()) {
580 return it->second;
581 }
582 ZLOGI("status:0x%{public}x", status);
583 return GenErr::E_ERROR;
584 }
585
IsValid()586 bool KVDBGeneralStore::IsValid()
587 {
588 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
589 return delegate_ != nullptr;
590 }
591
RegisterDetailProgressObserver(GeneralStore::DetailAsync async)592 int32_t KVDBGeneralStore::RegisterDetailProgressObserver(GeneralStore::DetailAsync async)
593 {
594 return GenErr::E_OK;
595 }
596
UnregisterDetailProgressObserver()597 int32_t KVDBGeneralStore::UnregisterDetailProgressObserver()
598 {
599 return GenErr::E_OK;
600 }
601
GetWaterVersion(const std::string & deviceId)602 std::vector<std::string> KVDBGeneralStore::GetWaterVersion(const std::string &deviceId)
603 {
604 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
605 if (delegate_ == nullptr) {
606 ZLOGE("store already closed! deviceId:%{public}s", Anonymous::Change(deviceId).c_str());
607 return {};
608 }
609 auto [status, versions] = delegate_->GetCloudVersion(deviceId);
610 if (status != DBStatus::OK || versions.empty()) {
611 return {};
612 }
613 std::vector<std::string> res;
614 for (auto &[_, version] : versions) {
615 res.push_back(std::move(version));
616 }
617 return res;
618 }
619
OnChange(DBOrigin origin,const std::string & originalId,DBChangeData && data)620 void KVDBGeneralStore::ObserverProxy::OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data)
621 {
622 if (!HasWatcher()) {
623 return;
624 }
625 GenOrigin genOrigin;
626 genOrigin.origin = (origin == DBOrigin::ORIGIN_CLOUD) ? GenOrigin::ORIGIN_CLOUD : GenOrigin::ORIGIN_NEARBY;
627 genOrigin.id.push_back(originalId);
628 genOrigin.store = storeId_;
629 Watcher::ChangeInfo changeInfo;
630 for (uint32_t i = 0; i < DistributedDB::OP_BUTT; ++i) {
631 auto &info = changeInfo[storeId_][i];
632 for (auto &priData : data.primaryData[i]) {
633 Watcher::PRIValue value;
634 Convert(std::move(*(priData.begin())), value);
635 info.push_back(std::move(value));
636 }
637 }
638 watcher_->OnChange(genOrigin, {}, std::move(changeInfo));
639 }
640
OnChange(const DistributedDB::KvStoreChangedData & data)641 void KVDBGeneralStore::ObserverProxy::OnChange(const DistributedDB::KvStoreChangedData &data)
642 {
643 if (!HasWatcher()) {
644 return;
645 }
646 const auto &inserts = data.GetEntriesInserted();
647 const auto &deletes = data.GetEntriesDeleted();
648 const auto &updates = data.GetEntriesUpdated();
649 Watcher::ChangeData changeData;
650 ConvertChangeData(inserts, changeData[storeId_][DistributedDB::OP_INSERT]);
651 ConvertChangeData(deletes, changeData[storeId_][DistributedDB::OP_DELETE]);
652 ConvertChangeData(updates, changeData[storeId_][DistributedDB::OP_UPDATE]);
653 GenOrigin genOrigin;
654 genOrigin.origin = GenOrigin::ORIGIN_NEARBY;
655 genOrigin.store = storeId_;
656
657 watcher_->OnChange(genOrigin, {}, std::move(changeData));
658 }
659
ConvertChangeData(const std::list<DBEntry> & entries,std::vector<Values> & values)660 void KVDBGeneralStore::ObserverProxy::ConvertChangeData(const std::list<DBEntry> &entries, std::vector<Values> &values)
661 {
662 for (auto &entry : entries) {
663 auto value = std::vector<Value>{ entry.key, entry.value };
664 values.push_back(value);
665 }
666 }
667
GetDBProcessCB(DetailAsync async)668 KVDBGeneralStore::DBProcessCB KVDBGeneralStore::GetDBProcessCB(DetailAsync async)
669 {
670 return [async](const std::map<std::string, SyncProcess> &processes) {
671 if (!async) {
672 return;
673 }
674 DistributedData::GenDetails details;
675 for (auto &[id, process] : processes) {
676 auto &detail = details[id];
677 detail.progress = process.process;
678 detail.code = ConvertStatus(process.errCode);
679 detail.dbCode = DB_ERR_OFFSET + process.errCode;
680 for (auto [key, value] : process.tableProcess) {
681 auto &table = detail.details[key];
682 table.upload.total = value.upLoadInfo.total;
683 table.upload.success = value.upLoadInfo.successCount;
684 table.upload.failed = value.upLoadInfo.failCount;
685 table.upload.untreated = table.upload.total - table.upload.success - table.upload.failed;
686 table.download.total = value.downLoadInfo.total;
687 table.download.success = value.downLoadInfo.successCount;
688 table.download.failed = value.downLoadInfo.failCount;
689 table.download.untreated = table.download.total - table.download.success - table.download.failed;
690 detail.changeCount = (process.process == FINISHED)
691 ? value.downLoadInfo.insertCount + value.downLoadInfo.updateCount +
692 value.downLoadInfo.deleteCount
693 : 0;
694 }
695 }
696 if (async) {
697 async(details);
698 }
699 };
700 }
701
SetDBPushDataInterceptor(int32_t storeType)702 void KVDBGeneralStore::SetDBPushDataInterceptor(int32_t storeType)
703 {
704 delegate_->SetPushDataInterceptor(
705 [this, storeType](DBInterceptedData &data, const std::string &sourceID, const std::string &targetID) {
706 int errCode = DBStatus::OK;
707 if (storeType != KvStoreType::DEVICE_COLLABORATION || DMAdapter::GetInstance().IsOHOSType(targetID)) {
708 return errCode;
709 }
710 if (targetID.empty()) {
711 ZLOGE("targetID empty");
712 return static_cast<int>(DBStatus::DB_ERROR);
713 }
714 auto entries = data.GetEntries();
715 for (size_t i = 0; i < entries.size(); i++) {
716 if (entries[i].key.empty()) {
717 continue;
718 }
719 auto oriKey = entries[i].key;
720 auto newKey = GetNewKey(oriKey, sourceID);
721 errCode = data.ModifyKey(i, newKey);
722 if (errCode != DBStatus::OK) {
723 ZLOGE("ModifyKey err: %{public}d", errCode);
724 break;
725 }
726 }
727 return errCode;
728 }
729 );
730 }
731
SetDBReceiveDataInterceptor(int32_t storeType)732 void KVDBGeneralStore::SetDBReceiveDataInterceptor(int32_t storeType)
733 {
734 delegate_->SetReceiveDataInterceptor(
735 [this, storeType](DBInterceptedData &data, const std::string &sourceID, const std::string &targetID) {
736 int errCode = DBStatus::OK;
737 if (storeType != KvStoreType::DEVICE_COLLABORATION || DMAdapter::GetInstance().IsOHOSType(sourceID)) {
738 return errCode;
739 }
740 if (sourceID.empty()) {
741 ZLOGE("targetID empty");
742 return static_cast<int>(DBStatus::DB_ERROR);
743 }
744 auto entries = data.GetEntries();
745 for (size_t i = 0; i < entries.size(); i++) {
746 if (entries[i].key.empty()) {
747 continue;
748 }
749
750 auto networkId = DMAdapter::GetInstance().ToNetworkID(sourceID);
751 auto encyptedUuid = DMAdapter::GetInstance().GetEncryptedUuidByNetworkId(networkId);
752 if (encyptedUuid.empty()) {
753 ZLOGE("get encyptedUuid failed");
754 continue;
755 }
756
757 auto oriKey = entries[i].key;
758 auto newKey = GetNewKey(oriKey, encyptedUuid);
759 errCode = data.ModifyKey(i, newKey);
760 if (errCode != DBStatus::OK) {
761 ZLOGE("ModifyKey err: %{public}d", errCode);
762 break;
763 }
764 }
765 return errCode;
766 }
767 );
768 }
769
GetNewKey(std::vector<uint8_t> & key,const std::string & uuid)770 std::vector<uint8_t> KVDBGeneralStore::GetNewKey(std::vector<uint8_t> &key, const std::string &uuid)
771 {
772 uint32_t remoteLen = *(reinterpret_cast<uint32_t *>(&(*(key.end() - sizeof(uint32_t)))));
773 remoteLen = le32toh(remoteLen);
774 uint32_t uuidLen = uuid.size();
775
776 std::vector<uint8_t> out;
777 std::vector<uint8_t> oriKey(key.begin() + remoteLen, key.end() - UUID_WIDTH);
778 out.insert(out.end(), uuid.begin(), uuid.end());
779 out.insert(out.end(), oriKey.begin(), oriKey.end());
780 uuidLen = htole32(uuidLen);
781 uint8_t *buf = reinterpret_cast<uint8_t *>(&uuidLen);
782 out.insert(out.end(), buf, buf + sizeof(uuidLen));
783 return out;
784 }
785
SetConfig(const GeneralStore::StoreConfig & storeConfig)786 void KVDBGeneralStore::SetConfig(const GeneralStore::StoreConfig &storeConfig)
787 {
788 enableCloud_ = storeConfig.enableCloud_;
789 }
790
LockCloudDB()791 std::pair<int32_t, uint32_t> KVDBGeneralStore::LockCloudDB()
792 {
793 return { GeneralError::E_NOT_SUPPORT, 0 };
794 }
795
UnLockCloudDB()796 int32_t KVDBGeneralStore::UnLockCloudDB()
797 {
798 return GeneralError::E_NOT_SUPPORT;
799 }
800
SetExecutor(std::shared_ptr<Executor> executor)801 void KVDBGeneralStore::SetExecutor(std::shared_ptr<Executor> executor)
802 {
803 return;
804 }
805 } // namespace OHOS::DistributedKv