1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "ObjectStoreManager"
16 
17 #include "object_manager.h"
18 
19 #include <regex>
20 
21 #include "accesstoken_kit.h"
22 #include "account/account_delegate.h"
23 #include "block_data.h"
24 #include "bootstrap.h"
25 #include "common/bytes.h"
26 #include "common/string_utils.h"
27 #include "datetime_ex.h"
28 #include "distributed_file_daemon_manager.h"
29 #include "kvstore_utils.h"
30 #include "log_print.h"
31 #include "metadata/meta_data_manager.h"
32 #include "metadata/store_meta_data.h"
33 #include "object_dms_handler.h"
34 #include "object_radar_reporter.h"
35 #include "utils/anonymous.h"
36 
37 namespace OHOS {
38 namespace DistributedObject {
39 using namespace OHOS::DistributedKv;
40 using namespace Security::AccessToken;
41 using StoreMetaData = OHOS::DistributedData::StoreMetaData;
42 using AccountDelegate = DistributedKv::AccountDelegate;
43 using Account = OHOS::DistributedKv::AccountDelegate;
44 using AccessTokenKit = Security::AccessToken::AccessTokenKit;
45 using ValueProxy = OHOS::DistributedData::ValueProxy;
46 using DistributedFileDaemonManager = Storage::DistributedFile::DistributedFileDaemonManager;
47 constexpr const char *SAVE_INFO = "p_###SAVEINFO###";
ObjectStoreManager()48 ObjectStoreManager::ObjectStoreManager()
49 {
50     ZLOGI("ObjectStoreManager construct");
51     RegisterAssetsLister();
52 }
53 
~ObjectStoreManager()54 ObjectStoreManager::~ObjectStoreManager()
55 {
56     ZLOGI("ObjectStoreManager destroy");
57     if (objectAssetsRecvListener_ != nullptr) {
58         auto status = DistributedFileDaemonManager::GetInstance().UnRegisterAssetCallback(objectAssetsRecvListener_);
59         if (status != DistributedDB::DBStatus::OK) {
60             ZLOGE("UnRegister assetsRecvListener err %{public}d", status);
61         }
62     }
63 }
64 
OpenObjectKvStore()65 DistributedDB::KvStoreNbDelegate *ObjectStoreManager::OpenObjectKvStore()
66 {
67     DistributedDB::KvStoreNbDelegate *store = nullptr;
68     DistributedDB::KvStoreNbDelegate::Option option;
69     option.createDirByStoreIdOnly = true;
70     option.syncDualTupleMode = true;
71     option.secOption = { DistributedDB::S1, DistributedDB::ECE };
72     if (objectDataListener_ == nullptr) {
73         objectDataListener_ = new ObjectDataListener();
74     }
75     ZLOGD("start GetKvStore");
76     kvStoreDelegateManager_->GetKvStore(ObjectCommon::OBJECTSTORE_DB_STOREID, option,
77         [&store, this](DistributedDB::DBStatus dbStatus, DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate) {
78             if (dbStatus != DistributedDB::DBStatus::OK) {
79                 ZLOGE("GetKvStore fail %{public}d", dbStatus);
80                 return;
81             }
82             ZLOGI("GetKvStore successsfully");
83             store = kvStoreNbDelegate;
84             std::vector<uint8_t> tmpKey;
85             DistributedDB::DBStatus status = store->RegisterObserver(tmpKey,
86                 DistributedDB::ObserverMode::OBSERVER_CHANGES_FOREIGN,
87                 objectDataListener_);
88             if (status != DistributedDB::DBStatus::OK) {
89                 ZLOGE("RegisterObserver err %{public}d", status);
90             }
91         });
92     return store;
93 }
94 
RegisterAssetsLister()95 bool ObjectStoreManager::RegisterAssetsLister()
96 {
97     if (objectAssetsSendListener_ == nullptr) {
98         objectAssetsSendListener_ = new ObjectAssetsSendListener();
99     }
100     if (objectAssetsRecvListener_ == nullptr) {
101         objectAssetsRecvListener_ = new ObjectAssetsRecvListener();
102     }
103     auto status = DistributedFileDaemonManager::GetInstance().RegisterAssetCallback(objectAssetsRecvListener_);
104     if (status != DistributedDB::DBStatus::OK) {
105         ZLOGE("Register assetsRecvListener err %{public}d", status);
106         return false;
107     }
108     return true;
109 }
110 
ProcessSyncCallback(const std::map<std::string,int32_t> & results,const std::string & appId,const std::string & sessionId,const std::string & deviceId)111 void ObjectStoreManager::ProcessSyncCallback(const std::map<std::string, int32_t> &results, const std::string &appId,
112     const std::string &sessionId, const std::string &deviceId)
113 {
114     if (results.empty() || results.find(LOCAL_DEVICE) != results.end()) {
115         return;
116     }
117     int32_t result = Open();
118     if (result != OBJECT_SUCCESS) {
119         ZLOGE("Open failed, errCode = %{public}d", result);
120         return;
121     }
122     // delete local data
123     result = RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, deviceId));
124     if (result != OBJECT_SUCCESS) {
125         ZLOGE("Save failed, status = %{public}d", result);
126     }
127     Close();
128     return;
129 }
130 
Save(const std::string & appId,const std::string & sessionId,const ObjectRecord & data,const std::string & deviceId,sptr<IRemoteObject> callback)131 int32_t ObjectStoreManager::Save(const std::string &appId, const std::string &sessionId,
132     const ObjectRecord &data, const std::string &deviceId, sptr<IRemoteObject> callback)
133 {
134     auto proxy = iface_cast<ObjectSaveCallbackProxy>(callback);
135     if (deviceId.size() == 0) {
136         ZLOGE("DeviceId empty, appId: %{public}s, sessionId: %{public}s", appId.c_str(), sessionId.c_str());
137         proxy->Completed(std::map<std::string, int32_t>());
138         return INVALID_ARGUMENT;
139     }
140     int32_t result = Open();
141     if (result != OBJECT_SUCCESS) {
142         ZLOGE("Open object kvstore failed, result: %{public}d", result);
143         ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
144             ObjectStore::SAVE_TO_STORE, ObjectStore::RADAR_FAILED, ObjectStore::GETKV_FAILED, ObjectStore::FINISHED);
145         proxy->Completed(std::map<std::string, int32_t>());
146         return STORE_NOT_OPEN;
147     }
148     SaveUserToMeta();
149     std::string dstBundleName = ObjectDmsHandler::GetInstance().GetDstBundleName(appId, deviceId);
150     result = SaveToStore(dstBundleName, sessionId, deviceId, data);
151     if (result != OBJECT_SUCCESS) {
152         ZLOGE("Save to store failed, result: %{public}d", result);
153         ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
154             ObjectStore::SAVE_TO_STORE, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
155         Close();
156         proxy->Completed(std::map<std::string, int32_t>());
157         return result;
158     }
159     ZLOGI("Sync data, bundleName: %{public}s, sessionId: %{public}s, deviceId: %{public}s", dstBundleName.c_str(),
160         sessionId.c_str(), Anonymous::Change(deviceId).c_str());
161     SyncCallBack syncCallback =
162         [proxy, dstBundleName, sessionId, deviceId, this](const std::map<std::string, int32_t> &results) {
163             ProcessSyncCallback(results, dstBundleName, sessionId, deviceId);
164             proxy->Completed(results);
165         };
166     result = SyncOnStore(GetPropertyPrefix(dstBundleName, sessionId, deviceId), {deviceId}, syncCallback);
167     if (result != OBJECT_SUCCESS) {
168         ZLOGE("Sync data failed, result: %{public}d", result);
169         ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
170             ObjectStore::SYNC_DATA, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
171         Close();
172         proxy->Completed(std::map<std::string, int32_t>());
173         return result;
174     }
175     Close();
176     return PushAssets(appId, dstBundleName, sessionId, data, deviceId);
177 }
178 
PushAssets(const std::string & srcBundleName,const std::string & dstBundleName,const std::string & sessionId,const ObjectRecord & data,const std::string & deviceId)179 int32_t ObjectStoreManager::PushAssets(const std::string &srcBundleName, const std::string &dstBundleName,
180     const std::string &sessionId, const ObjectRecord &data, const std::string &deviceId)
181 {
182     Assets assets = GetAssetsFromDBRecords(data);
183     if (assets.empty() || data.find(ObjectStore::FIELDS_PREFIX + ObjectStore::DEVICEID_KEY) == data.end()) {
184         return OBJECT_SUCCESS;
185     }
186     sptr<AssetObj> assetObj = new AssetObj();
187     assetObj->dstBundleName_ = dstBundleName;
188     assetObj->srcBundleName_ = srcBundleName;
189     assetObj->dstNetworkId_ = deviceId;
190     assetObj->sessionId_ = sessionId;
191     for (const auto& asset : assets) {
192         assetObj->uris_.push_back(asset.uri);
193     }
194     if (objectAssetsSendListener_ == nullptr) {
195         objectAssetsSendListener_ = new ObjectAssetsSendListener();
196     }
197     int userId = std::atoi(GetCurrentUser().c_str());
198     auto status =  ObjectAssetLoader::GetInstance()->PushAsset(userId, assetObj, objectAssetsSendListener_);
199     return status;
200 }
201 
RevokeSave(const std::string & appId,const std::string & sessionId,sptr<IRemoteObject> callback)202 int32_t ObjectStoreManager::RevokeSave(
203     const std::string &appId, const std::string &sessionId, sptr<IRemoteObject> callback)
204 {
205     auto proxy = iface_cast<ObjectRevokeSaveCallbackProxy>(callback);
206     int32_t result = Open();
207     if (result != OBJECT_SUCCESS) {
208         ZLOGE("Open failed, errCode = %{public}d", result);
209         proxy->Completed(STORE_NOT_OPEN);
210         return STORE_NOT_OPEN;
211     }
212 
213     result = RevokeSaveToStore(GetPrefixWithoutDeviceId(appId, sessionId));
214     if (result != OBJECT_SUCCESS) {
215         ZLOGE("RevokeSave failed, errCode = %{public}d", result);
216         Close();
217         proxy->Completed(result);
218         return result;
219     }
220     std::vector<std::string> deviceList;
221     auto deviceInfos = DmAdaper::GetInstance().GetRemoteDevices();
222     std::for_each(deviceInfos.begin(), deviceInfos.end(),
223         [&deviceList](AppDistributedKv::DeviceInfo info) { deviceList.emplace_back(info.networkId); });
224     if (!deviceList.empty()) {
225         SyncCallBack tmp = [proxy](const std::map<std::string, int32_t> &results) {
226             ZLOGI("revoke save finished");
227             proxy->Completed(OBJECT_SUCCESS);
228         };
229         result = SyncOnStore(GetPropertyPrefix(appId, sessionId), deviceList, tmp);
230         if (result != OBJECT_SUCCESS) {
231             ZLOGE("sync failed, errCode = %{public}d", result);
232             proxy->Completed(result);
233         }
234     } else {
235         proxy->Completed(OBJECT_SUCCESS);
236     };
237     Close();
238     return result;
239 }
240 
Retrieve(const std::string & bundleName,const std::string & sessionId,sptr<IRemoteObject> callback,uint32_t tokenId)241 int32_t ObjectStoreManager::Retrieve(
242     const std::string &bundleName, const std::string &sessionId, sptr<IRemoteObject> callback, uint32_t tokenId)
243 {
244     auto proxy = iface_cast<ObjectRetrieveCallbackProxy>(callback);
245     int32_t result = Open();
246     if (result != OBJECT_SUCCESS) {
247         ZLOGE("Open object kvstore failed, result: %{public}d", result);
248         proxy->Completed(ObjectRecord(), false);
249         return ObjectStore::GETKV_FAILED;
250     }
251     ObjectRecord results{};
252     int32_t status = RetrieveFromStore(bundleName, sessionId, results);
253     if (status != OBJECT_SUCCESS) {
254         ZLOGI("Retrieve from store failed, status: %{public}d", status);
255         Close();
256         proxy->Completed(ObjectRecord(), false);
257         return status;
258     }
259     bool allReady = false;
260     Assets assets = GetAssetsFromDBRecords(results);
261     if (assets.empty() || results.find(ObjectStore::FIELDS_PREFIX + ObjectStore::DEVICEID_KEY) == results.end()) {
262         allReady = true;
263     } else {
264         auto objectKey = bundleName + sessionId;
265         restoreStatus_.ComputeIfPresent(objectKey, [&allReady](const auto &key, auto &value) {
266             if (value == RestoreStatus::ALL_READY) {
267                 allReady = true;
268                 return false;
269             }
270             if (value == RestoreStatus::DATA_READY) {
271                 value = RestoreStatus::DATA_NOTIFIED;
272             }
273             return true;
274         });
275     }
276     status = RevokeSaveToStore(GetPrefixWithoutDeviceId(bundleName, sessionId));
277     if (status != OBJECT_SUCCESS) {
278         ZLOGE("Revoke save failed, status: %{public}d", status);
279         Close();
280         proxy->Completed(ObjectRecord(), false);
281         return status;
282     }
283     Close();
284     proxy->Completed(results, allReady);
285     if (allReady) {
286         ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
287             ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
288     }
289     return status;
290 }
291 
Clear()292 int32_t ObjectStoreManager::Clear()
293 {
294     ZLOGI("enter");
295     std::string userId = GetCurrentUser();
296     if (userId.empty()) {
297         return OBJECT_INNER_ERROR;
298     }
299     std::vector<StoreMetaData> metaData;
300     std::string appId = DistributedData::Bootstrap::GetInstance().GetProcessLabel();
301     std::string metaKey = GetMetaUserIdKey(userId, appId);
302     if (!DistributedData::MetaDataManager::GetInstance().LoadMeta(metaKey, metaData, true)) {
303         ZLOGE("no store of %{public}s", appId.c_str());
304         return OBJECT_STORE_NOT_FOUND;
305     }
306     for (const auto &storeMeta : metaData) {
307         if (storeMeta.storeType < StoreMetaData::StoreType::STORE_OBJECT_BEGIN
308             || storeMeta.storeType > StoreMetaData::StoreType::STORE_OBJECT_END) {
309             continue;
310         }
311         if (storeMeta.user == userId) {
312             ZLOGI("user is same, not need to change, mate user:%{public}s::user:%{public}s.",
313                 storeMeta.user.c_str(), userId.c_str());
314             return OBJECT_SUCCESS;
315         }
316     }
317     ZLOGD("user is change, need to change");
318     int32_t result = Open();
319     if (result != OBJECT_SUCCESS) {
320         ZLOGE("Open failed, errCode = %{public}d", result);
321         return STORE_NOT_OPEN;
322     }
323     result = RevokeSaveToStore("");
324     Close();
325     return result;
326 }
327 
DeleteByAppId(const std::string & appId,int32_t user)328 int32_t ObjectStoreManager::DeleteByAppId(const std::string &appId, int32_t user)
329 {
330     int32_t result = Open();
331     if (result != OBJECT_SUCCESS) {
332         ZLOGE("Open store failed, result: %{public}d, appId: %{public}s, user: %{public}d", result,
333             appId.c_str(), user);
334         return STORE_NOT_OPEN;
335     }
336     result = RevokeSaveToStore(appId);
337     if (result != OBJECT_SUCCESS) {
338         ZLOGE("Revoke save failed, result: %{public}d, appId: %{public}s, user: %{public}d", result,
339             appId.c_str(), user);
340     }
341     Close();
342     std::string userId = std::to_string(user);
343     std::string metaKey = GetMetaUserIdKey(userId, appId);
344     auto status = DistributedData::MetaDataManager::GetInstance().DelMeta(metaKey, true);
345     if (!status) {
346         ZLOGE("Delete meta failed, userId: %{public}s, appId: %{public}s", userId.c_str(), appId.c_str());
347     }
348     return result;
349 }
350 
RegisterRemoteCallback(const std::string & bundleName,const std::string & sessionId,pid_t pid,uint32_t tokenId,sptr<IRemoteObject> callback)351 void ObjectStoreManager::RegisterRemoteCallback(const std::string &bundleName, const std::string &sessionId,
352                                                 pid_t pid, uint32_t tokenId,
353                                                 sptr<IRemoteObject> callback)
354 {
355     if (bundleName.empty() || sessionId.empty()) {
356         ZLOGD("ObjectStoreManager::RegisterRemoteCallback empty");
357         return;
358     }
359     ZLOGD("ObjectStoreManager::RegisterRemoteCallback start");
360     auto proxy = iface_cast<ObjectChangeCallbackProxy>(callback);
361     std::string prefix = bundleName + sessionId;
362     callbacks_.Compute(tokenId, ([pid, &proxy, &prefix](const uint32_t key, CallbackInfo &value) {
363         if (value.pid != pid) {
364             value = CallbackInfo { pid };
365         }
366         value.observers_.insert_or_assign(prefix, proxy);
367         return !value.observers_.empty();
368     }));
369 }
370 
UnregisterRemoteCallback(const std::string & bundleName,pid_t pid,uint32_t tokenId,const std::string & sessionId)371 void ObjectStoreManager::UnregisterRemoteCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId,
372                                                   const std::string &sessionId)
373 {
374     if (bundleName.empty()) {
375         ZLOGD("bundleName is empty");
376         return;
377     }
378     callbacks_.Compute(tokenId, ([pid, &sessionId, &bundleName](const uint32_t key, CallbackInfo &value) {
379         if (value.pid != pid) {
380             return true;
381         }
382         if (sessionId.empty()) {
383             return false;
384         }
385         std::string prefix = bundleName + sessionId;
386         for (auto it = value.observers_.begin(); it != value.observers_.end();) {
387             if ((*it).first == prefix) {
388                 it = value.observers_.erase(it);
389             } else {
390                 ++it;
391             }
392         }
393         return true;
394     }));
395 }
396 
NotifyChange(ObjectRecord & changedData)397 void ObjectStoreManager::NotifyChange(ObjectRecord &changedData)
398 {
399     ZLOGI("OnChange start, size:%{public}zu", changedData.size());
400     bool hasAsset = false;
401     SaveInfo saveInfo;
402     for (const auto &[key, value] : changedData) {
403         if (key.find(SAVE_INFO) != std::string::npos) {
404             DistributedData::Serializable::Unmarshall(std::string(value.begin(), value.end()), saveInfo);
405             break;
406         }
407     }
408     auto data = GetObjectData(changedData, saveInfo, hasAsset);
409     if (!hasAsset) {
410         ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
411             ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, saveInfo.bundleName);
412         callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
413             DoNotify(tokenId, value, data, true); // no asset, data ready means all ready
414             return false;
415         });
416         return;
417     }
418     NotifyDataChanged(data, saveInfo);
419     SaveUserToMeta();
420 }
421 
GetObjectData(const ObjectRecord & changedData,SaveInfo & saveInfo,bool & hasAsset)422 std::map<std::string, ObjectRecord> ObjectStoreManager::GetObjectData(const ObjectRecord& changedData,
423     SaveInfo& saveInfo, bool& hasAsset)
424 {
425     std::map<std::string, ObjectRecord> data;
426     std::string keyPrefix = saveInfo.ToPropertyPrefix();
427     if (!keyPrefix.empty()) {
428         std::string observerKey = saveInfo.bundleName + saveInfo.sessionId;
429         for (const auto &[key, value] : changedData) {
430             if (key.size() < keyPrefix.size() || key.find(SAVE_INFO) != std::string::npos) {
431                 continue;
432             }
433             std::string propertyName = key.substr(keyPrefix.size());
434             data[observerKey].insert_or_assign(propertyName, value);
435             if (!hasAsset && IsAssetKey(propertyName)) {
436                 hasAsset = true;
437             }
438         }
439     } else {
440         for (const auto &item : changedData) {
441             std::vector<std::string> splitKeys = SplitEntryKey(item.first);
442             if (splitKeys.size() <= PROPERTY_NAME_INDEX) {
443                 continue;
444             }
445             if (saveInfo.sourceDeviceId.empty() || saveInfo.bundleName.empty()) {
446                 saveInfo.sourceDeviceId = splitKeys[SOURCE_DEVICE_UDID_INDEX];
447                 saveInfo.bundleName = splitKeys[BUNDLE_NAME_INDEX];
448                 saveInfo.sessionId = splitKeys[SESSION_ID_INDEX];
449                 saveInfo.timestamp = splitKeys[TIME_INDEX];
450             }
451             std::string prefix = splitKeys[BUNDLE_NAME_INDEX] + splitKeys[SESSION_ID_INDEX];
452             std::string propertyName = splitKeys[PROPERTY_NAME_INDEX];
453             data[prefix].insert_or_assign(propertyName, item.second);
454             if (IsAssetKey(propertyName)) {
455                 hasAsset = true;
456             }
457         }
458     }
459     return data;
460 }
461 
ComputeStatus(const std::string & objectKey,const SaveInfo & saveInfo,const std::map<std::string,ObjectRecord> & data)462 void ObjectStoreManager::ComputeStatus(const std::string& objectKey, const SaveInfo& saveInfo,
463     const std::map<std::string, ObjectRecord>& data)
464 {
465     restoreStatus_.Compute(objectKey, [this, &data, saveInfo] (const auto &key, auto &value) {
466         if (value == RestoreStatus::ASSETS_READY) {
467             value = RestoreStatus::ALL_READY;
468             ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
469                 ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS);
470             callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
471                 DoNotify(tokenId, value, data, true);
472                 return false;
473             });
474         } else {
475             value = RestoreStatus::DATA_READY;
476             ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
477                 ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, saveInfo.bundleName);
478             callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
479                 DoNotify(tokenId, value, data, false);
480                 return false;
481             });
482             WaitAssets(key, saveInfo, data);
483         }
484         return true;
485     });
486 }
487 
NotifyDataChanged(const std::map<std::string,ObjectRecord> & data,const SaveInfo & saveInfo)488 void ObjectStoreManager::NotifyDataChanged(const std::map<std::string, ObjectRecord>& data, const SaveInfo& saveInfo)
489 {
490     for (auto const& [objectKey, results] : data) {
491         restoreStatus_.ComputeIfAbsent(
492             objectKey, [](const std::string& key) -> auto {
493             return RestoreStatus::NONE;
494         });
495         ComputeStatus(objectKey, saveInfo, data);
496     }
497 }
498 
WaitAssets(const std::string & objectKey,const SaveInfo & saveInfo,const std::map<std::string,ObjectRecord> & data)499 int32_t ObjectStoreManager::WaitAssets(const std::string& objectKey, const SaveInfo& saveInfo,
500     const std::map<std::string, ObjectRecord>& data)
501 {
502     auto taskId = executors_->Schedule(std::chrono::seconds(WAIT_TIME), [this, objectKey, data, saveInfo]() {
503         ZLOGE("wait assets finisehd timeout, try pull assets, objectKey:%{public}s", objectKey.c_str());
504         PullAssets(data, saveInfo);
505         DoNotifyWaitAssetTimeout(objectKey);
506     });
507 
508     objectTimer_.ComputeIfAbsent(
509         objectKey, [taskId](const std::string& key) -> auto {
510             return taskId;
511     });
512     return  OBJECT_SUCCESS;
513 }
514 
PullAssets(const std::map<std::string,ObjectRecord> & data,const SaveInfo & saveInfo)515 void ObjectStoreManager::PullAssets(const std::map<std::string, ObjectRecord>& data, const SaveInfo& saveInfo)
516 {
517     std::map<std::string, Assets> changedAssets;
518     for (auto const& [objectId, result] : data) {
519         changedAssets[objectId] = GetAssetsFromDBRecords(result);
520     }
521     for (const auto& [objectId, assets] : changedAssets) {
522         std::string networkId = DmAdaper::GetInstance().ToNetworkID(saveInfo.sourceDeviceId);
523         auto block = std::make_shared<BlockData<std::tuple<bool, bool>>>(WAIT_TIME, std::tuple{ true, true });
524         ObjectAssetLoader::GetInstance()->TransferAssetsAsync(std::stoi(GetCurrentUser()),
525             saveInfo.bundleName, networkId, assets, [this, block](bool success) {
526                 block->SetValue({ false, success });
527         });
528         auto [timeout, success] = block->GetValue();
529         ZLOGI("Pull assets end, timeout: %{public}d, success: %{public}d, size:%{public}zu, deviceId: %{public}s",
530             timeout, success, assets.size(), DistributedData::Anonymous::Change(networkId).c_str());
531     }
532 }
533 
NotifyAssetsReady(const std::string & objectKey,const std::string & bundleName,const std::string & srcNetworkId)534 void ObjectStoreManager::NotifyAssetsReady(const std::string& objectKey, const std::string& bundleName,
535     const std::string& srcNetworkId)
536 {
537     restoreStatus_.ComputeIfAbsent(
538         objectKey, [](const std::string& key) -> auto {
539         return RestoreStatus::NONE;
540     });
541     restoreStatus_.Compute(objectKey, [this, &bundleName] (const auto &key, auto &value) {
542         if (value == RestoreStatus::DATA_NOTIFIED) {
543             value = RestoreStatus::ALL_READY;
544             ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
545                 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS);
546             callbacks_.ForEach([this, key](uint32_t tokenId, const CallbackInfo& value) {
547                 DoNotifyAssetsReady(tokenId, value,  key, true);
548                 return false;
549             });
550         } else if (value == RestoreStatus::DATA_READY) {
551             value = RestoreStatus::ALL_READY;
552             ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
553                 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS);
554             auto [has, taskId] = objectTimer_.Find(key);
555             if (has) {
556                 executors_->Remove(taskId);
557                 objectTimer_.Erase(key);
558             }
559         } else {
560             value = RestoreStatus::ASSETS_READY;
561             ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
562                 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, bundleName);
563         }
564         return true;
565     });
566 }
567 
NotifyAssetsStart(const std::string & objectKey,const std::string & srcNetworkId)568 void ObjectStoreManager::NotifyAssetsStart(const std::string& objectKey, const std::string& srcNetworkId)
569 {
570     restoreStatus_.ComputeIfAbsent(
571         objectKey, [](const std::string& key) -> auto {
572         return RestoreStatus::NONE;
573     });
574 }
575 
IsAssetKey(const std::string & key)576 bool ObjectStoreManager::IsAssetKey(const std::string& key)
577 {
578     return key.find(ObjectStore::ASSET_DOT) != std::string::npos;
579 }
580 
IsAssetComplete(const ObjectRecord & result,const std::string & assetPrefix)581 bool ObjectStoreManager::IsAssetComplete(const ObjectRecord& result, const std::string& assetPrefix)
582 {
583     if (result.find(assetPrefix + ObjectStore::NAME_SUFFIX) == result.end() ||
584         result.find(assetPrefix + ObjectStore::URI_SUFFIX) == result.end() ||
585         result.find(assetPrefix + ObjectStore::PATH_SUFFIX) == result.end() ||
586         result.find(assetPrefix + ObjectStore::CREATE_TIME_SUFFIX) == result.end() ||
587         result.find(assetPrefix + ObjectStore::MODIFY_TIME_SUFFIX) == result.end() ||
588         result.find(assetPrefix + ObjectStore::SIZE_SUFFIX) == result.end()) {
589         return false;
590     }
591     return true;
592 }
593 
GetAssetsFromDBRecords(const ObjectRecord & result)594 Assets ObjectStoreManager::GetAssetsFromDBRecords(const ObjectRecord& result)
595 {
596     Assets assets{};
597     std::set<std::string> assetKey;
598     for (const auto& [key, value] : result) {
599         std::string assetPrefix = key.substr(0, key.find(ObjectStore::ASSET_DOT));
600         if (!IsAssetKey(key) || assetKey.find(assetPrefix) != assetKey.end() ||
601             result.find(assetPrefix + ObjectStore::NAME_SUFFIX) == result.end() ||
602             result.find(assetPrefix + ObjectStore::URI_SUFFIX) == result.end()) {
603             continue;
604         }
605         Asset asset;
606         ObjectStore::StringUtils::BytesToStrWithType(
607             result.find(assetPrefix + ObjectStore::NAME_SUFFIX)->second, asset.name);
608         if (asset.name.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
609             asset.name = asset.name.substr(ObjectStore::STRING_PREFIX_LEN);
610         }
611         ObjectStore::StringUtils::BytesToStrWithType(
612             result.find(assetPrefix + ObjectStore::URI_SUFFIX)->second, asset.uri);
613         if (asset.uri.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
614             asset.uri = asset.uri.substr(ObjectStore::STRING_PREFIX_LEN);
615         }
616         ObjectStore::StringUtils::BytesToStrWithType(
617             result.find(assetPrefix + ObjectStore::MODIFY_TIME_SUFFIX)->second, asset.modifyTime);
618         if (asset.modifyTime.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
619             asset.modifyTime = asset.modifyTime.substr(ObjectStore::STRING_PREFIX_LEN);
620         }
621         ObjectStore::StringUtils::BytesToStrWithType(
622             result.find(assetPrefix + ObjectStore::SIZE_SUFFIX)->second, asset.size);
623         if (asset.size.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
624             asset.size = asset.size.substr(ObjectStore::STRING_PREFIX_LEN);
625         }
626         asset.hash = asset.modifyTime + "_" + asset.size;
627         assets.push_back(asset);
628         assetKey.insert(assetPrefix);
629     }
630     return assets;
631 }
632 
DoNotify(uint32_t tokenId,const CallbackInfo & value,const std::map<std::string,ObjectRecord> & data,bool allReady)633 void ObjectStoreManager::DoNotify(uint32_t tokenId, const CallbackInfo& value,
634     const std::map<std::string, ObjectRecord>& data, bool allReady)
635 {
636     for (const auto& observer : value.observers_) {
637         auto it = data.find(observer.first);
638         if (it == data.end()) {
639             continue;
640         }
641         observer.second->Completed((*it).second, allReady);
642         if (allReady) {
643             restoreStatus_.Erase(observer.first);
644             ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
645                 ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
646         } else {
647             restoreStatus_.ComputeIfPresent(observer.first, [](const auto &key, auto &value) {
648                 value = RestoreStatus::DATA_NOTIFIED;
649                 return true;
650             });
651         }
652     }
653 }
654 
DoNotifyAssetsReady(uint32_t tokenId,const CallbackInfo & value,const std::string & objectKey,bool allReady)655 void ObjectStoreManager::DoNotifyAssetsReady(uint32_t tokenId, const CallbackInfo& value,
656     const std::string& objectKey, bool allReady)
657 {
658     for (const auto& observer : value.observers_) {
659         if (objectKey != observer.first) {
660             continue;
661         }
662         observer.second->Completed(ObjectRecord(), allReady);
663         if (allReady) {
664             restoreStatus_.Erase(objectKey);
665             ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
666                 ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
667         }
668         auto [has, taskId] = objectTimer_.Find(objectKey);
669         if (has) {
670             executors_->Remove(taskId);
671             objectTimer_.Erase(objectKey);
672         }
673     }
674 }
675 
DoNotifyWaitAssetTimeout(const std::string & objectKey)676 void ObjectStoreManager::DoNotifyWaitAssetTimeout(const std::string &objectKey)
677 {
678     ObjectStore::RadarReporter::ReportStageError(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
679         ObjectStore::ASSETS_RECV, ObjectStore::RADAR_FAILED, ObjectStore::TIMEOUT);
680     callbacks_.ForEach([this, &objectKey](uint32_t tokenId, const CallbackInfo &value) {
681         for (const auto& observer : value.observers_) {
682             if (objectKey != observer.first) {
683                 continue;
684             }
685             observer.second->Completed(ObjectRecord(), true);
686             restoreStatus_.Erase(objectKey);
687             auto [has, taskId] = objectTimer_.Find(objectKey);
688             if (has) {
689                 executors_->Remove(taskId);
690                 objectTimer_.Erase(objectKey);
691             }
692             ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
693                 ObjectStore::NOTIFY, ObjectStore::RADAR_FAILED, ObjectStore::TIMEOUT, ObjectStore::FINISHED);
694         }
695         return false;
696     });
697 }
698 
SetData(const std::string & dataDir,const std::string & userId)699 void ObjectStoreManager::SetData(const std::string &dataDir, const std::string &userId)
700 {
701     ZLOGI("enter %{public}s", dataDir.c_str());
702     kvStoreDelegateManager_ =
703         new DistributedDB::KvStoreDelegateManager(DistributedData::Bootstrap::GetInstance().GetProcessLabel(), userId);
704     DistributedDB::KvStoreConfig kvStoreConfig { dataDir };
705     kvStoreDelegateManager_->SetKvStoreConfig(kvStoreConfig);
706     userId_ = userId;
707 }
708 
Open()709 int32_t ObjectStoreManager::Open()
710 {
711     if (kvStoreDelegateManager_ == nullptr) {
712         ZLOGE("Kvstore delegate manager not init");
713         return OBJECT_INNER_ERROR;
714     }
715     std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
716     if (delegate_ == nullptr) {
717         delegate_ = OpenObjectKvStore();
718         if (delegate_ == nullptr) {
719             ZLOGE("Open object kvstore failed");
720             return OBJECT_DBSTATUS_ERROR;
721         }
722         syncCount_ = 1;
723         ZLOGI("Open object kvstore success");
724     } else {
725         syncCount_++;
726         ZLOGI("Object kvstore syncCount: %{public}d", syncCount_);
727     }
728     return OBJECT_SUCCESS;
729 }
730 
Close()731 void ObjectStoreManager::Close()
732 {
733     std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
734     if (delegate_ == nullptr) {
735         return;
736     }
737     int32_t taskCount = delegate_->GetTaskCount();
738     if (taskCount > 0 && syncCount_ == 1) {
739         CloseAfterMinute();
740         ZLOGW("Store is busy, close after a minute, task count: %{public}d", taskCount);
741         return;
742     }
743     syncCount_--;
744     ZLOGI("closed a store, syncCount = %{public}d", syncCount_);
745     FlushClosedStore();
746 }
747 
SyncCompleted(const std::map<std::string,DistributedDB::DBStatus> & results,uint64_t sequenceId)748 void ObjectStoreManager::SyncCompleted(
749     const std::map<std::string, DistributedDB::DBStatus> &results, uint64_t sequenceId)
750 {
751     std::string userId;
752     SequenceSyncManager::Result result = SequenceSyncManager::GetInstance()->Process(sequenceId, results, userId);
753     if (result == SequenceSyncManager::SUCCESS_USER_HAS_FINISHED && userId == userId_) {
754         std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
755         SetSyncStatus(false);
756         FlushClosedStore();
757     }
758     for (const auto &item : results) {
759         if (item.second == DistributedDB::DBStatus::OK) {
760             ZLOGI("Sync data success, sequenceId: 0x%{public}" PRIx64 "", sequenceId);
761             ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::SAVE,
762                 ObjectStore::SYNC_DATA, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
763         } else {
764             ZLOGE("Sync data failed, sequenceId: 0x%{public}" PRIx64 ", status: %{public}d", sequenceId, item.second);
765             ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
766                 ObjectStore::SYNC_DATA, ObjectStore::RADAR_FAILED, item.second, ObjectStore::FINISHED);
767         }
768     }
769 }
770 
FlushClosedStore()771 void ObjectStoreManager::FlushClosedStore()
772 {
773     std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
774     if (!isSyncing_ && syncCount_ == 0 && delegate_ != nullptr) {
775         ZLOGD("close store");
776         auto status = kvStoreDelegateManager_->CloseKvStore(delegate_);
777         if (status != DistributedDB::DBStatus::OK) {
778             int timeOut = 1000;
779             executors_->Schedule(std::chrono::milliseconds(timeOut), [this]() {
780                 FlushClosedStore();
781             });
782             ZLOGE("GetEntries fail %{public}d", status);
783             return;
784         }
785         delegate_ = nullptr;
786         if (objectDataListener_ != nullptr) {
787             delete objectDataListener_;
788             objectDataListener_ = nullptr;
789         }
790     }
791 }
792 
ProcessOldEntry(const std::string & appId)793 void ObjectStoreManager::ProcessOldEntry(const std::string &appId)
794 {
795     std::vector<DistributedDB::Entry> entries;
796     auto status = delegate_->GetEntries(std::vector<uint8_t>(appId.begin(), appId.end()), entries);
797     if (status != DistributedDB::DBStatus::NOT_FOUND) {
798         ZLOGI("Get old entries empty, bundleName: %{public}s", appId.c_str());
799         return;
800     }
801     if (status != DistributedDB::DBStatus::OK) {
802         ZLOGE("Get old entries failed, bundleName: %{public}s, status %{public}d", appId.c_str(), status);
803         return;
804     }
805     std::map<std::string, int64_t> sessionIds;
806     int64_t oldestTime = 0;
807     std::string deleteKey;
808     for (auto &item : entries) {
809         std::string key(item.key.begin(), item.key.end());
810         std::vector<std::string> splitKeys = SplitEntryKey(key);
811         if (splitKeys.empty()) {
812             continue;
813         }
814         std::string bundleName = splitKeys[BUNDLE_NAME_INDEX];
815         std::string sessionId = splitKeys[SESSION_ID_INDEX];
816         if (sessionIds.count(sessionId) == 0) {
817             char *end = nullptr;
818             sessionIds[sessionId] = strtol(splitKeys[TIME_INDEX].c_str(), &end, DECIMAL_BASE);
819         }
820         if (oldestTime == 0 || oldestTime > sessionIds[sessionId]) {
821             oldestTime = sessionIds[sessionId];
822             deleteKey = GetPrefixWithoutDeviceId(bundleName, sessionId);
823         }
824     }
825     if (sessionIds.size() < MAX_OBJECT_SIZE_PER_APP) {
826         return;
827     }
828     int32_t result = RevokeSaveToStore(deleteKey);
829     if (result != OBJECT_SUCCESS) {
830         ZLOGE("Delete old entries failed, deleteKey: %{public}s, status: %{public}d", deleteKey.c_str(), result);
831         return;
832     }
833     ZLOGI("Delete old entries success, deleteKey: %{public}s", deleteKey.c_str());
834 }
835 
SaveToStore(const std::string & appId,const std::string & sessionId,const std::string & toDeviceId,const ObjectRecord & data)836 int32_t ObjectStoreManager::SaveToStore(const std::string &appId, const std::string &sessionId,
837     const std::string &toDeviceId, const ObjectRecord &data)
838 {
839     ProcessOldEntry(appId);
840     RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, toDeviceId));
841     std::string timestamp = std::to_string(GetSecondsSince1970ToNow());
842     std::string prefix = GetPropertyPrefix(appId, sessionId, toDeviceId) + timestamp + SEPERATOR;
843     DistributedDB::Entry saveInfoEntry;
844     std::string saveInfoKey = prefix + SAVE_INFO;
845     saveInfoEntry.key = std::vector<uint8_t>(saveInfoKey.begin(), saveInfoKey.end());
846     SaveInfo saveInfo(appId, sessionId, DmAdaper::GetInstance().GetLocalDevice().udid, toDeviceId, timestamp);
847     std::string saveInfoValue = DistributedData::Serializable::Marshall(saveInfo);
848     saveInfoEntry.value = std::vector<uint8_t>(saveInfoValue.begin(), saveInfoValue.end());
849     std::vector<DistributedDB::Entry> entries;
850     entries.emplace_back(saveInfoEntry);
851     for (auto &item : data) {
852         DistributedDB::Entry entry;
853         std::string key = GetPropertyPrefix(appId, sessionId, toDeviceId) + timestamp + SEPERATOR + item.first;
854         entry.key = std::vector<uint8_t>(key.begin(), key.end());
855         entry.value = item.second;
856         entries.emplace_back(entry);
857     }
858     auto status = delegate_->PutBatch(entries);
859     if (status != DistributedDB::DBStatus::OK) {
860         ZLOGE("PutBatch failed, bundleName: %{public}s, sessionId: %{public}s, dstNetworkId: %{public}s, "
861             "status: %{public}d", appId.c_str(), sessionId.c_str(), Anonymous::Change(toDeviceId).c_str(), status);
862         return status;
863     }
864     ZLOGI("PutBatch success, bundleName: %{public}s, sessionId: %{public}s, dstNetworkId: %{public}s, "
865         "count: %{public}zu", appId.c_str(), sessionId.c_str(), Anonymous::Change(toDeviceId).c_str(), entries.size());
866     return OBJECT_SUCCESS;
867 }
868 
SyncOnStore(const std::string & prefix,const std::vector<std::string> & deviceList,SyncCallBack & callback)869 int32_t ObjectStoreManager::SyncOnStore(
870     const std::string &prefix, const std::vector<std::string> &deviceList, SyncCallBack &callback)
871 {
872     std::vector<std::string> syncDevices;
873     for (auto &device : deviceList) {
874         if (device == LOCAL_DEVICE) {
875             ZLOGI("Save to local, do not need sync, prefix: %{public}s", prefix.c_str());
876             callback({{LOCAL_DEVICE, OBJECT_SUCCESS}});
877             return OBJECT_SUCCESS;
878         }
879         syncDevices.emplace_back(DmAdaper::GetInstance().GetUuidByNetworkId(device));
880     }
881     if (syncDevices.empty()) {
882         ZLOGI("Device list is empty, prefix: %{public}s", Anonymous::Change(prefix).c_str());
883         callback(std::map<std::string, int32_t>());
884         return OBJECT_SUCCESS;
885     }
886     uint64_t sequenceId = SequenceSyncManager::GetInstance()->AddNotifier(userId_, callback);
887     DistributedDB::Query dbQuery = DistributedDB::Query::Select();
888     dbQuery.PrefixKey(std::vector<uint8_t>(prefix.begin(), prefix.end()));
889     ZLOGI("Start sync data, sequenceId: 0x%{public}" PRIx64 "", sequenceId);
890     auto status = delegate_->Sync(syncDevices, DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY,
891         [this, sequenceId](const std::map<std::string, DistributedDB::DBStatus> &devicesMap) {
892             ZLOGI("Sync data finished, sequenceId: 0x%{public}" PRIx64 "", sequenceId);
893             std::map<std::string, DistributedDB::DBStatus> result;
894             for (auto &item : devicesMap) {
895                 result[DmAdaper::GetInstance().ToNetworkID(item.first)] = item.second;
896             }
897             SyncCompleted(result, sequenceId);
898         }, dbQuery, false);
899     if (status != DistributedDB::DBStatus::OK) {
900         ZLOGE("Sync data failed, prefix: %{public}s, sequenceId: 0x%{public}" PRIx64 ", status: %{public}d",
901             Anonymous::Change(prefix).c_str(), sequenceId, status);
902         std::string tmp;
903         SequenceSyncManager::GetInstance()->DeleteNotifier(sequenceId, tmp);
904         return status;
905     }
906     SetSyncStatus(true);
907     return OBJECT_SUCCESS;
908 }
909 
SetSyncStatus(bool status)910 int32_t ObjectStoreManager::SetSyncStatus(bool status)
911 {
912     std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
913     isSyncing_ = status;
914     return OBJECT_SUCCESS;
915 }
916 
RevokeSaveToStore(const std::string & prefix)917 int32_t ObjectStoreManager::RevokeSaveToStore(const std::string &prefix)
918 {
919     std::vector<DistributedDB::Entry> entries;
920     auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
921     if (status == DistributedDB::DBStatus::NOT_FOUND) {
922         ZLOGI("Get entries empty, prefix: %{public}s", Anonymous::Change(prefix).c_str());
923         return OBJECT_SUCCESS;
924     }
925     if (status != DistributedDB::DBStatus::OK) {
926         ZLOGE("Get entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status);
927         return DB_ERROR;
928     }
929     std::vector<std::vector<uint8_t>> keys;
930     std::for_each(entries.begin(), entries.end(), [&keys](const DistributedDB::Entry &entry) {
931         keys.emplace_back(entry.key);
932     });
933     if (keys.empty()) {
934         return OBJECT_SUCCESS;
935     }
936     status = delegate_->DeleteBatch(keys);
937     if (status != DistributedDB::DBStatus::OK) {
938         ZLOGE("Delete entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(),
939             status);
940         return DB_ERROR;
941     }
942     ZLOGI("Delete entries success, prefix: %{public}s, count: %{public}zu", Anonymous::Change(prefix).c_str(),
943         keys.size());
944     return OBJECT_SUCCESS;
945 }
946 
RetrieveFromStore(const std::string & appId,const std::string & sessionId,ObjectRecord & results)947 int32_t ObjectStoreManager::RetrieveFromStore(const std::string &appId, const std::string &sessionId,
948     ObjectRecord &results)
949 {
950     std::vector<DistributedDB::Entry> entries;
951     std::string prefix = GetPrefixWithoutDeviceId(appId, sessionId);
952     auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
953     if (status == DistributedDB::DBStatus::NOT_FOUND) {
954         ZLOGI("Get entries empty, prefix: %{public}s, status: %{public}d", prefix.c_str(), status);
955         return KEY_NOT_FOUND;
956     }
957     if (status != DistributedDB::DBStatus::OK) {
958         ZLOGE("Get entries failed, prefix: %{public}s, status: %{public}d", prefix.c_str(), status);
959         return DB_ERROR;
960     }
961     ZLOGI("Get entries success, prefix: %{public}s, count: %{public}zu", prefix.c_str(), entries.size());
962     for (const auto &entry : entries) {
963         std::string key(entry.key.begin(), entry.key.end());
964         if (key.find(SAVE_INFO) != std::string::npos) {
965             continue;
966         }
967         auto splitKeys = SplitEntryKey(key);
968         if (!splitKeys.empty()) {
969             results[splitKeys[PROPERTY_NAME_INDEX]] = entry.value;
970         }
971     }
972     return OBJECT_SUCCESS;
973 }
974 
SaveInfo(const std::string & bundleName,const std::string & sessionId,const std::string & sourceDeviceId,const std::string & targetDeviceId,const std::string & timestamp)975 ObjectStoreManager::SaveInfo::SaveInfo(const std::string &bundleName, const std::string &sessionId,
976     const std::string &sourceDeviceId, const std::string &targetDeviceId, const std::string &timestamp)
977     : bundleName(bundleName), sessionId(sessionId), sourceDeviceId(sourceDeviceId), targetDeviceId(targetDeviceId),
978     timestamp(timestamp) {}
979 
Marshal(json & node) const980 bool ObjectStoreManager::SaveInfo::Marshal(json &node) const
981 {
982     SetValue(node[GET_NAME(bundleName)], bundleName);
983     SetValue(node[GET_NAME(sessionId)], sessionId);
984     SetValue(node[GET_NAME(sourceDeviceId)], sourceDeviceId);
985     SetValue(node[GET_NAME(targetDeviceId)], targetDeviceId);
986     SetValue(node[GET_NAME(timestamp)], timestamp);
987     return true;
988 }
989 
Unmarshal(const json & node)990 bool ObjectStoreManager::SaveInfo::Unmarshal(const json &node)
991 {
992     GetValue(node, GET_NAME(bundleName), bundleName);
993     GetValue(node, GET_NAME(sessionId), sessionId);
994     GetValue(node, GET_NAME(sourceDeviceId), sourceDeviceId);
995     GetValue(node, GET_NAME(targetDeviceId), targetDeviceId);
996     GetValue(node, GET_NAME(timestamp), timestamp);
997     return true;
998 }
999 
ToPropertyPrefix()1000 std::string ObjectStoreManager::SaveInfo::ToPropertyPrefix()
1001 {
1002     if (bundleName.empty() || sessionId.empty() || sourceDeviceId.empty() || targetDeviceId.empty() ||
1003         timestamp.empty()) {
1004         return "";
1005     }
1006     return bundleName + SEPERATOR + sessionId + SEPERATOR + sourceDeviceId + SEPERATOR + targetDeviceId + SEPERATOR +
1007         timestamp + SEPERATOR;
1008 }
1009 
SplitEntryKey(const std::string & key)1010 std::vector<std::string> ObjectStoreManager::SplitEntryKey(const std::string &key)
1011 {
1012     std::smatch match;
1013     std::regex timeRegex(TIME_REGEX);
1014     if (!std::regex_search(key, match, timeRegex)) {
1015         ZLOGW("Format error, key.size = %{public}zu", key.size());
1016         return {};
1017     }
1018     auto timePos = match.position();
1019     std::string fromTime = key.substr(timePos + 1);
1020     std::string beforeTime = key.substr(0, timePos);
1021 
1022     size_t targetDevicePos = beforeTime.find_last_of(SEPERATOR);
1023     if (targetDevicePos == std::string::npos) {
1024         ZLOGW("Format error, key.size = %{public}zu", key.size());
1025         return {};
1026     }
1027     std::string targetDevice = beforeTime.substr(targetDevicePos + 1);
1028     std::string beforeTargetDevice = beforeTime.substr(0, targetDevicePos);
1029 
1030     size_t sourceDeviceUdidPos = beforeTargetDevice.find_last_of(SEPERATOR);
1031     if (sourceDeviceUdidPos == std::string::npos) {
1032         ZLOGW("Format error, key.size = %{public}zu", key.size());
1033         return {};
1034     }
1035     std::string sourceDeviceUdid = beforeTargetDevice.substr(sourceDeviceUdidPos + 1);
1036     std::string beforeSourceDeviceUdid = beforeTargetDevice.substr(0, sourceDeviceUdidPos);
1037 
1038     size_t sessionIdPos = beforeSourceDeviceUdid.find_last_of(SEPERATOR);
1039     if (sessionIdPos == std::string::npos) {
1040         ZLOGW("Format error, key.size = %{public}zu", key.size());
1041         return {};
1042     }
1043     std::string sessionId = beforeSourceDeviceUdid.substr(sessionIdPos + 1);
1044     std::string bundleName = beforeSourceDeviceUdid.substr(0, sessionIdPos);
1045 
1046     size_t propertyNamePos = fromTime.find_first_of(SEPERATOR);
1047     if (propertyNamePos == std::string::npos) {
1048         ZLOGW("Format error, key.size = %{public}zu", key.size());
1049         return {};
1050     }
1051     std::string propertyName = fromTime.substr(propertyNamePos + 1);
1052     std::string time = fromTime.substr(0, propertyNamePos);
1053 
1054     return { bundleName, sessionId, sourceDeviceUdid, targetDevice, time, propertyName };
1055 }
1056 
GetCurrentUser()1057 std::string ObjectStoreManager::GetCurrentUser()
1058 {
1059     std::vector<int> users;
1060     AccountDelegate::GetInstance()->QueryUsers(users);
1061     if (users.empty()) {
1062         return "";
1063     }
1064     return std::to_string(users[0]);
1065 }
1066 
SaveUserToMeta()1067 void ObjectStoreManager::SaveUserToMeta()
1068 {
1069     ZLOGD("start.");
1070     std::string userId = GetCurrentUser();
1071     if (userId.empty()) {
1072         return;
1073     }
1074     std::string appId = DistributedData::Bootstrap::GetInstance().GetProcessLabel();
1075     StoreMetaData userMeta;
1076     userMeta.storeId = DistributedObject::ObjectCommon::OBJECTSTORE_DB_STOREID;
1077     userMeta.user = userId;
1078     userMeta.storeType = ObjectDistributedType::OBJECT_SINGLE_VERSION;
1079     std::string userMetaKey = GetMetaUserIdKey(userId, appId);
1080     auto saved = DistributedData::MetaDataManager::GetInstance().SaveMeta(userMetaKey, userMeta, true);
1081     if (!saved) {
1082         ZLOGE("userMeta save failed");
1083     }
1084 }
1085 
CloseAfterMinute()1086 void ObjectStoreManager::CloseAfterMinute()
1087 {
1088     executors_->Schedule(std::chrono::minutes(INTERVAL), std::bind(&ObjectStoreManager::Close, this));
1089 }
1090 
SetThreadPool(std::shared_ptr<ExecutorPool> executors)1091 void ObjectStoreManager::SetThreadPool(std::shared_ptr<ExecutorPool> executors)
1092 {
1093     executors_ = executors;
1094 }
1095 
AddNotifier(const std::string & userId,SyncCallBack & callback)1096 uint64_t SequenceSyncManager::AddNotifier(const std::string &userId, SyncCallBack &callback)
1097 {
1098     std::lock_guard<std::mutex> lock(notifierLock_);
1099     uint64_t sequenceId = KvStoreUtils::GenerateSequenceId();
1100     userIdSeqIdRelations_[userId].emplace_back(sequenceId);
1101     seqIdCallbackRelations_[sequenceId] = callback;
1102     return sequenceId;
1103 }
1104 
Process(uint64_t sequenceId,const std::map<std::string,DistributedDB::DBStatus> & results,std::string & userId)1105 SequenceSyncManager::Result SequenceSyncManager::Process(
1106     uint64_t sequenceId, const std::map<std::string, DistributedDB::DBStatus> &results, std::string &userId)
1107 {
1108     std::lock_guard<std::mutex> lock(notifierLock_);
1109     if (seqIdCallbackRelations_.count(sequenceId) == 0) {
1110         ZLOGE("not exist");
1111         return ERR_SID_NOT_EXIST;
1112     }
1113     std::map<std::string, int32_t> syncResults;
1114     for (auto &item : results) {
1115         syncResults[item.first] = item.second == DistributedDB::DBStatus::OK ? 0 : -1;
1116     }
1117     seqIdCallbackRelations_[sequenceId](syncResults);
1118     ZLOGD("end complete");
1119     return DeleteNotifierNoLock(sequenceId, userId);
1120 }
1121 
DeleteNotifier(uint64_t sequenceId,std::string & userId)1122 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifier(uint64_t sequenceId, std::string &userId)
1123 {
1124     std::lock_guard<std::mutex> lock(notifierLock_);
1125     if (seqIdCallbackRelations_.count(sequenceId) == 0) {
1126         ZLOGE("not exist");
1127         return ERR_SID_NOT_EXIST;
1128     }
1129     return DeleteNotifierNoLock(sequenceId, userId);
1130 }
1131 
DeleteNotifierNoLock(uint64_t sequenceId,std::string & userId)1132 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifierNoLock(uint64_t sequenceId, std::string &userId)
1133 {
1134     seqIdCallbackRelations_.erase(sequenceId);
1135     auto userIdIter = userIdSeqIdRelations_.begin();
1136     while (userIdIter != userIdSeqIdRelations_.end()) {
1137         auto sIdIter = std::find(userIdIter->second.begin(), userIdIter->second.end(), sequenceId);
1138         if (sIdIter != userIdIter->second.end()) {
1139             userIdIter->second.erase(sIdIter);
1140             if (userIdIter->second.empty()) {
1141                 ZLOGD("finished user callback, userId = %{public}s", userIdIter->first.c_str());
1142                 userId = userIdIter->first;
1143                 userIdSeqIdRelations_.erase(userIdIter);
1144                 return SUCCESS_USER_HAS_FINISHED;
1145             } else {
1146                 ZLOGD(" finished a callback but user not finished, userId = %{public}s", userIdIter->first.c_str());
1147                 return SUCCESS_USER_IN_USE;
1148             }
1149         }
1150         userIdIter++;
1151     }
1152     return SUCCESS_USER_HAS_FINISHED;
1153 }
1154 
BindAsset(const uint32_t tokenId,const std::string & appId,const std::string & sessionId,ObjectStore::Asset & asset,ObjectStore::AssetBindInfo & bindInfo)1155 int32_t ObjectStoreManager::BindAsset(const uint32_t tokenId, const std::string& appId, const std::string& sessionId,
1156     ObjectStore::Asset& asset, ObjectStore::AssetBindInfo& bindInfo)
1157 {
1158     auto snapshotKey = appId + SEPERATOR + sessionId;
1159     snapshots_.ComputeIfAbsent(
1160         snapshotKey, [](const std::string& key) -> auto {
1161             return std::make_shared<ObjectSnapshot>();
1162         });
1163     auto storeKey = appId + SEPERATOR + bindInfo.storeName;
1164     bindSnapshots_.ComputeIfAbsent(
1165         storeKey, [](const std::string& key) -> auto {
1166             return std::make_shared<std::map<std::string, std::shared_ptr<Snapshot>>>();
1167         });
1168     auto snapshots = snapshots_.Find(snapshotKey).second;
1169     bindSnapshots_.Compute(storeKey, [this, &asset, snapshots] (const auto &key, auto &value) {
1170         value->emplace(std::pair{asset.uri, snapshots});
1171         return true;
1172     });
1173 
1174     HapTokenInfo tokenInfo;
1175     auto status = AccessTokenKit::GetHapTokenInfo(tokenId, tokenInfo);
1176     if (status != RET_SUCCESS) {
1177         ZLOGE("token:0x%{public}x, result:%{public}d, bundleName:%{public}s", tokenId, status, appId.c_str());
1178         return GeneralError::E_ERROR;
1179     }
1180     StoreInfo storeInfo;
1181     storeInfo.bundleName = appId;
1182     storeInfo.tokenId = tokenId;
1183     storeInfo.instanceId = tokenInfo.instIndex;
1184     storeInfo.user = tokenInfo.userID;
1185     storeInfo.storeName = bindInfo.storeName;
1186 
1187     snapshots_.Compute(snapshotKey, [this, &asset, &bindInfo, &storeInfo] (const auto &key, auto &value) {
1188         value->BindAsset(ValueProxy::Convert(std::move(asset)), ConvertBindInfo(bindInfo), storeInfo);
1189         return true;
1190     });
1191     return OBJECT_SUCCESS;
1192 }
1193 
ConvertBindInfo(ObjectStore::AssetBindInfo & bindInfo)1194 DistributedData::AssetBindInfo ObjectStoreManager::ConvertBindInfo(ObjectStore::AssetBindInfo& bindInfo)
1195 {
1196     return DistributedData::AssetBindInfo{
1197         .storeName = std::move(bindInfo.storeName),
1198         .tableName = std::move(bindInfo.tableName),
1199         .primaryKey = ValueProxy::Convert(std::move(bindInfo.primaryKey)),
1200         .field = std::move(bindInfo.field),
1201         .assetName = std::move(bindInfo.assetName),
1202     };
1203 }
1204 
OnAssetChanged(const uint32_t tokenId,const std::string & appId,const std::string & sessionId,const std::string & deviceId,const ObjectStore::Asset & asset)1205 int32_t ObjectStoreManager::OnAssetChanged(const uint32_t tokenId, const std::string& appId,
1206     const std::string& sessionId, const std::string& deviceId, const ObjectStore::Asset& asset)
1207 {
1208     const int32_t userId = DistributedKv::AccountDelegate::GetInstance()->GetUserByToken(tokenId);
1209     auto objectAsset = asset;
1210     Asset dataAsset =  ValueProxy::Convert(std::move(objectAsset));
1211     auto snapshotKey = appId + SEPERATOR + sessionId;
1212     int32_t res = OBJECT_SUCCESS;
1213     bool exist = snapshots_.ComputeIfPresent(snapshotKey,
1214         [&res, &dataAsset, &deviceId](const std::string &key, std::shared_ptr<Snapshot> snapshot) {
1215             if (snapshot != nullptr) {
1216                 res = snapshot->OnDataChanged(dataAsset, deviceId); // needChange
1217             }
1218             return snapshot != nullptr;
1219         });
1220     if (exist) {
1221         return res;
1222     }
1223 
1224     auto block = std::make_shared<BlockData<std::tuple<bool, bool>>>(WAIT_TIME, std::tuple{ true, true });
1225     ObjectAssetLoader::GetInstance()->TransferAssetsAsync(userId, appId, deviceId, { dataAsset }, [block](bool ret) {
1226         block->SetValue({ false, ret });
1227     });
1228     auto [timeout, success] = block->GetValue();
1229     if (timeout || !success) {
1230         ZLOGE("transfer failed, timeout: %{public}d, success: %{public}d, name: %{public}s, deviceId: %{public}s ",
1231             timeout, success, asset.name.c_str(), DistributedData::Anonymous::Change(deviceId).c_str());
1232         return OBJECT_INNER_ERROR;
1233     }
1234     return OBJECT_SUCCESS;
1235 }
1236 
GetSnapShots(const std::string & bundleName,const std::string & storeName)1237 ObjectStoreManager::UriToSnapshot ObjectStoreManager::GetSnapShots(const std::string& bundleName,
1238     const std::string& storeName)
1239 {
1240     auto storeKey = bundleName + SEPERATOR + storeName;
1241     bindSnapshots_.ComputeIfAbsent(
1242         storeKey, [](const std::string& key) -> auto {
1243             return std::make_shared<std::map<std::string, std::shared_ptr<Snapshot>>>();
1244         });
1245     return bindSnapshots_.Find(storeKey).second;
1246 }
1247 
DeleteSnapshot(const std::string & bundleName,const std::string & sessionId)1248 void ObjectStoreManager::DeleteSnapshot(const std::string& bundleName, const std::string& sessionId)
1249 {
1250     auto snapshotKey = bundleName + SEPERATOR + sessionId;
1251     auto snapshot = snapshots_.Find(snapshotKey).second;
1252     if (snapshot == nullptr) {
1253         ZLOGD("Not find snapshot, don't need delete");
1254         return;
1255     }
1256     bindSnapshots_.ForEach([snapshot](auto& key, auto& value) {
1257         for (auto pos = value->begin(); pos != value->end();) {
1258             if (pos->second == snapshot) {
1259                 pos = value->erase(pos);
1260             } else {
1261                 ++pos;
1262             }
1263         }
1264         return true;
1265     });
1266     snapshots_.Erase(snapshotKey);
1267 }
1268 } // namespace DistributedObject
1269 } // namespace OHOS
1270