1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "ObjectStoreManager"
16
17 #include "object_manager.h"
18
19 #include <regex>
20
21 #include "accesstoken_kit.h"
22 #include "account/account_delegate.h"
23 #include "block_data.h"
24 #include "bootstrap.h"
25 #include "common/bytes.h"
26 #include "common/string_utils.h"
27 #include "datetime_ex.h"
28 #include "distributed_file_daemon_manager.h"
29 #include "kvstore_utils.h"
30 #include "log_print.h"
31 #include "metadata/meta_data_manager.h"
32 #include "metadata/store_meta_data.h"
33 #include "object_dms_handler.h"
34 #include "object_radar_reporter.h"
35 #include "utils/anonymous.h"
36
37 namespace OHOS {
38 namespace DistributedObject {
39 using namespace OHOS::DistributedKv;
40 using namespace Security::AccessToken;
41 using StoreMetaData = OHOS::DistributedData::StoreMetaData;
42 using AccountDelegate = DistributedKv::AccountDelegate;
43 using Account = OHOS::DistributedKv::AccountDelegate;
44 using AccessTokenKit = Security::AccessToken::AccessTokenKit;
45 using ValueProxy = OHOS::DistributedData::ValueProxy;
46 using DistributedFileDaemonManager = Storage::DistributedFile::DistributedFileDaemonManager;
47 constexpr const char *SAVE_INFO = "p_###SAVEINFO###";
ObjectStoreManager()48 ObjectStoreManager::ObjectStoreManager()
49 {
50 ZLOGI("ObjectStoreManager construct");
51 RegisterAssetsLister();
52 }
53
~ObjectStoreManager()54 ObjectStoreManager::~ObjectStoreManager()
55 {
56 ZLOGI("ObjectStoreManager destroy");
57 if (objectAssetsRecvListener_ != nullptr) {
58 auto status = DistributedFileDaemonManager::GetInstance().UnRegisterAssetCallback(objectAssetsRecvListener_);
59 if (status != DistributedDB::DBStatus::OK) {
60 ZLOGE("UnRegister assetsRecvListener err %{public}d", status);
61 }
62 }
63 }
64
OpenObjectKvStore()65 DistributedDB::KvStoreNbDelegate *ObjectStoreManager::OpenObjectKvStore()
66 {
67 DistributedDB::KvStoreNbDelegate *store = nullptr;
68 DistributedDB::KvStoreNbDelegate::Option option;
69 option.createDirByStoreIdOnly = true;
70 option.syncDualTupleMode = true;
71 option.secOption = { DistributedDB::S1, DistributedDB::ECE };
72 if (objectDataListener_ == nullptr) {
73 objectDataListener_ = new ObjectDataListener();
74 }
75 ZLOGD("start GetKvStore");
76 kvStoreDelegateManager_->GetKvStore(ObjectCommon::OBJECTSTORE_DB_STOREID, option,
77 [&store, this](DistributedDB::DBStatus dbStatus, DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate) {
78 if (dbStatus != DistributedDB::DBStatus::OK) {
79 ZLOGE("GetKvStore fail %{public}d", dbStatus);
80 return;
81 }
82 ZLOGI("GetKvStore successsfully");
83 store = kvStoreNbDelegate;
84 std::vector<uint8_t> tmpKey;
85 DistributedDB::DBStatus status = store->RegisterObserver(tmpKey,
86 DistributedDB::ObserverMode::OBSERVER_CHANGES_FOREIGN,
87 objectDataListener_);
88 if (status != DistributedDB::DBStatus::OK) {
89 ZLOGE("RegisterObserver err %{public}d", status);
90 }
91 });
92 return store;
93 }
94
RegisterAssetsLister()95 bool ObjectStoreManager::RegisterAssetsLister()
96 {
97 if (objectAssetsSendListener_ == nullptr) {
98 objectAssetsSendListener_ = new ObjectAssetsSendListener();
99 }
100 if (objectAssetsRecvListener_ == nullptr) {
101 objectAssetsRecvListener_ = new ObjectAssetsRecvListener();
102 }
103 auto status = DistributedFileDaemonManager::GetInstance().RegisterAssetCallback(objectAssetsRecvListener_);
104 if (status != DistributedDB::DBStatus::OK) {
105 ZLOGE("Register assetsRecvListener err %{public}d", status);
106 return false;
107 }
108 return true;
109 }
110
ProcessSyncCallback(const std::map<std::string,int32_t> & results,const std::string & appId,const std::string & sessionId,const std::string & deviceId)111 void ObjectStoreManager::ProcessSyncCallback(const std::map<std::string, int32_t> &results, const std::string &appId,
112 const std::string &sessionId, const std::string &deviceId)
113 {
114 if (results.empty() || results.find(LOCAL_DEVICE) != results.end()) {
115 return;
116 }
117 int32_t result = Open();
118 if (result != OBJECT_SUCCESS) {
119 ZLOGE("Open failed, errCode = %{public}d", result);
120 return;
121 }
122 // delete local data
123 result = RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, deviceId));
124 if (result != OBJECT_SUCCESS) {
125 ZLOGE("Save failed, status = %{public}d", result);
126 }
127 Close();
128 return;
129 }
130
Save(const std::string & appId,const std::string & sessionId,const ObjectRecord & data,const std::string & deviceId,sptr<IRemoteObject> callback)131 int32_t ObjectStoreManager::Save(const std::string &appId, const std::string &sessionId,
132 const ObjectRecord &data, const std::string &deviceId, sptr<IRemoteObject> callback)
133 {
134 auto proxy = iface_cast<ObjectSaveCallbackProxy>(callback);
135 if (deviceId.size() == 0) {
136 ZLOGE("DeviceId empty, appId: %{public}s, sessionId: %{public}s", appId.c_str(), sessionId.c_str());
137 proxy->Completed(std::map<std::string, int32_t>());
138 return INVALID_ARGUMENT;
139 }
140 int32_t result = Open();
141 if (result != OBJECT_SUCCESS) {
142 ZLOGE("Open object kvstore failed, result: %{public}d", result);
143 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
144 ObjectStore::SAVE_TO_STORE, ObjectStore::RADAR_FAILED, ObjectStore::GETKV_FAILED, ObjectStore::FINISHED);
145 proxy->Completed(std::map<std::string, int32_t>());
146 return STORE_NOT_OPEN;
147 }
148 SaveUserToMeta();
149 std::string dstBundleName = ObjectDmsHandler::GetInstance().GetDstBundleName(appId, deviceId);
150 result = SaveToStore(dstBundleName, sessionId, deviceId, data);
151 if (result != OBJECT_SUCCESS) {
152 ZLOGE("Save to store failed, result: %{public}d", result);
153 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
154 ObjectStore::SAVE_TO_STORE, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
155 Close();
156 proxy->Completed(std::map<std::string, int32_t>());
157 return result;
158 }
159 ZLOGI("Sync data, bundleName: %{public}s, sessionId: %{public}s, deviceId: %{public}s", dstBundleName.c_str(),
160 sessionId.c_str(), Anonymous::Change(deviceId).c_str());
161 SyncCallBack syncCallback =
162 [proxy, dstBundleName, sessionId, deviceId, this](const std::map<std::string, int32_t> &results) {
163 ProcessSyncCallback(results, dstBundleName, sessionId, deviceId);
164 proxy->Completed(results);
165 };
166 result = SyncOnStore(GetPropertyPrefix(dstBundleName, sessionId, deviceId), {deviceId}, syncCallback);
167 if (result != OBJECT_SUCCESS) {
168 ZLOGE("Sync data failed, result: %{public}d", result);
169 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
170 ObjectStore::SYNC_DATA, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
171 Close();
172 proxy->Completed(std::map<std::string, int32_t>());
173 return result;
174 }
175 Close();
176 return PushAssets(appId, dstBundleName, sessionId, data, deviceId);
177 }
178
PushAssets(const std::string & srcBundleName,const std::string & dstBundleName,const std::string & sessionId,const ObjectRecord & data,const std::string & deviceId)179 int32_t ObjectStoreManager::PushAssets(const std::string &srcBundleName, const std::string &dstBundleName,
180 const std::string &sessionId, const ObjectRecord &data, const std::string &deviceId)
181 {
182 Assets assets = GetAssetsFromDBRecords(data);
183 if (assets.empty() || data.find(ObjectStore::FIELDS_PREFIX + ObjectStore::DEVICEID_KEY) == data.end()) {
184 return OBJECT_SUCCESS;
185 }
186 sptr<AssetObj> assetObj = new AssetObj();
187 assetObj->dstBundleName_ = dstBundleName;
188 assetObj->srcBundleName_ = srcBundleName;
189 assetObj->dstNetworkId_ = deviceId;
190 assetObj->sessionId_ = sessionId;
191 for (const auto& asset : assets) {
192 assetObj->uris_.push_back(asset.uri);
193 }
194 if (objectAssetsSendListener_ == nullptr) {
195 objectAssetsSendListener_ = new ObjectAssetsSendListener();
196 }
197 int userId = std::atoi(GetCurrentUser().c_str());
198 auto status = ObjectAssetLoader::GetInstance()->PushAsset(userId, assetObj, objectAssetsSendListener_);
199 return status;
200 }
201
RevokeSave(const std::string & appId,const std::string & sessionId,sptr<IRemoteObject> callback)202 int32_t ObjectStoreManager::RevokeSave(
203 const std::string &appId, const std::string &sessionId, sptr<IRemoteObject> callback)
204 {
205 auto proxy = iface_cast<ObjectRevokeSaveCallbackProxy>(callback);
206 int32_t result = Open();
207 if (result != OBJECT_SUCCESS) {
208 ZLOGE("Open failed, errCode = %{public}d", result);
209 proxy->Completed(STORE_NOT_OPEN);
210 return STORE_NOT_OPEN;
211 }
212
213 result = RevokeSaveToStore(GetPrefixWithoutDeviceId(appId, sessionId));
214 if (result != OBJECT_SUCCESS) {
215 ZLOGE("RevokeSave failed, errCode = %{public}d", result);
216 Close();
217 proxy->Completed(result);
218 return result;
219 }
220 std::vector<std::string> deviceList;
221 auto deviceInfos = DmAdaper::GetInstance().GetRemoteDevices();
222 std::for_each(deviceInfos.begin(), deviceInfos.end(),
223 [&deviceList](AppDistributedKv::DeviceInfo info) { deviceList.emplace_back(info.networkId); });
224 if (!deviceList.empty()) {
225 SyncCallBack tmp = [proxy](const std::map<std::string, int32_t> &results) {
226 ZLOGI("revoke save finished");
227 proxy->Completed(OBJECT_SUCCESS);
228 };
229 result = SyncOnStore(GetPropertyPrefix(appId, sessionId), deviceList, tmp);
230 if (result != OBJECT_SUCCESS) {
231 ZLOGE("sync failed, errCode = %{public}d", result);
232 proxy->Completed(result);
233 }
234 } else {
235 proxy->Completed(OBJECT_SUCCESS);
236 };
237 Close();
238 return result;
239 }
240
Retrieve(const std::string & bundleName,const std::string & sessionId,sptr<IRemoteObject> callback,uint32_t tokenId)241 int32_t ObjectStoreManager::Retrieve(
242 const std::string &bundleName, const std::string &sessionId, sptr<IRemoteObject> callback, uint32_t tokenId)
243 {
244 auto proxy = iface_cast<ObjectRetrieveCallbackProxy>(callback);
245 int32_t result = Open();
246 if (result != OBJECT_SUCCESS) {
247 ZLOGE("Open object kvstore failed, result: %{public}d", result);
248 proxy->Completed(ObjectRecord(), false);
249 return ObjectStore::GETKV_FAILED;
250 }
251 ObjectRecord results{};
252 int32_t status = RetrieveFromStore(bundleName, sessionId, results);
253 if (status != OBJECT_SUCCESS) {
254 ZLOGI("Retrieve from store failed, status: %{public}d", status);
255 Close();
256 proxy->Completed(ObjectRecord(), false);
257 return status;
258 }
259 bool allReady = false;
260 Assets assets = GetAssetsFromDBRecords(results);
261 if (assets.empty() || results.find(ObjectStore::FIELDS_PREFIX + ObjectStore::DEVICEID_KEY) == results.end()) {
262 allReady = true;
263 } else {
264 auto objectKey = bundleName + sessionId;
265 restoreStatus_.ComputeIfPresent(objectKey, [&allReady](const auto &key, auto &value) {
266 if (value == RestoreStatus::ALL_READY) {
267 allReady = true;
268 return false;
269 }
270 if (value == RestoreStatus::DATA_READY) {
271 value = RestoreStatus::DATA_NOTIFIED;
272 }
273 return true;
274 });
275 }
276 status = RevokeSaveToStore(GetPrefixWithoutDeviceId(bundleName, sessionId));
277 if (status != OBJECT_SUCCESS) {
278 ZLOGE("Revoke save failed, status: %{public}d", status);
279 Close();
280 proxy->Completed(ObjectRecord(), false);
281 return status;
282 }
283 Close();
284 proxy->Completed(results, allReady);
285 if (allReady) {
286 ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
287 ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
288 }
289 return status;
290 }
291
Clear()292 int32_t ObjectStoreManager::Clear()
293 {
294 ZLOGI("enter");
295 std::string userId = GetCurrentUser();
296 if (userId.empty()) {
297 return OBJECT_INNER_ERROR;
298 }
299 std::vector<StoreMetaData> metaData;
300 std::string appId = DistributedData::Bootstrap::GetInstance().GetProcessLabel();
301 std::string metaKey = GetMetaUserIdKey(userId, appId);
302 if (!DistributedData::MetaDataManager::GetInstance().LoadMeta(metaKey, metaData, true)) {
303 ZLOGE("no store of %{public}s", appId.c_str());
304 return OBJECT_STORE_NOT_FOUND;
305 }
306 for (const auto &storeMeta : metaData) {
307 if (storeMeta.storeType < StoreMetaData::StoreType::STORE_OBJECT_BEGIN
308 || storeMeta.storeType > StoreMetaData::StoreType::STORE_OBJECT_END) {
309 continue;
310 }
311 if (storeMeta.user == userId) {
312 ZLOGI("user is same, not need to change, mate user:%{public}s::user:%{public}s.",
313 storeMeta.user.c_str(), userId.c_str());
314 return OBJECT_SUCCESS;
315 }
316 }
317 ZLOGD("user is change, need to change");
318 int32_t result = Open();
319 if (result != OBJECT_SUCCESS) {
320 ZLOGE("Open failed, errCode = %{public}d", result);
321 return STORE_NOT_OPEN;
322 }
323 result = RevokeSaveToStore("");
324 Close();
325 return result;
326 }
327
DeleteByAppId(const std::string & appId,int32_t user)328 int32_t ObjectStoreManager::DeleteByAppId(const std::string &appId, int32_t user)
329 {
330 int32_t result = Open();
331 if (result != OBJECT_SUCCESS) {
332 ZLOGE("Open store failed, result: %{public}d, appId: %{public}s, user: %{public}d", result,
333 appId.c_str(), user);
334 return STORE_NOT_OPEN;
335 }
336 result = RevokeSaveToStore(appId);
337 if (result != OBJECT_SUCCESS) {
338 ZLOGE("Revoke save failed, result: %{public}d, appId: %{public}s, user: %{public}d", result,
339 appId.c_str(), user);
340 }
341 Close();
342 std::string userId = std::to_string(user);
343 std::string metaKey = GetMetaUserIdKey(userId, appId);
344 auto status = DistributedData::MetaDataManager::GetInstance().DelMeta(metaKey, true);
345 if (!status) {
346 ZLOGE("Delete meta failed, userId: %{public}s, appId: %{public}s", userId.c_str(), appId.c_str());
347 }
348 return result;
349 }
350
RegisterRemoteCallback(const std::string & bundleName,const std::string & sessionId,pid_t pid,uint32_t tokenId,sptr<IRemoteObject> callback)351 void ObjectStoreManager::RegisterRemoteCallback(const std::string &bundleName, const std::string &sessionId,
352 pid_t pid, uint32_t tokenId,
353 sptr<IRemoteObject> callback)
354 {
355 if (bundleName.empty() || sessionId.empty()) {
356 ZLOGD("ObjectStoreManager::RegisterRemoteCallback empty");
357 return;
358 }
359 ZLOGD("ObjectStoreManager::RegisterRemoteCallback start");
360 auto proxy = iface_cast<ObjectChangeCallbackProxy>(callback);
361 std::string prefix = bundleName + sessionId;
362 callbacks_.Compute(tokenId, ([pid, &proxy, &prefix](const uint32_t key, CallbackInfo &value) {
363 if (value.pid != pid) {
364 value = CallbackInfo { pid };
365 }
366 value.observers_.insert_or_assign(prefix, proxy);
367 return !value.observers_.empty();
368 }));
369 }
370
UnregisterRemoteCallback(const std::string & bundleName,pid_t pid,uint32_t tokenId,const std::string & sessionId)371 void ObjectStoreManager::UnregisterRemoteCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId,
372 const std::string &sessionId)
373 {
374 if (bundleName.empty()) {
375 ZLOGD("bundleName is empty");
376 return;
377 }
378 callbacks_.Compute(tokenId, ([pid, &sessionId, &bundleName](const uint32_t key, CallbackInfo &value) {
379 if (value.pid != pid) {
380 return true;
381 }
382 if (sessionId.empty()) {
383 return false;
384 }
385 std::string prefix = bundleName + sessionId;
386 for (auto it = value.observers_.begin(); it != value.observers_.end();) {
387 if ((*it).first == prefix) {
388 it = value.observers_.erase(it);
389 } else {
390 ++it;
391 }
392 }
393 return true;
394 }));
395 }
396
NotifyChange(ObjectRecord & changedData)397 void ObjectStoreManager::NotifyChange(ObjectRecord &changedData)
398 {
399 ZLOGI("OnChange start, size:%{public}zu", changedData.size());
400 bool hasAsset = false;
401 SaveInfo saveInfo;
402 for (const auto &[key, value] : changedData) {
403 if (key.find(SAVE_INFO) != std::string::npos) {
404 DistributedData::Serializable::Unmarshall(std::string(value.begin(), value.end()), saveInfo);
405 break;
406 }
407 }
408 auto data = GetObjectData(changedData, saveInfo, hasAsset);
409 if (!hasAsset) {
410 ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
411 ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, saveInfo.bundleName);
412 callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
413 DoNotify(tokenId, value, data, true); // no asset, data ready means all ready
414 return false;
415 });
416 return;
417 }
418 NotifyDataChanged(data, saveInfo);
419 SaveUserToMeta();
420 }
421
GetObjectData(const ObjectRecord & changedData,SaveInfo & saveInfo,bool & hasAsset)422 std::map<std::string, ObjectRecord> ObjectStoreManager::GetObjectData(const ObjectRecord& changedData,
423 SaveInfo& saveInfo, bool& hasAsset)
424 {
425 std::map<std::string, ObjectRecord> data;
426 std::string keyPrefix = saveInfo.ToPropertyPrefix();
427 if (!keyPrefix.empty()) {
428 std::string observerKey = saveInfo.bundleName + saveInfo.sessionId;
429 for (const auto &[key, value] : changedData) {
430 if (key.size() < keyPrefix.size() || key.find(SAVE_INFO) != std::string::npos) {
431 continue;
432 }
433 std::string propertyName = key.substr(keyPrefix.size());
434 data[observerKey].insert_or_assign(propertyName, value);
435 if (!hasAsset && IsAssetKey(propertyName)) {
436 hasAsset = true;
437 }
438 }
439 } else {
440 for (const auto &item : changedData) {
441 std::vector<std::string> splitKeys = SplitEntryKey(item.first);
442 if (splitKeys.size() <= PROPERTY_NAME_INDEX) {
443 continue;
444 }
445 if (saveInfo.sourceDeviceId.empty() || saveInfo.bundleName.empty()) {
446 saveInfo.sourceDeviceId = splitKeys[SOURCE_DEVICE_UDID_INDEX];
447 saveInfo.bundleName = splitKeys[BUNDLE_NAME_INDEX];
448 saveInfo.sessionId = splitKeys[SESSION_ID_INDEX];
449 saveInfo.timestamp = splitKeys[TIME_INDEX];
450 }
451 std::string prefix = splitKeys[BUNDLE_NAME_INDEX] + splitKeys[SESSION_ID_INDEX];
452 std::string propertyName = splitKeys[PROPERTY_NAME_INDEX];
453 data[prefix].insert_or_assign(propertyName, item.second);
454 if (IsAssetKey(propertyName)) {
455 hasAsset = true;
456 }
457 }
458 }
459 return data;
460 }
461
ComputeStatus(const std::string & objectKey,const SaveInfo & saveInfo,const std::map<std::string,ObjectRecord> & data)462 void ObjectStoreManager::ComputeStatus(const std::string& objectKey, const SaveInfo& saveInfo,
463 const std::map<std::string, ObjectRecord>& data)
464 {
465 restoreStatus_.Compute(objectKey, [this, &data, saveInfo] (const auto &key, auto &value) {
466 if (value == RestoreStatus::ASSETS_READY) {
467 value = RestoreStatus::ALL_READY;
468 ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
469 ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS);
470 callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
471 DoNotify(tokenId, value, data, true);
472 return false;
473 });
474 } else {
475 value = RestoreStatus::DATA_READY;
476 ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
477 ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, saveInfo.bundleName);
478 callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
479 DoNotify(tokenId, value, data, false);
480 return false;
481 });
482 WaitAssets(key, saveInfo, data);
483 }
484 return true;
485 });
486 }
487
NotifyDataChanged(const std::map<std::string,ObjectRecord> & data,const SaveInfo & saveInfo)488 void ObjectStoreManager::NotifyDataChanged(const std::map<std::string, ObjectRecord>& data, const SaveInfo& saveInfo)
489 {
490 for (auto const& [objectKey, results] : data) {
491 restoreStatus_.ComputeIfAbsent(
492 objectKey, [](const std::string& key) -> auto {
493 return RestoreStatus::NONE;
494 });
495 ComputeStatus(objectKey, saveInfo, data);
496 }
497 }
498
WaitAssets(const std::string & objectKey,const SaveInfo & saveInfo,const std::map<std::string,ObjectRecord> & data)499 int32_t ObjectStoreManager::WaitAssets(const std::string& objectKey, const SaveInfo& saveInfo,
500 const std::map<std::string, ObjectRecord>& data)
501 {
502 auto taskId = executors_->Schedule(std::chrono::seconds(WAIT_TIME), [this, objectKey, data, saveInfo]() {
503 ZLOGE("wait assets finisehd timeout, try pull assets, objectKey:%{public}s", objectKey.c_str());
504 PullAssets(data, saveInfo);
505 DoNotifyWaitAssetTimeout(objectKey);
506 });
507
508 objectTimer_.ComputeIfAbsent(
509 objectKey, [taskId](const std::string& key) -> auto {
510 return taskId;
511 });
512 return OBJECT_SUCCESS;
513 }
514
PullAssets(const std::map<std::string,ObjectRecord> & data,const SaveInfo & saveInfo)515 void ObjectStoreManager::PullAssets(const std::map<std::string, ObjectRecord>& data, const SaveInfo& saveInfo)
516 {
517 std::map<std::string, Assets> changedAssets;
518 for (auto const& [objectId, result] : data) {
519 changedAssets[objectId] = GetAssetsFromDBRecords(result);
520 }
521 for (const auto& [objectId, assets] : changedAssets) {
522 std::string networkId = DmAdaper::GetInstance().ToNetworkID(saveInfo.sourceDeviceId);
523 auto block = std::make_shared<BlockData<std::tuple<bool, bool>>>(WAIT_TIME, std::tuple{ true, true });
524 ObjectAssetLoader::GetInstance()->TransferAssetsAsync(std::stoi(GetCurrentUser()),
525 saveInfo.bundleName, networkId, assets, [this, block](bool success) {
526 block->SetValue({ false, success });
527 });
528 auto [timeout, success] = block->GetValue();
529 ZLOGI("Pull assets end, timeout: %{public}d, success: %{public}d, size:%{public}zu, deviceId: %{public}s",
530 timeout, success, assets.size(), DistributedData::Anonymous::Change(networkId).c_str());
531 }
532 }
533
NotifyAssetsReady(const std::string & objectKey,const std::string & bundleName,const std::string & srcNetworkId)534 void ObjectStoreManager::NotifyAssetsReady(const std::string& objectKey, const std::string& bundleName,
535 const std::string& srcNetworkId)
536 {
537 restoreStatus_.ComputeIfAbsent(
538 objectKey, [](const std::string& key) -> auto {
539 return RestoreStatus::NONE;
540 });
541 restoreStatus_.Compute(objectKey, [this, &bundleName] (const auto &key, auto &value) {
542 if (value == RestoreStatus::DATA_NOTIFIED) {
543 value = RestoreStatus::ALL_READY;
544 ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
545 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS);
546 callbacks_.ForEach([this, key](uint32_t tokenId, const CallbackInfo& value) {
547 DoNotifyAssetsReady(tokenId, value, key, true);
548 return false;
549 });
550 } else if (value == RestoreStatus::DATA_READY) {
551 value = RestoreStatus::ALL_READY;
552 ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
553 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS);
554 auto [has, taskId] = objectTimer_.Find(key);
555 if (has) {
556 executors_->Remove(taskId);
557 objectTimer_.Erase(key);
558 }
559 } else {
560 value = RestoreStatus::ASSETS_READY;
561 ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
562 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, bundleName);
563 }
564 return true;
565 });
566 }
567
NotifyAssetsStart(const std::string & objectKey,const std::string & srcNetworkId)568 void ObjectStoreManager::NotifyAssetsStart(const std::string& objectKey, const std::string& srcNetworkId)
569 {
570 restoreStatus_.ComputeIfAbsent(
571 objectKey, [](const std::string& key) -> auto {
572 return RestoreStatus::NONE;
573 });
574 }
575
IsAssetKey(const std::string & key)576 bool ObjectStoreManager::IsAssetKey(const std::string& key)
577 {
578 return key.find(ObjectStore::ASSET_DOT) != std::string::npos;
579 }
580
IsAssetComplete(const ObjectRecord & result,const std::string & assetPrefix)581 bool ObjectStoreManager::IsAssetComplete(const ObjectRecord& result, const std::string& assetPrefix)
582 {
583 if (result.find(assetPrefix + ObjectStore::NAME_SUFFIX) == result.end() ||
584 result.find(assetPrefix + ObjectStore::URI_SUFFIX) == result.end() ||
585 result.find(assetPrefix + ObjectStore::PATH_SUFFIX) == result.end() ||
586 result.find(assetPrefix + ObjectStore::CREATE_TIME_SUFFIX) == result.end() ||
587 result.find(assetPrefix + ObjectStore::MODIFY_TIME_SUFFIX) == result.end() ||
588 result.find(assetPrefix + ObjectStore::SIZE_SUFFIX) == result.end()) {
589 return false;
590 }
591 return true;
592 }
593
GetAssetsFromDBRecords(const ObjectRecord & result)594 Assets ObjectStoreManager::GetAssetsFromDBRecords(const ObjectRecord& result)
595 {
596 Assets assets{};
597 std::set<std::string> assetKey;
598 for (const auto& [key, value] : result) {
599 std::string assetPrefix = key.substr(0, key.find(ObjectStore::ASSET_DOT));
600 if (!IsAssetKey(key) || assetKey.find(assetPrefix) != assetKey.end() ||
601 result.find(assetPrefix + ObjectStore::NAME_SUFFIX) == result.end() ||
602 result.find(assetPrefix + ObjectStore::URI_SUFFIX) == result.end()) {
603 continue;
604 }
605 Asset asset;
606 ObjectStore::StringUtils::BytesToStrWithType(
607 result.find(assetPrefix + ObjectStore::NAME_SUFFIX)->second, asset.name);
608 if (asset.name.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
609 asset.name = asset.name.substr(ObjectStore::STRING_PREFIX_LEN);
610 }
611 ObjectStore::StringUtils::BytesToStrWithType(
612 result.find(assetPrefix + ObjectStore::URI_SUFFIX)->second, asset.uri);
613 if (asset.uri.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
614 asset.uri = asset.uri.substr(ObjectStore::STRING_PREFIX_LEN);
615 }
616 ObjectStore::StringUtils::BytesToStrWithType(
617 result.find(assetPrefix + ObjectStore::MODIFY_TIME_SUFFIX)->second, asset.modifyTime);
618 if (asset.modifyTime.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
619 asset.modifyTime = asset.modifyTime.substr(ObjectStore::STRING_PREFIX_LEN);
620 }
621 ObjectStore::StringUtils::BytesToStrWithType(
622 result.find(assetPrefix + ObjectStore::SIZE_SUFFIX)->second, asset.size);
623 if (asset.size.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
624 asset.size = asset.size.substr(ObjectStore::STRING_PREFIX_LEN);
625 }
626 asset.hash = asset.modifyTime + "_" + asset.size;
627 assets.push_back(asset);
628 assetKey.insert(assetPrefix);
629 }
630 return assets;
631 }
632
DoNotify(uint32_t tokenId,const CallbackInfo & value,const std::map<std::string,ObjectRecord> & data,bool allReady)633 void ObjectStoreManager::DoNotify(uint32_t tokenId, const CallbackInfo& value,
634 const std::map<std::string, ObjectRecord>& data, bool allReady)
635 {
636 for (const auto& observer : value.observers_) {
637 auto it = data.find(observer.first);
638 if (it == data.end()) {
639 continue;
640 }
641 observer.second->Completed((*it).second, allReady);
642 if (allReady) {
643 restoreStatus_.Erase(observer.first);
644 ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
645 ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
646 } else {
647 restoreStatus_.ComputeIfPresent(observer.first, [](const auto &key, auto &value) {
648 value = RestoreStatus::DATA_NOTIFIED;
649 return true;
650 });
651 }
652 }
653 }
654
DoNotifyAssetsReady(uint32_t tokenId,const CallbackInfo & value,const std::string & objectKey,bool allReady)655 void ObjectStoreManager::DoNotifyAssetsReady(uint32_t tokenId, const CallbackInfo& value,
656 const std::string& objectKey, bool allReady)
657 {
658 for (const auto& observer : value.observers_) {
659 if (objectKey != observer.first) {
660 continue;
661 }
662 observer.second->Completed(ObjectRecord(), allReady);
663 if (allReady) {
664 restoreStatus_.Erase(objectKey);
665 ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
666 ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
667 }
668 auto [has, taskId] = objectTimer_.Find(objectKey);
669 if (has) {
670 executors_->Remove(taskId);
671 objectTimer_.Erase(objectKey);
672 }
673 }
674 }
675
DoNotifyWaitAssetTimeout(const std::string & objectKey)676 void ObjectStoreManager::DoNotifyWaitAssetTimeout(const std::string &objectKey)
677 {
678 ObjectStore::RadarReporter::ReportStageError(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
679 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_FAILED, ObjectStore::TIMEOUT);
680 callbacks_.ForEach([this, &objectKey](uint32_t tokenId, const CallbackInfo &value) {
681 for (const auto& observer : value.observers_) {
682 if (objectKey != observer.first) {
683 continue;
684 }
685 observer.second->Completed(ObjectRecord(), true);
686 restoreStatus_.Erase(objectKey);
687 auto [has, taskId] = objectTimer_.Find(objectKey);
688 if (has) {
689 executors_->Remove(taskId);
690 objectTimer_.Erase(objectKey);
691 }
692 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
693 ObjectStore::NOTIFY, ObjectStore::RADAR_FAILED, ObjectStore::TIMEOUT, ObjectStore::FINISHED);
694 }
695 return false;
696 });
697 }
698
SetData(const std::string & dataDir,const std::string & userId)699 void ObjectStoreManager::SetData(const std::string &dataDir, const std::string &userId)
700 {
701 ZLOGI("enter %{public}s", dataDir.c_str());
702 kvStoreDelegateManager_ =
703 new DistributedDB::KvStoreDelegateManager(DistributedData::Bootstrap::GetInstance().GetProcessLabel(), userId);
704 DistributedDB::KvStoreConfig kvStoreConfig { dataDir };
705 kvStoreDelegateManager_->SetKvStoreConfig(kvStoreConfig);
706 userId_ = userId;
707 }
708
Open()709 int32_t ObjectStoreManager::Open()
710 {
711 if (kvStoreDelegateManager_ == nullptr) {
712 ZLOGE("Kvstore delegate manager not init");
713 return OBJECT_INNER_ERROR;
714 }
715 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
716 if (delegate_ == nullptr) {
717 delegate_ = OpenObjectKvStore();
718 if (delegate_ == nullptr) {
719 ZLOGE("Open object kvstore failed");
720 return OBJECT_DBSTATUS_ERROR;
721 }
722 syncCount_ = 1;
723 ZLOGI("Open object kvstore success");
724 } else {
725 syncCount_++;
726 ZLOGI("Object kvstore syncCount: %{public}d", syncCount_);
727 }
728 return OBJECT_SUCCESS;
729 }
730
Close()731 void ObjectStoreManager::Close()
732 {
733 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
734 if (delegate_ == nullptr) {
735 return;
736 }
737 int32_t taskCount = delegate_->GetTaskCount();
738 if (taskCount > 0 && syncCount_ == 1) {
739 CloseAfterMinute();
740 ZLOGW("Store is busy, close after a minute, task count: %{public}d", taskCount);
741 return;
742 }
743 syncCount_--;
744 ZLOGI("closed a store, syncCount = %{public}d", syncCount_);
745 FlushClosedStore();
746 }
747
SyncCompleted(const std::map<std::string,DistributedDB::DBStatus> & results,uint64_t sequenceId)748 void ObjectStoreManager::SyncCompleted(
749 const std::map<std::string, DistributedDB::DBStatus> &results, uint64_t sequenceId)
750 {
751 std::string userId;
752 SequenceSyncManager::Result result = SequenceSyncManager::GetInstance()->Process(sequenceId, results, userId);
753 if (result == SequenceSyncManager::SUCCESS_USER_HAS_FINISHED && userId == userId_) {
754 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
755 SetSyncStatus(false);
756 FlushClosedStore();
757 }
758 for (const auto &item : results) {
759 if (item.second == DistributedDB::DBStatus::OK) {
760 ZLOGI("Sync data success, sequenceId: 0x%{public}" PRIx64 "", sequenceId);
761 ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::SAVE,
762 ObjectStore::SYNC_DATA, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
763 } else {
764 ZLOGE("Sync data failed, sequenceId: 0x%{public}" PRIx64 ", status: %{public}d", sequenceId, item.second);
765 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
766 ObjectStore::SYNC_DATA, ObjectStore::RADAR_FAILED, item.second, ObjectStore::FINISHED);
767 }
768 }
769 }
770
FlushClosedStore()771 void ObjectStoreManager::FlushClosedStore()
772 {
773 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
774 if (!isSyncing_ && syncCount_ == 0 && delegate_ != nullptr) {
775 ZLOGD("close store");
776 auto status = kvStoreDelegateManager_->CloseKvStore(delegate_);
777 if (status != DistributedDB::DBStatus::OK) {
778 int timeOut = 1000;
779 executors_->Schedule(std::chrono::milliseconds(timeOut), [this]() {
780 FlushClosedStore();
781 });
782 ZLOGE("GetEntries fail %{public}d", status);
783 return;
784 }
785 delegate_ = nullptr;
786 if (objectDataListener_ != nullptr) {
787 delete objectDataListener_;
788 objectDataListener_ = nullptr;
789 }
790 }
791 }
792
ProcessOldEntry(const std::string & appId)793 void ObjectStoreManager::ProcessOldEntry(const std::string &appId)
794 {
795 std::vector<DistributedDB::Entry> entries;
796 auto status = delegate_->GetEntries(std::vector<uint8_t>(appId.begin(), appId.end()), entries);
797 if (status != DistributedDB::DBStatus::NOT_FOUND) {
798 ZLOGI("Get old entries empty, bundleName: %{public}s", appId.c_str());
799 return;
800 }
801 if (status != DistributedDB::DBStatus::OK) {
802 ZLOGE("Get old entries failed, bundleName: %{public}s, status %{public}d", appId.c_str(), status);
803 return;
804 }
805 std::map<std::string, int64_t> sessionIds;
806 int64_t oldestTime = 0;
807 std::string deleteKey;
808 for (auto &item : entries) {
809 std::string key(item.key.begin(), item.key.end());
810 std::vector<std::string> splitKeys = SplitEntryKey(key);
811 if (splitKeys.empty()) {
812 continue;
813 }
814 std::string bundleName = splitKeys[BUNDLE_NAME_INDEX];
815 std::string sessionId = splitKeys[SESSION_ID_INDEX];
816 if (sessionIds.count(sessionId) == 0) {
817 char *end = nullptr;
818 sessionIds[sessionId] = strtol(splitKeys[TIME_INDEX].c_str(), &end, DECIMAL_BASE);
819 }
820 if (oldestTime == 0 || oldestTime > sessionIds[sessionId]) {
821 oldestTime = sessionIds[sessionId];
822 deleteKey = GetPrefixWithoutDeviceId(bundleName, sessionId);
823 }
824 }
825 if (sessionIds.size() < MAX_OBJECT_SIZE_PER_APP) {
826 return;
827 }
828 int32_t result = RevokeSaveToStore(deleteKey);
829 if (result != OBJECT_SUCCESS) {
830 ZLOGE("Delete old entries failed, deleteKey: %{public}s, status: %{public}d", deleteKey.c_str(), result);
831 return;
832 }
833 ZLOGI("Delete old entries success, deleteKey: %{public}s", deleteKey.c_str());
834 }
835
SaveToStore(const std::string & appId,const std::string & sessionId,const std::string & toDeviceId,const ObjectRecord & data)836 int32_t ObjectStoreManager::SaveToStore(const std::string &appId, const std::string &sessionId,
837 const std::string &toDeviceId, const ObjectRecord &data)
838 {
839 ProcessOldEntry(appId);
840 RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, toDeviceId));
841 std::string timestamp = std::to_string(GetSecondsSince1970ToNow());
842 std::string prefix = GetPropertyPrefix(appId, sessionId, toDeviceId) + timestamp + SEPERATOR;
843 DistributedDB::Entry saveInfoEntry;
844 std::string saveInfoKey = prefix + SAVE_INFO;
845 saveInfoEntry.key = std::vector<uint8_t>(saveInfoKey.begin(), saveInfoKey.end());
846 SaveInfo saveInfo(appId, sessionId, DmAdaper::GetInstance().GetLocalDevice().udid, toDeviceId, timestamp);
847 std::string saveInfoValue = DistributedData::Serializable::Marshall(saveInfo);
848 saveInfoEntry.value = std::vector<uint8_t>(saveInfoValue.begin(), saveInfoValue.end());
849 std::vector<DistributedDB::Entry> entries;
850 entries.emplace_back(saveInfoEntry);
851 for (auto &item : data) {
852 DistributedDB::Entry entry;
853 std::string key = GetPropertyPrefix(appId, sessionId, toDeviceId) + timestamp + SEPERATOR + item.first;
854 entry.key = std::vector<uint8_t>(key.begin(), key.end());
855 entry.value = item.second;
856 entries.emplace_back(entry);
857 }
858 auto status = delegate_->PutBatch(entries);
859 if (status != DistributedDB::DBStatus::OK) {
860 ZLOGE("PutBatch failed, bundleName: %{public}s, sessionId: %{public}s, dstNetworkId: %{public}s, "
861 "status: %{public}d", appId.c_str(), sessionId.c_str(), Anonymous::Change(toDeviceId).c_str(), status);
862 return status;
863 }
864 ZLOGI("PutBatch success, bundleName: %{public}s, sessionId: %{public}s, dstNetworkId: %{public}s, "
865 "count: %{public}zu", appId.c_str(), sessionId.c_str(), Anonymous::Change(toDeviceId).c_str(), entries.size());
866 return OBJECT_SUCCESS;
867 }
868
SyncOnStore(const std::string & prefix,const std::vector<std::string> & deviceList,SyncCallBack & callback)869 int32_t ObjectStoreManager::SyncOnStore(
870 const std::string &prefix, const std::vector<std::string> &deviceList, SyncCallBack &callback)
871 {
872 std::vector<std::string> syncDevices;
873 for (auto &device : deviceList) {
874 if (device == LOCAL_DEVICE) {
875 ZLOGI("Save to local, do not need sync, prefix: %{public}s", prefix.c_str());
876 callback({{LOCAL_DEVICE, OBJECT_SUCCESS}});
877 return OBJECT_SUCCESS;
878 }
879 syncDevices.emplace_back(DmAdaper::GetInstance().GetUuidByNetworkId(device));
880 }
881 if (syncDevices.empty()) {
882 ZLOGI("Device list is empty, prefix: %{public}s", Anonymous::Change(prefix).c_str());
883 callback(std::map<std::string, int32_t>());
884 return OBJECT_SUCCESS;
885 }
886 uint64_t sequenceId = SequenceSyncManager::GetInstance()->AddNotifier(userId_, callback);
887 DistributedDB::Query dbQuery = DistributedDB::Query::Select();
888 dbQuery.PrefixKey(std::vector<uint8_t>(prefix.begin(), prefix.end()));
889 ZLOGI("Start sync data, sequenceId: 0x%{public}" PRIx64 "", sequenceId);
890 auto status = delegate_->Sync(syncDevices, DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY,
891 [this, sequenceId](const std::map<std::string, DistributedDB::DBStatus> &devicesMap) {
892 ZLOGI("Sync data finished, sequenceId: 0x%{public}" PRIx64 "", sequenceId);
893 std::map<std::string, DistributedDB::DBStatus> result;
894 for (auto &item : devicesMap) {
895 result[DmAdaper::GetInstance().ToNetworkID(item.first)] = item.second;
896 }
897 SyncCompleted(result, sequenceId);
898 }, dbQuery, false);
899 if (status != DistributedDB::DBStatus::OK) {
900 ZLOGE("Sync data failed, prefix: %{public}s, sequenceId: 0x%{public}" PRIx64 ", status: %{public}d",
901 Anonymous::Change(prefix).c_str(), sequenceId, status);
902 std::string tmp;
903 SequenceSyncManager::GetInstance()->DeleteNotifier(sequenceId, tmp);
904 return status;
905 }
906 SetSyncStatus(true);
907 return OBJECT_SUCCESS;
908 }
909
SetSyncStatus(bool status)910 int32_t ObjectStoreManager::SetSyncStatus(bool status)
911 {
912 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
913 isSyncing_ = status;
914 return OBJECT_SUCCESS;
915 }
916
RevokeSaveToStore(const std::string & prefix)917 int32_t ObjectStoreManager::RevokeSaveToStore(const std::string &prefix)
918 {
919 std::vector<DistributedDB::Entry> entries;
920 auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
921 if (status == DistributedDB::DBStatus::NOT_FOUND) {
922 ZLOGI("Get entries empty, prefix: %{public}s", Anonymous::Change(prefix).c_str());
923 return OBJECT_SUCCESS;
924 }
925 if (status != DistributedDB::DBStatus::OK) {
926 ZLOGE("Get entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status);
927 return DB_ERROR;
928 }
929 std::vector<std::vector<uint8_t>> keys;
930 std::for_each(entries.begin(), entries.end(), [&keys](const DistributedDB::Entry &entry) {
931 keys.emplace_back(entry.key);
932 });
933 if (keys.empty()) {
934 return OBJECT_SUCCESS;
935 }
936 status = delegate_->DeleteBatch(keys);
937 if (status != DistributedDB::DBStatus::OK) {
938 ZLOGE("Delete entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(),
939 status);
940 return DB_ERROR;
941 }
942 ZLOGI("Delete entries success, prefix: %{public}s, count: %{public}zu", Anonymous::Change(prefix).c_str(),
943 keys.size());
944 return OBJECT_SUCCESS;
945 }
946
RetrieveFromStore(const std::string & appId,const std::string & sessionId,ObjectRecord & results)947 int32_t ObjectStoreManager::RetrieveFromStore(const std::string &appId, const std::string &sessionId,
948 ObjectRecord &results)
949 {
950 std::vector<DistributedDB::Entry> entries;
951 std::string prefix = GetPrefixWithoutDeviceId(appId, sessionId);
952 auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
953 if (status == DistributedDB::DBStatus::NOT_FOUND) {
954 ZLOGI("Get entries empty, prefix: %{public}s, status: %{public}d", prefix.c_str(), status);
955 return KEY_NOT_FOUND;
956 }
957 if (status != DistributedDB::DBStatus::OK) {
958 ZLOGE("Get entries failed, prefix: %{public}s, status: %{public}d", prefix.c_str(), status);
959 return DB_ERROR;
960 }
961 ZLOGI("Get entries success, prefix: %{public}s, count: %{public}zu", prefix.c_str(), entries.size());
962 for (const auto &entry : entries) {
963 std::string key(entry.key.begin(), entry.key.end());
964 if (key.find(SAVE_INFO) != std::string::npos) {
965 continue;
966 }
967 auto splitKeys = SplitEntryKey(key);
968 if (!splitKeys.empty()) {
969 results[splitKeys[PROPERTY_NAME_INDEX]] = entry.value;
970 }
971 }
972 return OBJECT_SUCCESS;
973 }
974
SaveInfo(const std::string & bundleName,const std::string & sessionId,const std::string & sourceDeviceId,const std::string & targetDeviceId,const std::string & timestamp)975 ObjectStoreManager::SaveInfo::SaveInfo(const std::string &bundleName, const std::string &sessionId,
976 const std::string &sourceDeviceId, const std::string &targetDeviceId, const std::string ×tamp)
977 : bundleName(bundleName), sessionId(sessionId), sourceDeviceId(sourceDeviceId), targetDeviceId(targetDeviceId),
978 timestamp(timestamp) {}
979
Marshal(json & node) const980 bool ObjectStoreManager::SaveInfo::Marshal(json &node) const
981 {
982 SetValue(node[GET_NAME(bundleName)], bundleName);
983 SetValue(node[GET_NAME(sessionId)], sessionId);
984 SetValue(node[GET_NAME(sourceDeviceId)], sourceDeviceId);
985 SetValue(node[GET_NAME(targetDeviceId)], targetDeviceId);
986 SetValue(node[GET_NAME(timestamp)], timestamp);
987 return true;
988 }
989
Unmarshal(const json & node)990 bool ObjectStoreManager::SaveInfo::Unmarshal(const json &node)
991 {
992 GetValue(node, GET_NAME(bundleName), bundleName);
993 GetValue(node, GET_NAME(sessionId), sessionId);
994 GetValue(node, GET_NAME(sourceDeviceId), sourceDeviceId);
995 GetValue(node, GET_NAME(targetDeviceId), targetDeviceId);
996 GetValue(node, GET_NAME(timestamp), timestamp);
997 return true;
998 }
999
ToPropertyPrefix()1000 std::string ObjectStoreManager::SaveInfo::ToPropertyPrefix()
1001 {
1002 if (bundleName.empty() || sessionId.empty() || sourceDeviceId.empty() || targetDeviceId.empty() ||
1003 timestamp.empty()) {
1004 return "";
1005 }
1006 return bundleName + SEPERATOR + sessionId + SEPERATOR + sourceDeviceId + SEPERATOR + targetDeviceId + SEPERATOR +
1007 timestamp + SEPERATOR;
1008 }
1009
SplitEntryKey(const std::string & key)1010 std::vector<std::string> ObjectStoreManager::SplitEntryKey(const std::string &key)
1011 {
1012 std::smatch match;
1013 std::regex timeRegex(TIME_REGEX);
1014 if (!std::regex_search(key, match, timeRegex)) {
1015 ZLOGW("Format error, key.size = %{public}zu", key.size());
1016 return {};
1017 }
1018 auto timePos = match.position();
1019 std::string fromTime = key.substr(timePos + 1);
1020 std::string beforeTime = key.substr(0, timePos);
1021
1022 size_t targetDevicePos = beforeTime.find_last_of(SEPERATOR);
1023 if (targetDevicePos == std::string::npos) {
1024 ZLOGW("Format error, key.size = %{public}zu", key.size());
1025 return {};
1026 }
1027 std::string targetDevice = beforeTime.substr(targetDevicePos + 1);
1028 std::string beforeTargetDevice = beforeTime.substr(0, targetDevicePos);
1029
1030 size_t sourceDeviceUdidPos = beforeTargetDevice.find_last_of(SEPERATOR);
1031 if (sourceDeviceUdidPos == std::string::npos) {
1032 ZLOGW("Format error, key.size = %{public}zu", key.size());
1033 return {};
1034 }
1035 std::string sourceDeviceUdid = beforeTargetDevice.substr(sourceDeviceUdidPos + 1);
1036 std::string beforeSourceDeviceUdid = beforeTargetDevice.substr(0, sourceDeviceUdidPos);
1037
1038 size_t sessionIdPos = beforeSourceDeviceUdid.find_last_of(SEPERATOR);
1039 if (sessionIdPos == std::string::npos) {
1040 ZLOGW("Format error, key.size = %{public}zu", key.size());
1041 return {};
1042 }
1043 std::string sessionId = beforeSourceDeviceUdid.substr(sessionIdPos + 1);
1044 std::string bundleName = beforeSourceDeviceUdid.substr(0, sessionIdPos);
1045
1046 size_t propertyNamePos = fromTime.find_first_of(SEPERATOR);
1047 if (propertyNamePos == std::string::npos) {
1048 ZLOGW("Format error, key.size = %{public}zu", key.size());
1049 return {};
1050 }
1051 std::string propertyName = fromTime.substr(propertyNamePos + 1);
1052 std::string time = fromTime.substr(0, propertyNamePos);
1053
1054 return { bundleName, sessionId, sourceDeviceUdid, targetDevice, time, propertyName };
1055 }
1056
GetCurrentUser()1057 std::string ObjectStoreManager::GetCurrentUser()
1058 {
1059 std::vector<int> users;
1060 AccountDelegate::GetInstance()->QueryUsers(users);
1061 if (users.empty()) {
1062 return "";
1063 }
1064 return std::to_string(users[0]);
1065 }
1066
SaveUserToMeta()1067 void ObjectStoreManager::SaveUserToMeta()
1068 {
1069 ZLOGD("start.");
1070 std::string userId = GetCurrentUser();
1071 if (userId.empty()) {
1072 return;
1073 }
1074 std::string appId = DistributedData::Bootstrap::GetInstance().GetProcessLabel();
1075 StoreMetaData userMeta;
1076 userMeta.storeId = DistributedObject::ObjectCommon::OBJECTSTORE_DB_STOREID;
1077 userMeta.user = userId;
1078 userMeta.storeType = ObjectDistributedType::OBJECT_SINGLE_VERSION;
1079 std::string userMetaKey = GetMetaUserIdKey(userId, appId);
1080 auto saved = DistributedData::MetaDataManager::GetInstance().SaveMeta(userMetaKey, userMeta, true);
1081 if (!saved) {
1082 ZLOGE("userMeta save failed");
1083 }
1084 }
1085
CloseAfterMinute()1086 void ObjectStoreManager::CloseAfterMinute()
1087 {
1088 executors_->Schedule(std::chrono::minutes(INTERVAL), std::bind(&ObjectStoreManager::Close, this));
1089 }
1090
SetThreadPool(std::shared_ptr<ExecutorPool> executors)1091 void ObjectStoreManager::SetThreadPool(std::shared_ptr<ExecutorPool> executors)
1092 {
1093 executors_ = executors;
1094 }
1095
AddNotifier(const std::string & userId,SyncCallBack & callback)1096 uint64_t SequenceSyncManager::AddNotifier(const std::string &userId, SyncCallBack &callback)
1097 {
1098 std::lock_guard<std::mutex> lock(notifierLock_);
1099 uint64_t sequenceId = KvStoreUtils::GenerateSequenceId();
1100 userIdSeqIdRelations_[userId].emplace_back(sequenceId);
1101 seqIdCallbackRelations_[sequenceId] = callback;
1102 return sequenceId;
1103 }
1104
Process(uint64_t sequenceId,const std::map<std::string,DistributedDB::DBStatus> & results,std::string & userId)1105 SequenceSyncManager::Result SequenceSyncManager::Process(
1106 uint64_t sequenceId, const std::map<std::string, DistributedDB::DBStatus> &results, std::string &userId)
1107 {
1108 std::lock_guard<std::mutex> lock(notifierLock_);
1109 if (seqIdCallbackRelations_.count(sequenceId) == 0) {
1110 ZLOGE("not exist");
1111 return ERR_SID_NOT_EXIST;
1112 }
1113 std::map<std::string, int32_t> syncResults;
1114 for (auto &item : results) {
1115 syncResults[item.first] = item.second == DistributedDB::DBStatus::OK ? 0 : -1;
1116 }
1117 seqIdCallbackRelations_[sequenceId](syncResults);
1118 ZLOGD("end complete");
1119 return DeleteNotifierNoLock(sequenceId, userId);
1120 }
1121
DeleteNotifier(uint64_t sequenceId,std::string & userId)1122 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifier(uint64_t sequenceId, std::string &userId)
1123 {
1124 std::lock_guard<std::mutex> lock(notifierLock_);
1125 if (seqIdCallbackRelations_.count(sequenceId) == 0) {
1126 ZLOGE("not exist");
1127 return ERR_SID_NOT_EXIST;
1128 }
1129 return DeleteNotifierNoLock(sequenceId, userId);
1130 }
1131
DeleteNotifierNoLock(uint64_t sequenceId,std::string & userId)1132 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifierNoLock(uint64_t sequenceId, std::string &userId)
1133 {
1134 seqIdCallbackRelations_.erase(sequenceId);
1135 auto userIdIter = userIdSeqIdRelations_.begin();
1136 while (userIdIter != userIdSeqIdRelations_.end()) {
1137 auto sIdIter = std::find(userIdIter->second.begin(), userIdIter->second.end(), sequenceId);
1138 if (sIdIter != userIdIter->second.end()) {
1139 userIdIter->second.erase(sIdIter);
1140 if (userIdIter->second.empty()) {
1141 ZLOGD("finished user callback, userId = %{public}s", userIdIter->first.c_str());
1142 userId = userIdIter->first;
1143 userIdSeqIdRelations_.erase(userIdIter);
1144 return SUCCESS_USER_HAS_FINISHED;
1145 } else {
1146 ZLOGD(" finished a callback but user not finished, userId = %{public}s", userIdIter->first.c_str());
1147 return SUCCESS_USER_IN_USE;
1148 }
1149 }
1150 userIdIter++;
1151 }
1152 return SUCCESS_USER_HAS_FINISHED;
1153 }
1154
BindAsset(const uint32_t tokenId,const std::string & appId,const std::string & sessionId,ObjectStore::Asset & asset,ObjectStore::AssetBindInfo & bindInfo)1155 int32_t ObjectStoreManager::BindAsset(const uint32_t tokenId, const std::string& appId, const std::string& sessionId,
1156 ObjectStore::Asset& asset, ObjectStore::AssetBindInfo& bindInfo)
1157 {
1158 auto snapshotKey = appId + SEPERATOR + sessionId;
1159 snapshots_.ComputeIfAbsent(
1160 snapshotKey, [](const std::string& key) -> auto {
1161 return std::make_shared<ObjectSnapshot>();
1162 });
1163 auto storeKey = appId + SEPERATOR + bindInfo.storeName;
1164 bindSnapshots_.ComputeIfAbsent(
1165 storeKey, [](const std::string& key) -> auto {
1166 return std::make_shared<std::map<std::string, std::shared_ptr<Snapshot>>>();
1167 });
1168 auto snapshots = snapshots_.Find(snapshotKey).second;
1169 bindSnapshots_.Compute(storeKey, [this, &asset, snapshots] (const auto &key, auto &value) {
1170 value->emplace(std::pair{asset.uri, snapshots});
1171 return true;
1172 });
1173
1174 HapTokenInfo tokenInfo;
1175 auto status = AccessTokenKit::GetHapTokenInfo(tokenId, tokenInfo);
1176 if (status != RET_SUCCESS) {
1177 ZLOGE("token:0x%{public}x, result:%{public}d, bundleName:%{public}s", tokenId, status, appId.c_str());
1178 return GeneralError::E_ERROR;
1179 }
1180 StoreInfo storeInfo;
1181 storeInfo.bundleName = appId;
1182 storeInfo.tokenId = tokenId;
1183 storeInfo.instanceId = tokenInfo.instIndex;
1184 storeInfo.user = tokenInfo.userID;
1185 storeInfo.storeName = bindInfo.storeName;
1186
1187 snapshots_.Compute(snapshotKey, [this, &asset, &bindInfo, &storeInfo] (const auto &key, auto &value) {
1188 value->BindAsset(ValueProxy::Convert(std::move(asset)), ConvertBindInfo(bindInfo), storeInfo);
1189 return true;
1190 });
1191 return OBJECT_SUCCESS;
1192 }
1193
ConvertBindInfo(ObjectStore::AssetBindInfo & bindInfo)1194 DistributedData::AssetBindInfo ObjectStoreManager::ConvertBindInfo(ObjectStore::AssetBindInfo& bindInfo)
1195 {
1196 return DistributedData::AssetBindInfo{
1197 .storeName = std::move(bindInfo.storeName),
1198 .tableName = std::move(bindInfo.tableName),
1199 .primaryKey = ValueProxy::Convert(std::move(bindInfo.primaryKey)),
1200 .field = std::move(bindInfo.field),
1201 .assetName = std::move(bindInfo.assetName),
1202 };
1203 }
1204
OnAssetChanged(const uint32_t tokenId,const std::string & appId,const std::string & sessionId,const std::string & deviceId,const ObjectStore::Asset & asset)1205 int32_t ObjectStoreManager::OnAssetChanged(const uint32_t tokenId, const std::string& appId,
1206 const std::string& sessionId, const std::string& deviceId, const ObjectStore::Asset& asset)
1207 {
1208 const int32_t userId = DistributedKv::AccountDelegate::GetInstance()->GetUserByToken(tokenId);
1209 auto objectAsset = asset;
1210 Asset dataAsset = ValueProxy::Convert(std::move(objectAsset));
1211 auto snapshotKey = appId + SEPERATOR + sessionId;
1212 int32_t res = OBJECT_SUCCESS;
1213 bool exist = snapshots_.ComputeIfPresent(snapshotKey,
1214 [&res, &dataAsset, &deviceId](const std::string &key, std::shared_ptr<Snapshot> snapshot) {
1215 if (snapshot != nullptr) {
1216 res = snapshot->OnDataChanged(dataAsset, deviceId); // needChange
1217 }
1218 return snapshot != nullptr;
1219 });
1220 if (exist) {
1221 return res;
1222 }
1223
1224 auto block = std::make_shared<BlockData<std::tuple<bool, bool>>>(WAIT_TIME, std::tuple{ true, true });
1225 ObjectAssetLoader::GetInstance()->TransferAssetsAsync(userId, appId, deviceId, { dataAsset }, [block](bool ret) {
1226 block->SetValue({ false, ret });
1227 });
1228 auto [timeout, success] = block->GetValue();
1229 if (timeout || !success) {
1230 ZLOGE("transfer failed, timeout: %{public}d, success: %{public}d, name: %{public}s, deviceId: %{public}s ",
1231 timeout, success, asset.name.c_str(), DistributedData::Anonymous::Change(deviceId).c_str());
1232 return OBJECT_INNER_ERROR;
1233 }
1234 return OBJECT_SUCCESS;
1235 }
1236
GetSnapShots(const std::string & bundleName,const std::string & storeName)1237 ObjectStoreManager::UriToSnapshot ObjectStoreManager::GetSnapShots(const std::string& bundleName,
1238 const std::string& storeName)
1239 {
1240 auto storeKey = bundleName + SEPERATOR + storeName;
1241 bindSnapshots_.ComputeIfAbsent(
1242 storeKey, [](const std::string& key) -> auto {
1243 return std::make_shared<std::map<std::string, std::shared_ptr<Snapshot>>>();
1244 });
1245 return bindSnapshots_.Find(storeKey).second;
1246 }
1247
DeleteSnapshot(const std::string & bundleName,const std::string & sessionId)1248 void ObjectStoreManager::DeleteSnapshot(const std::string& bundleName, const std::string& sessionId)
1249 {
1250 auto snapshotKey = bundleName + SEPERATOR + sessionId;
1251 auto snapshot = snapshots_.Find(snapshotKey).second;
1252 if (snapshot == nullptr) {
1253 ZLOGD("Not find snapshot, don't need delete");
1254 return;
1255 }
1256 bindSnapshots_.ForEach([snapshot](auto& key, auto& value) {
1257 for (auto pos = value->begin(); pos != value->end();) {
1258 if (pos->second == snapshot) {
1259 pos = value->erase(pos);
1260 } else {
1261 ++pos;
1262 }
1263 }
1264 return true;
1265 });
1266 snapshots_.Erase(snapshotKey);
1267 }
1268 } // namespace DistributedObject
1269 } // namespace OHOS
1270