1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "KVDBGeneralStore"
16 #include "kvdb_general_store.h"
17 
18 #include <endian.h>
19 #include "bootstrap.h"
20 #include "checker/checker_manager.h"
21 #include "cloud/cloud_sync_finished_event.h"
22 #include "cloud/schema_meta.h"
23 #include "crypto_manager.h"
24 #include "device_matrix.h"
25 #include "directory/directory_manager.h"
26 #include "eventcenter/event_center.h"
27 #include "kvdb_query.h"
28 #include "log_print.h"
29 #include "metadata/meta_data_manager.h"
30 #include "metadata/secret_key_meta_data.h"
31 #include "metadata/store_meta_data_local.h"
32 #include "query_helper.h"
33 #include "rdb_cloud.h"
34 #include "snapshot/bind_event.h"
35 #include "types.h"
36 #include "user_delegate.h"
37 #include "utils/anonymous.h"
38 #include "water_version_manager.h"
39 #include "device_manager_adapter.h"
40 #include "utils/anonymous.h"
41 #include "app_id_mapping/app_id_mapping_config_manager.h"
42 
43 namespace OHOS::DistributedKv {
44 using namespace DistributedData;
45 using namespace DistributedDB;
46 using DBField = DistributedDB::Field;
47 using DBTable = DistributedDB::TableSchema;
48 using DBSchema = DistributedDB::DataBaseSchema;
49 using ClearMode = DistributedDB::ClearMode;
50 using DMAdapter = DistributedData::DeviceManagerAdapter;
51 using DBInterceptedData = DistributedDB::InterceptedData;
52 constexpr int UUID_WIDTH = 4;
53 const std::map<DBStatus, KVDBGeneralStore::GenErr> KVDBGeneralStore::dbStatusMap_ = {
54     { DBStatus::OK, GenErr::E_OK },
55     { DBStatus::CLOUD_NETWORK_ERROR, GenErr::E_NETWORK_ERROR },
56     { DBStatus::CLOUD_LOCK_ERROR, GenErr::E_LOCKED_BY_OTHERS },
57     { DBStatus::CLOUD_FULL_RECORDS, GenErr::E_RECODE_LIMIT_EXCEEDED },
58     { DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT, GenErr::E_NO_SPACE_FOR_ASSET },
59     { DBStatus::CLOUD_SYNC_TASK_MERGED, GenErr::E_SYNC_TASK_MERGED },
60     { DBStatus::BUSY, GenErr::E_DB_ERROR },
61     { DBStatus::DB_ERROR, GenErr::E_DB_ERROR },
62     { DBStatus::INVALID_ARGS, GenErr::E_INVALID_ARGS },
63     { DBStatus::NOT_FOUND, GenErr::E_RECORD_NOT_FOUND },
64     { DBStatus::INVALID_VALUE_FIELDS, GenErr::E_INVALID_VALUE_FIELDS },
65     { DBStatus::INVALID_FIELD_TYPE, GenErr::E_INVALID_FIELD_TYPE },
66     { DBStatus::CONSTRAIN_VIOLATION, GenErr::E_CONSTRAIN_VIOLATION },
67     { DBStatus::INVALID_FORMAT, GenErr::E_INVALID_FORMAT },
68     { DBStatus::INVALID_QUERY_FORMAT, GenErr::E_INVALID_QUERY_FORMAT },
69     { DBStatus::INVALID_QUERY_FIELD, GenErr::E_INVALID_QUERY_FIELD },
70     { DBStatus::NOT_SUPPORT, GenErr::E_NOT_SUPPORT },
71     { DBStatus::TIME_OUT, GenErr::E_TIME_OUT },
72     { DBStatus::OVER_MAX_LIMITS, GenErr::E_OVER_MAX_LIMITS },
73     { DBStatus::EKEYREVOKED_ERROR, GenErr::E_SECURITY_LEVEL_ERROR },
74     { DBStatus::SECURITY_OPTION_CHECK_ERROR, GenErr::E_SECURITY_LEVEL_ERROR },
75 };
76 
77 constexpr uint32_t LOCK_TIMEOUT = 3600; // second
GetDBSchema(const Database & database)78 static DBSchema GetDBSchema(const Database &database)
79 {
80     DBSchema schema;
81     schema.tables.resize(database.tables.size());
82     for (size_t i = 0; i < database.tables.size(); i++) {
83         const Table &table = database.tables[i];
84         DBTable &dbTable = schema.tables[i];
85         dbTable.name = table.name;
86         dbTable.sharedTableName = table.sharedTableName;
87         for (auto &field : table.fields) {
88             DBField dbField;
89             dbField.colName = field.colName;
90             dbField.type = field.type;
91             dbField.primary = field.primary;
92             dbField.nullable = field.nullable;
93             dbTable.fields.push_back(std::move(dbField));
94         }
95     }
96     return schema;
97 }
GetDBPassword(const StoreMetaData & data)98 KVDBGeneralStore::DBPassword KVDBGeneralStore::GetDBPassword(const StoreMetaData &data)
99 {
100     DBPassword dbPassword;
101     if (!data.isEncrypt) {
102         return dbPassword;
103     }
104 
105     SecretKeyMetaData secretKey;
106     secretKey.storeType = data.storeType;
107     auto storeKey = data.GetSecretKey();
108     MetaDataManager::GetInstance().LoadMeta(storeKey, secretKey, true);
109     std::vector<uint8_t> password;
110     CryptoManager::GetInstance().Decrypt(secretKey.sKey, password);
111     dbPassword.SetValue(password.data(), password.size());
112     password.assign(password.size(), 0);
113     return dbPassword;
114 }
115 
GetDBSecurity(int32_t secLevel)116 KVDBGeneralStore::DBSecurity KVDBGeneralStore::GetDBSecurity(int32_t secLevel)
117 {
118     if (secLevel < SecurityLevel::NO_LABEL || secLevel > SecurityLevel::S4) {
119         return { DistributedDB::NOT_SET, DistributedDB::ECE };
120     }
121     if (secLevel == SecurityLevel::S3) {
122         return { DistributedDB::S3, DistributedDB::SECE };
123     }
124     if (secLevel == SecurityLevel::S4) {
125         return { DistributedDB::S4, DistributedDB::ECE };
126     }
127     return { secLevel, DistributedDB::ECE };
128 }
129 
GetDBOption(const StoreMetaData & data,const DBPassword & password)130 KVDBGeneralStore::DBOption KVDBGeneralStore::GetDBOption(const StoreMetaData &data, const DBPassword &password)
131 {
132     DBOption dbOption;
133     dbOption.createIfNecessary = false;
134     dbOption.isMemoryDb = false;
135     dbOption.isEncryptedDb = data.isEncrypt;
136     dbOption.isNeedCompressOnSync = data.isNeedCompress;
137     if (data.isEncrypt) {
138         dbOption.cipher = DistributedDB::CipherType::AES_256_GCM;
139         dbOption.passwd = password;
140     }
141     StoreMetaDataLocal local;
142     MetaDataManager::GetInstance().LoadMeta(data.GetKeyLocal(), local, true);
143     if (local.isPublic || data.storeType == KvStoreType::DEVICE_COLLABORATION) {
144         dbOption.conflictResolvePolicy = DistributedDB::DEVICE_COLLABORATION;
145     } else if (data.storeType == KvStoreType::SINGLE_VERSION) {
146         dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
147     }
148     if (data.appId == Bootstrap::GetInstance().GetProcessLabel()) {
149         dbOption.compressionRate = META_COMPRESS_RATE;
150         dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
151     } else {
152         dbOption.syncDualTupleMode = true; // tuple of (appid+storeid)
153     }
154     dbOption.schema = data.schema;
155     dbOption.createDirByStoreIdOnly = true;
156     dbOption.secOption = GetDBSecurity(data.securityLevel);
157     return dbOption;
158 }
159 
KVDBGeneralStore(const StoreMetaData & meta)160 KVDBGeneralStore::KVDBGeneralStore(const StoreMetaData &meta)
161     : manager_(meta.appId, meta.appId == Bootstrap::GetInstance().GetProcessLabel() ? defaultAccountId : meta.user,
162           meta.instanceId)
163 {
164     observer_.storeId_ = meta.storeId;
165     StoreMetaDataLocal local;
166     MetaDataManager::GetInstance().LoadMeta(meta.GetKeyLocal(), local, true);
167     isPublic_ = local.isPublic;
168     DBStatus status = DBStatus::NOT_FOUND;
169     manager_.SetKvStoreConfig({ meta.dataDir });
170     std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
171     manager_.GetKvStore(
172         meta.storeId, GetDBOption(meta, GetDBPassword(meta)), [&status, this](auto dbStatus, auto *tmpStore) {
173             status = dbStatus;
174             delegate_ = tmpStore;
175         });
176     if (delegate_ == nullptr || status != DBStatus::OK) {
177         ZLOGE("GetKvStore failed. delegate is null?[%{public}d], status = %{public}d", delegate_ == nullptr, status);
178         manager_.CloseKvStore(delegate_);
179         return;
180     }
181     SetDBPushDataInterceptor(meta.storeType);
182     SetDBReceiveDataInterceptor(meta.storeType);
183     delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_FOREIGN, &observer_);
184     delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_CLOUD, &observer_);
185     if (DeviceMatrix::GetInstance().IsDynamic(meta) || DeviceMatrix::GetInstance().IsStatics(meta)) {
186         delegate_->SetRemotePushFinishedNotify([meta](const DistributedDB::RemotePushNotifyInfo &info) {
187             DeviceMatrix::GetInstance().OnExchanged(info.deviceId, meta, DeviceMatrix::ChangeType::CHANGE_REMOTE);
188         });
189     }
190     if (meta.isAutoSync) {
191         bool param = true;
192         auto data = static_cast<DistributedDB::PragmaData>(&param);
193         delegate_->Pragma(DistributedDB::SET_SYNC_RETRY, data);
194     }
195     storeInfo_.tokenId = meta.tokenId;
196     storeInfo_.bundleName = meta.bundleName;
197     storeInfo_.storeName = meta.storeId;
198     storeInfo_.instanceId = meta.instanceId;
199     storeInfo_.user = std::stoi(meta.user);
200     enableCloud_ = meta.enableCloud;
201 }
202 
~KVDBGeneralStore()203 KVDBGeneralStore::~KVDBGeneralStore()
204 {
205     {
206         std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
207         if (delegate_ != nullptr) {
208             delegate_->UnRegisterObserver(&observer_);
209         }
210         manager_.CloseKvStore(delegate_);
211         delegate_ = nullptr;
212     }
213     for (auto &bindInfo_ : bindInfos_) {
214         if (bindInfo_.db_ != nullptr) {
215             bindInfo_.db_->Close();
216         }
217     }
218     bindInfos_.clear();
219     dbClouds_.clear();
220 }
221 
BindSnapshots(std::shared_ptr<std::map<std::string,std::shared_ptr<Snapshot>>> bindAssets)222 int32_t KVDBGeneralStore::BindSnapshots(std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>> bindAssets)
223 {
224     return GenErr::E_NOT_SUPPORT;
225 }
226 
Bind(Database & database,const std::map<uint32_t,BindInfo> & bindInfos,const CloudConfig & config)227 int32_t KVDBGeneralStore::Bind(Database &database, const std::map<uint32_t, BindInfo> &bindInfos,
228     const CloudConfig &config)
229 {
230     if (bindInfos.empty()) {
231         ZLOGW("No cloudDB!");
232         return GeneralError::E_OK;
233     }
234 
235     std::map<std::string, DataBaseSchema> schemas;
236     auto dbSchema = GetDBSchema(database);
237     for (auto &[userId, bindInfo] : bindInfos) {
238         if (bindInfo.db_ == nullptr) {
239             return GeneralError::E_INVALID_ARGS;
240         }
241 
242         if (isBound_.exchange(true)) {
243             return GeneralError::E_OK;
244         }
245 
246         dbClouds_.insert({ std::to_string(userId), std::make_shared<DistributedRdb::RdbCloud>(bindInfo.db_, nullptr) });
247         bindInfos_.insert(std::move(bindInfo));
248         schemas.insert({ std::to_string(userId), dbSchema });
249     }
250     DistributedDB::CloudSyncConfig dbConfig;
251     dbConfig.maxUploadCount = config.maxNumber;
252     dbConfig.maxUploadSize = config.maxSize;
253     dbConfig.maxRetryConflictTimes = config.maxRetryConflictTimes;
254     std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
255     if (delegate_ == nullptr) {
256         return GeneralError::E_ALREADY_CLOSED;
257     }
258     delegate_->SetCloudDB(dbClouds_);
259     delegate_->SetCloudDbSchema(std::move(schemas));
260     delegate_->SetCloudSyncConfig(dbConfig);
261     return GeneralError::E_OK;
262 }
263 
IsBound()264 bool KVDBGeneralStore::IsBound()
265 {
266     return isBound_;
267 }
268 
Close(bool isForce)269 int32_t KVDBGeneralStore::Close(bool isForce)
270 {
271     std::unique_lock<decltype(rwMutex_)> lock(rwMutex_, std::chrono::seconds(isForce ? LOCK_TIMEOUT : 0));
272     if (!lock) {
273         return GeneralError::E_BUSY;
274     }
275 
276     if (delegate_ == nullptr) {
277         return GeneralError::E_OK;
278     }
279     if (!isForce && delegate_->GetTaskCount() > 0) {
280         return GeneralError::E_BUSY;
281     }
282     if (delegate_ != nullptr) {
283         delegate_->UnRegisterObserver(&observer_);
284     }
285     auto status = manager_.CloseKvStore(delegate_);
286     if (status != DBStatus::OK) {
287         return status;
288     }
289     delegate_ = nullptr;
290     return GeneralError::E_OK;
291 }
292 
Execute(const std::string & table,const std::string & sql)293 int32_t KVDBGeneralStore::Execute(const std::string &table, const std::string &sql)
294 {
295     return GeneralError::E_NOT_SUPPORT;
296 }
297 
Insert(const std::string & table,VBuckets && values)298 int32_t KVDBGeneralStore::Insert(const std::string &table, VBuckets &&values)
299 {
300     return GeneralError::E_NOT_SUPPORT;
301 }
302 
Update(const std::string & table,const std::string & setSql,Values && values,const std::string & whereSql,Values && conditions)303 int32_t KVDBGeneralStore::Update(const std::string &table, const std::string &setSql, Values &&values,
304     const std::string &whereSql, Values &&conditions)
305 {
306     return GeneralError::E_NOT_SUPPORT;
307 }
308 
Delete(const std::string & table,const std::string & sql,Values && args)309 int32_t KVDBGeneralStore::Delete(const std::string &table, const std::string &sql, Values &&args)
310 {
311     return GeneralError::E_NOT_SUPPORT;
312 }
313 
Replace(const std::string & table,VBucket && value)314 int32_t KVDBGeneralStore::Replace(const std::string &table, VBucket &&value)
315 {
316     return GeneralError::E_NOT_SUPPORT;
317 }
318 
Query(const std::string & table,const std::string & sql,Values && args)319 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::Query(
320     __attribute__((unused)) const std::string &table, const std::string &sql, Values &&args)
321 {
322     return { GeneralError::E_NOT_SUPPORT, nullptr };
323 }
324 
Query(const std::string & table,GenQuery & query)325 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::Query(const std::string &table, GenQuery &query)
326 {
327     return { GeneralError::E_NOT_SUPPORT, nullptr };
328 }
329 
MergeMigratedData(const std::string & tableName,VBuckets && values)330 int32_t KVDBGeneralStore::MergeMigratedData(const std::string &tableName, VBuckets &&values)
331 {
332     return GeneralError::E_NOT_SUPPORT;
333 }
334 
CleanTrackerData(const std::string & tableName,int64_t cursor)335 int32_t KVDBGeneralStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
336 {
337     return GeneralError::E_NOT_SUPPORT;
338 }
339 
GetDBSyncCompleteCB(DetailAsync async)340 KVDBGeneralStore::DBSyncCallback KVDBGeneralStore::GetDBSyncCompleteCB(DetailAsync async)
341 {
342     if (!async) {
343         return [](auto &) {};
344     }
345     return [async = std::move(async)](const std::map<std::string, DBStatus> &status) {
346         GenDetails details;
347         for (auto &[key, dbStatus] : status) {
348             auto &value = details[key];
349             value.progress = FINISHED;
350             value.code = GeneralError::E_OK;
351             if (dbStatus != DBStatus::OK) {
352                 value.code = dbStatus;
353             }
354         }
355         async(details);
356     };
357 }
358 
CloudSync(const Devices & devices,DistributedDB::SyncMode cloudSyncMode,DetailAsync async,int64_t wait,const std::string & prepareTraceId)359 DBStatus KVDBGeneralStore::CloudSync(const Devices &devices, DistributedDB::SyncMode cloudSyncMode, DetailAsync async,
360     int64_t wait, const std::string &prepareTraceId)
361 {
362     DistributedDB::CloudSyncOption syncOption;
363     syncOption.devices = devices;
364     syncOption.mode = cloudSyncMode;
365     syncOption.waitTime = wait;
366     syncOption.prepareTraceId = prepareTraceId;
367     syncOption.lockAction = DistributedDB::LockAction::NONE;
368     if (storeInfo_.user == 0) {
369         std::vector<int32_t> users;
370         AccountDelegate::GetInstance()->QueryUsers(users);
371         syncOption.users.push_back(std::to_string(users[0]));
372     } else {
373         syncOption.users.push_back(std::to_string(storeInfo_.user));
374     }
375     return delegate_->Sync(syncOption, GetDBProcessCB(async));
376 }
377 
Sync(const Devices & devices,GenQuery & query,DetailAsync async,const SyncParam & syncParam)378 std::pair<int32_t, int32_t> KVDBGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAsync async,
379     const SyncParam &syncParam)
380 {
381     auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
382     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
383     if (delegate_ == nullptr) {
384         ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d", devices.size(),
385             devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), syncParam.mode);
386         return { GeneralError::E_ALREADY_CLOSED, DBStatus::OK };
387     }
388     DBStatus dbStatus;
389     auto dbMode = DistributedDB::SyncMode(syncMode);
390     if (syncMode > NEARBY_END && syncMode < CLOUD_END) {
391         if (!enableCloud_) {
392             return { GeneralError::E_NOT_SUPPORT, DBStatus::OK };
393         }
394         dbStatus = CloudSync(devices, dbMode, async, syncParam.wait, syncParam.prepareTraceId);
395     } else {
396         if (devices.empty()) {
397             ZLOGE("Devices is empty! mode:%{public}d", syncParam.mode);
398             return { GeneralError::E_INVALID_ARGS, DBStatus::OK };
399         }
400         KVDBQuery *kvQuery = nullptr;
401         auto ret = query.QueryInterface(kvQuery);
402         DistributedDB::Query dbQuery;
403         if (ret == GeneralError::E_OK && kvQuery != nullptr && kvQuery->IsValidQuery()) {
404             dbQuery = kvQuery->GetDBQuery();
405         } else {
406             return { GeneralError::E_INVALID_ARGS, DBStatus::OK };
407         }
408         if (syncMode == NEARBY_SUBSCRIBE_REMOTE) {
409             dbStatus = delegate_->SubscribeRemoteQuery(devices, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
410         } else if (syncMode == NEARBY_UNSUBSCRIBE_REMOTE) {
411             dbStatus =
412                 delegate_->UnSubscribeRemoteQuery(devices, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
413         } else if (syncMode < NEARBY_END) {
414             if (kvQuery->IsEmpty()) {
415                 dbStatus = delegate_->Sync(devices, dbMode, GetDBSyncCompleteCB(std::move(async)), false);
416             } else {
417                 dbStatus = delegate_->Sync(devices, dbMode, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
418             }
419         } else {
420             ZLOGE("Err sync mode! sync mode:%{public}d", syncMode);
421             dbStatus = DistributedDB::INVALID_ARGS;
422         }
423     }
424     return { ConvertStatus(dbStatus), dbStatus };
425 }
426 
SetEqualIdentifier(const std::string & appId,const std::string & storeId,std::string account)427 void KVDBGeneralStore::SetEqualIdentifier(const std::string &appId, const std::string &storeId, std::string account)
428 {
429     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
430     if (delegate_ == nullptr) {
431         ZLOGE("store already closed! appId:%{public}s storeId:%{public}s", appId.c_str(),
432             Anonymous::Change(storeId).c_str());
433         return;
434     }
435     std::vector<std::string> sameAccountDevs {};
436     std::vector<std::string> defaultAccountDevs {};
437     auto uuids = DMAdapter::ToUUID(DMAdapter::GetInstance().GetRemoteDevices());
438     if (uuids.empty()) {
439         ZLOGI("no remote device to sync.appId:%{public}s", appId.c_str());
440         return;
441     }
442     GetIdentifierParams(sameAccountDevs, uuids, IDENTICAL_ACCOUNT);
443     GetIdentifierParams(defaultAccountDevs, uuids, NO_ACCOUNT);
444     if (!sameAccountDevs.empty()) {
445         auto accountId = account.empty() ? AccountDelegate::GetInstance()->GetUnencryptedAccountId() : account;
446         auto convertedIds = AppIdMappingConfigManager::GetInstance().Convert(appId, accountId);
447         auto identifier = KvManager::GetKvStoreIdentifier(convertedIds.second, convertedIds.first, storeId);
448         ZLOGI("same account store:%{public}s, user:%{public}s, device:%{public}.10s, appId:%{public}s",
449             Anonymous::Change(storeId).c_str(), Anonymous::Change(convertedIds.second).c_str(),
450             DistributedData::Serializable::Marshall(sameAccountDevs).c_str(), convertedIds.first.c_str());
451         delegate_->SetEqualIdentifier(identifier, sameAccountDevs);
452     }
453     if (!defaultAccountDevs.empty()) {
454         auto convertedIds = AppIdMappingConfigManager::GetInstance().Convert(appId, defaultAccountId);
455         auto identifier = KvManager::GetKvStoreIdentifier(convertedIds.second, convertedIds.first, storeId);
456         ZLOGI("no account store:%{public}s, device:%{public}.10s, appId:%{public}s",
457             Anonymous::Change(storeId).c_str(),
458             DistributedData::Serializable::Marshall(defaultAccountDevs).c_str(), convertedIds.first.c_str());
459         delegate_->SetEqualIdentifier(identifier, defaultAccountDevs);
460     }
461 }
462 
GetIdentifierParams(std::vector<std::string> & devices,const std::vector<std::string> & uuids,int32_t authType)463 void KVDBGeneralStore::GetIdentifierParams(std::vector<std::string> &devices,
464     const std::vector<std::string> &uuids, int32_t authType)
465 {
466     for (const auto &devId : uuids) {
467         if (DMAdapter::GetInstance().IsOHOSType(devId)) {
468             continue;
469         }
470         if (DMAdapter::GetInstance().GetAuthType(devId) != authType) {
471             continue;
472         }
473         devices.push_back(devId);
474     }
475     ZLOGI("devices size: %{public}zu", devices.size());
476 }
477 
PreSharing(GenQuery & query)478 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::PreSharing(GenQuery &query)
479 {
480     return { GeneralError::E_NOT_SUPPORT, nullptr };
481 }
482 
Clean(const std::vector<std::string> & devices,int32_t mode,const std::string & tableName)483 int32_t KVDBGeneralStore::Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName)
484 {
485     if (mode < 0 || mode > CLEAN_MODE_BUTT) {
486         return GeneralError::E_INVALID_ARGS;
487     }
488     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
489     if (delegate_ == nullptr) {
490         ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d", devices.size(),
491             devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode);
492         return GeneralError::E_ALREADY_CLOSED;
493     }
494     DBStatus status = OK;
495     switch (mode) {
496         case CLOUD_INFO:
497             status = delegate_->RemoveDeviceData(
498                 "", isPublic_ ? static_cast<ClearMode>(CLOUD_DATA) : static_cast<ClearMode>(CLOUD_INFO));
499             break;
500         case CLOUD_DATA:
501             status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_DATA));
502             break;
503         case NEARBY_DATA:
504             if (devices.empty()) {
505                 status = delegate_->RemoveDeviceData();
506                 break;
507             }
508             for (auto device : devices) {
509                 status = delegate_->RemoveDeviceData(device);
510             }
511             break;
512         default:
513             return GeneralError::E_ERROR;
514     }
515     return ConvertStatus(status);
516 }
517 
Watch(int32_t origin,Watcher & watcher)518 int32_t KVDBGeneralStore::Watch(int32_t origin, Watcher &watcher)
519 {
520     if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != nullptr) {
521         return GeneralError::E_INVALID_ARGS;
522     }
523 
524     observer_.watcher_ = &watcher;
525     return GeneralError::E_OK;
526 }
527 
Unwatch(int32_t origin,Watcher & watcher)528 int32_t KVDBGeneralStore::Unwatch(int32_t origin, Watcher &watcher)
529 {
530     if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != &watcher) {
531         return GeneralError::E_INVALID_ARGS;
532     }
533 
534     observer_.watcher_ = nullptr;
535     return GeneralError::E_OK;
536 }
537 
Release()538 int32_t KVDBGeneralStore::Release()
539 {
540     auto ref = 1;
541     {
542         std::lock_guard<decltype(mutex_)> lock(mutex_);
543         if (ref_ == 0) {
544             return 0;
545         }
546         ref = --ref_;
547     }
548     ZLOGD("ref:%{public}d", ref);
549     if (ref == 0) {
550         delete this;
551     }
552     return ref;
553 }
554 
AddRef()555 int32_t KVDBGeneralStore::AddRef()
556 {
557     std::lock_guard<decltype(mutex_)> lock(mutex_);
558     if (ref_ == 0) {
559         return 0;
560     }
561     return ++ref_;
562 }
563 
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const std::vector<Reference> & references)564 int32_t KVDBGeneralStore::SetDistributedTables(
565     const std::vector<std::string> &tables, int32_t type, const std::vector<Reference> &references)
566 {
567     return GeneralError::E_OK;
568 }
569 
SetTrackerTable(const std::string & tableName,const std::set<std::string> & trackerColNames,const std::string & extendColName,bool isForceUpgrade)570 int32_t KVDBGeneralStore::SetTrackerTable(const std::string &tableName, const std::set<std::string> &trackerColNames,
571     const std::string &extendColName, bool isForceUpgrade)
572 {
573     return GeneralError::E_OK;
574 }
575 
ConvertStatus(DistributedDB::DBStatus status)576 KVDBGeneralStore::GenErr KVDBGeneralStore::ConvertStatus(DistributedDB::DBStatus status)
577 {
578     auto it = dbStatusMap_.find(status);
579     if (it != dbStatusMap_.end()) {
580         return it->second;
581     }
582     ZLOGI("status:0x%{public}x", status);
583     return GenErr::E_ERROR;
584 }
585 
IsValid()586 bool KVDBGeneralStore::IsValid()
587 {
588     std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
589     return delegate_ != nullptr;
590 }
591 
RegisterDetailProgressObserver(GeneralStore::DetailAsync async)592 int32_t KVDBGeneralStore::RegisterDetailProgressObserver(GeneralStore::DetailAsync async)
593 {
594     return GenErr::E_OK;
595 }
596 
UnregisterDetailProgressObserver()597 int32_t KVDBGeneralStore::UnregisterDetailProgressObserver()
598 {
599     return GenErr::E_OK;
600 }
601 
GetWaterVersion(const std::string & deviceId)602 std::vector<std::string> KVDBGeneralStore::GetWaterVersion(const std::string &deviceId)
603 {
604     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
605     if (delegate_ == nullptr) {
606         ZLOGE("store already closed! deviceId:%{public}s", Anonymous::Change(deviceId).c_str());
607         return {};
608     }
609     auto [status, versions] = delegate_->GetCloudVersion(deviceId);
610     if (status != DBStatus::OK || versions.empty()) {
611         return {};
612     }
613     std::vector<std::string> res;
614     for (auto &[_, version] : versions) {
615         res.push_back(std::move(version));
616     }
617     return res;
618 }
619 
OnChange(DBOrigin origin,const std::string & originalId,DBChangeData && data)620 void KVDBGeneralStore::ObserverProxy::OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data)
621 {
622     if (!HasWatcher()) {
623         return;
624     }
625     GenOrigin genOrigin;
626     genOrigin.origin = (origin == DBOrigin::ORIGIN_CLOUD) ? GenOrigin::ORIGIN_CLOUD : GenOrigin::ORIGIN_NEARBY;
627     genOrigin.id.push_back(originalId);
628     genOrigin.store = storeId_;
629     Watcher::ChangeInfo changeInfo;
630     for (uint32_t i = 0; i < DistributedDB::OP_BUTT; ++i) {
631         auto &info = changeInfo[storeId_][i];
632         for (auto &priData : data.primaryData[i]) {
633             Watcher::PRIValue value;
634             Convert(std::move(*(priData.begin())), value);
635             info.push_back(std::move(value));
636         }
637     }
638     watcher_->OnChange(genOrigin, {}, std::move(changeInfo));
639 }
640 
OnChange(const DistributedDB::KvStoreChangedData & data)641 void KVDBGeneralStore::ObserverProxy::OnChange(const DistributedDB::KvStoreChangedData &data)
642 {
643     if (!HasWatcher()) {
644         return;
645     }
646     const auto &inserts = data.GetEntriesInserted();
647     const auto &deletes = data.GetEntriesDeleted();
648     const auto &updates = data.GetEntriesUpdated();
649     Watcher::ChangeData changeData;
650     ConvertChangeData(inserts, changeData[storeId_][DistributedDB::OP_INSERT]);
651     ConvertChangeData(deletes, changeData[storeId_][DistributedDB::OP_DELETE]);
652     ConvertChangeData(updates, changeData[storeId_][DistributedDB::OP_UPDATE]);
653     GenOrigin genOrigin;
654     genOrigin.origin = GenOrigin::ORIGIN_NEARBY;
655     genOrigin.store = storeId_;
656 
657     watcher_->OnChange(genOrigin, {}, std::move(changeData));
658 }
659 
ConvertChangeData(const std::list<DBEntry> & entries,std::vector<Values> & values)660 void KVDBGeneralStore::ObserverProxy::ConvertChangeData(const std::list<DBEntry> &entries, std::vector<Values> &values)
661 {
662     for (auto &entry : entries) {
663         auto value = std::vector<Value>{ entry.key, entry.value };
664         values.push_back(value);
665     }
666 }
667 
GetDBProcessCB(DetailAsync async)668 KVDBGeneralStore::DBProcessCB KVDBGeneralStore::GetDBProcessCB(DetailAsync async)
669 {
670     return [async](const std::map<std::string, SyncProcess> &processes) {
671         if (!async) {
672             return;
673         }
674         DistributedData::GenDetails details;
675         for (auto &[id, process] : processes) {
676             auto &detail = details[id];
677             detail.progress = process.process;
678             detail.code = ConvertStatus(process.errCode);
679             detail.dbCode = DB_ERR_OFFSET + process.errCode;
680             for (auto [key, value] : process.tableProcess) {
681                 auto &table = detail.details[key];
682                 table.upload.total = value.upLoadInfo.total;
683                 table.upload.success = value.upLoadInfo.successCount;
684                 table.upload.failed = value.upLoadInfo.failCount;
685                 table.upload.untreated = table.upload.total - table.upload.success - table.upload.failed;
686                 table.download.total = value.downLoadInfo.total;
687                 table.download.success = value.downLoadInfo.successCount;
688                 table.download.failed = value.downLoadInfo.failCount;
689                 table.download.untreated = table.download.total - table.download.success - table.download.failed;
690                 detail.changeCount = (process.process == FINISHED)
691                                         ? value.downLoadInfo.insertCount + value.downLoadInfo.updateCount +
692                                               value.downLoadInfo.deleteCount
693                                         : 0;
694             }
695         }
696         if (async) {
697             async(details);
698         }
699     };
700 }
701 
SetDBPushDataInterceptor(int32_t storeType)702 void KVDBGeneralStore::SetDBPushDataInterceptor(int32_t storeType)
703 {
704     delegate_->SetPushDataInterceptor(
705         [this, storeType](DBInterceptedData &data, const std::string &sourceID, const std::string &targetID) {
706             int errCode = DBStatus::OK;
707             if (storeType != KvStoreType::DEVICE_COLLABORATION || DMAdapter::GetInstance().IsOHOSType(targetID)) {
708                 return errCode;
709             }
710             if (targetID.empty()) {
711                 ZLOGE("targetID empty");
712                 return static_cast<int>(DBStatus::DB_ERROR);
713             }
714             auto entries = data.GetEntries();
715             for (size_t i = 0; i < entries.size(); i++) {
716                 if (entries[i].key.empty()) {
717                     continue;
718                 }
719                 auto oriKey = entries[i].key;
720                 auto newKey = GetNewKey(oriKey, sourceID);
721                 errCode = data.ModifyKey(i, newKey);
722                 if (errCode != DBStatus::OK) {
723                     ZLOGE("ModifyKey err: %{public}d", errCode);
724                     break;
725                 }
726             }
727             return errCode;
728         }
729     );
730 }
731 
SetDBReceiveDataInterceptor(int32_t storeType)732 void KVDBGeneralStore::SetDBReceiveDataInterceptor(int32_t storeType)
733 {
734     delegate_->SetReceiveDataInterceptor(
735         [this, storeType](DBInterceptedData &data, const std::string &sourceID, const std::string &targetID) {
736             int errCode = DBStatus::OK;
737             if (storeType != KvStoreType::DEVICE_COLLABORATION || DMAdapter::GetInstance().IsOHOSType(sourceID)) {
738                 return errCode;
739             }
740             if (sourceID.empty()) {
741                 ZLOGE("targetID empty");
742                 return static_cast<int>(DBStatus::DB_ERROR);
743             }
744             auto entries = data.GetEntries();
745             for (size_t i = 0; i < entries.size(); i++) {
746                 if (entries[i].key.empty()) {
747                     continue;
748                 }
749 
750                 auto networkId = DMAdapter::GetInstance().ToNetworkID(sourceID);
751                 auto encyptedUuid = DMAdapter::GetInstance().GetEncryptedUuidByNetworkId(networkId);
752                 if (encyptedUuid.empty()) {
753                     ZLOGE("get encyptedUuid failed");
754                     continue;
755                 }
756 
757                 auto oriKey = entries[i].key;
758                 auto newKey = GetNewKey(oriKey, encyptedUuid);
759                 errCode = data.ModifyKey(i, newKey);
760                 if (errCode != DBStatus::OK) {
761                     ZLOGE("ModifyKey err: %{public}d", errCode);
762                     break;
763                 }
764             }
765             return errCode;
766         }
767     );
768 }
769 
GetNewKey(std::vector<uint8_t> & key,const std::string & uuid)770 std::vector<uint8_t> KVDBGeneralStore::GetNewKey(std::vector<uint8_t> &key, const std::string &uuid)
771 {
772     uint32_t remoteLen = *(reinterpret_cast<uint32_t *>(&(*(key.end() - sizeof(uint32_t)))));
773     remoteLen = le32toh(remoteLen);
774     uint32_t uuidLen = uuid.size();
775 
776     std::vector<uint8_t> out;
777     std::vector<uint8_t> oriKey(key.begin() + remoteLen, key.end() - UUID_WIDTH);
778     out.insert(out.end(), uuid.begin(), uuid.end());
779     out.insert(out.end(), oriKey.begin(), oriKey.end());
780     uuidLen = htole32(uuidLen);
781     uint8_t *buf = reinterpret_cast<uint8_t *>(&uuidLen);
782     out.insert(out.end(), buf, buf + sizeof(uuidLen));
783     return out;
784 }
785 
SetConfig(const GeneralStore::StoreConfig & storeConfig)786 void KVDBGeneralStore::SetConfig(const GeneralStore::StoreConfig &storeConfig)
787 {
788     enableCloud_ = storeConfig.enableCloud_;
789 }
790 
LockCloudDB()791 std::pair<int32_t, uint32_t> KVDBGeneralStore::LockCloudDB()
792 {
793     return { GeneralError::E_NOT_SUPPORT, 0 };
794 }
795 
UnLockCloudDB()796 int32_t KVDBGeneralStore::UnLockCloudDB()
797 {
798     return GeneralError::E_NOT_SUPPORT;
799 }
800 
SetExecutor(std::shared_ptr<Executor> executor)801 void KVDBGeneralStore::SetExecutor(std::shared_ptr<Executor> executor)
802 {
803     return;
804 }
805 } // namespace OHOS::DistributedKv