1 /*
2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "SyncManager"
16 #include "sync_manager.h"
17
18 #include <chrono>
19
20 #include "account/account_delegate.h"
21 #include "bootstrap.h"
22 #include "checker/checker_manager.h"
23 #include "cloud/cloud_server.h"
24 #include "cloud/schema_meta.h"
25 #include "cloud/sync_event.h"
26 #include "cloud_value_util.h"
27 #include "cloud/cloud_lock_event.h"
28 #include "cloud/cloud_report.h"
29 #include "device_manager_adapter.h"
30 #include "dfx/radar_reporter.h"
31 #include "eventcenter/event_center.h"
32 #include "log_print.h"
33 #include "metadata/meta_data_manager.h"
34 #include "sync_strategies/network_sync_strategy.h"
35 #include "user_delegate.h"
36 #include "utils/anonymous.h"
37 namespace OHOS::CloudData {
38 using namespace DistributedData;
39 using namespace DistributedDataDfx;
40 using namespace DistributedKv;
41 using namespace SharingUtil;
42 using namespace std::chrono;
43 using Account = OHOS::DistributedKv::AccountDelegate;
44 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
45 using Defer = EventCenter::Defer;
46 std::atomic<uint32_t> SyncManager::genId_ = 0;
SyncInfo(int32_t user,const std::string & bundleName,const Store & store,const Tables & tables,int32_t triggerMode)47 SyncManager::SyncInfo::SyncInfo(
48 int32_t user, const std::string &bundleName, const Store &store, const Tables &tables, int32_t triggerMode)
49 : user_(user), bundleName_(bundleName), triggerMode_(triggerMode)
50 {
51 if (!store.empty()) {
52 tables_[store] = tables;
53 }
54 syncId_ = SyncManager::GenerateId(user);
55 }
56
SyncInfo(int32_t user,const std::string & bundleName,const Stores & stores)57 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const Stores &stores)
58 : user_(user), bundleName_(bundleName)
59 {
60 for (auto &store : stores) {
61 tables_[store] = {};
62 }
63 syncId_ = SyncManager::GenerateId(user);
64 }
65
SyncInfo(int32_t user,const std::string & bundleName,const MutliStoreTables & tables)66 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const MutliStoreTables &tables)
67 : user_(user), bundleName_(bundleName), tables_(tables)
68 {
69 tables_ = tables;
70 syncId_ = SyncManager::GenerateId(user);
71 }
72
SyncInfo(const Param & param)73 SyncManager::SyncInfo::SyncInfo(const Param ¶m)
74 : user_(param.user), bundleName_(param.bundleName), triggerMode_(param.triggerMode)
75 {
76 if (!param.store.empty()) {
77 tables_[param.store] = param.tables;
78 }
79 syncId_ = SyncManager::GenerateId(param.user);
80 prepareTraceId_ = param.prepareTraceId;
81 }
82
SetMode(int32_t mode)83 void SyncManager::SyncInfo::SetMode(int32_t mode)
84 {
85 mode_ = mode;
86 }
87
SetWait(int32_t wait)88 void SyncManager::SyncInfo::SetWait(int32_t wait)
89 {
90 wait_ = wait;
91 }
92
SetAsyncDetail(GenAsync asyncDetail)93 void SyncManager::SyncInfo::SetAsyncDetail(GenAsync asyncDetail)
94 {
95 async_ = std::move(asyncDetail);
96 }
97
SetQuery(std::shared_ptr<GenQuery> query)98 void SyncManager::SyncInfo::SetQuery(std::shared_ptr<GenQuery> query)
99 {
100 query_ = query;
101 }
102
SetCompensation(bool isCompensation)103 void SyncManager::SyncInfo::SetCompensation(bool isCompensation)
104 {
105 isCompensation_ = isCompensation;
106 }
107
SetTriggerMode(int32_t triggerMode)108 void SyncManager::SyncInfo::SetTriggerMode(int32_t triggerMode)
109 {
110 triggerMode_ = triggerMode;
111 }
112
SetError(int32_t code) const113 void SyncManager::SyncInfo::SetError(int32_t code) const
114 {
115 if (async_) {
116 GenDetails details;
117 auto &detail = details[id_];
118 detail.progress = GenProgress::SYNC_FINISH;
119 detail.code = code;
120 async_(std::move(details));
121 }
122 }
123
GenerateQuery(const std::string & store,const Tables & tables)124 std::shared_ptr<GenQuery> SyncManager::SyncInfo::GenerateQuery(const std::string &store, const Tables &tables)
125 {
126 if (query_ != nullptr) {
127 return query_;
128 }
129 class SyncQuery final : public GenQuery {
130 public:
131 explicit SyncQuery(const std::vector<std::string> &tables) : tables_(tables) {}
132
133 bool IsEqual(uint64_t tid) override
134 {
135 return false;
136 }
137
138 std::vector<std::string> GetTables() override
139 {
140 return tables_;
141 }
142
143 private:
144 std::vector<std::string> tables_;
145 };
146 auto it = tables_.find(store);
147 return std::make_shared<SyncQuery>(it == tables_.end() || it->second.empty() ? tables : it->second);
148 }
149
Contains(const std::string & storeName)150 bool SyncManager::SyncInfo::Contains(const std::string &storeName)
151 {
152 return tables_.empty() || tables_.find(storeName) != tables_.end();
153 }
154
GetLockChangeHandler()155 std::function<void(const Event &)> SyncManager::GetLockChangeHandler()
156 {
157 return [](const Event &event) {
158 auto &evt = static_cast<const CloudLockEvent &>(event);
159 auto storeInfo = evt.GetStoreInfo();
160 auto callback = evt.GetCallback();
161 if (callback == nullptr) {
162 ZLOGE("callback is nullptr. bundleName: %{public}s, storeName: %{public}s, user: %{public}d.",
163 storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str(), storeInfo.user);
164 return;
165 }
166 StoreMetaData meta(storeInfo);
167 meta.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid;
168 if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
169 ZLOGE("not found meta. bundleName: %{public}s, storeName: %{public}s, user: %{public}d.",
170 storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str(), storeInfo.user);
171 return;
172 }
173 auto store = GetStore(meta, storeInfo.user);
174 if (store == nullptr) {
175 ZLOGE("failed to get store. bundleName: %{public}s, storeName: %{public}s, user: %{public}d.",
176 storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str(), storeInfo.user);
177 return;
178 }
179 if (evt.GetEventId() == CloudEvent::LOCK_CLOUD_CONTAINER) {
180 auto [result, expiredTime] = store->LockCloudDB();
181 callback(result, expiredTime);
182 } else {
183 auto result = store->UnLockCloudDB();
184 callback(result, 0);
185 }
186 };
187 }
188
SyncManager()189 SyncManager::SyncManager()
190 {
191 EventCenter::GetInstance().Subscribe(CloudEvent::LOCK_CLOUD_CONTAINER, GetLockChangeHandler());
192 EventCenter::GetInstance().Subscribe(CloudEvent::UNLOCK_CLOUD_CONTAINER, GetLockChangeHandler());
193 EventCenter::GetInstance().Subscribe(CloudEvent::LOCAL_CHANGE, GetClientChangeHandler());
194 syncStrategy_ = std::make_shared<NetworkSyncStrategy>();
195 auto metaName = Bootstrap::GetInstance().GetProcessLabel();
196 kvApps_.insert(std::move(metaName));
197 auto stores = CheckerManager::GetInstance().GetStaticStores();
198 for (auto &store : stores) {
199 kvApps_.insert(std::move(store.bundleName));
200 }
201 stores = CheckerManager::GetInstance().GetDynamicStores();
202 for (auto &store : stores) {
203 kvApps_.insert(std::move(store.bundleName));
204 }
205 }
206
~SyncManager()207 SyncManager::~SyncManager()
208 {
209 if (executor_ != nullptr) {
210 actives_.ForEachCopies([this](auto &syncId, auto &taskId) {
211 executor_->Remove(taskId);
212 return false;
213 });
214 executor_ = nullptr;
215 }
216 }
217
Bind(std::shared_ptr<ExecutorPool> executor)218 int32_t SyncManager::Bind(std::shared_ptr<ExecutorPool> executor)
219 {
220 executor_ = executor;
221 return E_OK;
222 }
223
DoCloudSync(SyncInfo syncInfo)224 int32_t SyncManager::DoCloudSync(SyncInfo syncInfo)
225 {
226 if (executor_ == nullptr) {
227 return E_NOT_INIT;
228 }
229 auto syncId = GenerateId(syncInfo.user_);
230 auto ref = GenSyncRef(syncId);
231 actives_.Compute(syncId, [this, &ref, &syncInfo](const uint64_t &key, TaskId &taskId) mutable {
232 taskId = executor_->Execute(GetSyncTask(0, true, ref, std::move(syncInfo)));
233 return true;
234 });
235 return E_OK;
236 }
237
StopCloudSync(int32_t user)238 int32_t SyncManager::StopCloudSync(int32_t user)
239 {
240 if (executor_ == nullptr) {
241 return E_NOT_INIT;
242 }
243 actives_.ForEachCopies([this, user](auto &syncId, auto &taskId) {
244 if (Compare(syncId, user) == 0) {
245 executor_->Remove(taskId);
246 }
247 return false;
248 });
249 return E_OK;
250 }
251
IsValid(SyncInfo & info,CloudInfo & cloud)252 GeneralError SyncManager::IsValid(SyncInfo &info, CloudInfo &cloud)
253 {
254 if (!MetaDataManager::GetInstance().LoadMeta(cloud.GetKey(), cloud, true) ||
255 (info.id_ != SyncInfo::DEFAULT_ID && cloud.id != info.id_)) {
256 info.SetError(E_CLOUD_DISABLED);
257 ZLOGE("cloudInfo invalid:%{public}d, <syncId:%{public}s, metaId:%{public}s>", cloud.IsValid(),
258 Anonymous::Change(info.id_).c_str(), Anonymous::Change(cloud.id).c_str());
259 return E_CLOUD_DISABLED;
260 }
261 if (!cloud.enableCloud || (!info.bundleName_.empty() && !cloud.IsOn(info.bundleName_))) {
262 info.SetError(E_CLOUD_DISABLED);
263 ZLOGD("enable:%{public}d, bundleName:%{public}s", cloud.enableCloud, info.bundleName_.c_str());
264 return E_CLOUD_DISABLED;
265 }
266 if (!DmAdapter::GetInstance().IsNetworkAvailable()) {
267 info.SetError(E_NETWORK_ERROR);
268 ZLOGD("network unavailable");
269 return E_NETWORK_ERROR;
270 }
271 if (!Account::GetInstance()->IsVerified(info.user_)) {
272 info.SetError(E_USER_UNLOCK);
273 ZLOGD("user unverified");
274 return E_ERROR;
275 }
276 return E_OK;
277 }
278
GetPostEventTask(const std::vector<SchemaMeta> & schemas,CloudInfo & cloud,SyncInfo & info,bool retry,const TraceIds & traceIds)279 std::function<void()> SyncManager::GetPostEventTask(const std::vector<SchemaMeta> &schemas, CloudInfo &cloud,
280 SyncInfo &info, bool retry, const TraceIds &traceIds)
281 {
282 return [this, &cloud, &info, &schemas, retry, &traceIds]() {
283 bool isPostEvent = false;
284 for (auto &schema : schemas) {
285 auto it = traceIds.find(schema.bundleName);
286 if (!cloud.IsOn(schema.bundleName)) {
287 UpdateFinishSyncInfo({ cloud.id, schema.bundleName, "" }, info.syncId_, E_ERROR);
288 Report({ cloud.user, schema.bundleName, it == traceIds.end() ? "" : it->second, SyncStage::END,
289 E_ERROR });
290 continue;
291 }
292 for (const auto &database : schema.databases) {
293 if (!info.Contains(database.name)) {
294 UpdateFinishSyncInfo({ cloud.id, schema.bundleName, "" }, info.syncId_, E_ERROR);
295 Report({ cloud.user, schema.bundleName, it == traceIds.end() ? "" : it->second, SyncStage::END,
296 E_ERROR });
297 continue;
298 }
299 StoreInfo storeInfo = { 0, schema.bundleName, database.name, cloud.apps[schema.bundleName].instanceId,
300 info.user_, "", info.syncId_ };
301 auto status = syncStrategy_->CheckSyncAction(storeInfo);
302 if (status != SUCCESS) {
303 ZLOGW("Verification strategy failed, status:%{public}d. %{public}d:%{public}s:%{public}s", status,
304 storeInfo.user, storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str());
305 UpdateFinishSyncInfo({ cloud.id, schema.bundleName, "" }, info.syncId_, status);
306 Report({ cloud.user, schema.bundleName, it == traceIds.end() ? "" : it->second, SyncStage::END,
307 status });
308 info.SetError(status);
309 continue;
310 }
311 auto query = info.GenerateQuery(database.name, database.GetTableNames());
312 SyncParam syncParam = { info.mode_, info.wait_, info.isCompensation_, info.triggerMode_,
313 it == traceIds.end() ? "" : it->second, cloud.user };
314 auto evt = std::make_unique<SyncEvent>(std::move(storeInfo),
315 SyncEvent::EventInfo{ syncParam, retry, std::move(query), info.async_ });
316 EventCenter::GetInstance().PostEvent(std::move(evt));
317 isPostEvent = true;
318 }
319 }
320 if (!isPostEvent) {
321 ZLOGE("schema is invalid, user: %{public}d", cloud.user);
322 info.SetError(E_ERROR);
323 }
324 };
325 }
326
GetSyncTask(int32_t times,bool retry,RefCount ref,SyncInfo && syncInfo)327 ExecutorPool::Task SyncManager::GetSyncTask(int32_t times, bool retry, RefCount ref, SyncInfo &&syncInfo)
328 {
329 times++;
330 return [this, times, retry, keep = std::move(ref), info = std::move(syncInfo)]() mutable {
331 activeInfos_.Erase(info.syncId_);
332 bool createdByDefaultUser = InitDefaultUser(info.user_);
333 CloudInfo cloud;
334 cloud.user = info.user_;
335
336 auto cloudSyncInfos = GetCloudSyncInfo(info, cloud);
337 if (cloudSyncInfos.empty()) {
338 ZLOGD("get cloud info failed, user: %{public}d.", cloud.user);
339 info.SetError(E_CLOUD_DISABLED);
340 return;
341 }
342 auto traceIds = GetPrepareTraceId(info, cloud);
343 BatchReport(info.user_, traceIds, SyncStage::PREPARE, E_OK);
344 UpdateStartSyncInfo(cloudSyncInfos);
345 auto code = IsValid(info, cloud);
346 if (code != E_OK) {
347 BatchUpdateFinishState(cloudSyncInfos, code);
348 BatchReport(info.user_, traceIds, SyncStage::END, code);
349 return;
350 }
351
352 auto retryer = GetRetryer(times, info, cloud.user);
353 auto schemas = GetSchemaMeta(cloud, info.bundleName_);
354 if (schemas.empty()) {
355 UpdateSchema(info);
356 schemas = GetSchemaMeta(cloud, info.bundleName_);
357 if (schemas.empty()) {
358 auto it = traceIds.find(info.bundleName_);
359 retryer(RETRY_INTERVAL, E_RETRY_TIMEOUT, GenStore::CLOUD_ERR_OFFSET + E_CLOUD_DISABLED,
360 it == traceIds.end() ? "" : it->second);
361 BatchUpdateFinishState(cloudSyncInfos, E_CLOUD_DISABLED);
362 BatchReport(info.user_, traceIds, SyncStage::END, E_CLOUD_DISABLED);
363 return;
364 }
365 }
366 Defer defer(GetSyncHandler(std::move(retryer)), CloudEvent::CLOUD_SYNC);
367 if (createdByDefaultUser) {
368 info.user_ = 0;
369 }
370 auto task = GetPostEventTask(schemas, cloud, info, retry, traceIds);
371 task();
372 };
373 }
374
GetSyncHandler(Retryer retryer)375 std::function<void(const Event &)> SyncManager::GetSyncHandler(Retryer retryer)
376 {
377 return [this, retryer](const Event &event) {
378 auto &evt = static_cast<const SyncEvent &>(event);
379 auto &storeInfo = evt.GetStoreInfo();
380 GenAsync async = evt.GetAsyncDetail();
381 auto prepareTraceId = evt.GetPrepareTraceId();
382 auto user = evt.GetUser();
383 GenDetails details;
384 auto &detail = details[SyncInfo::DEFAULT_ID];
385 detail.progress = GenProgress::SYNC_FINISH;
386 auto [result, meta] = GetMetaData(storeInfo);
387 if (!result) {
388 return DoExceptionalCallback(async, details, storeInfo, prepareTraceId);
389 }
390 auto store = GetStore(meta, storeInfo.user);
391 if (store == nullptr) {
392 ZLOGE("store null, storeId:%{public}s, prepareTraceId:%{public}s", meta.GetStoreAlias().c_str(),
393 prepareTraceId.c_str());
394 return DoExceptionalCallback(async, details, storeInfo, prepareTraceId);
395 }
396 ZLOGI("database:<%{public}d:%{public}s:%{public}s:%{public}s> sync start", storeInfo.user,
397 storeInfo.bundleName.c_str(), meta.GetStoreAlias().c_str(), prepareTraceId.c_str());
398 RadarReporter::Report(
399 { storeInfo.bundleName.c_str(), CLOUD_SYNC, TRIGGER_SYNC, storeInfo.syncId, evt.GetTriggerMode() },
400 "GetSyncHandler", BizState::BEGIN);
401 Report({ user, storeInfo.bundleName, prepareTraceId, SyncStage::START, E_OK });
402 SyncParam syncParam = { evt.GetMode(), evt.GetWait(), evt.IsCompensation(), MODE_DEFAULT, prepareTraceId };
403 auto [status, dbCode] = store->Sync({ SyncInfo::DEFAULT_ID }, *(evt.GetQuery()),
404 evt.AutoRetry() ? RetryCallback(storeInfo, retryer, evt.GetTriggerMode(), prepareTraceId, user)
405 : GetCallback(evt.GetAsyncDetail(), storeInfo, evt.GetTriggerMode(), prepareTraceId, user),
406 syncParam);
407 if (status != E_OK) {
408 if (async) {
409 detail.code = status;
410 async(std::move(details));
411 }
412 UpdateFinishSyncInfo({ GetAccountId(storeInfo.user), storeInfo.bundleName, "" }, storeInfo.syncId, E_ERROR);
413 if (status == GeneralError::E_NOT_SUPPORT) {
414 return;
415 }
416 auto code = dbCode == 0 ? GenStore::CLOUD_ERR_OFFSET + status : dbCode;
417 RadarReporter::Report({ storeInfo.bundleName.c_str(), CLOUD_SYNC, FINISH_SYNC, storeInfo.syncId,
418 evt.GetTriggerMode(), false, code }, "GetSyncHandler", BizState::END);
419 Report({ user, storeInfo.bundleName, prepareTraceId, SyncStage::END, code });
420 }
421 };
422 }
423
GetClientChangeHandler()424 std::function<void(const Event &)> SyncManager::GetClientChangeHandler()
425 {
426 return [this](const Event &event) {
427 auto &evt = static_cast<const SyncEvent &>(event);
428 auto store = evt.GetStoreInfo();
429 SyncInfo syncInfo(store.user, store.bundleName, store.storeName);
430 syncInfo.SetMode(evt.GetMode());
431 syncInfo.SetWait(evt.GetWait());
432 syncInfo.SetAsyncDetail(evt.GetAsyncDetail());
433 syncInfo.SetQuery(evt.GetQuery());
434 syncInfo.SetCompensation(evt.IsCompensation());
435 syncInfo.SetTriggerMode(evt.GetTriggerMode());
436 auto times = evt.AutoRetry() ? RETRY_TIMES - CLIENT_RETRY_TIMES : RETRY_TIMES;
437 executor_->Execute(GetSyncTask(times, evt.AutoRetry(), RefCount(), std::move(syncInfo)));
438 };
439 }
440
GetRetryer(int32_t times,const SyncInfo & syncInfo,int32_t user)441 SyncManager::Retryer SyncManager::GetRetryer(int32_t times, const SyncInfo &syncInfo, int32_t user)
442 {
443 if (times >= RETRY_TIMES) {
444 return [this, user, info = SyncInfo(syncInfo)](Duration, int32_t code, int32_t dbCode,
445 const std::string &prepareTraceId) mutable {
446 if (code == E_OK || code == E_SYNC_TASK_MERGED) {
447 return true;
448 }
449 info.SetError(code);
450 RadarReporter::Report({ info.bundleName_.c_str(), CLOUD_SYNC, FINISH_SYNC, info.syncId_, info.triggerMode_,
451 false, dbCode },
452 "GetRetryer", BizState::END);
453 Report({ user, info.bundleName_, prepareTraceId, SyncStage::END,
454 dbCode == GenStore::DB_ERR_OFFSET ? 0 : dbCode });
455 return true;
456 };
457 }
458 return [this, times, user, info = SyncInfo(syncInfo)](Duration interval, int32_t code, int32_t dbCode,
459 const std::string &prepareTraceId) mutable {
460 if (code == E_OK || code == E_SYNC_TASK_MERGED) {
461 return true;
462 }
463 if (code == E_NO_SPACE_FOR_ASSET || code == E_RECODE_LIMIT_EXCEEDED) {
464 info.SetError(code);
465 RadarReporter::Report({ info.bundleName_.c_str(), CLOUD_SYNC, FINISH_SYNC, info.syncId_, info.triggerMode_,
466 false, dbCode },
467 "GetRetryer", BizState::END);
468 Report({ user, info.bundleName_, prepareTraceId, SyncStage::END,
469 dbCode == GenStore::DB_ERR_OFFSET ? 0 : dbCode });
470 return true;
471 }
472
473 activeInfos_.ComputeIfAbsent(info.syncId_, [this, times, interval, &info](uint64_t key) mutable {
474 auto syncId = GenerateId(info.user_);
475 auto ref = GenSyncRef(syncId);
476 actives_.Compute(syncId, [this, times, interval, &ref, &info](const uint64_t &key, TaskId &value) mutable {
477 value = executor_->Schedule(interval, GetSyncTask(times, true, ref, std::move(info)));
478 return true;
479 });
480 return syncId;
481 });
482 return true;
483 };
484 }
485
GenerateId(int32_t user)486 uint64_t SyncManager::GenerateId(int32_t user)
487 {
488 uint64_t syncId = static_cast<uint64_t>(user) & 0xFFFFFFFF;
489 return (syncId << MV_BIT) | (++genId_);
490 }
491
GenSyncRef(uint64_t syncId)492 RefCount SyncManager::GenSyncRef(uint64_t syncId)
493 {
494 return RefCount([syncId, this]() {
495 actives_.Erase(syncId);
496 });
497 }
498
Compare(uint64_t syncId,int32_t user)499 int32_t SyncManager::Compare(uint64_t syncId, int32_t user)
500 {
501 uint64_t inner = static_cast<uint64_t>(user) & 0xFFFFFFFF;
502 return (syncId & USER_MARK) == (inner << MV_BIT);
503 }
504
UpdateSchema(const SyncManager::SyncInfo & syncInfo)505 void SyncManager::UpdateSchema(const SyncManager::SyncInfo &syncInfo)
506 {
507 StoreInfo storeInfo;
508 storeInfo.user = syncInfo.user_;
509 storeInfo.bundleName = syncInfo.bundleName_;
510 EventCenter::GetInstance().PostEvent(std::make_unique<CloudEvent>(CloudEvent::GET_SCHEMA, storeInfo));
511 }
512
GetBindInfos(const StoreMetaData & meta,const std::vector<int32_t> & users,const Database & schemaDatabase)513 std::map<uint32_t, GeneralStore::BindInfo> SyncManager::GetBindInfos(const StoreMetaData &meta,
514 const std::vector<int32_t> &users, const Database &schemaDatabase)
515 {
516 auto instance = CloudServer::GetInstance();
517 if (instance == nullptr) {
518 ZLOGD("not support cloud sync");
519 return {};
520 }
521 std::map<uint32_t, GeneralStore::BindInfo> bindInfos;
522 for (auto &activeUser : users) {
523 if (activeUser == 0) {
524 continue;
525 }
526 auto cloudDB = instance->ConnectCloudDB(meta.bundleName, activeUser, schemaDatabase);
527 if (cloudDB == nullptr) {
528 ZLOGE("failed, no cloud DB <%{public}d:0x%{public}x %{public}s<->%{public}s>", meta.tokenId, activeUser,
529 Anonymous::Change(schemaDatabase.name).c_str(), Anonymous::Change(schemaDatabase.alias).c_str());
530 return {};
531 }
532 if (meta.storeType >= StoreMetaData::StoreType::STORE_KV_BEGIN &&
533 meta.storeType <= StoreMetaData::StoreType::STORE_KV_END) {
534 bindInfos.insert_or_assign(activeUser, GeneralStore::BindInfo{ std::move(cloudDB), nullptr });
535 continue;
536 }
537 auto assetLoader = instance->ConnectAssetLoader(meta.bundleName, activeUser, schemaDatabase);
538 if (assetLoader == nullptr) {
539 ZLOGE("failed, no cloud DB <%{public}d:0x%{public}x %{public}s<->%{public}s>", meta.tokenId, activeUser,
540 Anonymous::Change(schemaDatabase.name).c_str(), Anonymous::Change(schemaDatabase.alias).c_str());
541 return {};
542 }
543 bindInfos.insert_or_assign(activeUser, GeneralStore::BindInfo{ std::move(cloudDB), std::move(assetLoader) });
544 }
545 return bindInfos;
546 }
547
GetStore(const StoreMetaData & meta,int32_t user,bool mustBind)548 AutoCache::Store SyncManager::GetStore(const StoreMetaData &meta, int32_t user, bool mustBind)
549 {
550 if (user != 0 && !Account::GetInstance()->IsVerified(user)) {
551 ZLOGW("user:%{public}d is locked!", user);
552 return nullptr;
553 }
554 auto instance = CloudServer::GetInstance();
555 if (instance == nullptr) {
556 ZLOGD("not support cloud sync");
557 return nullptr;
558 }
559 auto store = AutoCache::GetInstance().GetStore(meta, {});
560 if (store == nullptr) {
561 ZLOGE("store null, storeId:%{public}s", meta.GetStoreAlias().c_str());
562 return nullptr;
563 }
564 if (!store->IsBound()) {
565 std::vector<int32_t> users{};
566 CloudInfo info;
567 if (user == 0) {
568 AccountDelegate::GetInstance()->QueryForegroundUsers(users);
569 } else {
570 users.push_back(user);
571 }
572 if (!users.empty()) {
573 info.user = users[0];
574 }
575 SchemaMeta schemaMeta;
576 std::string schemaKey = info.GetSchemaKey(meta.bundleName, meta.instanceId);
577 if (!MetaDataManager::GetInstance().LoadMeta(schemaKey, schemaMeta, true)) {
578 ZLOGE("failed, no schema bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
579 meta.GetStoreAlias().c_str());
580 return nullptr;
581 }
582 auto dbMeta = schemaMeta.GetDataBase(meta.storeId);
583 std::map<uint32_t, GeneralStore::BindInfo> bindInfos = GetBindInfos(meta, users, dbMeta);
584 if (mustBind && bindInfos.size() != users.size()) {
585 return nullptr;
586 }
587 GeneralStore::CloudConfig config;
588 if (MetaDataManager::GetInstance().LoadMeta(info.GetKey(), info, true)) {
589 config.maxNumber = info.maxNumber;
590 config.maxSize = info.maxSize;
591 }
592 store->Bind(dbMeta, bindInfos, config);
593 }
594 return store;
595 }
596
Report(const ReportParam & reportParam)597 void SyncManager::Report(const ReportParam &reportParam)
598 {
599 auto cloudReport = CloudReport::GetInstance();
600 if (cloudReport == nullptr) {
601 return;
602 }
603 cloudReport->Report(reportParam);
604 }
605
GetPrepareTraceId(const SyncInfo & info,const CloudInfo & cloud)606 SyncManager::TraceIds SyncManager::GetPrepareTraceId(const SyncInfo &info, const CloudInfo &cloud)
607 {
608 TraceIds traceIds;
609 if (!info.prepareTraceId_.empty()) {
610 traceIds.emplace(info.bundleName_, info.prepareTraceId_);
611 return traceIds;
612 }
613 auto cloudReport = CloudReport::GetInstance();
614 if (cloudReport == nullptr) {
615 return traceIds;
616 }
617 if (info.bundleName_.empty()) {
618 for (const auto &it : cloud.apps) {
619 traceIds.emplace(it.first, cloudReport->GetPrepareTraceId(info.user_));
620 }
621 } else {
622 traceIds.emplace(info.bundleName_, cloudReport->GetPrepareTraceId(info.user_));
623 }
624 return traceIds;
625 }
626
NeedGetCloudInfo(CloudInfo & cloud)627 bool SyncManager::NeedGetCloudInfo(CloudInfo &cloud)
628 {
629 return (!MetaDataManager::GetInstance().LoadMeta(cloud.GetKey(), cloud, true) || !cloud.enableCloud) &&
630 DmAdapter::GetInstance().IsNetworkAvailable() && Account::GetInstance()->IsLoginAccount();
631 }
632
GetCloudSyncInfo(const SyncInfo & info,CloudInfo & cloud)633 std::vector<std::tuple<QueryKey, uint64_t>> SyncManager::GetCloudSyncInfo(const SyncInfo &info, CloudInfo &cloud)
634 {
635 std::vector<std::tuple<QueryKey, uint64_t>> cloudSyncInfos;
636 if (NeedGetCloudInfo(cloud)) {
637 ZLOGI("get cloud info from server, user: %{public}d.", cloud.user);
638 auto instance = CloudServer::GetInstance();
639 if (instance == nullptr) {
640 return cloudSyncInfos;
641 }
642 cloud = instance->GetServerInfo(cloud.user, false);
643 if (!cloud.IsValid()) {
644 ZLOGE("cloud is empty, user: %{public}d", cloud.user);
645 return cloudSyncInfos;
646 }
647 if (!MetaDataManager::GetInstance().SaveMeta(cloud.GetKey(), cloud, true)) {
648 ZLOGW("save cloud info fail, user: %{public}d", cloud.user);
649 }
650 }
651 if (info.bundleName_.empty()) {
652 for (const auto &it : cloud.apps) {
653 QueryKey queryKey{ .accountId = cloud.id, .bundleName = it.first, .storeId = "" };
654 cloudSyncInfos.emplace_back(std::make_tuple(queryKey, info.syncId_));
655 }
656 } else {
657 QueryKey queryKey{ .accountId = cloud.id, .bundleName = info.bundleName_, .storeId = "" };
658 cloudSyncInfos.emplace_back(std::make_tuple(queryKey, info.syncId_));
659 }
660 return cloudSyncInfos;
661 }
662
GetLastResults(const std::string & storeId,std::map<SyncId,CloudSyncInfo> & infos,QueryLastResults & results)663 void SyncManager::GetLastResults(
664 const std::string &storeId, std::map<SyncId, CloudSyncInfo> &infos, QueryLastResults &results)
665 {
666 for (auto &[key, info] : infos) {
667 if (info.code != -1) {
668 results.insert(std::pair<std::string, CloudSyncInfo>(storeId, info));
669 }
670 }
671 }
672
NeedSaveSyncInfo(const QueryKey & queryKey)673 bool SyncManager::NeedSaveSyncInfo(const QueryKey &queryKey)
674 {
675 if (queryKey.accountId.empty()) {
676 return false;
677 }
678 return kvApps_.find(queryKey.bundleName) == kvApps_.end();
679 }
680
QueryLastSyncInfo(const std::vector<QueryKey> & queryKeys,QueryLastResults & results)681 int32_t SyncManager::QueryLastSyncInfo(const std::vector<QueryKey> &queryKeys, QueryLastResults &results)
682 {
683 for (const auto &queryKey : queryKeys) {
684 std::string storeId = queryKey.storeId;
685 QueryKey key{ .accountId = queryKey.accountId, .bundleName = queryKey.bundleName, .storeId = "" };
686 lastSyncInfos_.ComputeIfPresent(
687 key, [&storeId, &results](auto &key, std::map<SyncId, CloudSyncInfo> &vals) {
688 GetLastResults(storeId, vals, results);
689 return !vals.empty();
690 });
691 }
692 return SUCCESS;
693 }
694
UpdateStartSyncInfo(const std::vector<std::tuple<QueryKey,uint64_t>> & cloudSyncInfos)695 void SyncManager::UpdateStartSyncInfo(const std::vector<std::tuple<QueryKey, uint64_t>> &cloudSyncInfos)
696 {
697 int64_t startTime = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
698 for (const auto &[queryKey, syncId] : cloudSyncInfos) {
699 if (!NeedSaveSyncInfo(queryKey)) {
700 continue;
701 }
702 lastSyncInfos_.Compute(queryKey, [id = syncId, startTime](auto &, std::map<SyncId, CloudSyncInfo> &val) {
703 val[id] = { .startTime = startTime };
704 return !val.empty();
705 });
706 }
707 }
708
UpdateFinishSyncInfo(const QueryKey & queryKey,uint64_t syncId,int32_t code)709 void SyncManager::UpdateFinishSyncInfo(const QueryKey &queryKey, uint64_t syncId, int32_t code)
710 {
711 if (!NeedSaveSyncInfo(queryKey)) {
712 return;
713 }
714 lastSyncInfos_.ComputeIfPresent(queryKey, [syncId, code](auto &key, std::map<SyncId, CloudSyncInfo> &val) {
715 auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
716 for (auto iter = val.begin(); iter != val.end();) {
717 bool isExpired = ((now - iter->second.startTime) >= EXPIRATION_TIME) && iter->second.code == -1;
718 if ((iter->first != syncId && ((iter->second.code != -1) || isExpired))) {
719 iter = val.erase(iter);
720 } else if (iter->first == syncId) {
721 iter->second.finishTime = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
722 iter->second.code = code;
723 iter++;
724 } else {
725 iter++;
726 }
727 }
728 return true;
729 });
730 }
731
GetCallback(const GenAsync & async,const StoreInfo & storeInfo,int32_t triggerMode,const std::string & prepareTraceId,int32_t user)732 std::function<void(const GenDetails &result)> SyncManager::GetCallback(const GenAsync &async,
733 const StoreInfo &storeInfo, int32_t triggerMode, const std::string &prepareTraceId, int32_t user)
734 {
735 return [this, async, storeInfo, triggerMode, prepareTraceId, user](const GenDetails &result) {
736 if (async != nullptr) {
737 async(result);
738 }
739
740 if (result.empty()) {
741 ZLOGE("result is empty");
742 return;
743 }
744
745 if (result.begin()->second.progress != GenProgress::SYNC_FINISH) {
746 return;
747 }
748
749 int32_t dbCode = (result.begin()->second.dbCode == GenStore::DB_ERR_OFFSET) ? 0 : result.begin()->second.dbCode;
750 RadarReporter::Report({ storeInfo.bundleName.c_str(), CLOUD_SYNC, FINISH_SYNC, storeInfo.syncId, triggerMode,
751 result.begin()->second.changeCount, dbCode },
752 "GetCallback", BizState::END);
753 Report({ user, storeInfo.bundleName, prepareTraceId, SyncStage::END, dbCode });
754
755 auto id = GetAccountId(storeInfo.user);
756 if (id.empty()) {
757 ZLOGD("account id is empty");
758 return;
759 }
760 QueryKey queryKey{
761 .accountId = id,
762 .bundleName = storeInfo.bundleName,
763 .storeId = ""
764 };
765
766 int32_t code = result.begin()->second.code;
767 UpdateFinishSyncInfo(queryKey, storeInfo.syncId, code);
768 };
769 }
770
GetAccountId(int32_t user)771 std::string SyncManager::GetAccountId(int32_t user)
772 {
773 CloudInfo cloudInfo;
774 cloudInfo.user = user;
775 if (!MetaDataManager::GetInstance().LoadMeta(cloudInfo.GetKey(), cloudInfo, true)) {
776 ZLOGE("not exist meta, user:%{public}d.", cloudInfo.user);
777 return "";
778 }
779 return cloudInfo.id;
780 }
781
GetInterval(int32_t code)782 ExecutorPool::Duration SyncManager::GetInterval(int32_t code)
783 {
784 switch (code) {
785 case E_LOCKED_BY_OTHERS:
786 return LOCKED_INTERVAL;
787 case E_BUSY:
788 return BUSY_INTERVAL;
789 default:
790 return RETRY_INTERVAL;
791 }
792 }
793
GetSchemaMeta(const CloudInfo & cloud,const std::string & bundleName)794 std::vector<SchemaMeta> SyncManager::GetSchemaMeta(const CloudInfo &cloud, const std::string &bundleName)
795 {
796 std::vector<SchemaMeta> schemas;
797 auto key = cloud.GetSchemaPrefix(bundleName);
798 MetaDataManager::GetInstance().LoadMeta(key, schemas, true);
799 return schemas;
800 }
801
DoExceptionalCallback(const GenAsync & async,GenDetails & details,const StoreInfo & storeInfo,const std::string & prepareTraceId)802 void SyncManager::DoExceptionalCallback(const GenAsync &async, GenDetails &details, const StoreInfo &storeInfo,
803 const std::string &prepareTraceId)
804 {
805 if (async) {
806 details[SyncInfo::DEFAULT_ID].code = E_ERROR;
807 async(details);
808 }
809 QueryKey queryKey{ GetAccountId(storeInfo.user), storeInfo.bundleName, "" };
810 UpdateFinishSyncInfo(queryKey, storeInfo.syncId, E_ERROR);
811 Report({ storeInfo.user, storeInfo.bundleName, prepareTraceId, SyncStage::END, E_ERROR });
812 }
813
InitDefaultUser(int32_t & user)814 bool SyncManager::InitDefaultUser(int32_t &user)
815 {
816 if (user != 0) {
817 return false;
818 }
819 std::vector<int32_t> users;
820 AccountDelegate::GetInstance()->QueryUsers(users);
821 if (!users.empty()) {
822 user = users[0];
823 }
824 return true;
825 }
826
RetryCallback(const StoreInfo & storeInfo,Retryer retryer,int32_t triggerMode,const std::string & prepareTraceId,int32_t user)827 std::function<void(const DistributedData::GenDetails &result)> SyncManager::RetryCallback(const StoreInfo &storeInfo,
828 Retryer retryer, int32_t triggerMode, const std::string &prepareTraceId, int32_t user)
829 {
830 return [this, retryer, storeInfo, triggerMode, prepareTraceId, user](const GenDetails &details) {
831 if (details.empty()) {
832 ZLOGE("retry, details empty");
833 return;
834 }
835 int32_t code = details.begin()->second.code;
836 int32_t dbCode = details.begin()->second.dbCode;
837 if (details.begin()->second.progress == GenProgress::SYNC_FINISH) {
838 QueryKey queryKey{ GetAccountId(storeInfo.user), storeInfo.bundleName, "" };
839 UpdateFinishSyncInfo(queryKey, storeInfo.syncId, code);
840 if (code == E_OK) {
841 RadarReporter::Report({ storeInfo.bundleName.c_str(), CLOUD_SYNC, FINISH_SYNC, storeInfo.syncId,
842 triggerMode, details.begin()->second.changeCount },
843 "RetryCallback", BizState::END);
844 Report({ user, storeInfo.bundleName, prepareTraceId, SyncStage::END,
845 dbCode == GenStore::DB_ERR_OFFSET ? 0 : dbCode });
846 }
847 }
848 retryer(GetInterval(code), code, dbCode, prepareTraceId);
849 };
850 }
851
BatchUpdateFinishState(const std::vector<std::tuple<QueryKey,uint64_t>> & cloudSyncInfos,int32_t code)852 void SyncManager::BatchUpdateFinishState(const std::vector<std::tuple<QueryKey, uint64_t>> &cloudSyncInfos,
853 int32_t code)
854 {
855 for (const auto &[queryKey, syncId] : cloudSyncInfos) {
856 UpdateFinishSyncInfo(queryKey, syncId, code);
857 }
858 }
859
BatchReport(int32_t userId,const TraceIds & traceIds,SyncStage syncStage,int32_t errCode)860 void SyncManager::BatchReport(int32_t userId, const TraceIds &traceIds, SyncStage syncStage, int32_t errCode)
861 {
862 for (const auto &[bundle, id] : traceIds) {
863 Report({ userId, bundle, id, syncStage, errCode });
864 }
865 }
866
GetMetaData(const StoreInfo & storeInfo)867 std::pair<bool, StoreMetaData> SyncManager::GetMetaData(const StoreInfo &storeInfo)
868 {
869 StoreMetaData meta(storeInfo);
870 meta.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid;
871 if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
872 meta.user = "0"; // check if it is a public store.
873 StoreMetaDataLocal localMetaData;
874 if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKeyLocal(), localMetaData, true) ||
875 !localMetaData.isPublic || !MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
876 ZLOGE("failed, no store meta. bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
877 meta.GetStoreAlias().c_str());
878 return { false, meta };
879 }
880 }
881 return { true, meta };
882 }
883 } // namespace OHOS::CloudData