1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "RdbStoreImpl"
16 #include "rdb_store_impl.h"
17
18 #include <algorithm>
19 #include <chrono>
20 #include <cinttypes>
21 #include <cstdint>
22 #include <memory>
23 #include <mutex>
24 #include <sstream>
25 #include <string>
26 #include <unistd.h>
27
28 #include "cache_result_set.h"
29 #include "directory_ex.h"
30 #include "logger.h"
31 #include "rdb_common.h"
32 #include "rdb_errno.h"
33 #include "rdb_fault_hiview_reporter.h"
34 #include "rdb_radar_reporter.h"
35 #include "rdb_security_manager.h"
36 #include "rdb_sql_statistic.h"
37 #include "rdb_store.h"
38 #include "rdb_trace.h"
39 #include "rdb_types.h"
40 #include "relational_store_client.h"
41 #include "sqlite_global_config.h"
42 #include "sqlite_sql_builder.h"
43 #include "sqlite_statement.h"
44 #include "sqlite_utils.h"
45 #include "step_result_set.h"
46 #include "values_buckets.h"
47 #include "task_executor.h"
48 #include "traits.h"
49 #include "transaction.h"
50 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
51 #include "delay_notify.h"
52 #include "raw_data_parser.h"
53 #include "rdb_device_manager_adapter.h"
54 #include "rdb_manager_impl.h"
55 #include "relational_store_manager.h"
56 #include "runtime_config.h"
57 #include "security_policy.h"
58 #include "sqlite_shared_result_set.h"
59 #endif
60
61 #ifdef WINDOWS_PLATFORM
62 #define ISFILE(filePath) ((filePath.find("\\") == std::string::npos))
63 #else
64 #define ISFILE(filePath) ((filePath.find("/") == std::string::npos))
65 #endif
66
67 namespace OHOS::NativeRdb {
68 using namespace OHOS::Rdb;
69 using namespace std::chrono;
70 using SqlStatistic = DistributedRdb::SqlStatistic;
71 using RdbNotifyConfig = DistributedRdb::RdbNotifyConfig;
72 using Reportor = RdbFaultHiViewReporter;
73 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
74 using RdbMgr = DistributedRdb::RdbManagerImpl;
75 #endif
76
77 static constexpr const char *BEGIN_TRANSACTION_SQL = "begin;";
78 static constexpr const char *COMMIT_TRANSACTION_SQL = "commit;";
79 static constexpr const char *ROLLBACK_TRANSACTION_SQL = "rollback;";
80 static constexpr const char *BACKUP_RESTORE = "backup.restore";
81 constexpr int64_t TIME_OUT = 1500;
82
InitSyncerParam(const RdbStoreConfig & config,bool created)83 void RdbStoreImpl::InitSyncerParam(const RdbStoreConfig &config, bool created)
84 {
85 syncerParam_.bundleName_ = config.GetBundleName();
86 syncerParam_.hapName_ = config.GetModuleName();
87 syncerParam_.storeName_ = config.GetName();
88 syncerParam_.customDir_ = config.GetCustomDir();
89 syncerParam_.area_ = config.GetArea();
90 syncerParam_.level_ = static_cast<int32_t>(config.GetSecurityLevel());
91 syncerParam_.type_ = config.GetDistributedType();
92 syncerParam_.isEncrypt_ = config.IsEncrypt();
93 syncerParam_.isAutoClean_ = config.GetAutoClean();
94 syncerParam_.isSearchable_ = config.IsSearchable();
95 syncerParam_.password_ = config.GetEncryptKey();
96 syncerParam_.haMode_ = config.GetHaMode();
97 syncerParam_.roleType_ = config.GetRoleType();
98 if (created) {
99 syncerParam_.infos_ = Connection::Collect(config);
100 }
101 }
102
InnerOpen()103 int RdbStoreImpl::InnerOpen()
104 {
105 isOpen_ = true;
106 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
107 if (isReadOnly_) {
108 return E_OK;
109 }
110
111 AfterOpen(syncerParam_);
112 int errCode = RegisterDataChangeCallback();
113 if (errCode != E_OK) {
114 LOG_ERROR("RegisterCallBackObserver is failed, err is %{public}d.", errCode);
115 }
116 #endif
117 return E_OK;
118 }
119
120 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
AfterOpen(const RdbParam & param,int32_t retry)121 void RdbStoreImpl::AfterOpen(const RdbParam ¶m, int32_t retry)
122 {
123 auto [err, service] = RdbMgr::GetInstance().GetRdbService(param);
124 if (err == E_NOT_SUPPORT) {
125 return;
126 }
127 if (err != E_OK || service == nullptr) {
128 LOG_ERROR("GetRdbService failed, err: %{public}d, storeName: %{public}s.", err,
129 SqliteUtils::Anonymous(param.storeName_).c_str());
130 auto pool = TaskExecutor::GetInstance().GetExecutor();
131 if (err == E_SERVICE_NOT_FOUND && pool != nullptr && retry++ < MAX_RETRY_TIMES) {
132 pool->Schedule(std::chrono::seconds(RETRY_INTERVAL), [param, retry]() {
133 AfterOpen(param, retry);
134 });
135 }
136 return;
137 }
138 err = service->AfterOpen(param);
139 if (err != E_OK) {
140 LOG_ERROR("AfterOpen failed, err: %{public}d, storeName: %{public}s.", err,
141 SqliteUtils::Anonymous(param.storeName_).c_str());
142 }
143 }
144
GetModifyTime(const std::string & table,const std::string & columnName,std::vector<PRIKey> & keys)145 RdbStore::ModifyTime RdbStoreImpl::GetModifyTime(const std::string &table, const std::string &columnName,
146 std::vector<PRIKey> &keys)
147 {
148 if (table.empty() || columnName.empty() || keys.empty()) {
149 LOG_ERROR("invalid para.");
150 return {};
151 }
152
153 auto logTable = DistributedDB::RelationalStoreManager::GetDistributedLogTableName(table);
154 if (SqliteUtils::StrToUpper(columnName) == ROW_ID) {
155 return GetModifyTimeByRowId(logTable, keys);
156 }
157 std::vector<ValueObject> hashKeys;
158 hashKeys.reserve(keys.size());
159 std::map<std::vector<uint8_t>, PRIKey> keyMap;
160 std::map<std::string, DistributedDB::Type> tmp;
161 for (const auto &key : keys) {
162 DistributedDB::Type value;
163 RawDataParser::Convert(key, value);
164 tmp[columnName] = value;
165 auto hashKey = DistributedDB::RelationalStoreManager::CalcPrimaryKeyHash(tmp);
166 if (hashKey.empty()) {
167 LOG_DEBUG("hash key fail");
168 continue;
169 }
170 hashKeys.emplace_back(ValueObject(hashKey));
171 keyMap[hashKey] = key;
172 }
173
174 std::string sql;
175 sql.append("select hash_key as key, timestamp/10000 as modify_time from ");
176 sql.append(logTable);
177 sql.append(" where hash_key in (");
178 sql.append(SqliteSqlBuilder::GetSqlArgs(hashKeys.size()));
179 sql.append(")");
180 auto resultSet = QueryByStep(sql, hashKeys, true);
181 int count = 0;
182 if (resultSet == nullptr || resultSet->GetRowCount(count) != E_OK || count <= 0) {
183 LOG_ERROR("get resultSet err.");
184 return {};
185 }
186 return { resultSet, keyMap, false };
187 }
188
GetModifyTimeByRowId(const std::string & logTable,std::vector<PRIKey> & keys)189 RdbStore::ModifyTime RdbStoreImpl::GetModifyTimeByRowId(const std::string &logTable, std::vector<PRIKey> &keys)
190 {
191 std::string sql;
192 sql.append("select data_key as key, timestamp/10000 as modify_time from ");
193 sql.append(logTable);
194 sql.append(" where data_key in (");
195 sql.append(SqliteSqlBuilder::GetSqlArgs(keys.size()));
196 sql.append(")");
197 std::vector<ValueObject> args;
198 args.reserve(keys.size());
199 for (auto &key : keys) {
200 ValueObject::Type value;
201 RawDataParser::Convert(key, value);
202 args.emplace_back(ValueObject(value));
203 }
204 auto resultSet = QueryByStep(sql, args, true);
205 int count = 0;
206 if (resultSet == nullptr || resultSet->GetRowCount(count) != E_OK || count <= 0) {
207 LOG_ERROR("get resultSet err.");
208 return {};
209 }
210 return ModifyTime(resultSet, {}, true);
211 }
212
CleanDirtyData(const std::string & table,uint64_t cursor)213 int RdbStoreImpl::CleanDirtyData(const std::string &table, uint64_t cursor)
214 {
215 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
216 LOG_ERROR("not support. table:%{public}s, isRead:%{public}d, dbType:%{public}d",
217 SqliteUtils::Anonymous(table).c_str(), isReadOnly_, config_.GetDBType());
218 return E_NOT_SUPPORT;
219 }
220 auto connection = connectionPool_->AcquireConnection(false);
221 if (connection == nullptr) {
222 LOG_ERROR("db is busy. table:%{public}s", SqliteUtils::Anonymous(table).c_str());
223 return E_DATABASE_BUSY;
224 }
225 int errCode = connection->CleanDirtyData(table, cursor);
226 return errCode;
227 }
228
GetLogTableName(const std::string & tableName)229 std::string RdbStoreImpl::GetLogTableName(const std::string &tableName)
230 {
231 return DistributedDB::RelationalStoreManager::GetDistributedLogTableName(tableName);
232 }
233
QuerySharingResource(const AbsRdbPredicates & predicates,const Fields & columns)234 std::pair<int32_t, std::shared_ptr<ResultSet>> RdbStoreImpl::QuerySharingResource(const AbsRdbPredicates &predicates,
235 const Fields &columns)
236 {
237 if (config_.GetDBType() == DB_VECTOR) {
238 return { E_NOT_SUPPORT, nullptr };
239 }
240 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
241 if (errCode != E_OK) {
242 return { errCode, nullptr };
243 }
244 auto [status, resultSet] =
245 service->QuerySharingResource(syncerParam_, predicates.GetDistributedPredicates(), columns);
246 if (status != E_OK) {
247 return { status, nullptr };
248 }
249 return { status, resultSet };
250 }
251
RemoteQuery(const std::string & device,const AbsRdbPredicates & predicates,const Fields & columns,int & errCode)252 std::shared_ptr<ResultSet> RdbStoreImpl::RemoteQuery(const std::string &device, const AbsRdbPredicates &predicates,
253 const Fields &columns, int &errCode)
254 {
255 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
256 if (config_.GetDBType() == DB_VECTOR) {
257 return nullptr;
258 }
259 std::vector<std::string> selectionArgs = predicates.GetWhereArgs();
260 std::string sql = SqliteSqlBuilder::BuildQueryString(predicates, columns);
261 auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
262 if (err == E_NOT_SUPPORT) {
263 errCode = err;
264 return nullptr;
265 }
266 if (err != E_OK) {
267 LOG_ERROR("RdbStoreImpl::RemoteQuery get service failed");
268 errCode = err;
269 return nullptr;
270 }
271 auto [status, resultSet] = service->RemoteQuery(syncerParam_, device, sql, selectionArgs);
272 errCode = status;
273 return resultSet;
274 }
275
NotifyDataChange()276 void RdbStoreImpl::NotifyDataChange()
277 {
278 int errCode = RegisterDataChangeCallback();
279 if (errCode != E_OK) {
280 LOG_ERROR("RegisterDataChangeCallback is failed, err is %{public}d.", errCode);
281 }
282 DistributedRdb::RdbChangedData rdbChangedData;
283 if (delayNotifier_ != nullptr) {
284 delayNotifier_->UpdateNotify(rdbChangedData, true);
285 }
286 }
287
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const DistributedRdb::DistributedConfig & distributedConfig)288 int RdbStoreImpl::SetDistributedTables(const std::vector<std::string> &tables, int32_t type,
289 const DistributedRdb::DistributedConfig &distributedConfig)
290 {
291 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
292 if (config_.GetDBType() == DB_VECTOR || isReadOnly_) {
293 return E_NOT_SUPPORT;
294 }
295 if (tables.empty()) {
296 LOG_WARN("The distributed tables to be set is empty.");
297 return E_OK;
298 }
299 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
300 if (errCode != E_OK) {
301 return errCode;
302 }
303 int32_t errorCode = service->SetDistributedTables(syncerParam_, tables, distributedConfig.references,
304 distributedConfig.isRebuild, type);
305 if (errorCode != E_OK) {
306 LOG_ERROR("Fail to set distributed tables, error=%{public}d", errorCode);
307 return errorCode;
308 }
309 if (type != DistributedRdb::DISTRIBUTED_CLOUD) {
310 return E_OK;
311 }
312 auto conn = connectionPool_->AcquireConnection(false);
313 if (conn != nullptr) {
314 auto strategy = conn->GenerateExchangeStrategy(slaveStatus_);
315 if (strategy == ExchangeStrategy::BACKUP) {
316 (void)conn->Backup({}, {}, false, slaveStatus_);
317 }
318 }
319 {
320 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
321 if (distributedConfig.autoSync) {
322 cloudInfo_->AddTables(tables);
323 } else {
324 cloudInfo_->RmvTables(tables);
325 return E_OK;
326 }
327 }
328 auto isRebuilt = RebuiltType::NONE;
329 GetRebuilt(isRebuilt);
330 if (isRebuilt == RebuiltType::REBUILT) {
331 DoCloudSync("");
332 }
333 return E_OK;
334 }
335
ObtainDistributedTableName(const std::string & device,const std::string & table,int & errCode)336 std::string RdbStoreImpl::ObtainDistributedTableName(const std::string &device, const std::string &table, int &errCode)
337 {
338 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
339 if (config_.GetDBType() == DB_VECTOR) {
340 return "";
341 }
342 std::string uuid;
343 DeviceManagerAdaptor::RdbDeviceManagerAdaptor &deviceManager =
344 DeviceManagerAdaptor::RdbDeviceManagerAdaptor::GetInstance(syncerParam_.bundleName_);
345 errCode = deviceManager.GetEncryptedUuidByNetworkId(device, uuid);
346 if (errCode != E_OK) {
347 LOG_ERROR("GetUuid is failed.");
348 return "";
349 }
350
351 auto translateCall = [uuid](const std::string &oriDevId, const DistributedDB::StoreInfo &info) {
352 return uuid;
353 };
354 DistributedDB::RuntimeConfig::SetTranslateToDeviceIdCallback(translateCall);
355
356 return DistributedDB::RelationalStoreManager::GetDistributedTableName(uuid, table);
357 }
358
Sync(const SyncOption & option,const AbsRdbPredicates & predicate,const AsyncBrief & callback)359 int RdbStoreImpl::Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncBrief &callback)
360 {
361 if (config_.GetDBType() == DB_VECTOR) {
362 return E_NOT_SUPPORT;
363 }
364 return Sync(option, predicate, [callback](Details &&details) {
365 Briefs briefs;
366 for (auto &[key, value] : details) {
367 briefs.insert_or_assign(key, value.code);
368 }
369 if (callback != nullptr) {
370 callback(briefs);
371 }
372 });
373 }
374
Sync(const SyncOption & option,const std::vector<std::string> & tables,const AsyncDetail & async)375 int RdbStoreImpl::Sync(const SyncOption &option, const std::vector<std::string> &tables, const AsyncDetail &async)
376 {
377 return Sync(option, AbsRdbPredicates(tables), async);
378 }
379
Sync(const SyncOption & option,const AbsRdbPredicates & predicate,const AsyncDetail & async)380 int RdbStoreImpl::Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncDetail &async)
381 {
382 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
383 DistributedRdb::RdbService::Option rdbOption;
384 rdbOption.mode = option.mode;
385 rdbOption.isAsync = !option.isBlock;
386 RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
387 ret = InnerSync(syncerParam_, rdbOption, predicate.GetDistributedPredicates(), async);
388 return ret;
389 }
390
InnerSync(const RdbParam & param,const Options & option,const Memo & predicates,const AsyncDetail & async)391 int RdbStoreImpl::InnerSync(const RdbParam ¶m, const Options &option, const Memo &predicates,
392 const AsyncDetail &async)
393 {
394 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(param);
395 if (errCode == E_NOT_SUPPORT) {
396 return errCode;
397 }
398 if (errCode != E_OK) {
399 LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
400 param.bundleName_.c_str());
401 return errCode;
402 }
403 errCode = service->Sync(param, option, predicates, async);
404 if (errCode != E_OK) {
405 LOG_ERROR("Sync is failed, err is %{public}d.", errCode);
406 return errCode;
407 }
408 return E_OK;
409 }
410
GetUri(const std::string & event)411 Uri RdbStoreImpl::GetUri(const std::string &event)
412 {
413 std::string rdbUri;
414 if (config_.GetDataGroupId().empty()) {
415 rdbUri = SCHEME_RDB + config_.GetBundleName() + "/" + path_ + "/" + event;
416 } else {
417 rdbUri = SCHEME_RDB + config_.GetDataGroupId() + "/" + path_ + "/" + event;
418 }
419 return Uri(rdbUri);
420 }
421
SubscribeLocal(const SubscribeOption & option,RdbStoreObserver * observer)422 int RdbStoreImpl::SubscribeLocal(const SubscribeOption& option, RdbStoreObserver *observer)
423 {
424 std::lock_guard<std::mutex> lock(mutex_);
425 localObservers_.try_emplace(option.event);
426 auto &list = localObservers_.find(option.event)->second;
427 for (auto it = list.begin(); it != list.end(); it++) {
428 if ((*it)->getObserver() == observer) {
429 LOG_ERROR("duplicate subscribe.");
430 return E_OK;
431 }
432 }
433
434 localObservers_[option.event].push_back(std::make_shared<RdbStoreLocalObserver>(observer));
435 return E_OK;
436 }
437
SubscribeLocalShared(const SubscribeOption & option,RdbStoreObserver * observer)438 int RdbStoreImpl::SubscribeLocalShared(const SubscribeOption& option, RdbStoreObserver *observer)
439 {
440 std::lock_guard<std::mutex> lock(mutex_);
441 localSharedObservers_.try_emplace(option.event);
442 auto &list = localSharedObservers_.find(option.event)->second;
443 for (auto it = list.begin(); it != list.end(); it++) {
444 if ((*it)->getObserver() == observer) {
445 LOG_ERROR("duplicate subscribe.");
446 return E_OK;
447 }
448 }
449
450 auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
451 if (client == nullptr) {
452 LOG_ERROR("Failed to get DataObsMgrClient.");
453 return E_GET_DATAOBSMGRCLIENT_FAIL;
454 }
455 sptr<RdbStoreLocalSharedObserver> localSharedObserver(new (std::nothrow) RdbStoreLocalSharedObserver(observer));
456 int32_t err = client->RegisterObserver(GetUri(option.event), localSharedObserver);
457 if (err != 0) {
458 LOG_ERROR("Subscribe failed.");
459 return err;
460 }
461 localSharedObservers_[option.event].push_back(std::move(localSharedObserver));
462 return E_OK;
463 }
464
SubscribeLocalDetail(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)465 int32_t RdbStoreImpl::SubscribeLocalDetail(const SubscribeOption &option,
466 const std::shared_ptr<RdbStoreObserver> &observer)
467 {
468 auto connection = connectionPool_->AcquireConnection(false);
469 if (connection == nullptr) {
470 return E_DATABASE_BUSY;
471 }
472 int32_t errCode = connection->Subscribe(option.event, observer);
473 if (errCode != E_OK) {
474 LOG_ERROR("subscribe local detail observer failed. db name:%{public}s errCode:%{public}" PRId32,
475 SqliteUtils::Anonymous(config_.GetName()).c_str(), errCode);
476 }
477 return errCode;
478 }
479
SubscribeRemote(const SubscribeOption & option,RdbStoreObserver * observer)480 int RdbStoreImpl::SubscribeRemote(const SubscribeOption& option, RdbStoreObserver *observer)
481 {
482 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
483 if (errCode != E_OK) {
484 return errCode;
485 }
486 return service->Subscribe(syncerParam_, option, observer);
487 }
488
Subscribe(const SubscribeOption & option,RdbStoreObserver * observer)489 int RdbStoreImpl::Subscribe(const SubscribeOption &option, RdbStoreObserver *observer)
490 {
491 if (config_.GetDBType() == DB_VECTOR) {
492 return E_NOT_SUPPORT;
493 }
494 if (option.mode == SubscribeMode::LOCAL) {
495 return SubscribeLocal(option, observer);
496 }
497 if (option.mode == SubscribeMode::LOCAL_SHARED) {
498 return SubscribeLocalShared(option, observer);
499 }
500 return SubscribeRemote(option, observer);
501 }
502
UnSubscribeLocal(const SubscribeOption & option,RdbStoreObserver * observer)503 int RdbStoreImpl::UnSubscribeLocal(const SubscribeOption& option, RdbStoreObserver *observer)
504 {
505 std::lock_guard<std::mutex> lock(mutex_);
506 auto obs = localObservers_.find(option.event);
507 if (obs == localObservers_.end()) {
508 return E_OK;
509 }
510
511 auto &list = obs->second;
512 for (auto it = list.begin(); it != list.end(); it++) {
513 if ((*it)->getObserver() == observer) {
514 it = list.erase(it);
515 break;
516 }
517 }
518
519 if (list.empty()) {
520 localObservers_.erase(option.event);
521 }
522 return E_OK;
523 }
524
UnSubscribeLocalAll(const SubscribeOption & option)525 int RdbStoreImpl::UnSubscribeLocalAll(const SubscribeOption& option)
526 {
527 std::lock_guard<std::mutex> lock(mutex_);
528 auto obs = localObservers_.find(option.event);
529 if (obs == localObservers_.end()) {
530 return E_OK;
531 }
532
533 localObservers_.erase(option.event);
534 return E_OK;
535 }
536
UnSubscribeLocalShared(const SubscribeOption & option,RdbStoreObserver * observer)537 int RdbStoreImpl::UnSubscribeLocalShared(const SubscribeOption& option, RdbStoreObserver *observer)
538 {
539 std::lock_guard<std::mutex> lock(mutex_);
540 auto obs = localSharedObservers_.find(option.event);
541 if (obs == localSharedObservers_.end()) {
542 return E_OK;
543 }
544
545 auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
546 if (client == nullptr) {
547 LOG_ERROR("Failed to get DataObsMgrClient.");
548 return E_GET_DATAOBSMGRCLIENT_FAIL;
549 }
550
551 auto &list = obs->second;
552 for (auto it = list.begin(); it != list.end(); it++) {
553 if ((*it)->getObserver() == observer) {
554 int32_t err = client->UnregisterObserver(GetUri(option.event), *it);
555 if (err != 0) {
556 LOG_ERROR("UnSubscribeLocalShared failed.");
557 return err;
558 }
559 list.erase(it);
560 break;
561 }
562 }
563 if (list.empty()) {
564 localSharedObservers_.erase(option.event);
565 }
566 return E_OK;
567 }
568
UnSubscribeLocalSharedAll(const SubscribeOption & option)569 int RdbStoreImpl::UnSubscribeLocalSharedAll(const SubscribeOption& option)
570 {
571 std::lock_guard<std::mutex> lock(mutex_);
572 auto obs = localSharedObservers_.find(option.event);
573 if (obs == localSharedObservers_.end()) {
574 return E_OK;
575 }
576
577 auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
578 if (client == nullptr) {
579 LOG_ERROR("Failed to get DataObsMgrClient.");
580 return E_GET_DATAOBSMGRCLIENT_FAIL;
581 }
582
583 auto &list = obs->second;
584 auto it = list.begin();
585 while (it != list.end()) {
586 int32_t err = client->UnregisterObserver(GetUri(option.event), *it);
587 if (err != 0) {
588 LOG_ERROR("UnSubscribe failed.");
589 return err;
590 }
591 it = list.erase(it);
592 }
593
594 localSharedObservers_.erase(option.event);
595 return E_OK;
596 }
597
UnsubscribeLocalDetail(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)598 int32_t RdbStoreImpl::UnsubscribeLocalDetail(const SubscribeOption& option,
599 const std::shared_ptr<RdbStoreObserver> &observer)
600 {
601 auto connection = connectionPool_->AcquireConnection(false);
602 if (connection == nullptr) {
603 return E_DATABASE_BUSY;
604 }
605 int32_t errCode = connection->Unsubscribe(option.event, observer);
606 if (errCode != E_OK) {
607 LOG_ERROR("unsubscribe local detail observer failed. db name:%{public}s errCode:%{public}" PRId32,
608 SqliteUtils::Anonymous(config_.GetName()).c_str(), errCode);
609 }
610 return errCode;
611 }
612
UnSubscribeRemote(const SubscribeOption & option,RdbStoreObserver * observer)613 int RdbStoreImpl::UnSubscribeRemote(const SubscribeOption& option, RdbStoreObserver *observer)
614 {
615 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
616 if (errCode != E_OK) {
617 return errCode;
618 }
619 return service->UnSubscribe(syncerParam_, option, observer);
620 }
621
UnSubscribe(const SubscribeOption & option,RdbStoreObserver * observer)622 int RdbStoreImpl::UnSubscribe(const SubscribeOption &option, RdbStoreObserver *observer)
623 {
624 if (config_.GetDBType() == DB_VECTOR) {
625 return E_NOT_SUPPORT;
626 }
627 if (option.mode == SubscribeMode::LOCAL && observer) {
628 return UnSubscribeLocal(option, observer);
629 } else if (option.mode == SubscribeMode::LOCAL && !observer) {
630 return UnSubscribeLocalAll(option);
631 } else if (option.mode == SubscribeMode::LOCAL_SHARED && observer) {
632 return UnSubscribeLocalShared(option, observer);
633 } else if (option.mode == SubscribeMode::LOCAL_SHARED && !observer) {
634 return UnSubscribeLocalSharedAll(option);
635 }
636 return UnSubscribeRemote(option, observer);
637 }
638
SubscribeObserver(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)639 int RdbStoreImpl::SubscribeObserver(const SubscribeOption& option, const std::shared_ptr<RdbStoreObserver> &observer)
640 {
641 if (config_.GetDBType() == DB_VECTOR) {
642 return E_NOT_SUPPORT;
643 }
644 return SubscribeLocalDetail(option, observer);
645 }
646
UnsubscribeObserver(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)647 int RdbStoreImpl::UnsubscribeObserver(const SubscribeOption& option, const std::shared_ptr<RdbStoreObserver> &observer)
648 {
649 if (config_.GetDBType() == DB_VECTOR) {
650 return E_NOT_SUPPORT;
651 }
652 return UnsubscribeLocalDetail(option, observer);
653 }
654
Notify(const std::string & event)655 int RdbStoreImpl::Notify(const std::string &event)
656 {
657 if (config_.GetDBType() == DB_VECTOR) {
658 return E_NOT_SUPPORT;
659 }
660 auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
661 if (client == nullptr) {
662 LOG_ERROR("Failed to get DataObsMgrClient.");
663 return E_GET_DATAOBSMGRCLIENT_FAIL;
664 }
665 int32_t err = client->NotifyChange(GetUri(event));
666 if (err != 0) {
667 LOG_ERROR("Notify failed.");
668 }
669
670 std::lock_guard<std::mutex> lock(mutex_);
671 auto obs = localObservers_.find(event);
672 if (obs != localObservers_.end()) {
673 auto &list = obs->second;
674 for (auto &it : list) {
675 it->OnChange();
676 }
677 }
678 return E_OK;
679 }
680
SetSearchable(bool isSearchable)681 int RdbStoreImpl::SetSearchable(bool isSearchable)
682 {
683 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
684 if (errCode != E_OK || service == nullptr) {
685 LOG_ERROR("GetRdbService is failed, err is %{public}d.", errCode);
686 return errCode;
687 }
688 return service->SetSearchable(syncerParam_, isSearchable);
689 }
690
RegisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)691 int RdbStoreImpl::RegisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)
692 {
693 if (config_.GetDBType() == DB_VECTOR) {
694 return E_NOT_SUPPORT;
695 }
696 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
697 if (errCode != E_OK) {
698 return errCode;
699 }
700 return service->RegisterAutoSyncCallback(syncerParam_, observer);
701 }
702
UnregisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)703 int RdbStoreImpl::UnregisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)
704 {
705 if (config_.GetDBType() == DB_VECTOR) {
706 return E_NOT_SUPPORT;
707 }
708 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
709 if (errCode != E_OK) {
710 return errCode;
711 }
712 return service->UnregisterAutoSyncCallback(syncerParam_, observer);
713 }
714
InitDelayNotifier()715 void RdbStoreImpl::InitDelayNotifier()
716 {
717 if (delayNotifier_ != nullptr) {
718 return;
719 }
720 delayNotifier_ = std::make_shared<DelayNotify>();
721 if (delayNotifier_ == nullptr) {
722 LOG_ERROR("Init delay notifier failed.");
723 return;
724 }
725 delayNotifier_->SetExecutorPool(TaskExecutor::GetInstance().GetExecutor());
726 delayNotifier_->SetTask([param = syncerParam_]
727 (const DistributedRdb::RdbChangedData& rdbChangedData, const RdbNotifyConfig& rdbNotifyConfig) -> int {
728 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(param);
729 if (errCode == E_NOT_SUPPORT) {
730 return errCode;
731 }
732 if (errCode != E_OK || service == nullptr) {
733 LOG_ERROR("GetRdbService is failed, err is %{public}d.", errCode);
734 return errCode;
735 }
736 return service->NotifyDataChange(param, rdbChangedData, rdbNotifyConfig);
737 });
738 }
739
RegisterDataChangeCallback()740 int RdbStoreImpl::RegisterDataChangeCallback()
741 {
742 if (!config_.IsSearchable()) {
743 return E_OK;
744 }
745
746 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
747 return E_NOT_SUPPORT;
748 }
749 InitDelayNotifier();
750 auto callBack = [delayNotifier = delayNotifier_](const std::set<std::string> &tables) {
751 DistributedRdb::RdbChangedData rdbChangedData;
752 for (const auto& table : tables) {
753 rdbChangedData.tableData[table].isTrackedDataChange = true;
754 }
755 if (delayNotifier != nullptr) {
756 delayNotifier->UpdateNotify(rdbChangedData);
757 }
758 };
759 auto connection = connectionPool_->AcquireConnection(false);
760 if (connection == nullptr) {
761 return E_DATABASE_BUSY;
762 }
763 return connection->SubscribeTableChanges(callBack);
764 }
765
GetHashKeyForLockRow(const AbsRdbPredicates & predicates,std::vector<std::vector<uint8_t>> & hashKeys)766 int RdbStoreImpl::GetHashKeyForLockRow(const AbsRdbPredicates &predicates, std::vector<std::vector<uint8_t>> &hashKeys)
767 {
768 std::string table = predicates.GetTableName();
769 if (table.empty()) {
770 return E_EMPTY_TABLE_NAME;
771 }
772 auto logTable = GetLogTableName(table);
773 std::string sql;
774 sql.append("SELECT ").append(logTable).append(".hash_key ").append("FROM ").append(logTable);
775 sql.append(" INNER JOIN ").append(table).append(" ON ");
776 sql.append(table).append(".ROWID = ").append(logTable).append(".data_key");
777 auto whereClause = predicates.GetWhereClause();
778 if (!whereClause.empty()) {
779 SqliteUtils::Replace(whereClause, SqliteUtils::REP, logTable + ".");
780 sql.append(" WHERE ").append(whereClause);
781 }
782
783 auto result = QuerySql(sql, predicates.GetBindArgs());
784 if (result == nullptr) {
785 return E_ERROR;
786 }
787 int count = 0;
788 if (result->GetRowCount(count) != E_OK) {
789 return E_NO_ROW_IN_QUERY;
790 }
791
792 if (count <= 0) {
793 return E_NO_ROW_IN_QUERY;
794 }
795 while (result->GoToNextRow() == E_OK) {
796 std::vector<uint8_t> hashKey;
797 if (result->GetBlob(0, hashKey) != E_OK) {
798 return E_ERROR;
799 }
800 hashKeys.push_back(std::move(hashKey));
801 }
802 return E_OK;
803 }
804
ModifyLockStatus(const AbsRdbPredicates & predicates,bool isLock)805 int RdbStoreImpl::ModifyLockStatus(const AbsRdbPredicates &predicates, bool isLock)
806 {
807 std::vector<std::vector<uint8_t>> hashKeys;
808 int ret = GetHashKeyForLockRow(predicates, hashKeys);
809 if (ret != E_OK) {
810 LOG_ERROR("GetHashKeyForLockRow failed, err is %{public}d.", ret);
811 return ret;
812 }
813 auto [err, statement] = GetStatement(GlobalExpr::PRAGMA_VERSION);
814 if (statement == nullptr || err != E_OK) {
815 return err;
816 }
817 int errCode = statement->ModifyLockStatus(predicates.GetTableName(), hashKeys, isLock);
818 if (errCode == E_WAIT_COMPENSATED_SYNC) {
819 LOG_DEBUG("Start compensation sync.");
820 DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true, true };
821 auto memo = AbsRdbPredicates(predicates.GetTableName()).GetDistributedPredicates();
822 InnerSync(syncerParam_, option, memo, nullptr);
823 return E_OK;
824 }
825 if (errCode != E_OK) {
826 LOG_ERROR("ModifyLockStatus failed, err is %{public}d.", errCode);
827 }
828 return errCode;
829 }
830
LockCloudContainer()831 std::pair<int32_t, uint32_t> RdbStoreImpl::LockCloudContainer()
832 {
833 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
834 RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
835 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
836 if (errCode == E_NOT_SUPPORT) {
837 LOG_ERROR("not support");
838 return { errCode, 0 };
839 }
840 if (errCode != E_OK) {
841 LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
842 syncerParam_.bundleName_.c_str());
843 return { errCode, 0 };
844 }
845 auto result = service->LockCloudContainer(syncerParam_);
846 if (result.first != E_OK) {
847 LOG_ERROR("LockCloudContainer failed, err is %{public}d.", result.first);
848 }
849 return result;
850 }
851
UnlockCloudContainer()852 int32_t RdbStoreImpl::UnlockCloudContainer()
853 {
854 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
855 RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
856 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
857 if (errCode == E_NOT_SUPPORT) {
858 LOG_ERROR("not support");
859 return errCode;
860 }
861 if (errCode != E_OK) {
862 LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
863 syncerParam_.bundleName_.c_str());
864 return errCode;
865 }
866 errCode = service->UnlockCloudContainer(syncerParam_);
867 if (errCode != E_OK) {
868 LOG_ERROR("UnlockCloudContainer failed, err is %{public}d.", errCode);
869 }
870 return errCode;
871 }
872 #endif
873
RdbStoreImpl(const RdbStoreConfig & config)874 RdbStoreImpl::RdbStoreImpl(const RdbStoreConfig &config)
875 : isMemoryRdb_(config.IsMemoryRdb()), config_(config), name_(config.GetName()),
876 fileType_(config.GetDatabaseFileType())
877 {
878 path_ = (config.GetRoleType() == VISITOR) ? config.GetVisitorDir() : config.GetPath();
879 isReadOnly_ = config.IsReadOnly() || config.GetRoleType() == VISITOR;
880 }
881
RdbStoreImpl(const RdbStoreConfig & config,int & errCode)882 RdbStoreImpl::RdbStoreImpl(const RdbStoreConfig &config, int &errCode)
883 : isMemoryRdb_(config.IsMemoryRdb()), config_(config), name_(config.GetName()),
884 fileType_(config.GetDatabaseFileType())
885 {
886 isReadOnly_ = config.IsReadOnly() || config.GetRoleType() == VISITOR;
887 path_ = (config.GetRoleType() == VISITOR) ? config.GetVisitorDir() : config.GetPath();
888 bool created = access(path_.c_str(), F_OK) != 0;
889 connectionPool_ = ConnectionPool::Create(config_, errCode);
890 if (connectionPool_ == nullptr && (errCode == E_SQLITE_CORRUPT || errCode == E_INVALID_SECRET_KEY) &&
891 !isReadOnly_) {
892 LOG_ERROR("database corrupt, errCode:0x%{public}x, rebuild database %{public}s", errCode,
893 SqliteUtils::Anonymous(name_).c_str());
894 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
895 RdbParam param;
896 param.bundleName_ = config_.GetBundleName();
897 param.storeName_ = config_.GetName();
898 auto [err, service] = RdbMgr::GetInstance().GetRdbService(param);
899 if (service != nullptr) {
900 service->Disable(param);
901 }
902 #endif
903 config_.SetIter(0);
904 if (config_.IsEncrypt()) {
905 auto key = config_.GetEncryptKey();
906 RdbSecurityManager::GetInstance().RestoreKeyFile(path_, key);
907 key.assign(key.size(), 0);
908 }
909 std::tie(rebuild_, connectionPool_) = ConnectionPool::HandleDataCorruption(config_, errCode);
910 created = true;
911 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
912 if (service != nullptr) {
913 service->Enable(param);
914 }
915 #endif
916 }
917 if (connectionPool_ == nullptr || errCode != E_OK) {
918 connectionPool_ = nullptr;
919 LOG_ERROR("Create connPool failed, err is %{public}d, path:%{public}s", errCode,
920 SqliteUtils::Anonymous(path_).c_str());
921 return;
922 }
923 InitSyncerParam(config_, created);
924 InnerOpen();
925 }
926
~RdbStoreImpl()927 RdbStoreImpl::~RdbStoreImpl()
928 {
929 connectionPool_ = nullptr;
930 trxConnMap_ = {};
931 for (auto &trans : transactions_) {
932 auto realTrans = trans.lock();
933 if (realTrans) {
934 (void)realTrans->Close();
935 }
936 }
937 transactions_ = {};
938 }
939
GetConfig()940 const RdbStoreConfig &RdbStoreImpl::GetConfig()
941 {
942 return config_;
943 }
944
Insert(const std::string & table,const Row & row,Resolution resolution)945 std::pair<int, int64_t> RdbStoreImpl::Insert(const std::string &table, const Row &row, Resolution resolution)
946 {
947 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
948 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
949 return { E_NOT_SUPPORT, -1 };
950 }
951 if (table.empty()) {
952 return { E_EMPTY_TABLE_NAME, -1 };
953 }
954
955 if (row.IsEmpty()) {
956 return { E_EMPTY_VALUES_BUCKET, -1 };
957 }
958
959 auto conflictClause = SqliteUtils::GetConflictClause(static_cast<int>(resolution));
960 if (conflictClause == nullptr) {
961 return { E_INVALID_CONFLICT_FLAG, -1 };
962 }
963 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
964 std::string sql;
965 sql.append("INSERT").append(conflictClause).append(" INTO ").append(table).append("(");
966 size_t bindArgsSize = row.values_.size();
967 std::vector<ValueObject> bindArgs;
968 bindArgs.reserve(bindArgsSize);
969 const char *split = "";
970 for (const auto &[key, val] : row.values_) {
971 sql.append(split).append(key);
972 if (val.GetType() == ValueObject::TYPE_ASSETS && resolution == ConflictResolution::ON_CONFLICT_REPLACE) {
973 return { E_INVALID_ARGS, -1 };
974 }
975 SqliteSqlBuilder::UpdateAssetStatus(val, AssetValue::STATUS_INSERT);
976 bindArgs.push_back(val); // columnValue
977 split = ",";
978 }
979
980 sql.append(") VALUES (");
981 if (bindArgsSize > 0) {
982 sql.append(SqliteSqlBuilder::GetSqlArgs(bindArgsSize));
983 }
984
985 sql.append(")");
986 int64_t rowid = -1;
987 auto errCode = ExecuteForLastInsertedRowId(rowid, sql, bindArgs);
988 if (errCode == E_OK) {
989 DoCloudSync(table);
990 }
991
992 return { errCode, rowid };
993 }
994
BatchInsert(const std::string & table,const ValuesBuckets & rows)995 std::pair<int, int64_t> RdbStoreImpl::BatchInsert(const std::string &table, const ValuesBuckets &rows)
996 {
997 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
998 return { E_NOT_SUPPORT, -1 };
999 }
1000
1001 if (rows.RowSize() == 0) {
1002 return { E_OK, 0 };
1003 }
1004
1005 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1006 auto connection = connectionPool_->AcquireConnection(false);
1007 if (connection == nullptr) {
1008 return { E_DATABASE_BUSY, -1 };
1009 }
1010
1011 auto executeSqlArgs = SqliteSqlBuilder::GenerateSqls(table, rows, connection->GetMaxVariable());
1012 if (executeSqlArgs.empty()) {
1013 LOG_ERROR("empty, table=%{public}s, values:%{public}zu, max number:%{public}d.", table.c_str(), rows.RowSize(),
1014 connection->GetMaxVariable());
1015 return { E_INVALID_ARGS, -1 };
1016 }
1017 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
1018 PauseDelayNotify pauseDelayNotify(delayNotifier_);
1019 #endif
1020 for (const auto &[sql, bindArgs] : executeSqlArgs) {
1021 auto [errCode, statement] = GetStatement(sql, connection);
1022 if (statement == nullptr) {
1023 LOG_ERROR("statement is nullptr, errCode:0x%{public}x, args:%{public}zu, table:%{public}s, sql:%{public}s",
1024 errCode, bindArgs.size(), table.c_str(), sql.c_str());
1025 return { E_OK, -1 };
1026 }
1027 for (const auto &args : bindArgs) {
1028 auto errCode = statement->Execute(args);
1029 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1030 connectionPool_->Dump(true, "BATCH");
1031 return { errCode, -1 };
1032 }
1033 if (errCode != E_OK) {
1034 LOG_ERROR("failed, errCode:%{public}d,args:%{public}zu,table:%{public}s,sql:%{public}s", errCode,
1035 bindArgs.size(), table.c_str(), sql.c_str());
1036 return { E_OK, -1 };
1037 }
1038 }
1039 }
1040 connection = nullptr;
1041 DoCloudSync(table);
1042 return { E_OK, int64_t(rows.RowSize()) };
1043 }
1044
Update(const std::string & table,const Row & row,const std::string & where,const Values & args,Resolution resolution)1045 std::pair<int, int> RdbStoreImpl::Update(const std::string &table, const Row &row, const std::string &where,
1046 const Values &args, Resolution resolution)
1047 {
1048 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1049 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1050 return { E_NOT_SUPPORT, -1 };
1051 }
1052 if (table.empty()) {
1053 return { E_EMPTY_TABLE_NAME, -1 };
1054 }
1055
1056 if (row.IsEmpty()) {
1057 return { E_EMPTY_VALUES_BUCKET, -1 };
1058 }
1059
1060 auto clause = SqliteUtils::GetConflictClause(static_cast<int>(resolution));
1061 if (clause == nullptr) {
1062 return { E_INVALID_CONFLICT_FLAG, -1 };
1063 }
1064 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1065 std::string sql;
1066 sql.append("UPDATE").append(clause).append(" ").append(table).append(" SET ");
1067 std::vector<ValueObject> tmpBindArgs;
1068 size_t tmpBindSize = row.values_.size() + args.size();
1069 tmpBindArgs.reserve(tmpBindSize);
1070 const char *split = "";
1071 for (auto &[key, val] : row.values_) {
1072 sql.append(split);
1073 if (val.GetType() == ValueObject::TYPE_ASSETS) {
1074 sql.append(key).append("=merge_assets(").append(key).append(", ?)"); // columnName
1075 } else if (val.GetType() == ValueObject::TYPE_ASSET) {
1076 sql.append(key).append("=merge_asset(").append(key).append(", ?)"); // columnName
1077 } else {
1078 sql.append(key).append("=?"); // columnName
1079 }
1080 tmpBindArgs.push_back(val); // columnValue
1081 split = ",";
1082 }
1083
1084 if (!where.empty()) {
1085 sql.append(" WHERE ").append(where);
1086 }
1087
1088 tmpBindArgs.insert(tmpBindArgs.end(), args.begin(), args.end());
1089
1090 int64_t changes = 0;
1091 auto errCode = ExecuteForChangedRowCount(changes, sql, tmpBindArgs);
1092 if (errCode == E_OK) {
1093 DoCloudSync(table);
1094 }
1095 return { errCode, int32_t(changes) };
1096 }
1097
Delete(int & deletedRows,const std::string & table,const std::string & whereClause,const Values & args)1098 int RdbStoreImpl::Delete(int &deletedRows, const std::string &table, const std::string &whereClause, const Values &args)
1099 {
1100 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1101 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1102 return E_NOT_SUPPORT;
1103 }
1104 if (table.empty()) {
1105 return E_EMPTY_TABLE_NAME;
1106 }
1107
1108 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1109 std::string sql;
1110 sql.append("DELETE FROM ").append(table);
1111 if (!whereClause.empty()) {
1112 sql.append(" WHERE ").append(whereClause);
1113 }
1114 int64_t changes = 0;
1115 auto errCode = ExecuteForChangedRowCount(changes, sql, args);
1116 if (errCode != E_OK) {
1117 return errCode;
1118 }
1119 deletedRows = changes;
1120 DoCloudSync(table);
1121 return E_OK;
1122 }
1123
QuerySql(const std::string & sql,const Values & bindArgs)1124 std::shared_ptr<AbsSharedResultSet> RdbStoreImpl::QuerySql(const std::string &sql, const Values &bindArgs)
1125 {
1126 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1127 if (config_.GetDBType() == DB_VECTOR) {
1128 return nullptr;
1129 }
1130 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1131 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
1132 auto start = std::chrono::steady_clock::now();
1133 return std::make_shared<SqliteSharedResultSet>(start, connectionPool_->AcquireRef(true), sql, bindArgs, path_);
1134 #else
1135 (void)sql;
1136 (void)bindArgs;
1137 return nullptr;
1138 #endif
1139 }
1140
QueryByStep(const std::string & sql,const Values & args,bool preCount)1141 std::shared_ptr<ResultSet> RdbStoreImpl::QueryByStep(const std::string &sql, const Values &args, bool preCount)
1142 {
1143 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1144 auto start = std::chrono::steady_clock::now();
1145 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
1146 return std::make_shared<StepResultSet>(start, connectionPool_->AcquireRef(true), sql, args, preCount);
1147 #else
1148 return std::make_shared<StepResultSet>(start, connectionPool_->AcquireRef(true), sql, args, false);
1149 #endif
1150 }
1151
Count(int64_t & outValue,const AbsRdbPredicates & predicates)1152 int RdbStoreImpl::Count(int64_t &outValue, const AbsRdbPredicates &predicates)
1153 {
1154 if (config_.GetDBType() == DB_VECTOR) {
1155 return E_NOT_SUPPORT;
1156 }
1157 std::string sql = SqliteSqlBuilder::BuildCountString(predicates);
1158 return ExecuteAndGetLong(outValue, sql, predicates.GetBindArgs());
1159 }
1160
ExecuteSql(const std::string & sql,const Values & args)1161 int RdbStoreImpl::ExecuteSql(const std::string &sql, const Values &args)
1162 {
1163 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1164 if (config_.GetDBType() == DB_VECTOR || isReadOnly_) {
1165 return E_NOT_SUPPORT;
1166 }
1167 int ret = CheckAttach(sql);
1168 if (ret != E_OK) {
1169 return ret;
1170 }
1171 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1172 auto [errCode, statement] = BeginExecuteSql(sql);
1173 if (statement == nullptr) {
1174 return errCode;
1175 }
1176 errCode = statement->Execute(args);
1177 if (errCode != E_OK) {
1178 LOG_ERROR("failed,error:0x%{public}x sql:%{public}s.", errCode, sql.c_str());
1179 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1180 connectionPool_->Dump(true, "EXECUTE");
1181 }
1182 return errCode;
1183 }
1184 int sqlType = SqliteUtils::GetSqlStatementType(sql);
1185 if (sqlType == SqliteUtils::STATEMENT_DDL) {
1186 statement->Reset();
1187 statement->Prepare("PRAGMA schema_version");
1188 auto [err, version] = statement->ExecuteForValue();
1189 statement = nullptr;
1190 if (vSchema_ < static_cast<int64_t>(version)) {
1191 LOG_INFO("db:%{public}s exe DDL schema<%{public}" PRIi64 "->%{public}" PRIi64 "> sql:%{public}s.",
1192 SqliteUtils::Anonymous(name_).c_str(), vSchema_, static_cast<int64_t>(version), sql.c_str());
1193 vSchema_ = version;
1194 errCode = connectionPool_->RestartReaders();
1195 }
1196 }
1197 statement = nullptr;
1198 if (errCode == E_OK && (sqlType == SqliteUtils::STATEMENT_UPDATE || sqlType == SqliteUtils::STATEMENT_INSERT)) {
1199 DoCloudSync("");
1200 }
1201 return errCode;
1202 }
1203
Execute(const std::string & sql,const Values & args,int64_t trxId)1204 std::pair<int32_t, ValueObject> RdbStoreImpl::Execute(const std::string &sql, const Values &args, int64_t trxId)
1205 {
1206 ValueObject object;
1207 if (isReadOnly_) {
1208 return { E_NOT_SUPPORT, object };
1209 }
1210
1211 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1212 int sqlType = SqliteUtils::GetSqlStatementType(sql);
1213 if (!SqliteUtils::IsSupportSqlForExecute(sqlType)) {
1214 LOG_ERROR("Not support the sqlType: %{public}d, sql: %{public}s", sqlType, sql.c_str());
1215 return { E_NOT_SUPPORT_THE_SQL, object };
1216 }
1217
1218 if (config_.IsVector() && trxId > 0) {
1219 return { ExecuteByTrxId(sql, trxId, false, args), ValueObject() };
1220 }
1221
1222 auto connect = connectionPool_->AcquireConnection(false);
1223 if (connect == nullptr) {
1224 return { E_DATABASE_BUSY, object };
1225 }
1226
1227 auto [errCode, statement] = GetStatement(sql, connect);
1228 if (errCode != E_OK) {
1229 return { errCode, object };
1230 }
1231
1232 errCode = statement->Execute(args);
1233 if (errCode != E_OK) {
1234 LOG_ERROR("failed,error:0x%{public}x sql:%{public}s.", errCode, sql.c_str());
1235 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1236 connectionPool_->Dump(true, "EXECUTE");
1237 }
1238 return { errCode, object };
1239 }
1240
1241 if (config_.IsVector()) {
1242 return { errCode, object };
1243 }
1244
1245 return HandleDifferentSqlTypes(statement, sql, object, sqlType);
1246 }
1247
HandleDifferentSqlTypes(std::shared_ptr<Statement> statement,const std::string & sql,const ValueObject & object,int sqlType)1248 std::pair<int32_t, ValueObject> RdbStoreImpl::HandleDifferentSqlTypes(std::shared_ptr<Statement> statement,
1249 const std::string &sql, const ValueObject &object, int sqlType)
1250 {
1251 int32_t errCode = E_OK;
1252 if (sqlType == SqliteUtils::STATEMENT_INSERT) {
1253 int64_t outValue = statement->Changes() > 0 ? statement->LastInsertRowId() : -1;
1254 return { errCode, ValueObject(outValue) };
1255 }
1256
1257 if (sqlType == SqliteUtils::STATEMENT_UPDATE) {
1258 int outValue = statement->Changes();
1259 return { errCode, ValueObject(outValue) };
1260 }
1261
1262 if (sqlType == SqliteUtils::STATEMENT_PRAGMA) {
1263 if (statement->GetColumnCount() == 1) {
1264 return statement->GetColumn(0);
1265 }
1266
1267 if (statement->GetColumnCount() > 1) {
1268 LOG_ERROR("Not support the sql:%{public}s, column count more than 1", sql.c_str());
1269 return { E_NOT_SUPPORT_THE_SQL, object };
1270 }
1271 }
1272
1273 if (sqlType == SqliteUtils::STATEMENT_DDL) {
1274 statement->Reset();
1275 statement->Prepare("PRAGMA schema_version");
1276 auto [err, version] = statement->ExecuteForValue();
1277 if (vSchema_ < static_cast<int64_t>(version)) {
1278 LOG_INFO("db:%{public}s exe DDL schema<%{public}" PRIi64 "->%{public}" PRIi64 "> sql:%{public}s.",
1279 SqliteUtils::Anonymous(name_).c_str(), vSchema_, static_cast<int64_t>(version), sql.c_str());
1280 vSchema_ = version;
1281 errCode = connectionPool_->RestartReaders();
1282 }
1283 }
1284 return { errCode, object };
1285 }
1286
ExecuteAndGetLong(int64_t & outValue,const std::string & sql,const Values & args)1287 int RdbStoreImpl::ExecuteAndGetLong(int64_t &outValue, const std::string &sql, const Values &args)
1288 {
1289 if (config_.GetDBType() == DB_VECTOR) {
1290 return E_NOT_SUPPORT;
1291 }
1292 auto [errCode, statement] = BeginExecuteSql(sql);
1293 if (statement == nullptr) {
1294 return errCode;
1295 }
1296 auto [err, object] = statement->ExecuteForValue(args);
1297 if (err != E_OK) {
1298 LOG_ERROR("failed, sql %{public}s, ERROR is %{public}d.", sql.c_str(), err);
1299 }
1300 outValue = object;
1301 return err;
1302 }
1303
ExecuteAndGetString(std::string & outValue,const std::string & sql,const Values & args)1304 int RdbStoreImpl::ExecuteAndGetString(std::string &outValue, const std::string &sql, const Values &args)
1305 {
1306 if (config_.GetDBType() == DB_VECTOR) {
1307 return E_NOT_SUPPORT;
1308 }
1309 auto [errCode, statement] = BeginExecuteSql(sql);
1310 if (statement == nullptr) {
1311 return errCode;
1312 }
1313 ValueObject object;
1314 std::tie(errCode, object) = statement->ExecuteForValue(args);
1315 if (errCode != E_OK) {
1316 LOG_ERROR("failed, sql %{public}s, ERROR is %{public}d.", sql.c_str(), errCode);
1317 }
1318 outValue = static_cast<std::string>(object);
1319 return errCode;
1320 }
1321
ExecuteForLastInsertedRowId(int64_t & outValue,const std::string & sql,const Values & args)1322 int RdbStoreImpl::ExecuteForLastInsertedRowId(int64_t &outValue, const std::string &sql, const Values &args)
1323 {
1324 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1325 return E_NOT_SUPPORT;
1326 }
1327 auto begin = std::chrono::steady_clock::now();
1328 auto [errCode, statement] = GetStatement(sql, false);
1329 if (statement == nullptr) {
1330 return errCode;
1331 }
1332 auto beginExec = std::chrono::steady_clock::now();
1333 errCode = statement->Execute(args);
1334 if (errCode != E_OK) {
1335 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1336 connectionPool_->Dump(true, "INSERT");
1337 }
1338 return errCode;
1339 }
1340 auto beginResult = std::chrono::steady_clock::now();
1341 outValue = statement->Changes() > 0 ? statement->LastInsertRowId() : -1;
1342 auto allEnd = std::chrono::steady_clock::now();
1343 int64_t totalCostTime = std::chrono::duration_cast<std::chrono::milliseconds>(begin - allEnd).count();
1344 if (totalCostTime >= TIME_OUT) {
1345 int64_t prepareCost =
1346 std::chrono::duration_cast<std::chrono::milliseconds>(beginExec - begin).count();
1347 int64_t execCost =
1348 std::chrono::duration_cast<std::chrono::milliseconds>(beginExec - beginResult).count();
1349 int64_t resultCost = std::chrono::duration_cast<std::chrono::milliseconds>(allEnd - beginResult).count();
1350 LOG_WARN("total[%{public}" PRId64 "] stmt[%{public}" PRId64 "] exec[%{public}" PRId64
1351 "] result[%{public}" PRId64 "] "
1352 "sql[%{public}s]",
1353 totalCostTime, prepareCost, execCost, resultCost, SqliteUtils::Anonymous(sql).c_str());
1354 }
1355 return E_OK;
1356 }
1357
ExecuteForChangedRowCount(int64_t & outValue,const std::string & sql,const Values & args)1358 int RdbStoreImpl::ExecuteForChangedRowCount(int64_t &outValue, const std::string &sql, const Values &args)
1359 {
1360 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1361 return E_NOT_SUPPORT;
1362 }
1363 auto [errCode, statement] = GetStatement(sql, false);
1364 if (statement == nullptr) {
1365 return errCode;
1366 }
1367 errCode = statement->Execute(args);
1368 if (errCode != E_OK) {
1369 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1370 connectionPool_->Dump(true, "UPG DEL");
1371 }
1372 return errCode;
1373 }
1374 outValue = statement->Changes();
1375 return E_OK;
1376 }
1377
GetDataBasePath(const std::string & databasePath,std::string & backupFilePath)1378 int RdbStoreImpl::GetDataBasePath(const std::string &databasePath, std::string &backupFilePath)
1379 {
1380 if (databasePath.empty()) {
1381 return E_INVALID_FILE_PATH;
1382 }
1383
1384 if (ISFILE(databasePath)) {
1385 backupFilePath = ExtractFilePath(path_) + databasePath;
1386 } else {
1387 // 2 represents two characters starting from the len - 2 position
1388 if (!PathToRealPath(ExtractFilePath(databasePath), backupFilePath) || databasePath.back() == '/' ||
1389 databasePath.substr(databasePath.length() - 2, 2) == "\\") {
1390 LOG_ERROR("Invalid databasePath.");
1391 return E_INVALID_FILE_PATH;
1392 }
1393 backupFilePath = databasePath;
1394 }
1395
1396 if (backupFilePath == path_) {
1397 LOG_ERROR("The backupPath and path should not be same.");
1398 return E_INVALID_FILE_PATH;
1399 }
1400
1401 LOG_INFO("databasePath is %{public}s.", SqliteUtils::Anonymous(backupFilePath).c_str());
1402 return E_OK;
1403 }
1404
GetSlaveName(const std::string & path,std::string & backupFilePath)1405 int RdbStoreImpl::GetSlaveName(const std::string &path, std::string &backupFilePath)
1406 {
1407 std::string suffix(".db");
1408 std::string slaveSuffix("_slave.db");
1409 auto pos = path.find(suffix);
1410 if (pos == std::string::npos) {
1411 backupFilePath = path + slaveSuffix;
1412 } else {
1413 backupFilePath = std::string(path, 0, pos) + slaveSuffix;
1414 }
1415 return E_OK;
1416 }
1417
1418 /**
1419 * Backup a database from a specified encrypted or unencrypted database file.
1420 */
Backup(const std::string & databasePath,const std::vector<uint8_t> & encryptKey)1421 int RdbStoreImpl::Backup(const std::string &databasePath, const std::vector<uint8_t> &encryptKey)
1422 {
1423 LOG_INFO("Backup db: %{public}s.", SqliteUtils::Anonymous(config_.GetName()).c_str());
1424 if (isReadOnly_) {
1425 return E_NOT_SUPPORT;
1426 }
1427 std::string backupFilePath;
1428 if (TryGetMasterSlaveBackupPath(databasePath, backupFilePath)) {
1429 return InnerBackup(backupFilePath, encryptKey);
1430 }
1431
1432 int ret = GetDataBasePath(databasePath, backupFilePath);
1433 if (ret != E_OK) {
1434 return ret;
1435 }
1436
1437 RdbSecurityManager::KeyFiles keyFiles(path_ + BACKUP_RESTORE);
1438 keyFiles.Lock();
1439
1440 auto deleteDirtyFiles = [&backupFilePath] {
1441 auto res = SqliteUtils::DeleteFile(backupFilePath);
1442 res = SqliteUtils::DeleteFile(backupFilePath + "-shm") && res;
1443 res = SqliteUtils::DeleteFile(backupFilePath + "-wal") && res;
1444 return res;
1445 };
1446
1447 auto walFile = backupFilePath + "-wal";
1448 if (access(walFile.c_str(), F_OK) == E_OK) {
1449 if (!deleteDirtyFiles()) {
1450 keyFiles.Unlock();
1451 return E_ERROR;
1452 }
1453 }
1454 std::string tempPath = backupFilePath + ".tmp";
1455 if (access(tempPath.c_str(), F_OK) == E_OK) {
1456 SqliteUtils::DeleteFile(backupFilePath);
1457 } else {
1458 if (access(backupFilePath.c_str(), F_OK) == E_OK && !SqliteUtils::RenameFile(backupFilePath, tempPath)) {
1459 LOG_ERROR("rename backup file failed, path:%{public}s, errno:%{public}d",
1460 SqliteUtils::Anonymous(backupFilePath).c_str(), errno);
1461 keyFiles.Unlock();
1462 return E_ERROR;
1463 }
1464 }
1465 ret = InnerBackup(backupFilePath, encryptKey);
1466 if (ret != E_OK || access(walFile.c_str(), F_OK) == E_OK) {
1467 if (deleteDirtyFiles()) {
1468 SqliteUtils::RenameFile(tempPath, backupFilePath);
1469 }
1470 } else {
1471 SqliteUtils::DeleteFile(tempPath);
1472 }
1473 keyFiles.Unlock();
1474 return ret;
1475 }
1476
CreateBackupBindArgs(const std::string & databasePath,const std::vector<uint8_t> & destEncryptKey)1477 std::vector<ValueObject> RdbStoreImpl::CreateBackupBindArgs(const std::string &databasePath,
1478 const std::vector<uint8_t> &destEncryptKey)
1479 {
1480 std::vector<ValueObject> bindArgs;
1481 bindArgs.emplace_back(databasePath);
1482 if (!destEncryptKey.empty() && !config_.IsEncrypt()) {
1483 bindArgs.emplace_back(destEncryptKey);
1484 } else if (config_.IsEncrypt()) {
1485 std::vector<uint8_t> key = config_.GetEncryptKey();
1486 bindArgs.emplace_back(key);
1487 key.assign(key.size(), 0);
1488 } else {
1489 bindArgs.emplace_back("");
1490 }
1491 return bindArgs;
1492 }
1493
1494 /**
1495 * Backup a database from a specified encrypted or unencrypted database file.
1496 */
InnerBackup(const std::string & databasePath,const std::vector<uint8_t> & destEncryptKey)1497 int RdbStoreImpl::InnerBackup(const std::string &databasePath, const std::vector<uint8_t> &destEncryptKey)
1498 {
1499 if (isReadOnly_) {
1500 return E_NOT_SUPPORT;
1501 }
1502
1503 if (config_.GetDBType() == DB_VECTOR) {
1504 if (config_.IsEncrypt()) {
1505 return E_NOT_SUPPORT;
1506 }
1507
1508 auto conn = connectionPool_->AcquireConnection(false);
1509 if (conn == nullptr) {
1510 return E_BASE;
1511 }
1512
1513 return conn->Backup(databasePath, {}, false, slaveStatus_);
1514 }
1515
1516 if (config_.GetHaMode() != HAMode::SINGLE && SqliteUtils::IsSlaveDbName(databasePath)) {
1517 auto conn = connectionPool_->AcquireConnection(false);
1518 return conn == nullptr ? E_BASE : conn->Backup(databasePath, {}, false, slaveStatus_);
1519 }
1520
1521 auto [result, conn] = CreateWritableConn();
1522 if (result != E_OK) {
1523 return result;
1524 }
1525
1526 if (config_.IsEncrypt()) {
1527 result = SetDefaultEncryptAlgo(conn, config_);
1528 if (result != E_OK) {
1529 return result;
1530 }
1531 }
1532
1533 std::vector<ValueObject> bindArgs = CreateBackupBindArgs(databasePath, destEncryptKey);
1534 auto [errCode, statement] = conn->CreateStatement(GlobalExpr::ATTACH_BACKUP_SQL, conn);
1535 errCode = statement->Execute(bindArgs);
1536 if (errCode != E_OK) {
1537 return errCode;
1538 }
1539 errCode = statement->Prepare(GlobalExpr::PRAGMA_BACKUP_JOUR_MODE_WAL);
1540 errCode = statement->Execute();
1541 if (errCode != E_OK) {
1542 return errCode;
1543 }
1544 errCode = statement->Prepare(GlobalExpr::EXPORT_SQL);
1545 int ret = statement->Execute();
1546 errCode = statement->Prepare(GlobalExpr::DETACH_BACKUP_SQL);
1547 int res = statement->Execute();
1548 return (res == E_OK) ? ret : res;
1549 }
1550
BeginExecuteSql(const std::string & sql)1551 std::pair<int32_t, RdbStoreImpl::Stmt> RdbStoreImpl::BeginExecuteSql(const std::string& sql)
1552 {
1553 int type = SqliteUtils::GetSqlStatementType(sql);
1554 if (SqliteUtils::IsSpecial(type)) {
1555 return { E_NOT_SUPPORT, nullptr };
1556 }
1557
1558 bool assumeReadOnly = SqliteUtils::IsSqlReadOnly(type);
1559 auto conn = connectionPool_->AcquireConnection(assumeReadOnly);
1560 if (conn == nullptr) {
1561 return { E_DATABASE_BUSY, nullptr };
1562 }
1563
1564 auto [errCode, statement] = conn->CreateStatement(sql, conn);
1565 if (statement == nullptr) {
1566 return { errCode, nullptr };
1567 }
1568
1569 if (statement->ReadOnly() && conn->IsWriter()) {
1570 statement = nullptr;
1571 conn = nullptr;
1572 return GetStatement(sql, true);
1573 }
1574
1575 return { errCode, statement };
1576 }
1577
IsHoldingConnection()1578 bool RdbStoreImpl::IsHoldingConnection()
1579 {
1580 return connectionPool_ != nullptr;
1581 }
1582
SetDefaultEncryptSql(const std::shared_ptr<Statement> & statement,std::string sql,const RdbStoreConfig & config)1583 int RdbStoreImpl::SetDefaultEncryptSql(
1584 const std::shared_ptr<Statement> &statement, std::string sql, const RdbStoreConfig &config)
1585 {
1586 auto errCode = statement->Prepare(sql);
1587 if (errCode != E_OK) {
1588 LOG_ERROR("Prepare failed: %{public}s, %{public}d, %{public}d, %{public}d, %{public}d, %{public}u",
1589 SqliteUtils::Anonymous(config.GetName()).c_str(), config.GetCryptoParam().iterNum,
1590 config.GetCryptoParam().encryptAlgo, config.GetCryptoParam().hmacAlgo, config.GetCryptoParam().kdfAlgo,
1591 config.GetCryptoParam().cryptoPageSize);
1592 return errCode;
1593 }
1594 errCode = statement->Execute();
1595 if (errCode != E_OK) {
1596 LOG_ERROR("Execute failed: %{public}s, %{public}d, %{public}d, %{public}d, %{public}d, %{public}u",
1597 SqliteUtils::Anonymous(config.GetName()).c_str(), config.GetCryptoParam().iterNum,
1598 config.GetCryptoParam().encryptAlgo, config.GetCryptoParam().hmacAlgo, config.GetCryptoParam().kdfAlgo,
1599 config.GetCryptoParam().cryptoPageSize);
1600 return errCode;
1601 }
1602 return E_OK;
1603 }
1604
SetDefaultEncryptAlgo(const ConnectionPool::SharedConn & conn,const RdbStoreConfig & config)1605 int RdbStoreImpl::SetDefaultEncryptAlgo(const ConnectionPool::SharedConn &conn, const RdbStoreConfig &config)
1606 {
1607 if (conn == nullptr) {
1608 return E_DATABASE_BUSY;
1609 }
1610
1611 if (!config.GetCryptoParam().IsValid()) {
1612 LOG_ERROR("Invalid crypto param, name:%{public}s", SqliteUtils::Anonymous(config.GetName()).c_str());
1613 return E_INVALID_ARGS;
1614 }
1615
1616 std::string sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_CIPHER_PREFIX) +
1617 SqliteUtils::EncryptAlgoDescription(config.GetEncryptAlgo()) +
1618 std::string(GlobalExpr::ALGO_SUFFIX);
1619 auto [errCode, statement] = conn->CreateStatement(sql, conn);
1620 errCode = SetDefaultEncryptSql(statement, sql, config);
1621 if (errCode != E_OK) {
1622 return errCode;
1623 }
1624
1625 if (config.GetIter() > 0) {
1626 sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_KDF_ITER_PREFIX) + std::to_string(config.GetIter());
1627 errCode = SetDefaultEncryptSql(statement, sql, config);
1628 if (errCode != E_OK) {
1629 return errCode;
1630 }
1631 }
1632
1633 sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_HMAC_ALGO_PREFIX) +
1634 SqliteUtils::HmacAlgoDescription(config.GetCryptoParam().hmacAlgo) + std::string(GlobalExpr::ALGO_SUFFIX);
1635 errCode = SetDefaultEncryptSql(statement, sql, config);
1636 if (errCode != E_OK) {
1637 return errCode;
1638 }
1639
1640 sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_KDF_ALGO_PREFIX) +
1641 SqliteUtils::KdfAlgoDescription(config.GetCryptoParam().kdfAlgo) +
1642 std::string(GlobalExpr::ALGO_SUFFIX);
1643 errCode = SetDefaultEncryptSql(statement, sql, config);
1644 if (errCode != E_OK) {
1645 return errCode;
1646 }
1647
1648 sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_PAGE_SIZE_PREFIX) +
1649 std::to_string(config.GetCryptoParam().cryptoPageSize);
1650 return SetDefaultEncryptSql(statement, sql, config);
1651 }
1652
AttachInner(const RdbStoreConfig & config,const std::string & attachName,const std::string & dbPath,const std::vector<uint8_t> & key,int32_t waitTime)1653 int RdbStoreImpl::AttachInner(const RdbStoreConfig &config, const std::string &attachName, const std::string &dbPath,
1654 const std::vector<uint8_t> &key, int32_t waitTime)
1655 {
1656 auto [conn, readers] = connectionPool_->AcquireAll(waitTime);
1657 if (conn == nullptr) {
1658 return E_DATABASE_BUSY;
1659 }
1660
1661 if (config_.GetStorageMode() != StorageMode::MODE_MEMORY &&
1662 conn->GetJournalMode() == static_cast<int32_t>(JournalMode::MODE_WAL)) {
1663 // close first to prevent the connection from being put back.
1664 connectionPool_->CloseAllConnections();
1665 conn = nullptr;
1666 readers.clear();
1667 auto [err, newConn] = connectionPool_->DisableWal();
1668 if (err != E_OK) {
1669 return err;
1670 }
1671 conn = newConn;
1672 }
1673 std::vector<ValueObject> bindArgs;
1674 bindArgs.emplace_back(ValueObject(dbPath));
1675 bindArgs.emplace_back(ValueObject(attachName));
1676 if (!key.empty()) {
1677 auto ret = SetDefaultEncryptAlgo(conn, config);
1678 if (ret != E_OK) {
1679 return ret;
1680 }
1681 bindArgs.emplace_back(ValueObject(key));
1682 auto [errCode, statement] = conn->CreateStatement(GlobalExpr::ATTACH_WITH_KEY_SQL, conn);
1683 if (statement == nullptr || errCode != E_OK) {
1684 LOG_ERROR("Attach get statement failed, code is %{public}d", errCode);
1685 return E_ERROR;
1686 }
1687 return statement->Execute(bindArgs);
1688 }
1689
1690 auto [errCode, statement] = conn->CreateStatement(GlobalExpr::ATTACH_SQL, conn);
1691 if (statement == nullptr || errCode != E_OK) {
1692 LOG_ERROR("Attach get statement failed, code is %{public}d", errCode);
1693 return errCode;
1694 }
1695 return statement->Execute(bindArgs);
1696 }
1697
1698 /**
1699 * Attaches a database.
1700 */
Attach(const RdbStoreConfig & config,const std::string & attachName,int32_t waitTime)1701 std::pair<int32_t, int32_t> RdbStoreImpl::Attach(
1702 const RdbStoreConfig &config, const std::string &attachName, int32_t waitTime)
1703 {
1704 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR) || config_.GetHaMode() != HAMode::SINGLE) {
1705 return { E_NOT_SUPPORT, 0 };
1706 }
1707 std::string dbPath;
1708 int err = SqliteGlobalConfig::GetDbPath(config, dbPath);
1709 if (err != E_OK || access(dbPath.c_str(), F_OK) != E_OK) {
1710 return { E_INVALID_FILE_PATH, 0 };
1711 }
1712
1713 // encrypted databases are not supported to attach a non encrypted database.
1714 if (!config.IsEncrypt() && config_.IsEncrypt()) {
1715 return { E_NOT_SUPPORT, 0 };
1716 }
1717
1718 if (attachedInfo_.Contains(attachName)) {
1719 return { E_ATTACHED_DATABASE_EXIST, 0 };
1720 }
1721
1722 std::vector<uint8_t> key;
1723 config.Initialize();
1724 if (config.IsEncrypt()) {
1725 key = config.GetEncryptKey();
1726 }
1727 err = AttachInner(config, attachName, dbPath, key, waitTime);
1728 key.assign(key.size(), 0);
1729 if (err == E_SQLITE_ERROR) {
1730 // only when attachName is already in use, SQLITE-ERROR will be reported here.
1731 return { E_ATTACHED_DATABASE_EXIST, 0 };
1732 } else if (err != E_OK) {
1733 LOG_ERROR("failed, errCode[%{public}d] fileName[%{public}s] attachName[%{public}s] attach fileName"
1734 "[%{public}s]",
1735 err, SqliteUtils::Anonymous(config_.GetName()).c_str(), attachName.c_str(),
1736 SqliteUtils::Anonymous(config.GetName()).c_str());
1737 return { err, 0 };
1738 }
1739 if (!attachedInfo_.Insert(attachName, dbPath)) {
1740 return { E_ATTACHED_DATABASE_EXIST, 0 };
1741 }
1742 return { E_OK, attachedInfo_.Size() };
1743 }
1744
Detach(const std::string & attachName,int32_t waitTime)1745 std::pair<int32_t, int32_t> RdbStoreImpl::Detach(const std::string &attachName, int32_t waitTime)
1746 {
1747 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1748 return { E_NOT_SUPPORT, 0 };
1749 }
1750 if (!attachedInfo_.Contains(attachName)) {
1751 return { E_OK, attachedInfo_.Size() };
1752 }
1753
1754 auto [connection, readers] = connectionPool_->AcquireAll(waitTime);
1755 if (connection == nullptr) {
1756 return { E_DATABASE_BUSY, 0 };
1757 }
1758 std::vector<ValueObject> bindArgs;
1759 bindArgs.push_back(ValueObject(attachName));
1760
1761 auto [errCode, statement] = connection->CreateStatement(GlobalExpr::DETACH_SQL, connection);
1762 if (statement == nullptr || errCode != E_OK) {
1763 LOG_ERROR("Detach get statement failed, errCode %{public}d", errCode);
1764 return { errCode, 0 };
1765 }
1766 errCode = statement->Execute(bindArgs);
1767 if (errCode != E_OK) {
1768 LOG_ERROR("failed, errCode[%{public}d] fileName[%{public}s] attachName[%{public}s] attach", errCode,
1769 SqliteUtils::Anonymous(config_.GetName()).c_str(), attachName.c_str());
1770 return { errCode, 0 };
1771 }
1772
1773 attachedInfo_.Erase(attachName);
1774 if (!attachedInfo_.Empty()) {
1775 return { E_OK, attachedInfo_.Size() };
1776 }
1777 statement = nullptr;
1778 // close first to prevent the connection from being put back.
1779 connectionPool_->CloseAllConnections();
1780 connection = nullptr;
1781 readers.clear();
1782 errCode = connectionPool_->EnableWal();
1783 return { errCode, 0 };
1784 }
1785
1786 /**
1787 * Obtains the database version.
1788 */
GetVersion(int & version)1789 int RdbStoreImpl::GetVersion(int &version)
1790 {
1791 auto [errCode, statement] = GetStatement(GlobalExpr::PRAGMA_VERSION, isReadOnly_);
1792 if (statement == nullptr) {
1793 return errCode;
1794 }
1795 ValueObject value;
1796 std::tie(errCode, value) = statement->ExecuteForValue();
1797 auto val = std::get_if<int64_t>(&value.value);
1798 if (val != nullptr) {
1799 version = static_cast<int>(*val);
1800 }
1801 return errCode;
1802 }
1803
1804 /**
1805 * Sets the version of a new database.
1806 */
SetVersion(int version)1807 int RdbStoreImpl::SetVersion(int version)
1808 {
1809 if (isReadOnly_) {
1810 return E_NOT_SUPPORT;
1811 }
1812 std::string sql = std::string(GlobalExpr::PRAGMA_VERSION) + " = " + std::to_string(version);
1813 auto [errCode, statement] = GetStatement(sql);
1814 if (statement == nullptr) {
1815 return errCode;
1816 }
1817 return statement->Execute();
1818 }
1819 /**
1820 * Begins a transaction in EXCLUSIVE mode.
1821 */
BeginTransaction()1822 int RdbStoreImpl::BeginTransaction()
1823 {
1824 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1825 std::lock_guard<std::mutex> lockGuard(connectionPool_->GetTransactionStackMutex());
1826 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1827 return E_NOT_SUPPORT;
1828 }
1829 // size + 1 means the number of transactions in process
1830 size_t transactionId = connectionPool_->GetTransactionStack().size() + 1;
1831 BaseTransaction transaction(connectionPool_->GetTransactionStack().size());
1832 auto [errCode, statement] = GetStatement(transaction.GetTransactionStr());
1833 if (statement == nullptr) {
1834 return errCode;
1835 }
1836 errCode = statement->Execute();
1837 if (errCode != E_OK) {
1838 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1839 connectionPool_->Dump(true, "BEGIN");
1840 }
1841 LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
1842 transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
1843 return errCode;
1844 }
1845 connectionPool_->SetInTransaction(true);
1846 connectionPool_->GetTransactionStack().push(transaction);
1847 // 1 means the number of transactions in process
1848 if (transactionId > 1) {
1849 LOG_WARN("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
1850 transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
1851 }
1852
1853 return E_OK;
1854 }
1855
BeginTrans()1856 std::pair<int, int64_t> RdbStoreImpl::BeginTrans()
1857 {
1858 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1859 if (!config_.IsVector() || isReadOnly_) {
1860 return {E_NOT_SUPPORT, 0};
1861 }
1862
1863 int64_t tmpTrxId = 0;
1864 auto [errCode, connection] = connectionPool_->CreateTransConn(false);
1865 if (connection == nullptr) {
1866 LOG_ERROR("Get null connection, storeName: %{public}s errCode:0x%{public}x.",
1867 SqliteUtils::Anonymous(name_).c_str(), errCode);
1868 return {errCode, 0};
1869 }
1870 tmpTrxId = newTrxId_.fetch_add(1);
1871 trxConnMap_.Insert(tmpTrxId, connection);
1872 errCode = ExecuteByTrxId(BEGIN_TRANSACTION_SQL, tmpTrxId);
1873 if (errCode != E_OK) {
1874 trxConnMap_.Erase(tmpTrxId);
1875 }
1876 return {errCode, tmpTrxId};
1877 }
1878
1879 /**
1880 * Begins a transaction in EXCLUSIVE mode.
1881 */
RollBack()1882 int RdbStoreImpl::RollBack()
1883 {
1884 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1885 std::lock_guard<std::mutex> lockGuard(connectionPool_->GetTransactionStackMutex());
1886 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1887 return E_NOT_SUPPORT;
1888 }
1889 size_t transactionId = connectionPool_->GetTransactionStack().size();
1890
1891 if (connectionPool_->GetTransactionStack().empty()) {
1892 LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s", transactionId,
1893 SqliteUtils::Anonymous(name_).c_str());
1894 return E_NO_TRANSACTION_IN_SESSION;
1895 }
1896 BaseTransaction transaction = connectionPool_->GetTransactionStack().top();
1897 connectionPool_->GetTransactionStack().pop();
1898 if (transaction.GetType() != TransType::ROLLBACK_SELF && !connectionPool_->GetTransactionStack().empty()) {
1899 connectionPool_->GetTransactionStack().top().SetChildFailure(true);
1900 }
1901 auto [errCode, statement] = GetStatement(transaction.GetRollbackStr());
1902 if (statement == nullptr) {
1903 if (errCode == E_DATABASE_BUSY) {
1904 Reportor::Report(Reportor::Create(config_, errCode, "ErrorType: RollBusy"));
1905 }
1906 // size + 1 means the number of transactions in process
1907 LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s", transactionId + 1,
1908 SqliteUtils::Anonymous(name_).c_str());
1909 return E_DATABASE_BUSY;
1910 }
1911 errCode = statement->Execute();
1912 if (errCode != E_OK) {
1913 if (errCode == E_SQLITE_BUSY || errCode == E_SQLITE_LOCKED) {
1914 Reportor::Report(Reportor::Create(config_, errCode, "ErrorType: RollBusy"));
1915 }
1916 LOG_ERROR("failed, id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
1917 transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
1918 return errCode;
1919 }
1920 if (connectionPool_->GetTransactionStack().empty()) {
1921 connectionPool_->SetInTransaction(false);
1922 }
1923 // 1 means the number of transactions in process
1924 if (transactionId > 1) {
1925 LOG_WARN("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
1926 transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
1927 }
1928 return E_OK;
1929 }
1930
ExecuteByTrxId(const std::string & sql,int64_t trxId,bool closeConnAfterExecute,const std::vector<ValueObject> & bindArgs)1931 int RdbStoreImpl::ExecuteByTrxId(const std::string &sql, int64_t trxId, bool closeConnAfterExecute,
1932 const std::vector<ValueObject> &bindArgs)
1933 {
1934 if ((!config_.IsVector()) || isReadOnly_) {
1935 return E_NOT_SUPPORT;
1936 }
1937 if (trxId == 0) {
1938 return E_INVALID_ARGS;
1939 }
1940
1941 if (!trxConnMap_.Contains(trxId)) {
1942 LOG_ERROR("trxId hasn't appeared before %{public}" PRIu64, trxId);
1943 return E_INVALID_ARGS;
1944 }
1945 auto time = static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
1946 auto result = trxConnMap_.Find(trxId);
1947 auto connection = result.second;
1948 if (connection == nullptr) {
1949 LOG_ERROR("Get null connection, storeName: %{public}s time:%{public}" PRIu64 ".",
1950 SqliteUtils::Anonymous(name_).c_str(), time);
1951 return E_ERROR;
1952 }
1953 auto [ret, statement] = GetStatement(sql, connection);
1954 if (ret != E_OK) {
1955 return ret;
1956 }
1957 ret = statement->Execute(bindArgs);
1958 if (ret != E_OK) {
1959 LOG_ERROR("transaction id: %{public}" PRIu64 ", storeName: %{public}s, errCode: %{public}d" PRIu64, trxId,
1960 SqliteUtils::Anonymous(name_).c_str(), ret);
1961 trxConnMap_.Erase(trxId);
1962 return ret;
1963 }
1964 if (closeConnAfterExecute) {
1965 trxConnMap_.Erase(trxId);
1966 }
1967 return E_OK;
1968 }
1969
RollBack(int64_t trxId)1970 int RdbStoreImpl::RollBack(int64_t trxId)
1971 {
1972 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1973 return ExecuteByTrxId(ROLLBACK_TRANSACTION_SQL, trxId, true);
1974 }
1975
1976 /**
1977 * Begins a transaction in EXCLUSIVE mode.
1978 */
Commit()1979 int RdbStoreImpl::Commit()
1980 {
1981 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1982 std::lock_guard<std::mutex> lockGuard(connectionPool_->GetTransactionStackMutex());
1983 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1984 return E_NOT_SUPPORT;
1985 }
1986 size_t transactionId = connectionPool_->GetTransactionStack().size();
1987
1988 if (connectionPool_->GetTransactionStack().empty()) {
1989 return E_OK;
1990 }
1991 BaseTransaction transaction = connectionPool_->GetTransactionStack().top();
1992 std::string sqlStr = transaction.GetCommitStr();
1993 if (sqlStr.size() <= 1) {
1994 LOG_WARN("id: %{public}zu, storeName: %{public}s, sql: %{public}s",
1995 transactionId, SqliteUtils::Anonymous(name_).c_str(), sqlStr.c_str());
1996 connectionPool_->GetTransactionStack().pop();
1997 return E_OK;
1998 }
1999 auto [errCode, statement] = GetStatement(sqlStr);
2000 if (statement == nullptr) {
2001 if (errCode == E_DATABASE_BUSY || errCode == E_SQLITE_BUSY || E_SQLITE_LOCKED) {
2002 Reportor::Report(Reportor::Create(config_, E_DATABASE_BUSY, "ErrorType: Busy"));
2003 }
2004 LOG_ERROR("id: %{public}zu, storeName: %{public}s, statement error", transactionId,
2005 SqliteUtils::Anonymous(name_).c_str());
2006 return E_DATABASE_BUSY;
2007 }
2008 errCode = statement->Execute();
2009 if (errCode != E_OK) {
2010 if (errCode == E_SQLITE_BUSY || errCode == E_SQLITE_LOCKED) {
2011 Reportor::Report(Reportor::Create(config_, errCode, "ErrorType: CommitBusy"));
2012 }
2013 LOG_ERROR("failed, id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
2014 transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
2015 return errCode;
2016 }
2017 connectionPool_->SetInTransaction(false);
2018 // 1 means the number of transactions in process
2019 if (transactionId > 1) {
2020 LOG_WARN("id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2021 SqliteUtils::Anonymous(name_).c_str(), errCode);
2022 }
2023 connectionPool_->GetTransactionStack().pop();
2024 return E_OK;
2025 }
2026
Commit(int64_t trxId)2027 int RdbStoreImpl::Commit(int64_t trxId)
2028 {
2029 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2030 return ExecuteByTrxId(COMMIT_TRANSACTION_SQL, trxId, true);
2031 }
2032
IsInTransaction()2033 bool RdbStoreImpl::IsInTransaction()
2034 {
2035 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2036 return false;
2037 }
2038 return connectionPool_->IsInTransaction();
2039 }
2040
CheckAttach(const std::string & sql)2041 int RdbStoreImpl::CheckAttach(const std::string &sql)
2042 {
2043 size_t index = sql.find_first_not_of(' ');
2044 if (index == std::string::npos) {
2045 return E_OK;
2046 }
2047
2048 /* The first 3 characters can determine the type */
2049 std::string sqlType = sql.substr(index, 3);
2050 sqlType = SqliteUtils::StrToUpper(sqlType);
2051 if (sqlType != "ATT") {
2052 return E_OK;
2053 }
2054
2055 auto [errCode, statement] = GetStatement(GlobalExpr::PRAGMA_JOUR_MODE_EXP);
2056 if (statement == nullptr) {
2057 return errCode;
2058 }
2059
2060 errCode = statement->Execute();
2061 if (errCode != E_OK) {
2062 LOG_ERROR("RdbStoreImpl CheckAttach fail to get journal mode : %{public}d", errCode);
2063 return errCode;
2064 }
2065 auto [errorCode, valueObject] = statement->GetColumn(0);
2066 if (errorCode != E_OK) {
2067 LOG_ERROR("RdbStoreImpl CheckAttach fail to get journal mode : %{public}d", errorCode);
2068 return errorCode;
2069 }
2070 auto journal = std::get_if<std::string>(&valueObject.value);
2071 auto journalMode = SqliteUtils::StrToUpper((journal == nullptr) ? "" : *journal);
2072 if (journalMode == RdbStoreConfig::DB_DEFAULT_JOURNAL_MODE) {
2073 LOG_ERROR("RdbStoreImpl attach is not supported in WAL mode");
2074 return E_NOT_SUPPORTED_ATTACH_IN_WAL_MODE;
2075 }
2076
2077 return E_OK;
2078 }
2079
IsOpen() const2080 bool RdbStoreImpl::IsOpen() const
2081 {
2082 return isOpen_;
2083 }
2084
GetPath()2085 std::string RdbStoreImpl::GetPath()
2086 {
2087 return path_;
2088 }
2089
IsReadOnly() const2090 bool RdbStoreImpl::IsReadOnly() const
2091 {
2092 return isReadOnly_;
2093 }
2094
IsMemoryRdb() const2095 bool RdbStoreImpl::IsMemoryRdb() const
2096 {
2097 return isMemoryRdb_;
2098 }
2099
GetName()2100 std::string RdbStoreImpl::GetName()
2101 {
2102 return name_;
2103 }
2104
DoCloudSync(const std::string & table)2105 void RdbStoreImpl::DoCloudSync(const std::string &table)
2106 {
2107 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
2108 auto needSync = cloudInfo_->Change(table);
2109 if (!needSync) {
2110 return;
2111 }
2112 auto pool = TaskExecutor::GetInstance().GetExecutor();
2113 if (pool == nullptr) {
2114 return;
2115 }
2116 auto interval =
2117 std::chrono::duration_cast<std::chrono::steady_clock::duration>(std::chrono::milliseconds(INTERVAL));
2118 pool->Schedule(interval, [cloudInfo = std::weak_ptr<CloudTables>(cloudInfo_), param = syncerParam_]() {
2119 auto changeInfo = cloudInfo.lock();
2120 if (changeInfo == nullptr) {
2121 return ;
2122 }
2123 auto tables = changeInfo->Steal();
2124 if (tables.empty()) {
2125 return;
2126 }
2127 DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true };
2128 auto memo = AbsRdbPredicates(std::vector<std::string>(tables.begin(), tables.end())).GetDistributedPredicates();
2129 InnerSync(param, option, memo, nullptr);
2130 });
2131 #endif
2132 }
GetFileType()2133 std::string RdbStoreImpl::GetFileType()
2134 {
2135 return fileType_;
2136 }
2137
2138 /**
2139 * Sets the database locale.
2140 */
ConfigLocale(const std::string & localeStr)2141 int RdbStoreImpl::ConfigLocale(const std::string &localeStr)
2142 {
2143 if (!isOpen_) {
2144 LOG_ERROR("The connection pool has been closed.");
2145 return E_ERROR;
2146 }
2147
2148 if (connectionPool_ == nullptr) {
2149 LOG_ERROR("connectionPool_ is null.");
2150 return E_ERROR;
2151 }
2152 return connectionPool_->ConfigLocale(localeStr);
2153 }
2154
GetDestPath(const std::string & backupPath,std::string & destPath)2155 int RdbStoreImpl::GetDestPath(const std::string &backupPath, std::string &destPath)
2156 {
2157 int ret = GetDataBasePath(backupPath, destPath);
2158 if (ret != E_OK) {
2159 return ret;
2160 }
2161 std::string tempPath = destPath + ".tmp";
2162 if (access(tempPath.c_str(), F_OK) == E_OK) {
2163 destPath = tempPath;
2164 } else {
2165 auto walFile = destPath + "-wal";
2166 if (access(walFile.c_str(), F_OK) == E_OK) {
2167 return E_ERROR;
2168 }
2169 }
2170
2171 if (access(destPath.c_str(), F_OK) != E_OK) {
2172 LOG_ERROR("The backupFilePath does not exists.");
2173 return E_INVALID_FILE_PATH;
2174 }
2175 return E_OK;
2176 }
2177
Restore(const std::string & backupPath,const std::vector<uint8_t> & newKey)2178 int RdbStoreImpl::Restore(const std::string &backupPath, const std::vector<uint8_t> &newKey)
2179 {
2180 LOG_INFO("Restore db: %{public}s.", SqliteUtils::Anonymous(config_.GetName()).c_str());
2181 if (isReadOnly_) {
2182 return E_NOT_SUPPORT;
2183 }
2184
2185 if (!isOpen_ || connectionPool_ == nullptr) {
2186 LOG_ERROR("The pool is: %{public}d, pool is null: %{public}d", isOpen_, connectionPool_ == nullptr);
2187 return E_ERROR;
2188 }
2189
2190 RdbSecurityManager::KeyFiles keyFiles(path_ + BACKUP_RESTORE);
2191 keyFiles.Lock();
2192 std::string destPath;
2193 bool isOK = TryGetMasterSlaveBackupPath(backupPath, destPath, true);
2194 if (!isOK) {
2195 int ret = GetDestPath(backupPath, destPath);
2196 if (ret != E_OK) {
2197 keyFiles.Unlock();
2198 return ret;
2199 }
2200 }
2201 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
2202 auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
2203 if (service != nullptr) {
2204 service->Disable(syncerParam_);
2205 }
2206 #endif
2207 bool corrupt = Reportor::IsReportCorruptedFault(path_);
2208 int errCode = connectionPool_->ChangeDbFileForRestore(path_, destPath, newKey, slaveStatus_);
2209 keyFiles.Unlock();
2210 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
2211 SecurityPolicy::SetSecurityLabel(config_);
2212 if (service != nullptr) {
2213 service->Enable(syncerParam_);
2214 if (errCode == E_OK) {
2215 auto syncerParam = syncerParam_;
2216 syncerParam.infos_ = Connection::Collect(config_);
2217 service->AfterOpen(syncerParam);
2218 NotifyDataChange();
2219 }
2220 }
2221 #endif
2222 if (errCode == E_OK) {
2223 Reportor::ReportRestore(Reportor::Create(config_, E_OK), corrupt);
2224 rebuild_ = RebuiltType::NONE;
2225 }
2226 DoCloudSync("");
2227 return errCode;
2228 }
2229
CreateWritableConn()2230 std::pair<int32_t, std::shared_ptr<Connection>> RdbStoreImpl::CreateWritableConn()
2231 {
2232 auto config = config_;
2233 config.SetHaMode(HAMode::SINGLE);
2234 auto [result, conn] = Connection::Create(config, true);
2235 if (result != E_OK || conn == nullptr) {
2236 LOG_ERROR("create connection failed, err:%{public}d", result);
2237 return { result, nullptr };
2238 }
2239 return { E_OK, conn };
2240 }
2241
GetStatement(const std::string & sql,std::shared_ptr<Connection> conn) const2242 std::pair<int32_t, std::shared_ptr<Statement>> RdbStoreImpl::GetStatement(
2243 const std::string &sql, std::shared_ptr<Connection> conn) const
2244 {
2245 if (conn == nullptr) {
2246 return { E_DATABASE_BUSY, nullptr };
2247 }
2248 return conn->CreateStatement(sql, conn);
2249 }
2250
GetStatement(const std::string & sql,bool read) const2251 std::pair<int32_t, std::shared_ptr<Statement>> RdbStoreImpl::GetStatement(const std::string& sql, bool read) const
2252 {
2253 auto conn = connectionPool_->AcquireConnection(read);
2254 if (conn == nullptr) {
2255 return { E_DATABASE_BUSY, nullptr };
2256 }
2257 return conn->CreateStatement(sql, conn);
2258 }
2259
GetRebuilt(RebuiltType & rebuilt)2260 int RdbStoreImpl::GetRebuilt(RebuiltType &rebuilt)
2261 {
2262 rebuilt = static_cast<RebuiltType>(rebuild_);
2263 return E_OK;
2264 }
2265
InterruptBackup()2266 int RdbStoreImpl::InterruptBackup()
2267 {
2268 if (config_.GetHaMode() != HAMode::MANUAL_TRIGGER) {
2269 return E_NOT_SUPPORT;
2270 }
2271 if (slaveStatus_ == SlaveStatus::BACKING_UP) {
2272 slaveStatus_ = SlaveStatus::BACKUP_INTERRUPT;
2273 return E_OK;
2274 }
2275 return E_INVALID_INTERRUPT;
2276 }
2277
GetBackupStatus() const2278 int32_t RdbStoreImpl::GetBackupStatus() const
2279 {
2280 if (config_.GetHaMode() != HAMode::MANUAL_TRIGGER && config_.GetHaMode() != HAMode::MAIN_REPLICA) {
2281 return SlaveStatus::UNDEFINED;
2282 }
2283 return slaveStatus_;
2284 }
2285
TryGetMasterSlaveBackupPath(const std::string & srcPath,std::string & destPath,bool isRestore)2286 bool RdbStoreImpl::TryGetMasterSlaveBackupPath(const std::string &srcPath, std::string &destPath, bool isRestore)
2287 {
2288 if (!srcPath.empty() || config_.GetHaMode() == HAMode::SINGLE || config_.GetDBType() != DB_SQLITE) {
2289 return false;
2290 }
2291 int ret = GetSlaveName(config_.GetPath(), destPath);
2292 if (ret != E_OK) {
2293 destPath = {};
2294 return false;
2295 }
2296 if (isRestore && access(destPath.c_str(), F_OK) != 0) {
2297 LOG_WARN("The backup path can not access: %{public}s", SqliteUtils::Anonymous(destPath).c_str());
2298 return false;
2299 }
2300 return true;
2301 }
2302
IsSlaveDiffFromMaster() const2303 bool RdbStoreImpl::IsSlaveDiffFromMaster() const
2304 {
2305 std::string failureFlagFile = config_.GetPath() + "-slaveFailure";
2306 std::string slaveDbPath = SqliteUtils::GetSlavePath(config_.GetPath());
2307 return access(failureFlagFile.c_str(), F_OK) == 0 || access(slaveDbPath.c_str(), F_OK) != 0;
2308 }
2309
ExchangeSlaverToMaster()2310 int32_t RdbStoreImpl::ExchangeSlaverToMaster()
2311 {
2312 if (isReadOnly_) {
2313 return E_OK;
2314 }
2315 auto conn = connectionPool_->AcquireConnection(false);
2316 if (conn == nullptr) {
2317 return E_DATABASE_BUSY;
2318 }
2319 auto strategy = conn->GenerateExchangeStrategy(slaveStatus_);
2320 if (strategy != ExchangeStrategy::NOT_HANDLE) {
2321 LOG_WARN("exchange st:%{public}d, %{public}s,", strategy, SqliteUtils::Anonymous(config_.GetName()).c_str());
2322 }
2323 int ret = E_OK;
2324 if (strategy == ExchangeStrategy::RESTORE) {
2325 conn = nullptr;
2326 // disable is required before restore
2327 ret = Restore({}, {});
2328 } else if (strategy == ExchangeStrategy::BACKUP) {
2329 // async backup
2330 ret = conn->Backup({}, {}, true, slaveStatus_);
2331 }
2332 return ret;
2333 }
2334
GetDbType() const2335 int32_t RdbStoreImpl::GetDbType() const
2336 {
2337 return config_.GetDBType();
2338 }
2339
CreateTransaction(int32_t type)2340 std::pair<int32_t, std::shared_ptr<Transaction>> RdbStoreImpl::CreateTransaction(int32_t type)
2341 {
2342 if (isReadOnly_) {
2343 return { E_NOT_SUPPORT, nullptr};
2344 }
2345
2346 auto [errCode, conn] = connectionPool_->CreateTransConn();
2347 if (conn == nullptr) {
2348 return { errCode, nullptr };
2349 }
2350 std::shared_ptr<Transaction> trans;
2351 std::tie(errCode, trans) = Transaction::Create(type, conn, config_.GetName());
2352 if (trans == nullptr) {
2353 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
2354 connectionPool_->Dump(true, "TRANS");
2355 }
2356 return { errCode, nullptr };
2357 }
2358
2359 std::lock_guard<decltype(mutex_)> guard(mutex_);
2360 for (auto it = transactions_.begin(); it != transactions_.end();) {
2361 if (it->expired()) {
2362 it = transactions_.erase(it);
2363 } else {
2364 it++;
2365 }
2366 }
2367 transactions_.push_back(trans);
2368 return { errCode, trans };
2369 }
2370
AddTables(const std::vector<std::string> & tables)2371 int32_t RdbStoreImpl::CloudTables::AddTables(const std::vector<std::string> &tables)
2372 {
2373 std::lock_guard<std::mutex> lock(mutex_);
2374 for (auto &table : tables) {
2375 tables_.insert(table);
2376 }
2377 return E_OK;
2378 }
2379
RmvTables(const std::vector<std::string> & tables)2380 int32_t RdbStoreImpl::CloudTables::RmvTables(const std::vector<std::string> &tables)
2381 {
2382 std::lock_guard<std::mutex> lock(mutex_);
2383 for (auto &table : tables) {
2384 tables_.erase(table);
2385 }
2386 return E_OK;
2387 }
2388
Change(const std::string & table)2389 bool RdbStoreImpl::CloudTables::Change(const std::string &table)
2390 {
2391 bool needSync = false;
2392 {
2393 std::lock_guard<std::mutex> lock(mutex_);
2394 if (tables_.empty() || (!table.empty() && tables_.find(table) == tables_.end())) {
2395 return needSync;
2396 }
2397 // from empty, then need schedule the cloud sync, others only wait the schedule execute.
2398 needSync = changes_.empty();
2399 if (!table.empty()) {
2400 changes_.insert(table);
2401 } else {
2402 changes_.insert(tables_.begin(), tables_.end());
2403 }
2404 }
2405 return needSync;
2406 }
2407
Steal()2408 std::set<std::string> RdbStoreImpl::CloudTables::Steal()
2409 {
2410 std::set<std::string> result;
2411 {
2412 std::lock_guard<std::mutex> lock(mutex_);
2413 result = std::move(changes_);
2414 }
2415 return result;
2416 }
2417 } // namespace OHOS::NativeRdb