1 /*
2  * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "SyncManager"
16 #include "sync_manager.h"
17 
18 #include <chrono>
19 
20 #include "account/account_delegate.h"
21 #include "bootstrap.h"
22 #include "checker/checker_manager.h"
23 #include "cloud/cloud_server.h"
24 #include "cloud/schema_meta.h"
25 #include "cloud/sync_event.h"
26 #include "cloud_value_util.h"
27 #include "cloud/cloud_lock_event.h"
28 #include "cloud/cloud_report.h"
29 #include "device_manager_adapter.h"
30 #include "dfx/radar_reporter.h"
31 #include "eventcenter/event_center.h"
32 #include "log_print.h"
33 #include "metadata/meta_data_manager.h"
34 #include "sync_strategies/network_sync_strategy.h"
35 #include "user_delegate.h"
36 #include "utils/anonymous.h"
37 namespace OHOS::CloudData {
38 using namespace DistributedData;
39 using namespace DistributedDataDfx;
40 using namespace DistributedKv;
41 using namespace SharingUtil;
42 using namespace std::chrono;
43 using Account = OHOS::DistributedKv::AccountDelegate;
44 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
45 using Defer = EventCenter::Defer;
46 std::atomic<uint32_t> SyncManager::genId_ = 0;
SyncInfo(int32_t user,const std::string & bundleName,const Store & store,const Tables & tables,int32_t triggerMode)47 SyncManager::SyncInfo::SyncInfo(
48     int32_t user, const std::string &bundleName, const Store &store, const Tables &tables, int32_t triggerMode)
49     : user_(user), bundleName_(bundleName), triggerMode_(triggerMode)
50 {
51     if (!store.empty()) {
52         tables_[store] = tables;
53     }
54     syncId_ = SyncManager::GenerateId(user);
55 }
56 
SyncInfo(int32_t user,const std::string & bundleName,const Stores & stores)57 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const Stores &stores)
58     : user_(user), bundleName_(bundleName)
59 {
60     for (auto &store : stores) {
61         tables_[store] = {};
62     }
63     syncId_ = SyncManager::GenerateId(user);
64 }
65 
SyncInfo(int32_t user,const std::string & bundleName,const MutliStoreTables & tables)66 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const MutliStoreTables &tables)
67     : user_(user), bundleName_(bundleName), tables_(tables)
68 {
69     tables_ = tables;
70     syncId_ = SyncManager::GenerateId(user);
71 }
72 
SyncInfo(const Param & param)73 SyncManager::SyncInfo::SyncInfo(const Param &param)
74     : user_(param.user), bundleName_(param.bundleName), triggerMode_(param.triggerMode)
75 {
76     if (!param.store.empty()) {
77         tables_[param.store] = param.tables;
78     }
79     syncId_ = SyncManager::GenerateId(param.user);
80     prepareTraceId_ = param.prepareTraceId;
81 }
82 
SetMode(int32_t mode)83 void SyncManager::SyncInfo::SetMode(int32_t mode)
84 {
85     mode_ = mode;
86 }
87 
SetWait(int32_t wait)88 void SyncManager::SyncInfo::SetWait(int32_t wait)
89 {
90     wait_ = wait;
91 }
92 
SetAsyncDetail(GenAsync asyncDetail)93 void SyncManager::SyncInfo::SetAsyncDetail(GenAsync asyncDetail)
94 {
95     async_ = std::move(asyncDetail);
96 }
97 
SetQuery(std::shared_ptr<GenQuery> query)98 void SyncManager::SyncInfo::SetQuery(std::shared_ptr<GenQuery> query)
99 {
100     query_ = query;
101 }
102 
SetCompensation(bool isCompensation)103 void SyncManager::SyncInfo::SetCompensation(bool isCompensation)
104 {
105     isCompensation_ = isCompensation;
106 }
107 
SetTriggerMode(int32_t triggerMode)108 void SyncManager::SyncInfo::SetTriggerMode(int32_t triggerMode)
109 {
110     triggerMode_ = triggerMode;
111 }
112 
SetError(int32_t code) const113 void SyncManager::SyncInfo::SetError(int32_t code) const
114 {
115     if (async_) {
116         GenDetails details;
117         auto &detail = details[id_];
118         detail.progress = GenProgress::SYNC_FINISH;
119         detail.code = code;
120         async_(std::move(details));
121     }
122 }
123 
GenerateQuery(const std::string & store,const Tables & tables)124 std::shared_ptr<GenQuery> SyncManager::SyncInfo::GenerateQuery(const std::string &store, const Tables &tables)
125 {
126     if (query_ != nullptr) {
127         return query_;
128     }
129     class SyncQuery final : public GenQuery {
130     public:
131         explicit SyncQuery(const std::vector<std::string> &tables) : tables_(tables) {}
132 
133         bool IsEqual(uint64_t tid) override
134         {
135             return false;
136         }
137 
138         std::vector<std::string> GetTables() override
139         {
140             return tables_;
141         }
142 
143     private:
144         std::vector<std::string> tables_;
145     };
146     auto it = tables_.find(store);
147     return std::make_shared<SyncQuery>(it == tables_.end() || it->second.empty() ? tables : it->second);
148 }
149 
Contains(const std::string & storeName)150 bool SyncManager::SyncInfo::Contains(const std::string &storeName)
151 {
152     return tables_.empty() || tables_.find(storeName) != tables_.end();
153 }
154 
GetLockChangeHandler()155 std::function<void(const Event &)> SyncManager::GetLockChangeHandler()
156 {
157     return [](const Event &event) {
158         auto &evt = static_cast<const CloudLockEvent &>(event);
159         auto storeInfo = evt.GetStoreInfo();
160         auto callback = evt.GetCallback();
161         if (callback == nullptr) {
162             ZLOGE("callback is nullptr. bundleName: %{public}s, storeName: %{public}s, user: %{public}d.",
163                 storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str(), storeInfo.user);
164             return;
165         }
166         StoreMetaData meta(storeInfo);
167         meta.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid;
168         if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
169             ZLOGE("not found meta. bundleName: %{public}s, storeName: %{public}s, user: %{public}d.",
170                 storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str(), storeInfo.user);
171             return;
172         }
173         auto store = GetStore(meta, storeInfo.user);
174         if (store == nullptr) {
175             ZLOGE("failed to get store. bundleName: %{public}s, storeName: %{public}s, user: %{public}d.",
176                 storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str(), storeInfo.user);
177             return;
178         }
179         if (evt.GetEventId() == CloudEvent::LOCK_CLOUD_CONTAINER) {
180             auto [result, expiredTime] = store->LockCloudDB();
181             callback(result, expiredTime);
182         } else {
183             auto result = store->UnLockCloudDB();
184             callback(result, 0);
185         }
186     };
187 }
188 
SyncManager()189 SyncManager::SyncManager()
190 {
191     EventCenter::GetInstance().Subscribe(CloudEvent::LOCK_CLOUD_CONTAINER, GetLockChangeHandler());
192     EventCenter::GetInstance().Subscribe(CloudEvent::UNLOCK_CLOUD_CONTAINER, GetLockChangeHandler());
193     EventCenter::GetInstance().Subscribe(CloudEvent::LOCAL_CHANGE, GetClientChangeHandler());
194     syncStrategy_ = std::make_shared<NetworkSyncStrategy>();
195     auto metaName = Bootstrap::GetInstance().GetProcessLabel();
196     kvApps_.insert(std::move(metaName));
197     auto stores = CheckerManager::GetInstance().GetStaticStores();
198     for (auto &store : stores) {
199         kvApps_.insert(std::move(store.bundleName));
200     }
201     stores = CheckerManager::GetInstance().GetDynamicStores();
202     for (auto &store : stores) {
203         kvApps_.insert(std::move(store.bundleName));
204     }
205 }
206 
~SyncManager()207 SyncManager::~SyncManager()
208 {
209     if (executor_ != nullptr) {
210         actives_.ForEachCopies([this](auto &syncId, auto &taskId) {
211             executor_->Remove(taskId);
212             return false;
213         });
214         executor_ = nullptr;
215     }
216 }
217 
Bind(std::shared_ptr<ExecutorPool> executor)218 int32_t SyncManager::Bind(std::shared_ptr<ExecutorPool> executor)
219 {
220     executor_ = executor;
221     return E_OK;
222 }
223 
DoCloudSync(SyncInfo syncInfo)224 int32_t SyncManager::DoCloudSync(SyncInfo syncInfo)
225 {
226     if (executor_ == nullptr) {
227         return E_NOT_INIT;
228     }
229     auto syncId = GenerateId(syncInfo.user_);
230     auto ref = GenSyncRef(syncId);
231     actives_.Compute(syncId, [this, &ref, &syncInfo](const uint64_t &key, TaskId &taskId) mutable {
232         taskId = executor_->Execute(GetSyncTask(0, true, ref, std::move(syncInfo)));
233         return true;
234     });
235     return E_OK;
236 }
237 
StopCloudSync(int32_t user)238 int32_t SyncManager::StopCloudSync(int32_t user)
239 {
240     if (executor_ == nullptr) {
241         return E_NOT_INIT;
242     }
243     actives_.ForEachCopies([this, user](auto &syncId, auto &taskId) {
244         if (Compare(syncId, user) == 0) {
245             executor_->Remove(taskId);
246         }
247         return false;
248     });
249     return E_OK;
250 }
251 
IsValid(SyncInfo & info,CloudInfo & cloud)252 GeneralError SyncManager::IsValid(SyncInfo &info, CloudInfo &cloud)
253 {
254     if (!MetaDataManager::GetInstance().LoadMeta(cloud.GetKey(), cloud, true) ||
255         (info.id_ != SyncInfo::DEFAULT_ID && cloud.id != info.id_)) {
256         info.SetError(E_CLOUD_DISABLED);
257         ZLOGE("cloudInfo invalid:%{public}d, <syncId:%{public}s, metaId:%{public}s>", cloud.IsValid(),
258             Anonymous::Change(info.id_).c_str(), Anonymous::Change(cloud.id).c_str());
259         return E_CLOUD_DISABLED;
260     }
261     if (!cloud.enableCloud || (!info.bundleName_.empty() && !cloud.IsOn(info.bundleName_))) {
262         info.SetError(E_CLOUD_DISABLED);
263         ZLOGD("enable:%{public}d, bundleName:%{public}s", cloud.enableCloud, info.bundleName_.c_str());
264         return E_CLOUD_DISABLED;
265     }
266     if (!DmAdapter::GetInstance().IsNetworkAvailable()) {
267         info.SetError(E_NETWORK_ERROR);
268         ZLOGD("network unavailable");
269         return E_NETWORK_ERROR;
270     }
271     if (!Account::GetInstance()->IsVerified(info.user_)) {
272         info.SetError(E_USER_UNLOCK);
273         ZLOGD("user unverified");
274         return E_ERROR;
275     }
276     return E_OK;
277 }
278 
GetPostEventTask(const std::vector<SchemaMeta> & schemas,CloudInfo & cloud,SyncInfo & info,bool retry,const TraceIds & traceIds)279 std::function<void()> SyncManager::GetPostEventTask(const std::vector<SchemaMeta> &schemas, CloudInfo &cloud,
280     SyncInfo &info, bool retry, const TraceIds &traceIds)
281 {
282     return [this, &cloud, &info, &schemas, retry, &traceIds]() {
283         bool isPostEvent = false;
284         for (auto &schema : schemas) {
285             auto it = traceIds.find(schema.bundleName);
286             if (!cloud.IsOn(schema.bundleName)) {
287                 UpdateFinishSyncInfo({ cloud.id, schema.bundleName, "" }, info.syncId_, E_ERROR);
288                 Report({ cloud.user, schema.bundleName, it == traceIds.end() ? "" : it->second, SyncStage::END,
289                          E_ERROR });
290                 continue;
291             }
292             for (const auto &database : schema.databases) {
293                 if (!info.Contains(database.name)) {
294                     UpdateFinishSyncInfo({ cloud.id, schema.bundleName, "" }, info.syncId_, E_ERROR);
295                     Report({ cloud.user, schema.bundleName, it == traceIds.end() ? "" : it->second, SyncStage::END,
296                              E_ERROR });
297                     continue;
298                 }
299                 StoreInfo storeInfo = { 0, schema.bundleName, database.name, cloud.apps[schema.bundleName].instanceId,
300                     info.user_, "", info.syncId_ };
301                 auto status = syncStrategy_->CheckSyncAction(storeInfo);
302                 if (status != SUCCESS) {
303                     ZLOGW("Verification strategy failed, status:%{public}d. %{public}d:%{public}s:%{public}s", status,
304                         storeInfo.user, storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str());
305                     UpdateFinishSyncInfo({ cloud.id, schema.bundleName, "" }, info.syncId_, status);
306                     Report({ cloud.user, schema.bundleName, it == traceIds.end() ? "" : it->second, SyncStage::END,
307                         status });
308                     info.SetError(status);
309                     continue;
310                 }
311                 auto query = info.GenerateQuery(database.name, database.GetTableNames());
312                 SyncParam syncParam = { info.mode_, info.wait_, info.isCompensation_, info.triggerMode_,
313                     it == traceIds.end() ? "" : it->second, cloud.user };
314                 auto evt = std::make_unique<SyncEvent>(std::move(storeInfo),
315                     SyncEvent::EventInfo{ syncParam, retry, std::move(query), info.async_ });
316                 EventCenter::GetInstance().PostEvent(std::move(evt));
317                 isPostEvent = true;
318             }
319         }
320         if (!isPostEvent) {
321             ZLOGE("schema is invalid, user: %{public}d", cloud.user);
322             info.SetError(E_ERROR);
323         }
324     };
325 }
326 
GetSyncTask(int32_t times,bool retry,RefCount ref,SyncInfo && syncInfo)327 ExecutorPool::Task SyncManager::GetSyncTask(int32_t times, bool retry, RefCount ref, SyncInfo &&syncInfo)
328 {
329     times++;
330     return [this, times, retry, keep = std::move(ref), info = std::move(syncInfo)]() mutable {
331         activeInfos_.Erase(info.syncId_);
332         bool createdByDefaultUser = InitDefaultUser(info.user_);
333         CloudInfo cloud;
334         cloud.user = info.user_;
335 
336         auto cloudSyncInfos = GetCloudSyncInfo(info, cloud);
337         if (cloudSyncInfos.empty()) {
338             ZLOGD("get cloud info failed, user: %{public}d.", cloud.user);
339             info.SetError(E_CLOUD_DISABLED);
340             return;
341         }
342         auto traceIds = GetPrepareTraceId(info, cloud);
343         BatchReport(info.user_, traceIds, SyncStage::PREPARE, E_OK);
344         UpdateStartSyncInfo(cloudSyncInfos);
345         auto code = IsValid(info, cloud);
346         if (code != E_OK) {
347             BatchUpdateFinishState(cloudSyncInfos, code);
348             BatchReport(info.user_, traceIds, SyncStage::END, code);
349             return;
350         }
351 
352         auto retryer = GetRetryer(times, info, cloud.user);
353         auto schemas = GetSchemaMeta(cloud, info.bundleName_);
354         if (schemas.empty()) {
355             UpdateSchema(info);
356             schemas = GetSchemaMeta(cloud, info.bundleName_);
357             if (schemas.empty()) {
358                 auto it = traceIds.find(info.bundleName_);
359                 retryer(RETRY_INTERVAL, E_RETRY_TIMEOUT, GenStore::CLOUD_ERR_OFFSET + E_CLOUD_DISABLED,
360                     it == traceIds.end() ? "" : it->second);
361                 BatchUpdateFinishState(cloudSyncInfos, E_CLOUD_DISABLED);
362                 BatchReport(info.user_, traceIds, SyncStage::END, E_CLOUD_DISABLED);
363                 return;
364             }
365         }
366         Defer defer(GetSyncHandler(std::move(retryer)), CloudEvent::CLOUD_SYNC);
367         if (createdByDefaultUser) {
368             info.user_ = 0;
369         }
370         auto task = GetPostEventTask(schemas, cloud, info, retry, traceIds);
371         task();
372     };
373 }
374 
GetSyncHandler(Retryer retryer)375 std::function<void(const Event &)> SyncManager::GetSyncHandler(Retryer retryer)
376 {
377     return [this, retryer](const Event &event) {
378         auto &evt = static_cast<const SyncEvent &>(event);
379         auto &storeInfo = evt.GetStoreInfo();
380         GenAsync async = evt.GetAsyncDetail();
381         auto prepareTraceId = evt.GetPrepareTraceId();
382         auto user = evt.GetUser();
383         GenDetails details;
384         auto &detail = details[SyncInfo::DEFAULT_ID];
385         detail.progress = GenProgress::SYNC_FINISH;
386         auto [result, meta] = GetMetaData(storeInfo);
387         if (!result) {
388             return DoExceptionalCallback(async, details, storeInfo, prepareTraceId);
389         }
390         auto store = GetStore(meta, storeInfo.user);
391         if (store == nullptr) {
392             ZLOGE("store null, storeId:%{public}s, prepareTraceId:%{public}s", meta.GetStoreAlias().c_str(),
393                 prepareTraceId.c_str());
394             return DoExceptionalCallback(async, details, storeInfo, prepareTraceId);
395         }
396         ZLOGI("database:<%{public}d:%{public}s:%{public}s:%{public}s> sync start", storeInfo.user,
397             storeInfo.bundleName.c_str(), meta.GetStoreAlias().c_str(), prepareTraceId.c_str());
398         RadarReporter::Report(
399             { storeInfo.bundleName.c_str(), CLOUD_SYNC, TRIGGER_SYNC, storeInfo.syncId, evt.GetTriggerMode() },
400             "GetSyncHandler", BizState::BEGIN);
401         Report({ user, storeInfo.bundleName, prepareTraceId, SyncStage::START, E_OK });
402         SyncParam syncParam = { evt.GetMode(), evt.GetWait(), evt.IsCompensation(), MODE_DEFAULT, prepareTraceId };
403         auto [status, dbCode] = store->Sync({ SyncInfo::DEFAULT_ID }, *(evt.GetQuery()),
404             evt.AutoRetry() ? RetryCallback(storeInfo, retryer, evt.GetTriggerMode(), prepareTraceId, user)
405                             : GetCallback(evt.GetAsyncDetail(), storeInfo, evt.GetTriggerMode(), prepareTraceId, user),
406             syncParam);
407         if (status != E_OK) {
408             if (async) {
409                 detail.code = status;
410                 async(std::move(details));
411             }
412             UpdateFinishSyncInfo({ GetAccountId(storeInfo.user), storeInfo.bundleName, "" }, storeInfo.syncId, E_ERROR);
413             if (status == GeneralError::E_NOT_SUPPORT) {
414                 return;
415             }
416             auto code = dbCode == 0 ? GenStore::CLOUD_ERR_OFFSET + status : dbCode;
417             RadarReporter::Report({ storeInfo.bundleName.c_str(), CLOUD_SYNC, FINISH_SYNC, storeInfo.syncId,
418                                   evt.GetTriggerMode(), false, code }, "GetSyncHandler", BizState::END);
419             Report({ user, storeInfo.bundleName, prepareTraceId, SyncStage::END, code });
420         }
421     };
422 }
423 
GetClientChangeHandler()424 std::function<void(const Event &)> SyncManager::GetClientChangeHandler()
425 {
426     return [this](const Event &event) {
427         auto &evt = static_cast<const SyncEvent &>(event);
428         auto store = evt.GetStoreInfo();
429         SyncInfo syncInfo(store.user, store.bundleName, store.storeName);
430         syncInfo.SetMode(evt.GetMode());
431         syncInfo.SetWait(evt.GetWait());
432         syncInfo.SetAsyncDetail(evt.GetAsyncDetail());
433         syncInfo.SetQuery(evt.GetQuery());
434         syncInfo.SetCompensation(evt.IsCompensation());
435         syncInfo.SetTriggerMode(evt.GetTriggerMode());
436         auto times = evt.AutoRetry() ? RETRY_TIMES - CLIENT_RETRY_TIMES : RETRY_TIMES;
437         executor_->Execute(GetSyncTask(times, evt.AutoRetry(), RefCount(), std::move(syncInfo)));
438     };
439 }
440 
GetRetryer(int32_t times,const SyncInfo & syncInfo,int32_t user)441 SyncManager::Retryer SyncManager::GetRetryer(int32_t times, const SyncInfo &syncInfo, int32_t user)
442 {
443     if (times >= RETRY_TIMES) {
444         return [this, user, info = SyncInfo(syncInfo)](Duration, int32_t code, int32_t dbCode,
445                    const std::string &prepareTraceId) mutable {
446             if (code == E_OK || code == E_SYNC_TASK_MERGED) {
447                 return true;
448             }
449             info.SetError(code);
450             RadarReporter::Report({ info.bundleName_.c_str(), CLOUD_SYNC, FINISH_SYNC, info.syncId_, info.triggerMode_,
451                                       false, dbCode },
452                 "GetRetryer", BizState::END);
453             Report({ user, info.bundleName_, prepareTraceId, SyncStage::END,
454                 dbCode == GenStore::DB_ERR_OFFSET ? 0 : dbCode });
455             return true;
456         };
457     }
458     return [this, times, user, info = SyncInfo(syncInfo)](Duration interval, int32_t code, int32_t dbCode,
459                const std::string &prepareTraceId) mutable {
460         if (code == E_OK || code == E_SYNC_TASK_MERGED) {
461             return true;
462         }
463         if (code == E_NO_SPACE_FOR_ASSET || code == E_RECODE_LIMIT_EXCEEDED) {
464             info.SetError(code);
465             RadarReporter::Report({ info.bundleName_.c_str(), CLOUD_SYNC, FINISH_SYNC, info.syncId_, info.triggerMode_,
466                                       false, dbCode },
467                 "GetRetryer", BizState::END);
468             Report({ user, info.bundleName_, prepareTraceId, SyncStage::END,
469                 dbCode == GenStore::DB_ERR_OFFSET ? 0 : dbCode });
470             return true;
471         }
472 
473         activeInfos_.ComputeIfAbsent(info.syncId_, [this, times, interval, &info](uint64_t key) mutable {
474             auto syncId = GenerateId(info.user_);
475             auto ref = GenSyncRef(syncId);
476             actives_.Compute(syncId, [this, times, interval, &ref, &info](const uint64_t &key, TaskId &value) mutable {
477                 value = executor_->Schedule(interval, GetSyncTask(times, true, ref, std::move(info)));
478                 return true;
479             });
480             return syncId;
481         });
482         return true;
483     };
484 }
485 
GenerateId(int32_t user)486 uint64_t SyncManager::GenerateId(int32_t user)
487 {
488     uint64_t syncId = static_cast<uint64_t>(user) & 0xFFFFFFFF;
489     return (syncId << MV_BIT) | (++genId_);
490 }
491 
GenSyncRef(uint64_t syncId)492 RefCount SyncManager::GenSyncRef(uint64_t syncId)
493 {
494     return RefCount([syncId, this]() {
495         actives_.Erase(syncId);
496     });
497 }
498 
Compare(uint64_t syncId,int32_t user)499 int32_t SyncManager::Compare(uint64_t syncId, int32_t user)
500 {
501     uint64_t inner = static_cast<uint64_t>(user) & 0xFFFFFFFF;
502     return (syncId & USER_MARK) == (inner << MV_BIT);
503 }
504 
UpdateSchema(const SyncManager::SyncInfo & syncInfo)505 void SyncManager::UpdateSchema(const SyncManager::SyncInfo &syncInfo)
506 {
507     StoreInfo storeInfo;
508     storeInfo.user = syncInfo.user_;
509     storeInfo.bundleName = syncInfo.bundleName_;
510     EventCenter::GetInstance().PostEvent(std::make_unique<CloudEvent>(CloudEvent::GET_SCHEMA, storeInfo));
511 }
512 
GetBindInfos(const StoreMetaData & meta,const std::vector<int32_t> & users,const Database & schemaDatabase)513 std::map<uint32_t, GeneralStore::BindInfo> SyncManager::GetBindInfos(const StoreMetaData &meta,
514     const std::vector<int32_t> &users, const Database &schemaDatabase)
515 {
516     auto instance = CloudServer::GetInstance();
517     if (instance == nullptr) {
518         ZLOGD("not support cloud sync");
519         return {};
520     }
521     std::map<uint32_t, GeneralStore::BindInfo> bindInfos;
522     for (auto &activeUser : users) {
523         if (activeUser == 0) {
524             continue;
525         }
526         auto cloudDB = instance->ConnectCloudDB(meta.bundleName, activeUser, schemaDatabase);
527         if (cloudDB == nullptr) {
528             ZLOGE("failed, no cloud DB <%{public}d:0x%{public}x %{public}s<->%{public}s>", meta.tokenId, activeUser,
529                 Anonymous::Change(schemaDatabase.name).c_str(), Anonymous::Change(schemaDatabase.alias).c_str());
530             return {};
531         }
532         if (meta.storeType >= StoreMetaData::StoreType::STORE_KV_BEGIN &&
533             meta.storeType <= StoreMetaData::StoreType::STORE_KV_END) {
534             bindInfos.insert_or_assign(activeUser, GeneralStore::BindInfo{ std::move(cloudDB), nullptr });
535             continue;
536         }
537         auto assetLoader = instance->ConnectAssetLoader(meta.bundleName, activeUser, schemaDatabase);
538         if (assetLoader == nullptr) {
539             ZLOGE("failed, no cloud DB <%{public}d:0x%{public}x %{public}s<->%{public}s>", meta.tokenId, activeUser,
540                 Anonymous::Change(schemaDatabase.name).c_str(), Anonymous::Change(schemaDatabase.alias).c_str());
541             return {};
542         }
543         bindInfos.insert_or_assign(activeUser, GeneralStore::BindInfo{ std::move(cloudDB), std::move(assetLoader) });
544     }
545     return bindInfos;
546 }
547 
GetStore(const StoreMetaData & meta,int32_t user,bool mustBind)548 AutoCache::Store SyncManager::GetStore(const StoreMetaData &meta, int32_t user, bool mustBind)
549 {
550     if (user != 0 && !Account::GetInstance()->IsVerified(user)) {
551         ZLOGW("user:%{public}d is locked!", user);
552         return nullptr;
553     }
554     auto instance = CloudServer::GetInstance();
555     if (instance == nullptr) {
556         ZLOGD("not support cloud sync");
557         return nullptr;
558     }
559     auto store = AutoCache::GetInstance().GetStore(meta, {});
560     if (store == nullptr) {
561         ZLOGE("store null, storeId:%{public}s", meta.GetStoreAlias().c_str());
562         return nullptr;
563     }
564     if (!store->IsBound()) {
565         std::vector<int32_t> users{};
566         CloudInfo info;
567         if (user == 0) {
568             AccountDelegate::GetInstance()->QueryForegroundUsers(users);
569         } else {
570             users.push_back(user);
571         }
572         if (!users.empty()) {
573             info.user = users[0];
574         }
575         SchemaMeta schemaMeta;
576         std::string schemaKey = info.GetSchemaKey(meta.bundleName, meta.instanceId);
577         if (!MetaDataManager::GetInstance().LoadMeta(schemaKey, schemaMeta, true)) {
578             ZLOGE("failed, no schema bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
579                 meta.GetStoreAlias().c_str());
580             return nullptr;
581         }
582         auto dbMeta = schemaMeta.GetDataBase(meta.storeId);
583         std::map<uint32_t, GeneralStore::BindInfo> bindInfos = GetBindInfos(meta, users, dbMeta);
584         if (mustBind && bindInfos.size() != users.size()) {
585             return nullptr;
586         }
587         GeneralStore::CloudConfig config;
588         if (MetaDataManager::GetInstance().LoadMeta(info.GetKey(), info, true)) {
589             config.maxNumber = info.maxNumber;
590             config.maxSize = info.maxSize;
591         }
592         store->Bind(dbMeta, bindInfos, config);
593     }
594     return store;
595 }
596 
Report(const ReportParam & reportParam)597 void SyncManager::Report(const ReportParam &reportParam)
598 {
599     auto cloudReport = CloudReport::GetInstance();
600     if (cloudReport == nullptr) {
601         return;
602     }
603     cloudReport->Report(reportParam);
604 }
605 
GetPrepareTraceId(const SyncInfo & info,const CloudInfo & cloud)606 SyncManager::TraceIds SyncManager::GetPrepareTraceId(const SyncInfo &info, const CloudInfo &cloud)
607 {
608     TraceIds traceIds;
609     if (!info.prepareTraceId_.empty()) {
610         traceIds.emplace(info.bundleName_, info.prepareTraceId_);
611         return traceIds;
612     }
613     auto cloudReport = CloudReport::GetInstance();
614     if (cloudReport == nullptr) {
615         return traceIds;
616     }
617     if (info.bundleName_.empty()) {
618         for (const auto &it : cloud.apps) {
619             traceIds.emplace(it.first, cloudReport->GetPrepareTraceId(info.user_));
620         }
621     } else {
622         traceIds.emplace(info.bundleName_, cloudReport->GetPrepareTraceId(info.user_));
623     }
624     return traceIds;
625 }
626 
NeedGetCloudInfo(CloudInfo & cloud)627 bool SyncManager::NeedGetCloudInfo(CloudInfo &cloud)
628 {
629     return (!MetaDataManager::GetInstance().LoadMeta(cloud.GetKey(), cloud, true) || !cloud.enableCloud) &&
630            DmAdapter::GetInstance().IsNetworkAvailable() && Account::GetInstance()->IsLoginAccount();
631 }
632 
GetCloudSyncInfo(const SyncInfo & info,CloudInfo & cloud)633 std::vector<std::tuple<QueryKey, uint64_t>> SyncManager::GetCloudSyncInfo(const SyncInfo &info, CloudInfo &cloud)
634 {
635     std::vector<std::tuple<QueryKey, uint64_t>> cloudSyncInfos;
636     if (NeedGetCloudInfo(cloud)) {
637         ZLOGI("get cloud info from server, user: %{public}d.", cloud.user);
638         auto instance = CloudServer::GetInstance();
639         if (instance == nullptr) {
640             return cloudSyncInfos;
641         }
642         cloud = instance->GetServerInfo(cloud.user, false);
643         if (!cloud.IsValid()) {
644             ZLOGE("cloud is empty, user: %{public}d", cloud.user);
645             return cloudSyncInfos;
646         }
647         if (!MetaDataManager::GetInstance().SaveMeta(cloud.GetKey(), cloud, true)) {
648             ZLOGW("save cloud info fail, user: %{public}d", cloud.user);
649         }
650     }
651     if (info.bundleName_.empty()) {
652         for (const auto &it : cloud.apps) {
653             QueryKey queryKey{ .accountId = cloud.id, .bundleName = it.first, .storeId = "" };
654             cloudSyncInfos.emplace_back(std::make_tuple(queryKey, info.syncId_));
655         }
656     } else {
657         QueryKey queryKey{ .accountId = cloud.id, .bundleName = info.bundleName_, .storeId = "" };
658         cloudSyncInfos.emplace_back(std::make_tuple(queryKey, info.syncId_));
659     }
660     return cloudSyncInfos;
661 }
662 
GetLastResults(const std::string & storeId,std::map<SyncId,CloudSyncInfo> & infos,QueryLastResults & results)663 void SyncManager::GetLastResults(
664     const std::string &storeId, std::map<SyncId, CloudSyncInfo> &infos, QueryLastResults &results)
665 {
666     for (auto &[key, info] : infos) {
667         if (info.code != -1) {
668             results.insert(std::pair<std::string, CloudSyncInfo>(storeId, info));
669         }
670     }
671 }
672 
NeedSaveSyncInfo(const QueryKey & queryKey)673 bool SyncManager::NeedSaveSyncInfo(const QueryKey &queryKey)
674 {
675     if (queryKey.accountId.empty()) {
676         return false;
677     }
678     return kvApps_.find(queryKey.bundleName) == kvApps_.end();
679 }
680 
QueryLastSyncInfo(const std::vector<QueryKey> & queryKeys,QueryLastResults & results)681 int32_t SyncManager::QueryLastSyncInfo(const std::vector<QueryKey> &queryKeys, QueryLastResults &results)
682 {
683     for (const auto &queryKey : queryKeys) {
684         std::string storeId = queryKey.storeId;
685         QueryKey key{ .accountId = queryKey.accountId, .bundleName = queryKey.bundleName, .storeId = "" };
686         lastSyncInfos_.ComputeIfPresent(
687             key, [&storeId, &results](auto &key, std::map<SyncId, CloudSyncInfo> &vals) {
688                 GetLastResults(storeId, vals, results);
689                 return !vals.empty();
690             });
691     }
692     return SUCCESS;
693 }
694 
UpdateStartSyncInfo(const std::vector<std::tuple<QueryKey,uint64_t>> & cloudSyncInfos)695 void SyncManager::UpdateStartSyncInfo(const std::vector<std::tuple<QueryKey, uint64_t>> &cloudSyncInfos)
696 {
697     int64_t startTime = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
698     for (const auto &[queryKey, syncId] : cloudSyncInfos) {
699         if (!NeedSaveSyncInfo(queryKey)) {
700             continue;
701         }
702         lastSyncInfos_.Compute(queryKey, [id = syncId, startTime](auto &, std::map<SyncId, CloudSyncInfo> &val) {
703             val[id] = { .startTime = startTime };
704             return !val.empty();
705         });
706     }
707 }
708 
UpdateFinishSyncInfo(const QueryKey & queryKey,uint64_t syncId,int32_t code)709 void SyncManager::UpdateFinishSyncInfo(const QueryKey &queryKey, uint64_t syncId, int32_t code)
710 {
711     if (!NeedSaveSyncInfo(queryKey)) {
712         return;
713     }
714     lastSyncInfos_.ComputeIfPresent(queryKey, [syncId, code](auto &key, std::map<SyncId, CloudSyncInfo> &val) {
715         auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
716         for (auto iter = val.begin(); iter != val.end();) {
717             bool isExpired = ((now - iter->second.startTime) >= EXPIRATION_TIME) && iter->second.code == -1;
718             if ((iter->first != syncId && ((iter->second.code != -1) || isExpired))) {
719                 iter = val.erase(iter);
720             } else if (iter->first == syncId) {
721                 iter->second.finishTime = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
722                 iter->second.code = code;
723                 iter++;
724             } else {
725                 iter++;
726             }
727         }
728         return true;
729     });
730 }
731 
GetCallback(const GenAsync & async,const StoreInfo & storeInfo,int32_t triggerMode,const std::string & prepareTraceId,int32_t user)732 std::function<void(const GenDetails &result)> SyncManager::GetCallback(const GenAsync &async,
733     const StoreInfo &storeInfo, int32_t triggerMode, const std::string &prepareTraceId, int32_t user)
734 {
735     return [this, async, storeInfo, triggerMode, prepareTraceId, user](const GenDetails &result) {
736         if (async != nullptr) {
737             async(result);
738         }
739 
740         if (result.empty()) {
741             ZLOGE("result is empty");
742             return;
743         }
744 
745         if (result.begin()->second.progress != GenProgress::SYNC_FINISH) {
746             return;
747         }
748 
749         int32_t dbCode = (result.begin()->second.dbCode == GenStore::DB_ERR_OFFSET) ? 0 : result.begin()->second.dbCode;
750         RadarReporter::Report({ storeInfo.bundleName.c_str(), CLOUD_SYNC, FINISH_SYNC, storeInfo.syncId, triggerMode,
751                                   result.begin()->second.changeCount, dbCode },
752             "GetCallback", BizState::END);
753         Report({ user, storeInfo.bundleName, prepareTraceId, SyncStage::END, dbCode });
754 
755         auto id = GetAccountId(storeInfo.user);
756         if (id.empty()) {
757             ZLOGD("account id is empty");
758             return;
759         }
760         QueryKey queryKey{
761             .accountId = id,
762             .bundleName = storeInfo.bundleName,
763             .storeId = ""
764         };
765 
766         int32_t code = result.begin()->second.code;
767         UpdateFinishSyncInfo(queryKey, storeInfo.syncId, code);
768     };
769 }
770 
GetAccountId(int32_t user)771 std::string SyncManager::GetAccountId(int32_t user)
772 {
773     CloudInfo cloudInfo;
774     cloudInfo.user = user;
775     if (!MetaDataManager::GetInstance().LoadMeta(cloudInfo.GetKey(), cloudInfo, true)) {
776         ZLOGE("not exist meta, user:%{public}d.", cloudInfo.user);
777         return "";
778     }
779     return cloudInfo.id;
780 }
781 
GetInterval(int32_t code)782 ExecutorPool::Duration SyncManager::GetInterval(int32_t code)
783 {
784     switch (code) {
785         case E_LOCKED_BY_OTHERS:
786             return LOCKED_INTERVAL;
787         case E_BUSY:
788             return BUSY_INTERVAL;
789         default:
790             return RETRY_INTERVAL;
791     }
792 }
793 
GetSchemaMeta(const CloudInfo & cloud,const std::string & bundleName)794 std::vector<SchemaMeta> SyncManager::GetSchemaMeta(const CloudInfo &cloud, const std::string &bundleName)
795 {
796     std::vector<SchemaMeta> schemas;
797     auto key = cloud.GetSchemaPrefix(bundleName);
798     MetaDataManager::GetInstance().LoadMeta(key, schemas, true);
799     return schemas;
800 }
801 
DoExceptionalCallback(const GenAsync & async,GenDetails & details,const StoreInfo & storeInfo,const std::string & prepareTraceId)802 void SyncManager::DoExceptionalCallback(const GenAsync &async, GenDetails &details, const StoreInfo &storeInfo,
803     const std::string &prepareTraceId)
804 {
805     if (async) {
806         details[SyncInfo::DEFAULT_ID].code = E_ERROR;
807         async(details);
808     }
809     QueryKey queryKey{ GetAccountId(storeInfo.user), storeInfo.bundleName, "" };
810     UpdateFinishSyncInfo(queryKey, storeInfo.syncId, E_ERROR);
811     Report({ storeInfo.user, storeInfo.bundleName, prepareTraceId, SyncStage::END, E_ERROR });
812 }
813 
InitDefaultUser(int32_t & user)814 bool SyncManager::InitDefaultUser(int32_t &user)
815 {
816     if (user != 0) {
817         return false;
818     }
819     std::vector<int32_t> users;
820     AccountDelegate::GetInstance()->QueryUsers(users);
821     if (!users.empty()) {
822         user = users[0];
823     }
824     return true;
825 }
826 
RetryCallback(const StoreInfo & storeInfo,Retryer retryer,int32_t triggerMode,const std::string & prepareTraceId,int32_t user)827 std::function<void(const DistributedData::GenDetails &result)> SyncManager::RetryCallback(const StoreInfo &storeInfo,
828     Retryer retryer, int32_t triggerMode, const std::string &prepareTraceId, int32_t user)
829 {
830     return [this, retryer, storeInfo, triggerMode, prepareTraceId, user](const GenDetails &details) {
831         if (details.empty()) {
832             ZLOGE("retry, details empty");
833             return;
834         }
835         int32_t code = details.begin()->second.code;
836         int32_t dbCode = details.begin()->second.dbCode;
837         if (details.begin()->second.progress == GenProgress::SYNC_FINISH) {
838             QueryKey queryKey{ GetAccountId(storeInfo.user), storeInfo.bundleName, "" };
839             UpdateFinishSyncInfo(queryKey, storeInfo.syncId, code);
840             if (code == E_OK) {
841                 RadarReporter::Report({ storeInfo.bundleName.c_str(), CLOUD_SYNC, FINISH_SYNC, storeInfo.syncId,
842                                           triggerMode, details.begin()->second.changeCount },
843                     "RetryCallback", BizState::END);
844                 Report({ user, storeInfo.bundleName, prepareTraceId, SyncStage::END,
845                     dbCode == GenStore::DB_ERR_OFFSET ? 0 : dbCode });
846             }
847         }
848         retryer(GetInterval(code), code, dbCode, prepareTraceId);
849     };
850 }
851 
BatchUpdateFinishState(const std::vector<std::tuple<QueryKey,uint64_t>> & cloudSyncInfos,int32_t code)852 void SyncManager::BatchUpdateFinishState(const std::vector<std::tuple<QueryKey, uint64_t>> &cloudSyncInfos,
853     int32_t code)
854 {
855     for (const auto &[queryKey, syncId] : cloudSyncInfos) {
856         UpdateFinishSyncInfo(queryKey, syncId, code);
857     }
858 }
859 
BatchReport(int32_t userId,const TraceIds & traceIds,SyncStage syncStage,int32_t errCode)860 void SyncManager::BatchReport(int32_t userId, const TraceIds &traceIds, SyncStage syncStage, int32_t errCode)
861 {
862     for (const auto &[bundle, id] : traceIds) {
863         Report({ userId, bundle, id, syncStage, errCode });
864     }
865 }
866 
GetMetaData(const StoreInfo & storeInfo)867 std::pair<bool, StoreMetaData> SyncManager::GetMetaData(const StoreInfo &storeInfo)
868 {
869     StoreMetaData meta(storeInfo);
870     meta.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid;
871     if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
872         meta.user = "0"; // check if it is a public store.
873         StoreMetaDataLocal localMetaData;
874         if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKeyLocal(), localMetaData, true) ||
875             !localMetaData.isPublic || !MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
876             ZLOGE("failed, no store meta. bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
877                   meta.GetStoreAlias().c_str());
878             return { false, meta };
879         }
880     }
881     return { true, meta };
882 }
883 } // namespace OHOS::CloudData