1 /*
2  * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "RdbGeneralStore"
16 
17 #include "rdb_general_store.h"
18 
19 #include <chrono>
20 #include <cinttypes>
21 
22 #include "cache_cursor.h"
23 #include "changeevent/remote_change_event.h"
24 #include "cloud/asset_loader.h"
25 #include "cloud/cloud_db.h"
26 #include "cloud/cloud_lock_event.h"
27 #include "cloud/cloud_store_types.h"
28 #include "cloud/schema_meta.h"
29 #include "cloud_service.h"
30 #include "commonevent/data_sync_event.h"
31 #include "communicator/device_manager_adapter.h"
32 #include "crypto_manager.h"
33 #include "device_manager_adapter.h"
34 #include "eventcenter/event_center.h"
35 #include "log_print.h"
36 #include "metadata/meta_data_manager.h"
37 #include "metadata/secret_key_meta_data.h"
38 #include "rdb_cursor.h"
39 #include "rdb_helper.h"
40 #include "rdb_query.h"
41 #include "rdb_result_set_impl.h"
42 #include "relational_store_manager.h"
43 #include "snapshot/bind_event.h"
44 #include "utils/anonymous.h"
45 #include "value_proxy.h"
46 namespace OHOS::DistributedRdb {
47 using namespace DistributedData;
48 using namespace DistributedDB;
49 using namespace NativeRdb;
50 using namespace CloudData;
51 using namespace std::chrono;
52 using DBField = DistributedDB::Field;
53 using DBTable = DistributedDB::TableSchema;
54 using DBSchema = DistributedDB::DataBaseSchema;
55 using ClearMode = DistributedDB::ClearMode;
56 using DBStatus = DistributedDB::DBStatus;
57 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
58 
59 constexpr const char *INSERT = "INSERT INTO ";
60 constexpr const char *REPLACE = "REPLACE INTO ";
61 constexpr const char *VALUES = " VALUES ";
62 constexpr const char *LOGOUT_DELETE_FLAG = "DELETE#ALL_CLOUDDATA";
63 constexpr const LockAction LOCK_ACTION = static_cast<LockAction>(static_cast<uint32_t>(LockAction::INSERT) |
64     static_cast<uint32_t>(LockAction::UPDATE) | static_cast<uint32_t>(LockAction::DELETE) |
65     static_cast<uint32_t>(LockAction::DOWNLOAD));
66 constexpr uint32_t CLOUD_SYNC_FLAG = 1;
67 constexpr uint32_t SEARCHABLE_FLAG = 2;
68 constexpr uint32_t LOCK_TIMEOUT = 3600; // second
69 
GetDBSchema(const Database & database)70 static DBSchema GetDBSchema(const Database &database)
71 {
72     DBSchema schema;
73     schema.tables.resize(database.tables.size());
74     for (size_t i = 0; i < database.tables.size(); i++) {
75         const Table &table = database.tables[i];
76         DBTable &dbTable = schema.tables[i];
77         dbTable.name = table.name;
78         dbTable.sharedTableName = table.sharedTableName;
79         for (auto &field : table.fields) {
80             DBField dbField;
81             dbField.colName = field.colName;
82             dbField.type = field.type;
83             dbField.primary = field.primary;
84             dbField.nullable = field.nullable;
85             dbTable.fields.push_back(std::move(dbField));
86         }
87     }
88     return schema;
89 }
90 
InitStoreInfo(const StoreMetaData & meta)91 void RdbGeneralStore::InitStoreInfo(const StoreMetaData &meta)
92 {
93     storeInfo_.tokenId = meta.tokenId;
94     storeInfo_.bundleName = meta.bundleName;
95     storeInfo_.storeName = meta.storeId;
96     storeInfo_.instanceId = meta.instanceId;
97     storeInfo_.user = std::stoi(meta.user);
98     storeInfo_.deviceId = DeviceManagerAdapter::GetInstance().GetLocalDevice().uuid;
99 }
100 
RdbGeneralStore(const StoreMetaData & meta)101 RdbGeneralStore::RdbGeneralStore(const StoreMetaData &meta)
102     : manager_(meta.appId, meta.user, meta.instanceId), tasks_(std::make_shared<ConcurrentMap<SyncId, FinishTask>>())
103 {
104     observer_.storeId_ = meta.storeId;
105     observer_.meta_ = meta;
106     RelationalStoreDelegate::Option option;
107     option.syncDualTupleMode = true;
108     option.observer = &observer_;
109     if (meta.isEncrypt) {
110         std::string key = meta.GetSecretKey();
111         SecretKeyMetaData secretKeyMeta;
112         MetaDataManager::GetInstance().LoadMeta(key, secretKeyMeta, true);
113         std::vector<uint8_t> decryptKey;
114         CryptoManager::GetInstance().Decrypt(secretKeyMeta.sKey, decryptKey);
115         option.passwd.SetValue(decryptKey.data(), decryptKey.size());
116         std::fill(decryptKey.begin(), decryptKey.end(), 0);
117         option.isEncryptedDb = meta.isEncrypt;
118         option.cipher = CipherType::AES_256_GCM;
119         for (uint32_t i = 0; i < ITERS_COUNT; ++i) {
120             option.iterateTimes = ITERS[i];
121             auto ret = manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_);
122             if (ret == DBStatus::OK && delegate_ != nullptr) {
123                 break;
124             }
125             manager_.CloseStore(delegate_);
126             delegate_ = nullptr;
127         }
128     } else {
129         auto ret = manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_);
130         if (ret != DBStatus::OK || delegate_ == nullptr) {
131             manager_.CloseStore(delegate_);
132             delegate_ = nullptr;
133         }
134     }
135 
136     InitStoreInfo(meta);
137 
138     if (meta.isSearchable) {
139         syncNotifyFlag_ |= SEARCHABLE_FLAG;
140     }
141 
142     if (delegate_ != nullptr && meta.isManualClean) {
143         PragmaData data =
144             static_cast<PragmaData>(const_cast<void *>(static_cast<const void *>(&meta.isManualClean)));
145         delegate_->Pragma(PragmaCmd::LOGIC_DELETE_SYNC_DATA, data);
146     }
147     ZLOGI("Get rdb store, tokenId:%{public}u, bundleName:%{public}s, storeName:%{public}s, user:%{public}s,"
148           "isEncrypt:%{public}d, isManualClean:%{public}d, isSearchable:%{public}d",
149           meta.tokenId, meta.bundleName.c_str(), Anonymous::Change(meta.storeId).c_str(), meta.user.c_str(),
150           meta.isEncrypt, meta.isManualClean, meta.isSearchable);
151 }
152 
~RdbGeneralStore()153 RdbGeneralStore::~RdbGeneralStore()
154 {
155     manager_.CloseStore(delegate_);
156     delegate_ = nullptr;
157     bindInfo_.loader_ = nullptr;
158     if (bindInfo_.db_ != nullptr) {
159         bindInfo_.db_->Close();
160     }
161     bindInfo_.db_ = nullptr;
162     rdbCloud_ = nullptr;
163     rdbLoader_ = nullptr;
164     RemoveTasks();
165     tasks_ = nullptr;
166     executor_ = nullptr;
167 }
168 
BindSnapshots(std::shared_ptr<std::map<std::string,std::shared_ptr<Snapshot>>> bindAssets)169 int32_t RdbGeneralStore::BindSnapshots(std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>> bindAssets)
170 {
171     if (snapshots_.bindAssets == nullptr) {
172         snapshots_.bindAssets = bindAssets;
173     }
174     return GenErr::E_OK;
175 }
176 
Bind(Database & database,const std::map<uint32_t,BindInfo> & bindInfos,const CloudConfig & config)177 int32_t RdbGeneralStore::Bind(Database &database, const std::map<uint32_t, BindInfo> &bindInfos,
178     const CloudConfig &config)
179 {
180     if (bindInfos.empty()) {
181         return GeneralError::E_OK;
182     }
183     auto bindInfo = bindInfos.begin()->second;
184     if (bindInfo.db_ == nullptr || bindInfo.loader_ == nullptr) {
185         return GeneralError::E_INVALID_ARGS;
186     }
187 
188     if (isBound_.exchange(true)) {
189         return GeneralError::E_OK;
190     }
191 
192     BindEvent::BindEventInfo eventInfo;
193     eventInfo.tokenId = storeInfo_.tokenId;
194     eventInfo.bundleName = storeInfo_.bundleName;
195     eventInfo.storeName = storeInfo_.storeName;
196     eventInfo.user = storeInfo_.user;
197     eventInfo.instanceId = storeInfo_.instanceId;
198 
199     auto evt = std::make_unique<BindEvent>(BindEvent::BIND_SNAPSHOT, std::move(eventInfo));
200     EventCenter::GetInstance().PostEvent(std::move(evt));
201     bindInfo_ = std::move(bindInfo);
202     {
203         std::unique_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
204         rdbCloud_ = std::make_shared<RdbCloud>(bindInfo_.db_, &snapshots_);
205         rdbLoader_ = std::make_shared<RdbAssetLoader>(bindInfo_.loader_, &snapshots_);
206     }
207 
208     DistributedDB::CloudSyncConfig dbConfig;
209     dbConfig.maxUploadCount = config.maxNumber;
210     dbConfig.maxUploadSize = config.maxSize;
211     dbConfig.maxRetryConflictTimes = config.maxRetryConflictTimes;
212     DBSchema schema = GetDBSchema(database);
213     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
214     if (delegate_ == nullptr) {
215         ZLOGE("database:%{public}s already closed!", Anonymous::Change(database.name).c_str());
216         return GeneralError::E_ALREADY_CLOSED;
217     }
218     delegate_->SetCloudDB(rdbCloud_);
219     delegate_->SetIAssetLoader(rdbLoader_);
220     delegate_->SetCloudDbSchema(std::move(schema));
221     delegate_->SetCloudSyncConfig(dbConfig);
222 
223     syncNotifyFlag_ |= CLOUD_SYNC_FLAG;
224     return GeneralError::E_OK;
225 }
226 
IsBound()227 bool RdbGeneralStore::IsBound()
228 {
229     return isBound_;
230 }
231 
Close(bool isForce)232 int32_t RdbGeneralStore::Close(bool isForce)
233 {
234     {
235         std::unique_lock<decltype(rwMutex_)> lock(rwMutex_, std::chrono::seconds(isForce ? LOCK_TIMEOUT : 0));
236         if (!lock) {
237             return GeneralError::E_BUSY;
238         }
239 
240         if (delegate_ == nullptr) {
241             return GeneralError::E_OK;
242         }
243         if (!isForce && delegate_->GetCloudSyncTaskCount() > 0) {
244             return GeneralError::E_BUSY;
245         }
246         if (isForce && bindInfo_.loader_ != nullptr) {
247             bindInfo_.loader_->Cancel();
248         }
249         auto status = manager_.CloseStore(delegate_);
250         if (status != DBStatus::OK) {
251             return status;
252         }
253         delegate_ = nullptr;
254     }
255     RemoveTasks();
256     bindInfo_.loader_ = nullptr;
257     if (bindInfo_.db_ != nullptr) {
258         bindInfo_.db_->Close();
259     }
260     bindInfo_.db_ = nullptr;
261     {
262         std::unique_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
263         rdbCloud_ = nullptr;
264         rdbLoader_ = nullptr;
265     }
266     return GeneralError::E_OK;
267 }
268 
Execute(const std::string & table,const std::string & sql)269 int32_t RdbGeneralStore::Execute(const std::string &table, const std::string &sql)
270 {
271     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
272     if (delegate_ == nullptr) {
273         ZLOGE("Database already closed! database:%{public}s, table:%{public}s, sql:%{public}s",
274             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str(),
275             Anonymous::Change(sql).c_str());
276         return GeneralError::E_ERROR;
277     }
278     std::vector<DistributedDB::VBucket> changedData;
279     auto status = delegate_->ExecuteSql({ sql, {}, false }, changedData);
280     if (status != DBStatus::OK) {
281         ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status, Anonymous::Change(sql).c_str(),
282               changedData.size());
283         return GeneralError::E_ERROR;
284     }
285     return GeneralError::E_OK;
286 }
287 
SqlConcatenate(VBucket & value,std::string & strColumnSql,std::string & strRowValueSql)288 size_t RdbGeneralStore::SqlConcatenate(VBucket &value, std::string &strColumnSql, std::string &strRowValueSql)
289 {
290     strColumnSql += " (";
291     strRowValueSql += " (";
292     auto columnSize = value.size();
293     for (auto column = value.begin(); column != value.end(); ++column) {
294         strRowValueSql += " ?,";
295         strColumnSql += " " + column->first + ",";
296     }
297     if (columnSize != 0) {
298         strColumnSql.pop_back();
299         strColumnSql += ")";
300         strRowValueSql.pop_back();
301         strRowValueSql += ")";
302     }
303     return columnSize;
304 }
305 
Insert(const std::string & table,VBuckets && values)306 int32_t RdbGeneralStore::Insert(const std::string &table, VBuckets &&values)
307 {
308     if (table.empty() || values.size() == 0) {
309         ZLOGE("Insert:table maybe empty:%{public}d,value size is:%{public}zu", table.empty(), values.size());
310         return GeneralError::E_INVALID_ARGS;
311     }
312 
313     std::string strColumnSql;
314     std::string strRowValueSql;
315     auto value = values.front();
316     size_t columnSize = SqlConcatenate(value, strColumnSql, strRowValueSql);
317     if (columnSize == 0) {
318         ZLOGE("Insert: columnSize error, can't be 0!");
319         return GeneralError::E_INVALID_ARGS;
320     }
321 
322     Values args;
323     std::string strValueSql;
324     for (auto &rowData : values) {
325         if (rowData.size() != columnSize) {
326             ZLOGE("Insert: VBucket size error, Each VBucket in values must be of the same length!");
327             return GeneralError::E_INVALID_ARGS;
328         }
329         for (auto column = rowData.begin(); column != rowData.end(); ++column) {
330             args.push_back(std::move(column->second));
331         }
332         strValueSql += strRowValueSql + ",";
333     }
334     strValueSql.pop_back();
335     std::string sql = INSERT + table + strColumnSql + VALUES + strValueSql;
336 
337     std::vector<DistributedDB::VBucket> changedData;
338     std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
339     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
340     if (delegate_ == nullptr) {
341         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
342             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
343         return GeneralError::E_ERROR;
344     }
345     auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs), false }, changedData);
346     if (status != DBStatus::OK) {
347         if (IsPrintLog(status)) {
348             auto time =
349                 static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
350                 ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu times %{public}" PRIu64 ".",
351                 status, Anonymous::Change(sql).c_str(), changedData.size(), time);
352         }
353         return GeneralError::E_ERROR;
354     }
355     return GeneralError::E_OK;
356 }
357 
IsPrintLog(DBStatus status)358 bool RdbGeneralStore::IsPrintLog(DBStatus status)
359 {
360     bool isPrint = false;
361     if (status == lastError_) {
362         if (++lastErrCnt_ % PRINT_ERROR_CNT == 0) {
363             isPrint = true;
364         }
365     } else {
366         isPrint = true;
367         lastErrCnt_ = 0;
368         lastError_ = status;
369     }
370     return isPrint;
371 }
372 
373 /**
374  * This function does not support batch updates in rdb, it only supports single data updates.
375  */
Update(const std::string & table,const std::string & setSql,Values && values,const std::string & whereSql,Values && conditions)376 int32_t RdbGeneralStore::Update(const std::string &table, const std::string &setSql, Values &&values,
377     const std::string &whereSql, Values &&conditions)
378 {
379     if (table.empty()) {
380         ZLOGE("Update: table can't be empty!");
381         return GeneralError::E_INVALID_ARGS;
382     }
383     if (setSql.empty() || values.size() == 0) {
384         ZLOGE("Update: setSql and values can't be empty!");
385         return GeneralError::E_INVALID_ARGS;
386     }
387     if (whereSql.empty() || conditions.size() == 0) {
388         ZLOGE("Update: whereSql and conditions can't be empty!");
389         return GeneralError::E_INVALID_ARGS;
390     }
391 
392     std::string sqlIn = " UPDATE " + table + " SET " + setSql + " WHERE " + whereSql;
393     Values args;
394     for (auto &value : values) {
395         args.push_back(std::move(value));
396     }
397     for (auto &condition : conditions) {
398         args.push_back(std::move(condition));
399     }
400 
401     std::vector<DistributedDB::VBucket> changedData;
402     std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
403     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
404     if (delegate_ == nullptr) {
405         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
406             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
407         return GeneralError::E_ERROR;
408     }
409     auto status = delegate_->ExecuteSql({ sqlIn, std::move(bindArgs), false }, changedData);
410     if (status != DBStatus::OK) {
411         ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status, Anonymous::Change(sqlIn).c_str(),
412               changedData.size());
413         return GeneralError::E_ERROR;
414     }
415     return GeneralError::E_OK;
416 }
417 
Replace(const std::string & table,VBucket && value)418 int32_t RdbGeneralStore::Replace(const std::string &table, VBucket &&value)
419 {
420     if (table.empty() || value.size() == 0) {
421         return GeneralError::E_INVALID_ARGS;
422     }
423     std::string columnSql;
424     std::string valueSql;
425     SqlConcatenate(value, columnSql, valueSql);
426     std::string sql = REPLACE + table + columnSql + VALUES + valueSql;
427 
428     Values args;
429     for (auto &[k, v] : value) {
430         args.emplace_back(std::move(v));
431     }
432     std::vector<DistributedDB::VBucket> changedData;
433     std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
434     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
435     if (delegate_ == nullptr) {
436         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
437             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
438         return GeneralError::E_ERROR;
439     }
440     auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs) }, changedData);
441     if (status != DBStatus::OK) {
442         ZLOGE("Replace failed! ret:%{public}d, table:%{public}s, sql:%{public}s, fields:%{public}s",
443             status, Anonymous::Change(table).c_str(), Anonymous::Change(sql).c_str(), columnSql.c_str());
444         return GeneralError::E_ERROR;
445     }
446     return GeneralError::E_OK;
447 }
448 
Delete(const std::string & table,const std::string & sql,Values && args)449 int32_t RdbGeneralStore::Delete(const std::string &table, const std::string &sql, Values &&args)
450 {
451     return 0;
452 }
453 
Query(const std::string & table,const std::string & sql,Values && args)454 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::Query(__attribute__((unused))const std::string &table,
455     const std::string &sql, Values &&args)
456 {
457     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
458     if (delegate_ == nullptr) {
459         ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
460         return { GeneralError::E_ALREADY_CLOSED, nullptr };
461     }
462     auto [errCode, records] = QuerySql(sql, std::move(args));
463     return { errCode, std::make_shared<CacheCursor>(std::move(records)) };
464 }
465 
Query(const std::string & table,GenQuery & query)466 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::Query(const std::string &table, GenQuery &query)
467 {
468     RdbQuery *rdbQuery = nullptr;
469     auto ret = query.QueryInterface(rdbQuery);
470     if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
471         ZLOGE("not RdbQuery!");
472         return { GeneralError::E_INVALID_ARGS, nullptr };
473     }
474     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
475     if (delegate_ == nullptr) {
476         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
477             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
478         return { GeneralError::E_ALREADY_CLOSED, nullptr };
479     }
480     if (rdbQuery->IsRemoteQuery()) {
481         if (rdbQuery->GetDevices().size() != 1) {
482             ZLOGE("RemoteQuery: devices size error! size:%{public}zu", rdbQuery->GetDevices().size());
483             return { GeneralError::E_ERROR, nullptr };
484         }
485         auto cursor = RemoteQuery(*rdbQuery->GetDevices().begin(), rdbQuery->GetRemoteCondition());
486         return { cursor != nullptr ? GeneralError::E_OK : GeneralError::E_ERROR, cursor};
487     }
488     return { GeneralError::E_ERROR, nullptr };
489 }
490 
MergeMigratedData(const std::string & tableName,VBuckets && values)491 int32_t RdbGeneralStore::MergeMigratedData(const std::string &tableName, VBuckets &&values)
492 {
493     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
494     if (delegate_ == nullptr) {
495         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
496             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
497         return GeneralError::E_ERROR;
498     }
499 
500     auto status = delegate_->UpsertData(tableName, ValueProxy::Convert(std::move(values)));
501     return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
502 }
503 
CleanTrackerData(const std::string & tableName,int64_t cursor)504 int32_t RdbGeneralStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
505 {
506     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
507     if (delegate_ == nullptr) {
508         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
509               Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
510         return GeneralError::E_ERROR;
511     }
512 
513     auto status = delegate_->CleanTrackerData(tableName, cursor);
514     return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
515 }
516 
Sync(const Devices & devices,GenQuery & query,DetailAsync async,const SyncParam & syncParam)517 std::pair<int32_t, int32_t> RdbGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAsync async,
518     const SyncParam &syncParam)
519 {
520     DistributedDB::Query dbQuery;
521     RdbQuery *rdbQuery = nullptr;
522     bool isPriority = false;
523     auto ret = query.QueryInterface(rdbQuery);
524     if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
525         dbQuery.FromTable(GetIntersection(query.GetTables(), GetTables()));
526     } else {
527         dbQuery = rdbQuery->GetQuery();
528         isPriority = rdbQuery->IsPriority();
529     }
530     auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
531     auto dbMode = DistributedDB::SyncMode(syncMode);
532     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
533     if (delegate_ == nullptr) {
534         ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
535               "wait:%{public}d", devices.size(),
536               devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), syncParam.mode, syncParam.wait);
537         return { GeneralError::E_ALREADY_CLOSED, DBStatus::OK };
538     }
539     auto highMode = GetHighMode(static_cast<uint32_t>(syncParam.mode));
540     SyncId syncId = ++syncTaskId_;
541     auto dbStatus = DistributedDB::INVALID_ARGS;
542     if (syncMode < NEARBY_END) {
543         dbStatus = delegate_->Sync(devices, dbMode, dbQuery, GetDBBriefCB(std::move(async)), syncParam.wait != 0);
544     } else if (syncMode > NEARBY_END && syncMode < CLOUD_END) {
545         auto callback = GetDBProcessCB(std::move(async), syncMode, syncId, highMode);
546         if (executor_ != nullptr && tasks_ != nullptr) {
547             auto id = executor_->Schedule(std::chrono::minutes(INTERVAL), GetFinishTask(syncId));
548             tasks_->Insert(syncId, { id, callback });
549         }
550         dbStatus = delegate_->Sync({ devices, dbMode, dbQuery, syncParam.wait,
551                                        (isPriority || highMode == MANUAL_SYNC_MODE), syncParam.isCompensation, {},
552                                        highMode == AUTO_SYNC_MODE, LOCK_ACTION, syncParam.prepareTraceId },
553             tasks_ != nullptr ? GetCB(syncId) : callback, syncId);
554         if (dbStatus == DBStatus::OK || tasks_ == nullptr) {
555             return { ConvertStatus(dbStatus), dbStatus };
556         }
557         tasks_->ComputeIfPresent(syncId, [executor = executor_](SyncId syncId, const FinishTask &task) {
558             if (executor != nullptr) {
559                 executor->Remove(task.taskId);
560             }
561             return false;
562         });
563     }
564     return { ConvertStatus(dbStatus), dbStatus };
565 }
566 
PreSharing(GenQuery & query)567 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::PreSharing(GenQuery &query)
568 {
569     RdbQuery *rdbQuery = nullptr;
570     auto ret = query.QueryInterface(rdbQuery);
571     if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
572         ZLOGE("not RdbQuery!");
573         return { GeneralError::E_INVALID_ARGS, nullptr };
574     }
575     auto tables = rdbQuery->GetTables();
576     auto statement = rdbQuery->GetStatement();
577     if (statement.empty() || tables.empty()) {
578         ZLOGE("statement size:%{public}zu, tables size:%{public}zu", statement.size(), tables.size());
579         return { GeneralError::E_INVALID_ARGS, nullptr };
580     }
581     std::string sql = BuildSql(*tables.begin(), statement, rdbQuery->GetColumns());
582     VBuckets values;
583     {
584         std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
585         if (delegate_ == nullptr) {
586             ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
587             return { GeneralError::E_ALREADY_CLOSED, nullptr };
588         }
589         auto [errCode, ret] = QuerySql(sql, rdbQuery->GetBindArgs());
590         values = std::move(ret);
591     }
592     auto rdbCloud = GetRdbCloud();
593     if (rdbCloud == nullptr || values.empty()) {
594         ZLOGW("rdbCloud is %{public}s, values size:%{public}zu", rdbCloud == nullptr ? "nullptr" : "not nullptr",
595             values.size());
596         return { GeneralError::E_CLOUD_DISABLED, nullptr };
597     }
598     VBuckets extends = ExtractExtend(values);
599     rdbCloud->PreSharing(*tables.begin(), extends);
600     for (auto value = values.begin(), extend = extends.begin(); value != values.end() && extend != extends.end();
601          ++value, ++extend) {
602         value->insert_or_assign(DistributedRdb::Field::SHARING_RESOURCE_FIELD, (*extend)[SchemaMeta::SHARING_RESOURCE]);
603         value->erase(CLOUD_GID);
604     }
605     return { GeneralError::E_OK, std::make_shared<CacheCursor>(std::move(values)) };
606 }
607 
ExtractExtend(VBuckets & values) const608 VBuckets RdbGeneralStore::ExtractExtend(VBuckets &values) const
609 {
610     VBuckets extends(values.size());
611     for (auto value = values.begin(), extend = extends.begin(); value != values.end() && extend != extends.end();
612          ++value, ++extend) {
613         auto it = value->find(CLOUD_GID);
614         if (it == value->end()) {
615             continue;
616         }
617         auto gid = std::get_if<std::string>(&(it->second));
618         if (gid == nullptr || gid->empty()) {
619             continue;
620         }
621         extend->insert_or_assign(SchemaMeta::GID_FIELD, std::move(*gid));
622     }
623     return extends;
624 }
625 
BuildSql(const std::string & table,const std::string & statement,const std::vector<std::string> & columns) const626 std::string RdbGeneralStore::BuildSql(
627     const std::string &table, const std::string &statement, const std::vector<std::string> &columns) const
628 {
629     std::string sql = "select ";
630     sql.append(CLOUD_GID);
631     std::string sqlNode = "select rowid";
632     for (auto &column : columns) {
633         sql.append(", ").append(column);
634         sqlNode.append(", ").append(column);
635     }
636     sqlNode.append(" from ").append(table).append(statement);
637     auto logTable = RelationalStoreManager::GetDistributedLogTableName(table);
638     sql.append(" from ").append(logTable).append(", (").append(sqlNode);
639     sql.append(") where ").append(DATE_KEY).append(" = rowid");
640     return sql;
641 }
642 
Clean(const std::vector<std::string> & devices,int32_t mode,const std::string & tableName)643 int32_t RdbGeneralStore::Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName)
644 {
645     if (mode < 0 || mode > CLEAN_MODE_BUTT) {
646         return GeneralError::E_INVALID_ARGS;
647     }
648     DBStatus status = DistributedDB::DB_ERROR;
649     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
650     if (delegate_ == nullptr) {
651         ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
652               "tableName:%{public}s",
653             devices.size(), devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode,
654             Anonymous::Change(tableName).c_str());
655         return GeneralError::E_ALREADY_CLOSED;
656     }
657     switch (mode) {
658         case CLOUD_INFO:
659             status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_INFO));
660             if (status == DistributedDB::OK) {
661                 status = delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
662                 break;
663             }
664             (void)delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
665             break;
666         case CLOUD_DATA:
667             status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_DATA));
668             if (status == DistributedDB::OK) {
669                 status = delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
670                 break;
671             }
672             (void)delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
673             break;
674         case NEARBY_DATA:
675             if (devices.empty()) {
676                 status = delegate_->RemoveDeviceData();
677                 break;
678             }
679             for (auto device : devices) {
680                 status = delegate_->RemoveDeviceData(device, tableName);
681             }
682             break;
683         default:
684             return GeneralError::E_ERROR;
685     }
686     return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
687 }
688 
Watch(int32_t origin,Watcher & watcher)689 int32_t RdbGeneralStore::Watch(int32_t origin, Watcher &watcher)
690 {
691     if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != nullptr) {
692         return GeneralError::E_INVALID_ARGS;
693     }
694 
695     observer_.watcher_ = &watcher;
696     return GeneralError::E_OK;
697 }
698 
Unwatch(int32_t origin,Watcher & watcher)699 int32_t RdbGeneralStore::Unwatch(int32_t origin, Watcher &watcher)
700 {
701     if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != &watcher) {
702         return GeneralError::E_INVALID_ARGS;
703     }
704 
705     observer_.watcher_ = nullptr;
706     return GeneralError::E_OK;
707 }
708 
GetDBBriefCB(DetailAsync async)709 RdbGeneralStore::DBBriefCB RdbGeneralStore::GetDBBriefCB(DetailAsync async)
710 {
711     if (!async) {
712         return [](auto &) {};
713     }
714     return [async = std::move(async)](
715         const std::map<std::string, std::vector<TableStatus>> &result) {
716         DistributedData::GenDetails details;
717         for (auto &[key, tables] : result) {
718             auto &value = details[key];
719             value.progress = FINISHED;
720             value.code = GeneralError::E_OK;
721             for (auto &table : tables) {
722                 if (table.status != DBStatus::OK) {
723                     value.code = GeneralError::E_ERROR;
724                 }
725             }
726         }
727         async(details);
728     };
729 }
730 
GetDBProcessCB(DetailAsync async,uint32_t syncMode,SyncId syncId,uint32_t highMode)731 RdbGeneralStore::DBProcessCB RdbGeneralStore::GetDBProcessCB(DetailAsync async, uint32_t syncMode, SyncId syncId,
732     uint32_t highMode)
733 {
734     std::shared_lock<std::shared_mutex> lock(asyncMutex_);
735     return [async, autoAsync = async_, highMode, storeInfo = storeInfo_, flag = syncNotifyFlag_, syncMode, syncId,
736         rdbCloud = GetRdbCloud()](const std::map<std::string, SyncProcess> &processes) {
737         DistributedData::GenDetails details;
738         for (auto &[id, process] : processes) {
739             bool isDownload = false;
740             auto &detail = details[id];
741             detail.progress = process.process;
742             detail.code = ConvertStatus(process.errCode);
743             detail.dbCode = process.errCode;
744             uint32_t totalCount = 0;
745             for (auto [key, value] : process.tableProcess) {
746                 auto &table = detail.details[key];
747                 table.upload.total = value.upLoadInfo.total;
748                 table.upload.success = value.upLoadInfo.successCount;
749                 table.upload.failed = value.upLoadInfo.failCount;
750                 table.upload.untreated = table.upload.total - table.upload.success - table.upload.failed;
751                 totalCount += table.upload.total;
752                 isDownload = table.download.total > 0;
753                 table.download.total = value.downLoadInfo.total;
754                 table.download.success = value.downLoadInfo.successCount;
755                 table.download.failed = value.downLoadInfo.failCount;
756                 table.download.untreated = table.download.total - table.download.success - table.download.failed;
757                 detail.changeCount = (process.process == FINISHED)
758                                         ? value.downLoadInfo.insertCount + value.downLoadInfo.updateCount +
759                                               value.downLoadInfo.deleteCount
760                                         : 0;
761                 totalCount += table.download.total;
762             }
763             if (process.process == FINISHED) {
764                 RdbGeneralStore::OnSyncFinish(storeInfo, flag, syncMode, syncId);
765             } else {
766                 RdbGeneralStore::OnSyncStart(storeInfo, flag, syncMode, syncId, totalCount);
767             }
768 
769             if (isDownload && (process.process == FINISHED || process.process == PROCESSING) && rdbCloud != nullptr &&
770                 (rdbCloud->GetLockFlag() & RdbCloud::FLAG::APPLICATION)) {
771                 rdbCloud->LockCloudDB(RdbCloud::FLAG::APPLICATION);
772             }
773         }
774         if (async) {
775             async(details);
776         }
777 
778         if (highMode == AUTO_SYNC_MODE && autoAsync
779             && (details.empty() || details.begin()->second.code != E_SYNC_TASK_MERGED)) {
780             autoAsync(details);
781         }
782     };
783 }
784 
Release()785 int32_t RdbGeneralStore::Release()
786 {
787     auto ref = 1;
788     {
789         std::lock_guard<decltype(mutex_)> lock(mutex_);
790         if (ref_ == 0) {
791             return 0;
792         }
793         ref = --ref_;
794     }
795     ZLOGD("ref:%{public}d", ref);
796     if (ref == 0) {
797         delete this;
798     }
799     return ref;
800 }
801 
AddRef()802 int32_t RdbGeneralStore::AddRef()
803 {
804     std::lock_guard<decltype(mutex_)> lock(mutex_);
805     if (ref_ == 0) {
806         return 0;
807     }
808     return ++ref_;
809 }
810 
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const std::vector<Reference> & references)811 int32_t RdbGeneralStore::SetDistributedTables(const std::vector<std::string> &tables, int32_t type,
812     const std::vector<Reference> &references)
813 {
814     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
815     if (delegate_ == nullptr) {
816         ZLOGE("Database already closed! database:%{public}s, tables size:%{public}zu, type:%{public}d",
817             Anonymous::Change(storeInfo_.storeName).c_str(), tables.size(), type);
818         return GeneralError::E_ALREADY_CLOSED;
819     }
820     for (const auto &table : tables) {
821         ZLOGD("tableName:%{public}s, type:%{public}d", Anonymous::Change(table).c_str(), type);
822         auto dBStatus = delegate_->CreateDistributedTable(table, static_cast<DistributedDB::TableSyncType>(type));
823         if (dBStatus != DistributedDB::DBStatus::OK) {
824             ZLOGE("create distributed table failed, table:%{public}s, err:%{public}d",
825                 Anonymous::Change(table).c_str(), dBStatus);
826             return GeneralError::E_ERROR;
827         }
828     }
829     std::vector<DistributedDB::TableReferenceProperty> properties;
830     for (const auto &reference : references) {
831         properties.push_back({ reference.sourceTable, reference.targetTable, reference.refFields });
832     }
833     auto status = delegate_->SetReference(properties);
834     if (status != DistributedDB::DBStatus::OK) {
835         ZLOGE("distributed table set reference failed, err:%{public}d", status);
836         return GeneralError::E_ERROR;
837     }
838     return GeneralError::E_OK;
839 }
840 
SetTrackerTable(const std::string & tableName,const std::set<std::string> & trackerColNames,const std::string & extendColName,bool isForceUpgrade)841 int32_t RdbGeneralStore::SetTrackerTable(const std::string &tableName, const std::set<std::string> &trackerColNames,
842     const std::string &extendColName, bool isForceUpgrade)
843 {
844     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
845     if (delegate_ == nullptr) {
846         ZLOGE("database already closed! database:%{public}s, tables name:%{public}s",
847             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
848         return GeneralError::E_ALREADY_CLOSED;
849     }
850     auto status = delegate_->SetTrackerTable({ tableName, extendColName, trackerColNames, isForceUpgrade });
851     if (status == DBStatus::WITH_INVENTORY_DATA) {
852         ZLOGI("Set tracker table with inventory data, database:%{public}s, tables name:%{public}s",
853             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
854         return GeneralError::E_WITH_INVENTORY_DATA;
855     }
856     if (status != DBStatus::OK) {
857         ZLOGE("Set tracker table failed! ret:%{public}d, database:%{public}s, tables name:%{public}s",
858             status, Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
859         return GeneralError::E_ERROR;
860     }
861     return GeneralError::E_OK;
862 }
863 
RemoteQuery(const std::string & device,const DistributedDB::RemoteCondition & remoteCondition)864 std::shared_ptr<Cursor> RdbGeneralStore::RemoteQuery(const std::string &device,
865     const DistributedDB::RemoteCondition &remoteCondition)
866 {
867     std::shared_ptr<DistributedDB::ResultSet> dbResultSet;
868     DistributedDB::DBStatus status =
869         delegate_->RemoteQuery(device, remoteCondition, REMOTE_QUERY_TIME_OUT, dbResultSet);
870     if (status != DistributedDB::DBStatus::OK) {
871         ZLOGE("DistributedDB remote query failed, device:%{public}s, status is  %{public}d.",
872             Anonymous::Change(device).c_str(), status);
873         return nullptr;
874     }
875     return std::make_shared<RdbCursor>(dbResultSet);
876 }
877 
ConvertStatus(DistributedDB::DBStatus status)878 RdbGeneralStore::GenErr RdbGeneralStore::ConvertStatus(DistributedDB::DBStatus status)
879 {
880     switch (status) {
881         case DBStatus::OK:
882             return GenErr::E_OK;
883         case DBStatus::CLOUD_NETWORK_ERROR:
884             return GenErr::E_NETWORK_ERROR;
885         case DBStatus::CLOUD_LOCK_ERROR:
886             return GenErr::E_LOCKED_BY_OTHERS;
887         case DBStatus::CLOUD_FULL_RECORDS:
888             return GenErr::E_RECODE_LIMIT_EXCEEDED;
889         case DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT:
890             return GenErr::E_NO_SPACE_FOR_ASSET;
891         case DBStatus::BUSY:
892             return GenErr::E_BUSY;
893         case DBStatus::CLOUD_SYNC_TASK_MERGED:
894             return GenErr::E_SYNC_TASK_MERGED;
895         default:
896             ZLOGI("status:0x%{public}x", status);
897             break;
898     }
899     return GenErr::E_ERROR;
900 }
901 
IsValid()902 bool RdbGeneralStore::IsValid()
903 {
904     return delegate_ != nullptr;
905 }
906 
RegisterDetailProgressObserver(GeneralStore::DetailAsync async)907 int32_t RdbGeneralStore::RegisterDetailProgressObserver(GeneralStore::DetailAsync async)
908 {
909     std::unique_lock<std::shared_mutex> lock(asyncMutex_);
910     async_ = std::move(async);
911     return GenErr::E_OK;
912 }
913 
UnregisterDetailProgressObserver()914 int32_t RdbGeneralStore::UnregisterDetailProgressObserver()
915 {
916     std::unique_lock<std::shared_mutex> lock(asyncMutex_);
917     async_ = nullptr;
918     return GenErr::E_OK;
919 }
920 
QuerySql(const std::string & sql,Values && args)921 std::pair<int32_t, VBuckets> RdbGeneralStore::QuerySql(const std::string &sql, Values &&args)
922 {
923     std::vector<DistributedDB::VBucket> changedData;
924     std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
925     auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs), true }, changedData);
926     if (status != DBStatus::OK) {
927         ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status, Anonymous::Change(sql).c_str(),
928             changedData.size());
929         return { GenErr::E_ERROR, {} };
930     }
931     return { GenErr::E_OK, ValueProxy::Convert(std::move(changedData)) };
932 }
933 
GetWaterVersion(const std::string & deviceId)934 std::vector<std::string> RdbGeneralStore::GetWaterVersion(const std::string &deviceId)
935 {
936     return {};
937 }
938 
OnSyncStart(const StoreInfo & storeInfo,uint32_t flag,uint32_t syncMode,uint32_t traceId,uint32_t syncCount)939 void RdbGeneralStore::OnSyncStart(const StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode, uint32_t traceId,
940     uint32_t syncCount)
941 {
942     uint32_t requiredFlag = (CLOUD_SYNC_FLAG | SEARCHABLE_FLAG);
943     if (requiredFlag != (requiredFlag & flag)) {
944         return;
945     }
946     StoreInfo info = storeInfo;
947     auto evt = std::make_unique<DataSyncEvent>(std::move(info), syncMode, DataSyncEvent::DataSyncStatus::START,
948         traceId, syncCount);
949     EventCenter::GetInstance().PostEvent(std::move(evt));
950 }
951 
OnSyncFinish(const StoreInfo & storeInfo,uint32_t flag,uint32_t syncMode,uint32_t traceId)952 void RdbGeneralStore::OnSyncFinish(const StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode, uint32_t traceId)
953 {
954     uint32_t requiredFlag = (CLOUD_SYNC_FLAG | SEARCHABLE_FLAG);
955     if (requiredFlag != (requiredFlag & flag)) {
956         return;
957     }
958     StoreInfo info = storeInfo;
959     auto evt = std::make_unique<DataSyncEvent>(std::move(info), syncMode, DataSyncEvent::DataSyncStatus::FINISH,
960         traceId);
961     EventCenter::GetInstance().PostEvent(std::move(evt));
962 }
963 
GetTables()964 std::set<std::string> RdbGeneralStore::GetTables()
965 {
966     std::set<std::string> tables;
967     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
968     if (delegate_ == nullptr) {
969         ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
970         return tables;
971     }
972     auto [errCode, res] = QuerySql(QUERY_TABLES_SQL, {});
973     if (errCode != GenErr::E_OK) {
974         return tables;
975     }
976     for (auto &table : res) {
977         auto it = table.find("name");
978         if (it == table.end() || TYPE_INDEX<std::string> != it->second.index()) {
979             ZLOGW("error res! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
980             continue;
981         }
982         tables.emplace(std::move(*std::get_if<std::string>(&(it->second))));
983     }
984     return tables;
985 }
986 
GetIntersection(std::vector<std::string> && syncTables,const std::set<std::string> & localTables)987 std::vector<std::string> RdbGeneralStore::GetIntersection(std::vector<std::string> &&syncTables,
988     const std::set<std::string> &localTables)
989 {
990     std::vector<std::string> res;
991     for (auto &it : syncTables) {
992         if (localTables.count(it)) {
993             res.push_back(std::move(it));
994         }
995     }
996     return res;
997 }
998 
PostDataChange(const StoreMetaData & meta,const std::vector<std::string> & tables,ChangeType type)999 void RdbGeneralStore::ObserverProxy::PostDataChange(const StoreMetaData &meta,
1000     const std::vector<std::string> &tables, ChangeType type)
1001 {
1002     RemoteChangeEvent::DataInfo info;
1003     info.userId = meta.user;
1004     info.storeId = meta.storeId;
1005     info.deviceId = meta.deviceId;
1006     info.bundleName = meta.bundleName;
1007     info.tables = tables;
1008     info.changeType = type;
1009     auto evt = std::make_unique<RemoteChangeEvent>(RemoteChangeEvent::DATA_CHANGE, std::move(info));
1010     EventCenter::GetInstance().PostEvent(std::move(evt));
1011 }
1012 
OnChange(const DBChangedIF & data)1013 void RdbGeneralStore::ObserverProxy::OnChange(const DBChangedIF &data)
1014 {
1015     if (!HasWatcher()) {
1016         return;
1017     }
1018     std::string device = data.GetDataChangeDevice();
1019     auto networkId = DmAdapter::GetInstance().ToNetworkID(device);
1020     ZLOGD("store:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
1021         Anonymous::Change(device).c_str());
1022     GenOrigin genOrigin;
1023     genOrigin.origin = GenOrigin::ORIGIN_NEARBY;
1024     genOrigin.dataType = GenOrigin::BASIC_DATA;
1025     DistributedDB::StoreProperty property;
1026     data.GetStoreProperty(property);
1027     genOrigin.id.push_back(networkId);
1028     genOrigin.store = storeId_;
1029     GeneralWatcher::ChangeInfo changeInfo{};
1030     watcher_->OnChange(genOrigin, {}, std::move(changeInfo));
1031     return;
1032 }
1033 
OnChange(DBOrigin origin,const std::string & originalId,DBChangedData && data)1034 void RdbGeneralStore::ObserverProxy::OnChange(DBOrigin origin, const std::string &originalId, DBChangedData &&data)
1035 {
1036     if (!HasWatcher()) {
1037         return;
1038     }
1039     ZLOGD("store:%{public}s table:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
1040         Anonymous::Change(data.tableName).c_str(), Anonymous::Change(originalId).c_str());
1041     GenOrigin genOrigin;
1042     genOrigin.origin = (origin == DBOrigin::ORIGIN_LOCAL)
1043                            ? GenOrigin::ORIGIN_LOCAL
1044                            : (origin == DBOrigin::ORIGIN_CLOUD) ? GenOrigin::ORIGIN_CLOUD : GenOrigin::ORIGIN_NEARBY;
1045     genOrigin.dataType = data.type == DistributedDB::ASSET ? GenOrigin::ASSET_DATA : GenOrigin::BASIC_DATA;
1046     genOrigin.id.push_back(originalId);
1047     genOrigin.store = storeId_;
1048     Watcher::PRIFields fields;
1049     Watcher::ChangeInfo changeInfo;
1050     bool notifyFlag = false;
1051     for (uint32_t i = 0; i < DistributedDB::OP_BUTT; ++i) {
1052         auto &info = changeInfo[data.tableName][i];
1053         for (auto &priData : data.primaryData[i]) {
1054             Watcher::PRIValue value;
1055             Convert(std::move(*(priData.begin())), value);
1056             if (notifyFlag || origin != DBOrigin::ORIGIN_CLOUD || i != DistributedDB::OP_DELETE) {
1057                 info.push_back(std::move(value));
1058                 continue;
1059             }
1060             auto deleteKey = std::get_if<std::string>(&value);
1061             if (deleteKey != nullptr && (*deleteKey == LOGOUT_DELETE_FLAG)) {
1062                 // notify to start app
1063                 notifyFlag = true;
1064             }
1065             info.push_back(std::move(value));
1066         }
1067     }
1068     if (notifyFlag) {
1069         PostDataChange(meta_, {}, CLOUD_DATA_CLEAN);
1070     }
1071     if (!data.field.empty()) {
1072         fields[std::move(data.tableName)] = std::move(*(data.field.begin()));
1073     }
1074     watcher_->OnChange(genOrigin, fields, std::move(changeInfo));
1075 }
1076 
LockCloudDB()1077 std::pair<int32_t, uint32_t> RdbGeneralStore::LockCloudDB()
1078 {
1079     auto rdbCloud = GetRdbCloud();
1080     if (rdbCloud == nullptr) {
1081         return { GeneralError::E_ERROR, 0 };
1082     }
1083     return rdbCloud->LockCloudDB(RdbCloud::FLAG::APPLICATION);
1084 }
1085 
UnLockCloudDB()1086 int32_t RdbGeneralStore::UnLockCloudDB()
1087 {
1088     auto rdbCloud = GetRdbCloud();
1089     if (rdbCloud == nullptr) {
1090         return GeneralError::E_ERROR;
1091     }
1092     return rdbCloud->UnLockCloudDB(RdbCloud::FLAG::APPLICATION);
1093 }
1094 
GetRdbCloud() const1095 std::shared_ptr<RdbCloud> RdbGeneralStore::GetRdbCloud() const
1096 {
1097     std::shared_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
1098     return rdbCloud_;
1099 }
1100 
IsFinished(SyncId syncId) const1101 bool RdbGeneralStore::IsFinished(SyncId syncId) const
1102 {
1103     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1104     if (delegate_ == nullptr) {
1105         ZLOGE("database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
1106         return true;
1107     }
1108     return delegate_->GetCloudTaskStatus(syncId).process == DistributedDB::FINISHED;
1109 }
1110 
GetFinishTask(SyncId syncId)1111 Executor::Task RdbGeneralStore::GetFinishTask(SyncId syncId)
1112 {
1113     return [this, executor = executor_, task = tasks_, syncId]() {
1114         auto [exist, finishTask] = task->Find(syncId);
1115         if (!exist || finishTask.cb == nullptr) {
1116             task->Erase(syncId);
1117             return;
1118         }
1119         if (!IsFinished(syncId)) {
1120             task->ComputeIfPresent(syncId, [executor = executor_, this](SyncId syncId, FinishTask &task) {
1121                 task.taskId = executor->Schedule(std::chrono::minutes(INTERVAL), GetFinishTask(syncId));
1122                 return true;
1123             });
1124             return;
1125         }
1126         DBProcessCB cb;
1127         task->ComputeIfPresent(syncId, [&cb, executor = executor_](SyncId syncId, const FinishTask &task) {
1128             cb = task.cb;
1129             return false;
1130         });
1131         if (cb != nullptr) {
1132             ZLOGW("database:%{public}s syncId:%{public}" PRIu64 " miss finished. ",
1133                   Anonymous::Change(storeInfo_.storeName).c_str(), syncId);
1134             std::map<std::string, SyncProcess> result;
1135             result.insert({ "", { DistributedDB::FINISHED, DBStatus::DB_ERROR } });
1136             cb(result);
1137         }
1138     };
1139 }
1140 
SetExecutor(std::shared_ptr<Executor> executor)1141 void RdbGeneralStore::SetExecutor(std::shared_ptr<Executor> executor)
1142 {
1143     if (executor_ == nullptr) {
1144         executor_ = executor;
1145     }
1146 }
1147 
RemoveTasks()1148 void RdbGeneralStore::RemoveTasks()
1149 {
1150     if (tasks_ == nullptr) {
1151         return;
1152     }
1153     std::list<DBProcessCB> cbs;
1154     std::list<TaskId> taskIds;
1155     tasks_->EraseIf([&cbs, &taskIds, store = storeInfo_.storeName](SyncId syncId, const FinishTask &task) {
1156         if (task.cb != nullptr) {
1157             ZLOGW("database:%{public}s syncId:%{public}" PRIu64 " miss finished. ", Anonymous::Change(store).c_str(),
1158                   syncId);
1159         }
1160         cbs.push_back(std::move(task.cb));
1161         taskIds.push_back(task.taskId);
1162         return true;
1163     });
1164     if (executor_ != nullptr) {
1165         for (auto taskId : taskIds) {
1166             executor_->Remove(taskId, true);
1167         }
1168     }
1169     std::map<std::string, SyncProcess> result;
1170     result.insert({ "", { DistributedDB::FINISHED, DBStatus::DB_ERROR } });
1171     for (auto &cb : cbs) {
1172         if (cb != nullptr) {
1173             cb(result);
1174         }
1175     }
1176 }
1177 
GetCB(SyncId syncId)1178 RdbGeneralStore::DBProcessCB RdbGeneralStore::GetCB(SyncId syncId)
1179 {
1180     return [task = tasks_, executor = executor_, syncId](const std::map<std::string, SyncProcess> &progress) {
1181         if (task == nullptr) {
1182             return;
1183         }
1184         DBProcessCB cb;
1185         task->ComputeIfPresent(syncId, [&cb, &progress, executor](SyncId syncId, FinishTask &finishTask) {
1186             cb = finishTask.cb;
1187             bool isFinished = !progress.empty() && progress.begin()->second.process == DistributedDB::FINISHED;
1188             if (isFinished) {
1189                 finishTask.cb = nullptr;
1190             }
1191             return true;
1192         });
1193         if (cb != nullptr) {
1194             cb(progress);
1195         }
1196         return;
1197     };
1198 }
1199 } // namespace OHOS::DistributedRdb