1 /*
2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "RdbGeneralStore"
16
17 #include "rdb_general_store.h"
18
19 #include <chrono>
20 #include <cinttypes>
21
22 #include "cache_cursor.h"
23 #include "changeevent/remote_change_event.h"
24 #include "cloud/asset_loader.h"
25 #include "cloud/cloud_db.h"
26 #include "cloud/cloud_lock_event.h"
27 #include "cloud/cloud_store_types.h"
28 #include "cloud/schema_meta.h"
29 #include "cloud_service.h"
30 #include "commonevent/data_sync_event.h"
31 #include "communicator/device_manager_adapter.h"
32 #include "crypto_manager.h"
33 #include "device_manager_adapter.h"
34 #include "eventcenter/event_center.h"
35 #include "log_print.h"
36 #include "metadata/meta_data_manager.h"
37 #include "metadata/secret_key_meta_data.h"
38 #include "rdb_cursor.h"
39 #include "rdb_helper.h"
40 #include "rdb_query.h"
41 #include "rdb_result_set_impl.h"
42 #include "relational_store_manager.h"
43 #include "snapshot/bind_event.h"
44 #include "utils/anonymous.h"
45 #include "value_proxy.h"
46 namespace OHOS::DistributedRdb {
47 using namespace DistributedData;
48 using namespace DistributedDB;
49 using namespace NativeRdb;
50 using namespace CloudData;
51 using namespace std::chrono;
52 using DBField = DistributedDB::Field;
53 using DBTable = DistributedDB::TableSchema;
54 using DBSchema = DistributedDB::DataBaseSchema;
55 using ClearMode = DistributedDB::ClearMode;
56 using DBStatus = DistributedDB::DBStatus;
57 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
58
59 constexpr const char *INSERT = "INSERT INTO ";
60 constexpr const char *REPLACE = "REPLACE INTO ";
61 constexpr const char *VALUES = " VALUES ";
62 constexpr const char *LOGOUT_DELETE_FLAG = "DELETE#ALL_CLOUDDATA";
63 constexpr const LockAction LOCK_ACTION = static_cast<LockAction>(static_cast<uint32_t>(LockAction::INSERT) |
64 static_cast<uint32_t>(LockAction::UPDATE) | static_cast<uint32_t>(LockAction::DELETE) |
65 static_cast<uint32_t>(LockAction::DOWNLOAD));
66 constexpr uint32_t CLOUD_SYNC_FLAG = 1;
67 constexpr uint32_t SEARCHABLE_FLAG = 2;
68 constexpr uint32_t LOCK_TIMEOUT = 3600; // second
69
GetDBSchema(const Database & database)70 static DBSchema GetDBSchema(const Database &database)
71 {
72 DBSchema schema;
73 schema.tables.resize(database.tables.size());
74 for (size_t i = 0; i < database.tables.size(); i++) {
75 const Table &table = database.tables[i];
76 DBTable &dbTable = schema.tables[i];
77 dbTable.name = table.name;
78 dbTable.sharedTableName = table.sharedTableName;
79 for (auto &field : table.fields) {
80 DBField dbField;
81 dbField.colName = field.colName;
82 dbField.type = field.type;
83 dbField.primary = field.primary;
84 dbField.nullable = field.nullable;
85 dbTable.fields.push_back(std::move(dbField));
86 }
87 }
88 return schema;
89 }
90
InitStoreInfo(const StoreMetaData & meta)91 void RdbGeneralStore::InitStoreInfo(const StoreMetaData &meta)
92 {
93 storeInfo_.tokenId = meta.tokenId;
94 storeInfo_.bundleName = meta.bundleName;
95 storeInfo_.storeName = meta.storeId;
96 storeInfo_.instanceId = meta.instanceId;
97 storeInfo_.user = std::stoi(meta.user);
98 storeInfo_.deviceId = DeviceManagerAdapter::GetInstance().GetLocalDevice().uuid;
99 }
100
RdbGeneralStore(const StoreMetaData & meta)101 RdbGeneralStore::RdbGeneralStore(const StoreMetaData &meta)
102 : manager_(meta.appId, meta.user, meta.instanceId), tasks_(std::make_shared<ConcurrentMap<SyncId, FinishTask>>())
103 {
104 observer_.storeId_ = meta.storeId;
105 observer_.meta_ = meta;
106 RelationalStoreDelegate::Option option;
107 option.syncDualTupleMode = true;
108 option.observer = &observer_;
109 if (meta.isEncrypt) {
110 std::string key = meta.GetSecretKey();
111 SecretKeyMetaData secretKeyMeta;
112 MetaDataManager::GetInstance().LoadMeta(key, secretKeyMeta, true);
113 std::vector<uint8_t> decryptKey;
114 CryptoManager::GetInstance().Decrypt(secretKeyMeta.sKey, decryptKey);
115 option.passwd.SetValue(decryptKey.data(), decryptKey.size());
116 std::fill(decryptKey.begin(), decryptKey.end(), 0);
117 option.isEncryptedDb = meta.isEncrypt;
118 option.cipher = CipherType::AES_256_GCM;
119 for (uint32_t i = 0; i < ITERS_COUNT; ++i) {
120 option.iterateTimes = ITERS[i];
121 auto ret = manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_);
122 if (ret == DBStatus::OK && delegate_ != nullptr) {
123 break;
124 }
125 manager_.CloseStore(delegate_);
126 delegate_ = nullptr;
127 }
128 } else {
129 auto ret = manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_);
130 if (ret != DBStatus::OK || delegate_ == nullptr) {
131 manager_.CloseStore(delegate_);
132 delegate_ = nullptr;
133 }
134 }
135
136 InitStoreInfo(meta);
137
138 if (meta.isSearchable) {
139 syncNotifyFlag_ |= SEARCHABLE_FLAG;
140 }
141
142 if (delegate_ != nullptr && meta.isManualClean) {
143 PragmaData data =
144 static_cast<PragmaData>(const_cast<void *>(static_cast<const void *>(&meta.isManualClean)));
145 delegate_->Pragma(PragmaCmd::LOGIC_DELETE_SYNC_DATA, data);
146 }
147 ZLOGI("Get rdb store, tokenId:%{public}u, bundleName:%{public}s, storeName:%{public}s, user:%{public}s,"
148 "isEncrypt:%{public}d, isManualClean:%{public}d, isSearchable:%{public}d",
149 meta.tokenId, meta.bundleName.c_str(), Anonymous::Change(meta.storeId).c_str(), meta.user.c_str(),
150 meta.isEncrypt, meta.isManualClean, meta.isSearchable);
151 }
152
~RdbGeneralStore()153 RdbGeneralStore::~RdbGeneralStore()
154 {
155 manager_.CloseStore(delegate_);
156 delegate_ = nullptr;
157 bindInfo_.loader_ = nullptr;
158 if (bindInfo_.db_ != nullptr) {
159 bindInfo_.db_->Close();
160 }
161 bindInfo_.db_ = nullptr;
162 rdbCloud_ = nullptr;
163 rdbLoader_ = nullptr;
164 RemoveTasks();
165 tasks_ = nullptr;
166 executor_ = nullptr;
167 }
168
BindSnapshots(std::shared_ptr<std::map<std::string,std::shared_ptr<Snapshot>>> bindAssets)169 int32_t RdbGeneralStore::BindSnapshots(std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>> bindAssets)
170 {
171 if (snapshots_.bindAssets == nullptr) {
172 snapshots_.bindAssets = bindAssets;
173 }
174 return GenErr::E_OK;
175 }
176
Bind(Database & database,const std::map<uint32_t,BindInfo> & bindInfos,const CloudConfig & config)177 int32_t RdbGeneralStore::Bind(Database &database, const std::map<uint32_t, BindInfo> &bindInfos,
178 const CloudConfig &config)
179 {
180 if (bindInfos.empty()) {
181 return GeneralError::E_OK;
182 }
183 auto bindInfo = bindInfos.begin()->second;
184 if (bindInfo.db_ == nullptr || bindInfo.loader_ == nullptr) {
185 return GeneralError::E_INVALID_ARGS;
186 }
187
188 if (isBound_.exchange(true)) {
189 return GeneralError::E_OK;
190 }
191
192 BindEvent::BindEventInfo eventInfo;
193 eventInfo.tokenId = storeInfo_.tokenId;
194 eventInfo.bundleName = storeInfo_.bundleName;
195 eventInfo.storeName = storeInfo_.storeName;
196 eventInfo.user = storeInfo_.user;
197 eventInfo.instanceId = storeInfo_.instanceId;
198
199 auto evt = std::make_unique<BindEvent>(BindEvent::BIND_SNAPSHOT, std::move(eventInfo));
200 EventCenter::GetInstance().PostEvent(std::move(evt));
201 bindInfo_ = std::move(bindInfo);
202 {
203 std::unique_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
204 rdbCloud_ = std::make_shared<RdbCloud>(bindInfo_.db_, &snapshots_);
205 rdbLoader_ = std::make_shared<RdbAssetLoader>(bindInfo_.loader_, &snapshots_);
206 }
207
208 DistributedDB::CloudSyncConfig dbConfig;
209 dbConfig.maxUploadCount = config.maxNumber;
210 dbConfig.maxUploadSize = config.maxSize;
211 dbConfig.maxRetryConflictTimes = config.maxRetryConflictTimes;
212 DBSchema schema = GetDBSchema(database);
213 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
214 if (delegate_ == nullptr) {
215 ZLOGE("database:%{public}s already closed!", Anonymous::Change(database.name).c_str());
216 return GeneralError::E_ALREADY_CLOSED;
217 }
218 delegate_->SetCloudDB(rdbCloud_);
219 delegate_->SetIAssetLoader(rdbLoader_);
220 delegate_->SetCloudDbSchema(std::move(schema));
221 delegate_->SetCloudSyncConfig(dbConfig);
222
223 syncNotifyFlag_ |= CLOUD_SYNC_FLAG;
224 return GeneralError::E_OK;
225 }
226
IsBound()227 bool RdbGeneralStore::IsBound()
228 {
229 return isBound_;
230 }
231
Close(bool isForce)232 int32_t RdbGeneralStore::Close(bool isForce)
233 {
234 {
235 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_, std::chrono::seconds(isForce ? LOCK_TIMEOUT : 0));
236 if (!lock) {
237 return GeneralError::E_BUSY;
238 }
239
240 if (delegate_ == nullptr) {
241 return GeneralError::E_OK;
242 }
243 if (!isForce && delegate_->GetCloudSyncTaskCount() > 0) {
244 return GeneralError::E_BUSY;
245 }
246 if (isForce && bindInfo_.loader_ != nullptr) {
247 bindInfo_.loader_->Cancel();
248 }
249 auto status = manager_.CloseStore(delegate_);
250 if (status != DBStatus::OK) {
251 return status;
252 }
253 delegate_ = nullptr;
254 }
255 RemoveTasks();
256 bindInfo_.loader_ = nullptr;
257 if (bindInfo_.db_ != nullptr) {
258 bindInfo_.db_->Close();
259 }
260 bindInfo_.db_ = nullptr;
261 {
262 std::unique_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
263 rdbCloud_ = nullptr;
264 rdbLoader_ = nullptr;
265 }
266 return GeneralError::E_OK;
267 }
268
Execute(const std::string & table,const std::string & sql)269 int32_t RdbGeneralStore::Execute(const std::string &table, const std::string &sql)
270 {
271 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
272 if (delegate_ == nullptr) {
273 ZLOGE("Database already closed! database:%{public}s, table:%{public}s, sql:%{public}s",
274 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str(),
275 Anonymous::Change(sql).c_str());
276 return GeneralError::E_ERROR;
277 }
278 std::vector<DistributedDB::VBucket> changedData;
279 auto status = delegate_->ExecuteSql({ sql, {}, false }, changedData);
280 if (status != DBStatus::OK) {
281 ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status, Anonymous::Change(sql).c_str(),
282 changedData.size());
283 return GeneralError::E_ERROR;
284 }
285 return GeneralError::E_OK;
286 }
287
SqlConcatenate(VBucket & value,std::string & strColumnSql,std::string & strRowValueSql)288 size_t RdbGeneralStore::SqlConcatenate(VBucket &value, std::string &strColumnSql, std::string &strRowValueSql)
289 {
290 strColumnSql += " (";
291 strRowValueSql += " (";
292 auto columnSize = value.size();
293 for (auto column = value.begin(); column != value.end(); ++column) {
294 strRowValueSql += " ?,";
295 strColumnSql += " " + column->first + ",";
296 }
297 if (columnSize != 0) {
298 strColumnSql.pop_back();
299 strColumnSql += ")";
300 strRowValueSql.pop_back();
301 strRowValueSql += ")";
302 }
303 return columnSize;
304 }
305
Insert(const std::string & table,VBuckets && values)306 int32_t RdbGeneralStore::Insert(const std::string &table, VBuckets &&values)
307 {
308 if (table.empty() || values.size() == 0) {
309 ZLOGE("Insert:table maybe empty:%{public}d,value size is:%{public}zu", table.empty(), values.size());
310 return GeneralError::E_INVALID_ARGS;
311 }
312
313 std::string strColumnSql;
314 std::string strRowValueSql;
315 auto value = values.front();
316 size_t columnSize = SqlConcatenate(value, strColumnSql, strRowValueSql);
317 if (columnSize == 0) {
318 ZLOGE("Insert: columnSize error, can't be 0!");
319 return GeneralError::E_INVALID_ARGS;
320 }
321
322 Values args;
323 std::string strValueSql;
324 for (auto &rowData : values) {
325 if (rowData.size() != columnSize) {
326 ZLOGE("Insert: VBucket size error, Each VBucket in values must be of the same length!");
327 return GeneralError::E_INVALID_ARGS;
328 }
329 for (auto column = rowData.begin(); column != rowData.end(); ++column) {
330 args.push_back(std::move(column->second));
331 }
332 strValueSql += strRowValueSql + ",";
333 }
334 strValueSql.pop_back();
335 std::string sql = INSERT + table + strColumnSql + VALUES + strValueSql;
336
337 std::vector<DistributedDB::VBucket> changedData;
338 std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
339 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
340 if (delegate_ == nullptr) {
341 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
342 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
343 return GeneralError::E_ERROR;
344 }
345 auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs), false }, changedData);
346 if (status != DBStatus::OK) {
347 if (IsPrintLog(status)) {
348 auto time =
349 static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
350 ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu times %{public}" PRIu64 ".",
351 status, Anonymous::Change(sql).c_str(), changedData.size(), time);
352 }
353 return GeneralError::E_ERROR;
354 }
355 return GeneralError::E_OK;
356 }
357
IsPrintLog(DBStatus status)358 bool RdbGeneralStore::IsPrintLog(DBStatus status)
359 {
360 bool isPrint = false;
361 if (status == lastError_) {
362 if (++lastErrCnt_ % PRINT_ERROR_CNT == 0) {
363 isPrint = true;
364 }
365 } else {
366 isPrint = true;
367 lastErrCnt_ = 0;
368 lastError_ = status;
369 }
370 return isPrint;
371 }
372
373 /**
374 * This function does not support batch updates in rdb, it only supports single data updates.
375 */
Update(const std::string & table,const std::string & setSql,Values && values,const std::string & whereSql,Values && conditions)376 int32_t RdbGeneralStore::Update(const std::string &table, const std::string &setSql, Values &&values,
377 const std::string &whereSql, Values &&conditions)
378 {
379 if (table.empty()) {
380 ZLOGE("Update: table can't be empty!");
381 return GeneralError::E_INVALID_ARGS;
382 }
383 if (setSql.empty() || values.size() == 0) {
384 ZLOGE("Update: setSql and values can't be empty!");
385 return GeneralError::E_INVALID_ARGS;
386 }
387 if (whereSql.empty() || conditions.size() == 0) {
388 ZLOGE("Update: whereSql and conditions can't be empty!");
389 return GeneralError::E_INVALID_ARGS;
390 }
391
392 std::string sqlIn = " UPDATE " + table + " SET " + setSql + " WHERE " + whereSql;
393 Values args;
394 for (auto &value : values) {
395 args.push_back(std::move(value));
396 }
397 for (auto &condition : conditions) {
398 args.push_back(std::move(condition));
399 }
400
401 std::vector<DistributedDB::VBucket> changedData;
402 std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
403 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
404 if (delegate_ == nullptr) {
405 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
406 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
407 return GeneralError::E_ERROR;
408 }
409 auto status = delegate_->ExecuteSql({ sqlIn, std::move(bindArgs), false }, changedData);
410 if (status != DBStatus::OK) {
411 ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status, Anonymous::Change(sqlIn).c_str(),
412 changedData.size());
413 return GeneralError::E_ERROR;
414 }
415 return GeneralError::E_OK;
416 }
417
Replace(const std::string & table,VBucket && value)418 int32_t RdbGeneralStore::Replace(const std::string &table, VBucket &&value)
419 {
420 if (table.empty() || value.size() == 0) {
421 return GeneralError::E_INVALID_ARGS;
422 }
423 std::string columnSql;
424 std::string valueSql;
425 SqlConcatenate(value, columnSql, valueSql);
426 std::string sql = REPLACE + table + columnSql + VALUES + valueSql;
427
428 Values args;
429 for (auto &[k, v] : value) {
430 args.emplace_back(std::move(v));
431 }
432 std::vector<DistributedDB::VBucket> changedData;
433 std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
434 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
435 if (delegate_ == nullptr) {
436 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
437 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
438 return GeneralError::E_ERROR;
439 }
440 auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs) }, changedData);
441 if (status != DBStatus::OK) {
442 ZLOGE("Replace failed! ret:%{public}d, table:%{public}s, sql:%{public}s, fields:%{public}s",
443 status, Anonymous::Change(table).c_str(), Anonymous::Change(sql).c_str(), columnSql.c_str());
444 return GeneralError::E_ERROR;
445 }
446 return GeneralError::E_OK;
447 }
448
Delete(const std::string & table,const std::string & sql,Values && args)449 int32_t RdbGeneralStore::Delete(const std::string &table, const std::string &sql, Values &&args)
450 {
451 return 0;
452 }
453
Query(const std::string & table,const std::string & sql,Values && args)454 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::Query(__attribute__((unused))const std::string &table,
455 const std::string &sql, Values &&args)
456 {
457 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
458 if (delegate_ == nullptr) {
459 ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
460 return { GeneralError::E_ALREADY_CLOSED, nullptr };
461 }
462 auto [errCode, records] = QuerySql(sql, std::move(args));
463 return { errCode, std::make_shared<CacheCursor>(std::move(records)) };
464 }
465
Query(const std::string & table,GenQuery & query)466 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::Query(const std::string &table, GenQuery &query)
467 {
468 RdbQuery *rdbQuery = nullptr;
469 auto ret = query.QueryInterface(rdbQuery);
470 if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
471 ZLOGE("not RdbQuery!");
472 return { GeneralError::E_INVALID_ARGS, nullptr };
473 }
474 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
475 if (delegate_ == nullptr) {
476 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
477 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
478 return { GeneralError::E_ALREADY_CLOSED, nullptr };
479 }
480 if (rdbQuery->IsRemoteQuery()) {
481 if (rdbQuery->GetDevices().size() != 1) {
482 ZLOGE("RemoteQuery: devices size error! size:%{public}zu", rdbQuery->GetDevices().size());
483 return { GeneralError::E_ERROR, nullptr };
484 }
485 auto cursor = RemoteQuery(*rdbQuery->GetDevices().begin(), rdbQuery->GetRemoteCondition());
486 return { cursor != nullptr ? GeneralError::E_OK : GeneralError::E_ERROR, cursor};
487 }
488 return { GeneralError::E_ERROR, nullptr };
489 }
490
MergeMigratedData(const std::string & tableName,VBuckets && values)491 int32_t RdbGeneralStore::MergeMigratedData(const std::string &tableName, VBuckets &&values)
492 {
493 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
494 if (delegate_ == nullptr) {
495 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
496 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
497 return GeneralError::E_ERROR;
498 }
499
500 auto status = delegate_->UpsertData(tableName, ValueProxy::Convert(std::move(values)));
501 return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
502 }
503
CleanTrackerData(const std::string & tableName,int64_t cursor)504 int32_t RdbGeneralStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
505 {
506 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
507 if (delegate_ == nullptr) {
508 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
509 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
510 return GeneralError::E_ERROR;
511 }
512
513 auto status = delegate_->CleanTrackerData(tableName, cursor);
514 return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
515 }
516
Sync(const Devices & devices,GenQuery & query,DetailAsync async,const SyncParam & syncParam)517 std::pair<int32_t, int32_t> RdbGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAsync async,
518 const SyncParam &syncParam)
519 {
520 DistributedDB::Query dbQuery;
521 RdbQuery *rdbQuery = nullptr;
522 bool isPriority = false;
523 auto ret = query.QueryInterface(rdbQuery);
524 if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
525 dbQuery.FromTable(GetIntersection(query.GetTables(), GetTables()));
526 } else {
527 dbQuery = rdbQuery->GetQuery();
528 isPriority = rdbQuery->IsPriority();
529 }
530 auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
531 auto dbMode = DistributedDB::SyncMode(syncMode);
532 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
533 if (delegate_ == nullptr) {
534 ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
535 "wait:%{public}d", devices.size(),
536 devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), syncParam.mode, syncParam.wait);
537 return { GeneralError::E_ALREADY_CLOSED, DBStatus::OK };
538 }
539 auto highMode = GetHighMode(static_cast<uint32_t>(syncParam.mode));
540 SyncId syncId = ++syncTaskId_;
541 auto dbStatus = DistributedDB::INVALID_ARGS;
542 if (syncMode < NEARBY_END) {
543 dbStatus = delegate_->Sync(devices, dbMode, dbQuery, GetDBBriefCB(std::move(async)), syncParam.wait != 0);
544 } else if (syncMode > NEARBY_END && syncMode < CLOUD_END) {
545 auto callback = GetDBProcessCB(std::move(async), syncMode, syncId, highMode);
546 if (executor_ != nullptr && tasks_ != nullptr) {
547 auto id = executor_->Schedule(std::chrono::minutes(INTERVAL), GetFinishTask(syncId));
548 tasks_->Insert(syncId, { id, callback });
549 }
550 dbStatus = delegate_->Sync({ devices, dbMode, dbQuery, syncParam.wait,
551 (isPriority || highMode == MANUAL_SYNC_MODE), syncParam.isCompensation, {},
552 highMode == AUTO_SYNC_MODE, LOCK_ACTION, syncParam.prepareTraceId },
553 tasks_ != nullptr ? GetCB(syncId) : callback, syncId);
554 if (dbStatus == DBStatus::OK || tasks_ == nullptr) {
555 return { ConvertStatus(dbStatus), dbStatus };
556 }
557 tasks_->ComputeIfPresent(syncId, [executor = executor_](SyncId syncId, const FinishTask &task) {
558 if (executor != nullptr) {
559 executor->Remove(task.taskId);
560 }
561 return false;
562 });
563 }
564 return { ConvertStatus(dbStatus), dbStatus };
565 }
566
PreSharing(GenQuery & query)567 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::PreSharing(GenQuery &query)
568 {
569 RdbQuery *rdbQuery = nullptr;
570 auto ret = query.QueryInterface(rdbQuery);
571 if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
572 ZLOGE("not RdbQuery!");
573 return { GeneralError::E_INVALID_ARGS, nullptr };
574 }
575 auto tables = rdbQuery->GetTables();
576 auto statement = rdbQuery->GetStatement();
577 if (statement.empty() || tables.empty()) {
578 ZLOGE("statement size:%{public}zu, tables size:%{public}zu", statement.size(), tables.size());
579 return { GeneralError::E_INVALID_ARGS, nullptr };
580 }
581 std::string sql = BuildSql(*tables.begin(), statement, rdbQuery->GetColumns());
582 VBuckets values;
583 {
584 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
585 if (delegate_ == nullptr) {
586 ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
587 return { GeneralError::E_ALREADY_CLOSED, nullptr };
588 }
589 auto [errCode, ret] = QuerySql(sql, rdbQuery->GetBindArgs());
590 values = std::move(ret);
591 }
592 auto rdbCloud = GetRdbCloud();
593 if (rdbCloud == nullptr || values.empty()) {
594 ZLOGW("rdbCloud is %{public}s, values size:%{public}zu", rdbCloud == nullptr ? "nullptr" : "not nullptr",
595 values.size());
596 return { GeneralError::E_CLOUD_DISABLED, nullptr };
597 }
598 VBuckets extends = ExtractExtend(values);
599 rdbCloud->PreSharing(*tables.begin(), extends);
600 for (auto value = values.begin(), extend = extends.begin(); value != values.end() && extend != extends.end();
601 ++value, ++extend) {
602 value->insert_or_assign(DistributedRdb::Field::SHARING_RESOURCE_FIELD, (*extend)[SchemaMeta::SHARING_RESOURCE]);
603 value->erase(CLOUD_GID);
604 }
605 return { GeneralError::E_OK, std::make_shared<CacheCursor>(std::move(values)) };
606 }
607
ExtractExtend(VBuckets & values) const608 VBuckets RdbGeneralStore::ExtractExtend(VBuckets &values) const
609 {
610 VBuckets extends(values.size());
611 for (auto value = values.begin(), extend = extends.begin(); value != values.end() && extend != extends.end();
612 ++value, ++extend) {
613 auto it = value->find(CLOUD_GID);
614 if (it == value->end()) {
615 continue;
616 }
617 auto gid = std::get_if<std::string>(&(it->second));
618 if (gid == nullptr || gid->empty()) {
619 continue;
620 }
621 extend->insert_or_assign(SchemaMeta::GID_FIELD, std::move(*gid));
622 }
623 return extends;
624 }
625
BuildSql(const std::string & table,const std::string & statement,const std::vector<std::string> & columns) const626 std::string RdbGeneralStore::BuildSql(
627 const std::string &table, const std::string &statement, const std::vector<std::string> &columns) const
628 {
629 std::string sql = "select ";
630 sql.append(CLOUD_GID);
631 std::string sqlNode = "select rowid";
632 for (auto &column : columns) {
633 sql.append(", ").append(column);
634 sqlNode.append(", ").append(column);
635 }
636 sqlNode.append(" from ").append(table).append(statement);
637 auto logTable = RelationalStoreManager::GetDistributedLogTableName(table);
638 sql.append(" from ").append(logTable).append(", (").append(sqlNode);
639 sql.append(") where ").append(DATE_KEY).append(" = rowid");
640 return sql;
641 }
642
Clean(const std::vector<std::string> & devices,int32_t mode,const std::string & tableName)643 int32_t RdbGeneralStore::Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName)
644 {
645 if (mode < 0 || mode > CLEAN_MODE_BUTT) {
646 return GeneralError::E_INVALID_ARGS;
647 }
648 DBStatus status = DistributedDB::DB_ERROR;
649 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
650 if (delegate_ == nullptr) {
651 ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
652 "tableName:%{public}s",
653 devices.size(), devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode,
654 Anonymous::Change(tableName).c_str());
655 return GeneralError::E_ALREADY_CLOSED;
656 }
657 switch (mode) {
658 case CLOUD_INFO:
659 status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_INFO));
660 if (status == DistributedDB::OK) {
661 status = delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
662 break;
663 }
664 (void)delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
665 break;
666 case CLOUD_DATA:
667 status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_DATA));
668 if (status == DistributedDB::OK) {
669 status = delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
670 break;
671 }
672 (void)delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
673 break;
674 case NEARBY_DATA:
675 if (devices.empty()) {
676 status = delegate_->RemoveDeviceData();
677 break;
678 }
679 for (auto device : devices) {
680 status = delegate_->RemoveDeviceData(device, tableName);
681 }
682 break;
683 default:
684 return GeneralError::E_ERROR;
685 }
686 return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
687 }
688
Watch(int32_t origin,Watcher & watcher)689 int32_t RdbGeneralStore::Watch(int32_t origin, Watcher &watcher)
690 {
691 if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != nullptr) {
692 return GeneralError::E_INVALID_ARGS;
693 }
694
695 observer_.watcher_ = &watcher;
696 return GeneralError::E_OK;
697 }
698
Unwatch(int32_t origin,Watcher & watcher)699 int32_t RdbGeneralStore::Unwatch(int32_t origin, Watcher &watcher)
700 {
701 if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != &watcher) {
702 return GeneralError::E_INVALID_ARGS;
703 }
704
705 observer_.watcher_ = nullptr;
706 return GeneralError::E_OK;
707 }
708
GetDBBriefCB(DetailAsync async)709 RdbGeneralStore::DBBriefCB RdbGeneralStore::GetDBBriefCB(DetailAsync async)
710 {
711 if (!async) {
712 return [](auto &) {};
713 }
714 return [async = std::move(async)](
715 const std::map<std::string, std::vector<TableStatus>> &result) {
716 DistributedData::GenDetails details;
717 for (auto &[key, tables] : result) {
718 auto &value = details[key];
719 value.progress = FINISHED;
720 value.code = GeneralError::E_OK;
721 for (auto &table : tables) {
722 if (table.status != DBStatus::OK) {
723 value.code = GeneralError::E_ERROR;
724 }
725 }
726 }
727 async(details);
728 };
729 }
730
GetDBProcessCB(DetailAsync async,uint32_t syncMode,SyncId syncId,uint32_t highMode)731 RdbGeneralStore::DBProcessCB RdbGeneralStore::GetDBProcessCB(DetailAsync async, uint32_t syncMode, SyncId syncId,
732 uint32_t highMode)
733 {
734 std::shared_lock<std::shared_mutex> lock(asyncMutex_);
735 return [async, autoAsync = async_, highMode, storeInfo = storeInfo_, flag = syncNotifyFlag_, syncMode, syncId,
736 rdbCloud = GetRdbCloud()](const std::map<std::string, SyncProcess> &processes) {
737 DistributedData::GenDetails details;
738 for (auto &[id, process] : processes) {
739 bool isDownload = false;
740 auto &detail = details[id];
741 detail.progress = process.process;
742 detail.code = ConvertStatus(process.errCode);
743 detail.dbCode = process.errCode;
744 uint32_t totalCount = 0;
745 for (auto [key, value] : process.tableProcess) {
746 auto &table = detail.details[key];
747 table.upload.total = value.upLoadInfo.total;
748 table.upload.success = value.upLoadInfo.successCount;
749 table.upload.failed = value.upLoadInfo.failCount;
750 table.upload.untreated = table.upload.total - table.upload.success - table.upload.failed;
751 totalCount += table.upload.total;
752 isDownload = table.download.total > 0;
753 table.download.total = value.downLoadInfo.total;
754 table.download.success = value.downLoadInfo.successCount;
755 table.download.failed = value.downLoadInfo.failCount;
756 table.download.untreated = table.download.total - table.download.success - table.download.failed;
757 detail.changeCount = (process.process == FINISHED)
758 ? value.downLoadInfo.insertCount + value.downLoadInfo.updateCount +
759 value.downLoadInfo.deleteCount
760 : 0;
761 totalCount += table.download.total;
762 }
763 if (process.process == FINISHED) {
764 RdbGeneralStore::OnSyncFinish(storeInfo, flag, syncMode, syncId);
765 } else {
766 RdbGeneralStore::OnSyncStart(storeInfo, flag, syncMode, syncId, totalCount);
767 }
768
769 if (isDownload && (process.process == FINISHED || process.process == PROCESSING) && rdbCloud != nullptr &&
770 (rdbCloud->GetLockFlag() & RdbCloud::FLAG::APPLICATION)) {
771 rdbCloud->LockCloudDB(RdbCloud::FLAG::APPLICATION);
772 }
773 }
774 if (async) {
775 async(details);
776 }
777
778 if (highMode == AUTO_SYNC_MODE && autoAsync
779 && (details.empty() || details.begin()->second.code != E_SYNC_TASK_MERGED)) {
780 autoAsync(details);
781 }
782 };
783 }
784
Release()785 int32_t RdbGeneralStore::Release()
786 {
787 auto ref = 1;
788 {
789 std::lock_guard<decltype(mutex_)> lock(mutex_);
790 if (ref_ == 0) {
791 return 0;
792 }
793 ref = --ref_;
794 }
795 ZLOGD("ref:%{public}d", ref);
796 if (ref == 0) {
797 delete this;
798 }
799 return ref;
800 }
801
AddRef()802 int32_t RdbGeneralStore::AddRef()
803 {
804 std::lock_guard<decltype(mutex_)> lock(mutex_);
805 if (ref_ == 0) {
806 return 0;
807 }
808 return ++ref_;
809 }
810
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const std::vector<Reference> & references)811 int32_t RdbGeneralStore::SetDistributedTables(const std::vector<std::string> &tables, int32_t type,
812 const std::vector<Reference> &references)
813 {
814 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
815 if (delegate_ == nullptr) {
816 ZLOGE("Database already closed! database:%{public}s, tables size:%{public}zu, type:%{public}d",
817 Anonymous::Change(storeInfo_.storeName).c_str(), tables.size(), type);
818 return GeneralError::E_ALREADY_CLOSED;
819 }
820 for (const auto &table : tables) {
821 ZLOGD("tableName:%{public}s, type:%{public}d", Anonymous::Change(table).c_str(), type);
822 auto dBStatus = delegate_->CreateDistributedTable(table, static_cast<DistributedDB::TableSyncType>(type));
823 if (dBStatus != DistributedDB::DBStatus::OK) {
824 ZLOGE("create distributed table failed, table:%{public}s, err:%{public}d",
825 Anonymous::Change(table).c_str(), dBStatus);
826 return GeneralError::E_ERROR;
827 }
828 }
829 std::vector<DistributedDB::TableReferenceProperty> properties;
830 for (const auto &reference : references) {
831 properties.push_back({ reference.sourceTable, reference.targetTable, reference.refFields });
832 }
833 auto status = delegate_->SetReference(properties);
834 if (status != DistributedDB::DBStatus::OK) {
835 ZLOGE("distributed table set reference failed, err:%{public}d", status);
836 return GeneralError::E_ERROR;
837 }
838 return GeneralError::E_OK;
839 }
840
SetTrackerTable(const std::string & tableName,const std::set<std::string> & trackerColNames,const std::string & extendColName,bool isForceUpgrade)841 int32_t RdbGeneralStore::SetTrackerTable(const std::string &tableName, const std::set<std::string> &trackerColNames,
842 const std::string &extendColName, bool isForceUpgrade)
843 {
844 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
845 if (delegate_ == nullptr) {
846 ZLOGE("database already closed! database:%{public}s, tables name:%{public}s",
847 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
848 return GeneralError::E_ALREADY_CLOSED;
849 }
850 auto status = delegate_->SetTrackerTable({ tableName, extendColName, trackerColNames, isForceUpgrade });
851 if (status == DBStatus::WITH_INVENTORY_DATA) {
852 ZLOGI("Set tracker table with inventory data, database:%{public}s, tables name:%{public}s",
853 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
854 return GeneralError::E_WITH_INVENTORY_DATA;
855 }
856 if (status != DBStatus::OK) {
857 ZLOGE("Set tracker table failed! ret:%{public}d, database:%{public}s, tables name:%{public}s",
858 status, Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
859 return GeneralError::E_ERROR;
860 }
861 return GeneralError::E_OK;
862 }
863
RemoteQuery(const std::string & device,const DistributedDB::RemoteCondition & remoteCondition)864 std::shared_ptr<Cursor> RdbGeneralStore::RemoteQuery(const std::string &device,
865 const DistributedDB::RemoteCondition &remoteCondition)
866 {
867 std::shared_ptr<DistributedDB::ResultSet> dbResultSet;
868 DistributedDB::DBStatus status =
869 delegate_->RemoteQuery(device, remoteCondition, REMOTE_QUERY_TIME_OUT, dbResultSet);
870 if (status != DistributedDB::DBStatus::OK) {
871 ZLOGE("DistributedDB remote query failed, device:%{public}s, status is %{public}d.",
872 Anonymous::Change(device).c_str(), status);
873 return nullptr;
874 }
875 return std::make_shared<RdbCursor>(dbResultSet);
876 }
877
ConvertStatus(DistributedDB::DBStatus status)878 RdbGeneralStore::GenErr RdbGeneralStore::ConvertStatus(DistributedDB::DBStatus status)
879 {
880 switch (status) {
881 case DBStatus::OK:
882 return GenErr::E_OK;
883 case DBStatus::CLOUD_NETWORK_ERROR:
884 return GenErr::E_NETWORK_ERROR;
885 case DBStatus::CLOUD_LOCK_ERROR:
886 return GenErr::E_LOCKED_BY_OTHERS;
887 case DBStatus::CLOUD_FULL_RECORDS:
888 return GenErr::E_RECODE_LIMIT_EXCEEDED;
889 case DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT:
890 return GenErr::E_NO_SPACE_FOR_ASSET;
891 case DBStatus::BUSY:
892 return GenErr::E_BUSY;
893 case DBStatus::CLOUD_SYNC_TASK_MERGED:
894 return GenErr::E_SYNC_TASK_MERGED;
895 default:
896 ZLOGI("status:0x%{public}x", status);
897 break;
898 }
899 return GenErr::E_ERROR;
900 }
901
IsValid()902 bool RdbGeneralStore::IsValid()
903 {
904 return delegate_ != nullptr;
905 }
906
RegisterDetailProgressObserver(GeneralStore::DetailAsync async)907 int32_t RdbGeneralStore::RegisterDetailProgressObserver(GeneralStore::DetailAsync async)
908 {
909 std::unique_lock<std::shared_mutex> lock(asyncMutex_);
910 async_ = std::move(async);
911 return GenErr::E_OK;
912 }
913
UnregisterDetailProgressObserver()914 int32_t RdbGeneralStore::UnregisterDetailProgressObserver()
915 {
916 std::unique_lock<std::shared_mutex> lock(asyncMutex_);
917 async_ = nullptr;
918 return GenErr::E_OK;
919 }
920
QuerySql(const std::string & sql,Values && args)921 std::pair<int32_t, VBuckets> RdbGeneralStore::QuerySql(const std::string &sql, Values &&args)
922 {
923 std::vector<DistributedDB::VBucket> changedData;
924 std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
925 auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs), true }, changedData);
926 if (status != DBStatus::OK) {
927 ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status, Anonymous::Change(sql).c_str(),
928 changedData.size());
929 return { GenErr::E_ERROR, {} };
930 }
931 return { GenErr::E_OK, ValueProxy::Convert(std::move(changedData)) };
932 }
933
GetWaterVersion(const std::string & deviceId)934 std::vector<std::string> RdbGeneralStore::GetWaterVersion(const std::string &deviceId)
935 {
936 return {};
937 }
938
OnSyncStart(const StoreInfo & storeInfo,uint32_t flag,uint32_t syncMode,uint32_t traceId,uint32_t syncCount)939 void RdbGeneralStore::OnSyncStart(const StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode, uint32_t traceId,
940 uint32_t syncCount)
941 {
942 uint32_t requiredFlag = (CLOUD_SYNC_FLAG | SEARCHABLE_FLAG);
943 if (requiredFlag != (requiredFlag & flag)) {
944 return;
945 }
946 StoreInfo info = storeInfo;
947 auto evt = std::make_unique<DataSyncEvent>(std::move(info), syncMode, DataSyncEvent::DataSyncStatus::START,
948 traceId, syncCount);
949 EventCenter::GetInstance().PostEvent(std::move(evt));
950 }
951
OnSyncFinish(const StoreInfo & storeInfo,uint32_t flag,uint32_t syncMode,uint32_t traceId)952 void RdbGeneralStore::OnSyncFinish(const StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode, uint32_t traceId)
953 {
954 uint32_t requiredFlag = (CLOUD_SYNC_FLAG | SEARCHABLE_FLAG);
955 if (requiredFlag != (requiredFlag & flag)) {
956 return;
957 }
958 StoreInfo info = storeInfo;
959 auto evt = std::make_unique<DataSyncEvent>(std::move(info), syncMode, DataSyncEvent::DataSyncStatus::FINISH,
960 traceId);
961 EventCenter::GetInstance().PostEvent(std::move(evt));
962 }
963
GetTables()964 std::set<std::string> RdbGeneralStore::GetTables()
965 {
966 std::set<std::string> tables;
967 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
968 if (delegate_ == nullptr) {
969 ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
970 return tables;
971 }
972 auto [errCode, res] = QuerySql(QUERY_TABLES_SQL, {});
973 if (errCode != GenErr::E_OK) {
974 return tables;
975 }
976 for (auto &table : res) {
977 auto it = table.find("name");
978 if (it == table.end() || TYPE_INDEX<std::string> != it->second.index()) {
979 ZLOGW("error res! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
980 continue;
981 }
982 tables.emplace(std::move(*std::get_if<std::string>(&(it->second))));
983 }
984 return tables;
985 }
986
GetIntersection(std::vector<std::string> && syncTables,const std::set<std::string> & localTables)987 std::vector<std::string> RdbGeneralStore::GetIntersection(std::vector<std::string> &&syncTables,
988 const std::set<std::string> &localTables)
989 {
990 std::vector<std::string> res;
991 for (auto &it : syncTables) {
992 if (localTables.count(it)) {
993 res.push_back(std::move(it));
994 }
995 }
996 return res;
997 }
998
PostDataChange(const StoreMetaData & meta,const std::vector<std::string> & tables,ChangeType type)999 void RdbGeneralStore::ObserverProxy::PostDataChange(const StoreMetaData &meta,
1000 const std::vector<std::string> &tables, ChangeType type)
1001 {
1002 RemoteChangeEvent::DataInfo info;
1003 info.userId = meta.user;
1004 info.storeId = meta.storeId;
1005 info.deviceId = meta.deviceId;
1006 info.bundleName = meta.bundleName;
1007 info.tables = tables;
1008 info.changeType = type;
1009 auto evt = std::make_unique<RemoteChangeEvent>(RemoteChangeEvent::DATA_CHANGE, std::move(info));
1010 EventCenter::GetInstance().PostEvent(std::move(evt));
1011 }
1012
OnChange(const DBChangedIF & data)1013 void RdbGeneralStore::ObserverProxy::OnChange(const DBChangedIF &data)
1014 {
1015 if (!HasWatcher()) {
1016 return;
1017 }
1018 std::string device = data.GetDataChangeDevice();
1019 auto networkId = DmAdapter::GetInstance().ToNetworkID(device);
1020 ZLOGD("store:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
1021 Anonymous::Change(device).c_str());
1022 GenOrigin genOrigin;
1023 genOrigin.origin = GenOrigin::ORIGIN_NEARBY;
1024 genOrigin.dataType = GenOrigin::BASIC_DATA;
1025 DistributedDB::StoreProperty property;
1026 data.GetStoreProperty(property);
1027 genOrigin.id.push_back(networkId);
1028 genOrigin.store = storeId_;
1029 GeneralWatcher::ChangeInfo changeInfo{};
1030 watcher_->OnChange(genOrigin, {}, std::move(changeInfo));
1031 return;
1032 }
1033
OnChange(DBOrigin origin,const std::string & originalId,DBChangedData && data)1034 void RdbGeneralStore::ObserverProxy::OnChange(DBOrigin origin, const std::string &originalId, DBChangedData &&data)
1035 {
1036 if (!HasWatcher()) {
1037 return;
1038 }
1039 ZLOGD("store:%{public}s table:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
1040 Anonymous::Change(data.tableName).c_str(), Anonymous::Change(originalId).c_str());
1041 GenOrigin genOrigin;
1042 genOrigin.origin = (origin == DBOrigin::ORIGIN_LOCAL)
1043 ? GenOrigin::ORIGIN_LOCAL
1044 : (origin == DBOrigin::ORIGIN_CLOUD) ? GenOrigin::ORIGIN_CLOUD : GenOrigin::ORIGIN_NEARBY;
1045 genOrigin.dataType = data.type == DistributedDB::ASSET ? GenOrigin::ASSET_DATA : GenOrigin::BASIC_DATA;
1046 genOrigin.id.push_back(originalId);
1047 genOrigin.store = storeId_;
1048 Watcher::PRIFields fields;
1049 Watcher::ChangeInfo changeInfo;
1050 bool notifyFlag = false;
1051 for (uint32_t i = 0; i < DistributedDB::OP_BUTT; ++i) {
1052 auto &info = changeInfo[data.tableName][i];
1053 for (auto &priData : data.primaryData[i]) {
1054 Watcher::PRIValue value;
1055 Convert(std::move(*(priData.begin())), value);
1056 if (notifyFlag || origin != DBOrigin::ORIGIN_CLOUD || i != DistributedDB::OP_DELETE) {
1057 info.push_back(std::move(value));
1058 continue;
1059 }
1060 auto deleteKey = std::get_if<std::string>(&value);
1061 if (deleteKey != nullptr && (*deleteKey == LOGOUT_DELETE_FLAG)) {
1062 // notify to start app
1063 notifyFlag = true;
1064 }
1065 info.push_back(std::move(value));
1066 }
1067 }
1068 if (notifyFlag) {
1069 PostDataChange(meta_, {}, CLOUD_DATA_CLEAN);
1070 }
1071 if (!data.field.empty()) {
1072 fields[std::move(data.tableName)] = std::move(*(data.field.begin()));
1073 }
1074 watcher_->OnChange(genOrigin, fields, std::move(changeInfo));
1075 }
1076
LockCloudDB()1077 std::pair<int32_t, uint32_t> RdbGeneralStore::LockCloudDB()
1078 {
1079 auto rdbCloud = GetRdbCloud();
1080 if (rdbCloud == nullptr) {
1081 return { GeneralError::E_ERROR, 0 };
1082 }
1083 return rdbCloud->LockCloudDB(RdbCloud::FLAG::APPLICATION);
1084 }
1085
UnLockCloudDB()1086 int32_t RdbGeneralStore::UnLockCloudDB()
1087 {
1088 auto rdbCloud = GetRdbCloud();
1089 if (rdbCloud == nullptr) {
1090 return GeneralError::E_ERROR;
1091 }
1092 return rdbCloud->UnLockCloudDB(RdbCloud::FLAG::APPLICATION);
1093 }
1094
GetRdbCloud() const1095 std::shared_ptr<RdbCloud> RdbGeneralStore::GetRdbCloud() const
1096 {
1097 std::shared_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
1098 return rdbCloud_;
1099 }
1100
IsFinished(SyncId syncId) const1101 bool RdbGeneralStore::IsFinished(SyncId syncId) const
1102 {
1103 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1104 if (delegate_ == nullptr) {
1105 ZLOGE("database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
1106 return true;
1107 }
1108 return delegate_->GetCloudTaskStatus(syncId).process == DistributedDB::FINISHED;
1109 }
1110
GetFinishTask(SyncId syncId)1111 Executor::Task RdbGeneralStore::GetFinishTask(SyncId syncId)
1112 {
1113 return [this, executor = executor_, task = tasks_, syncId]() {
1114 auto [exist, finishTask] = task->Find(syncId);
1115 if (!exist || finishTask.cb == nullptr) {
1116 task->Erase(syncId);
1117 return;
1118 }
1119 if (!IsFinished(syncId)) {
1120 task->ComputeIfPresent(syncId, [executor = executor_, this](SyncId syncId, FinishTask &task) {
1121 task.taskId = executor->Schedule(std::chrono::minutes(INTERVAL), GetFinishTask(syncId));
1122 return true;
1123 });
1124 return;
1125 }
1126 DBProcessCB cb;
1127 task->ComputeIfPresent(syncId, [&cb, executor = executor_](SyncId syncId, const FinishTask &task) {
1128 cb = task.cb;
1129 return false;
1130 });
1131 if (cb != nullptr) {
1132 ZLOGW("database:%{public}s syncId:%{public}" PRIu64 " miss finished. ",
1133 Anonymous::Change(storeInfo_.storeName).c_str(), syncId);
1134 std::map<std::string, SyncProcess> result;
1135 result.insert({ "", { DistributedDB::FINISHED, DBStatus::DB_ERROR } });
1136 cb(result);
1137 }
1138 };
1139 }
1140
SetExecutor(std::shared_ptr<Executor> executor)1141 void RdbGeneralStore::SetExecutor(std::shared_ptr<Executor> executor)
1142 {
1143 if (executor_ == nullptr) {
1144 executor_ = executor;
1145 }
1146 }
1147
RemoveTasks()1148 void RdbGeneralStore::RemoveTasks()
1149 {
1150 if (tasks_ == nullptr) {
1151 return;
1152 }
1153 std::list<DBProcessCB> cbs;
1154 std::list<TaskId> taskIds;
1155 tasks_->EraseIf([&cbs, &taskIds, store = storeInfo_.storeName](SyncId syncId, const FinishTask &task) {
1156 if (task.cb != nullptr) {
1157 ZLOGW("database:%{public}s syncId:%{public}" PRIu64 " miss finished. ", Anonymous::Change(store).c_str(),
1158 syncId);
1159 }
1160 cbs.push_back(std::move(task.cb));
1161 taskIds.push_back(task.taskId);
1162 return true;
1163 });
1164 if (executor_ != nullptr) {
1165 for (auto taskId : taskIds) {
1166 executor_->Remove(taskId, true);
1167 }
1168 }
1169 std::map<std::string, SyncProcess> result;
1170 result.insert({ "", { DistributedDB::FINISHED, DBStatus::DB_ERROR } });
1171 for (auto &cb : cbs) {
1172 if (cb != nullptr) {
1173 cb(result);
1174 }
1175 }
1176 }
1177
GetCB(SyncId syncId)1178 RdbGeneralStore::DBProcessCB RdbGeneralStore::GetCB(SyncId syncId)
1179 {
1180 return [task = tasks_, executor = executor_, syncId](const std::map<std::string, SyncProcess> &progress) {
1181 if (task == nullptr) {
1182 return;
1183 }
1184 DBProcessCB cb;
1185 task->ComputeIfPresent(syncId, [&cb, &progress, executor](SyncId syncId, FinishTask &finishTask) {
1186 cb = finishTask.cb;
1187 bool isFinished = !progress.empty() && progress.begin()->second.process == DistributedDB::FINISHED;
1188 if (isFinished) {
1189 finishTask.cb = nullptr;
1190 }
1191 return true;
1192 });
1193 if (cb != nullptr) {
1194 cb(progress);
1195 }
1196 return;
1197 };
1198 }
1199 } // namespace OHOS::DistributedRdb