1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "RdbStoreImpl"
16 #include "rdb_store_impl.h"
17 
18 #include <algorithm>
19 #include <chrono>
20 #include <cinttypes>
21 #include <cstdint>
22 #include <memory>
23 #include <mutex>
24 #include <sstream>
25 #include <string>
26 #include <unistd.h>
27 
28 #include "cache_result_set.h"
29 #include "directory_ex.h"
30 #include "logger.h"
31 #include "rdb_common.h"
32 #include "rdb_errno.h"
33 #include "rdb_fault_hiview_reporter.h"
34 #include "rdb_radar_reporter.h"
35 #include "rdb_security_manager.h"
36 #include "rdb_sql_statistic.h"
37 #include "rdb_store.h"
38 #include "rdb_trace.h"
39 #include "rdb_types.h"
40 #include "relational_store_client.h"
41 #include "sqlite_global_config.h"
42 #include "sqlite_sql_builder.h"
43 #include "sqlite_statement.h"
44 #include "sqlite_utils.h"
45 #include "step_result_set.h"
46 #include "values_buckets.h"
47 #include "task_executor.h"
48 #include "traits.h"
49 #include "transaction.h"
50 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
51 #include "delay_notify.h"
52 #include "raw_data_parser.h"
53 #include "rdb_device_manager_adapter.h"
54 #include "rdb_manager_impl.h"
55 #include "relational_store_manager.h"
56 #include "runtime_config.h"
57 #include "security_policy.h"
58 #include "sqlite_shared_result_set.h"
59 #endif
60 
61 #ifdef WINDOWS_PLATFORM
62 #define ISFILE(filePath) ((filePath.find("\\") == std::string::npos))
63 #else
64 #define ISFILE(filePath) ((filePath.find("/") == std::string::npos))
65 #endif
66 
67 namespace OHOS::NativeRdb {
68 using namespace OHOS::Rdb;
69 using namespace std::chrono;
70 using SqlStatistic = DistributedRdb::SqlStatistic;
71 using RdbNotifyConfig = DistributedRdb::RdbNotifyConfig;
72 using Reportor = RdbFaultHiViewReporter;
73 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
74 using RdbMgr = DistributedRdb::RdbManagerImpl;
75 #endif
76 
77 static constexpr const char *BEGIN_TRANSACTION_SQL = "begin;";
78 static constexpr const char *COMMIT_TRANSACTION_SQL = "commit;";
79 static constexpr const char *ROLLBACK_TRANSACTION_SQL = "rollback;";
80 static constexpr const char *BACKUP_RESTORE = "backup.restore";
81 constexpr int64_t TIME_OUT = 1500;
82 
InitSyncerParam(const RdbStoreConfig & config,bool created)83 void RdbStoreImpl::InitSyncerParam(const RdbStoreConfig &config, bool created)
84 {
85     syncerParam_.bundleName_ = config.GetBundleName();
86     syncerParam_.hapName_ = config.GetModuleName();
87     syncerParam_.storeName_ = config.GetName();
88     syncerParam_.customDir_ = config.GetCustomDir();
89     syncerParam_.area_ = config.GetArea();
90     syncerParam_.level_ = static_cast<int32_t>(config.GetSecurityLevel());
91     syncerParam_.type_ = config.GetDistributedType();
92     syncerParam_.isEncrypt_ = config.IsEncrypt();
93     syncerParam_.isAutoClean_ = config.GetAutoClean();
94     syncerParam_.isSearchable_ = config.IsSearchable();
95     syncerParam_.password_ = config.GetEncryptKey();
96     syncerParam_.haMode_ = config.GetHaMode();
97     syncerParam_.roleType_ = config.GetRoleType();
98     if (created) {
99         syncerParam_.infos_ = Connection::Collect(config);
100     }
101 }
102 
InnerOpen()103 int RdbStoreImpl::InnerOpen()
104 {
105     isOpen_ = true;
106 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
107     if (isReadOnly_) {
108         return E_OK;
109     }
110 
111     AfterOpen(syncerParam_);
112     int errCode = RegisterDataChangeCallback();
113     if (errCode != E_OK) {
114         LOG_ERROR("RegisterCallBackObserver is failed, err is %{public}d.", errCode);
115     }
116 #endif
117     return E_OK;
118 }
119 
120 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
AfterOpen(const RdbParam & param,int32_t retry)121 void RdbStoreImpl::AfterOpen(const RdbParam &param, int32_t retry)
122 {
123     auto [err, service] = RdbMgr::GetInstance().GetRdbService(param);
124     if (err == E_NOT_SUPPORT) {
125         return;
126     }
127     if (err != E_OK || service == nullptr) {
128         LOG_ERROR("GetRdbService failed, err: %{public}d, storeName: %{public}s.", err,
129             SqliteUtils::Anonymous(param.storeName_).c_str());
130         auto pool = TaskExecutor::GetInstance().GetExecutor();
131         if (err == E_SERVICE_NOT_FOUND && pool != nullptr && retry++ < MAX_RETRY_TIMES) {
132             pool->Schedule(std::chrono::seconds(RETRY_INTERVAL), [param, retry]() {
133                 AfterOpen(param, retry);
134             });
135         }
136         return;
137     }
138     err = service->AfterOpen(param);
139     if (err != E_OK) {
140         LOG_ERROR("AfterOpen failed, err: %{public}d, storeName: %{public}s.", err,
141             SqliteUtils::Anonymous(param.storeName_).c_str());
142     }
143 }
144 
GetModifyTime(const std::string & table,const std::string & columnName,std::vector<PRIKey> & keys)145 RdbStore::ModifyTime RdbStoreImpl::GetModifyTime(const std::string &table, const std::string &columnName,
146     std::vector<PRIKey> &keys)
147 {
148     if (table.empty() || columnName.empty() || keys.empty()) {
149         LOG_ERROR("invalid para.");
150         return {};
151     }
152 
153     auto logTable = DistributedDB::RelationalStoreManager::GetDistributedLogTableName(table);
154     if (SqliteUtils::StrToUpper(columnName) == ROW_ID) {
155         return GetModifyTimeByRowId(logTable, keys);
156     }
157     std::vector<ValueObject> hashKeys;
158     hashKeys.reserve(keys.size());
159     std::map<std::vector<uint8_t>, PRIKey> keyMap;
160     std::map<std::string, DistributedDB::Type> tmp;
161     for (const auto &key : keys) {
162         DistributedDB::Type value;
163         RawDataParser::Convert(key, value);
164         tmp[columnName] = value;
165         auto hashKey = DistributedDB::RelationalStoreManager::CalcPrimaryKeyHash(tmp);
166         if (hashKey.empty()) {
167             LOG_DEBUG("hash key fail");
168             continue;
169         }
170         hashKeys.emplace_back(ValueObject(hashKey));
171         keyMap[hashKey] = key;
172     }
173 
174     std::string sql;
175     sql.append("select hash_key as key, timestamp/10000 as modify_time from ");
176     sql.append(logTable);
177     sql.append(" where hash_key in (");
178     sql.append(SqliteSqlBuilder::GetSqlArgs(hashKeys.size()));
179     sql.append(")");
180     auto resultSet = QueryByStep(sql, hashKeys, true);
181     int count = 0;
182     if (resultSet == nullptr || resultSet->GetRowCount(count) != E_OK || count <= 0) {
183         LOG_ERROR("get resultSet err.");
184         return {};
185     }
186     return { resultSet, keyMap, false };
187 }
188 
GetModifyTimeByRowId(const std::string & logTable,std::vector<PRIKey> & keys)189 RdbStore::ModifyTime RdbStoreImpl::GetModifyTimeByRowId(const std::string &logTable, std::vector<PRIKey> &keys)
190 {
191     std::string sql;
192     sql.append("select data_key as key, timestamp/10000 as modify_time from ");
193     sql.append(logTable);
194     sql.append(" where data_key in (");
195     sql.append(SqliteSqlBuilder::GetSqlArgs(keys.size()));
196     sql.append(")");
197     std::vector<ValueObject> args;
198     args.reserve(keys.size());
199     for (auto &key : keys) {
200         ValueObject::Type value;
201         RawDataParser::Convert(key, value);
202         args.emplace_back(ValueObject(value));
203     }
204     auto resultSet = QueryByStep(sql, args, true);
205     int count = 0;
206     if (resultSet == nullptr || resultSet->GetRowCount(count) != E_OK || count <= 0) {
207         LOG_ERROR("get resultSet err.");
208         return {};
209     }
210     return ModifyTime(resultSet, {}, true);
211 }
212 
CleanDirtyData(const std::string & table,uint64_t cursor)213 int RdbStoreImpl::CleanDirtyData(const std::string &table, uint64_t cursor)
214 {
215     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
216         LOG_ERROR("not support. table:%{public}s, isRead:%{public}d, dbType:%{public}d",
217             SqliteUtils::Anonymous(table).c_str(), isReadOnly_, config_.GetDBType());
218         return E_NOT_SUPPORT;
219     }
220     auto connection = connectionPool_->AcquireConnection(false);
221     if (connection == nullptr) {
222         LOG_ERROR("db is busy. table:%{public}s", SqliteUtils::Anonymous(table).c_str());
223         return E_DATABASE_BUSY;
224     }
225     int errCode = connection->CleanDirtyData(table, cursor);
226     return errCode;
227 }
228 
GetLogTableName(const std::string & tableName)229 std::string RdbStoreImpl::GetLogTableName(const std::string &tableName)
230 {
231     return DistributedDB::RelationalStoreManager::GetDistributedLogTableName(tableName);
232 }
233 
QuerySharingResource(const AbsRdbPredicates & predicates,const Fields & columns)234 std::pair<int32_t, std::shared_ptr<ResultSet>> RdbStoreImpl::QuerySharingResource(const AbsRdbPredicates &predicates,
235     const Fields &columns)
236 {
237     if (config_.GetDBType() == DB_VECTOR) {
238         return { E_NOT_SUPPORT, nullptr };
239     }
240     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
241     if (errCode != E_OK) {
242         return { errCode, nullptr };
243     }
244     auto [status, resultSet] =
245         service->QuerySharingResource(syncerParam_, predicates.GetDistributedPredicates(), columns);
246     if (status != E_OK) {
247         return { status, nullptr };
248     }
249     return { status, resultSet };
250 }
251 
RemoteQuery(const std::string & device,const AbsRdbPredicates & predicates,const Fields & columns,int & errCode)252 std::shared_ptr<ResultSet> RdbStoreImpl::RemoteQuery(const std::string &device, const AbsRdbPredicates &predicates,
253     const Fields &columns, int &errCode)
254 {
255     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
256     if (config_.GetDBType() == DB_VECTOR) {
257         return nullptr;
258     }
259     std::vector<std::string> selectionArgs = predicates.GetWhereArgs();
260     std::string sql = SqliteSqlBuilder::BuildQueryString(predicates, columns);
261     auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
262     if (err == E_NOT_SUPPORT) {
263         errCode = err;
264         return nullptr;
265     }
266     if (err != E_OK) {
267         LOG_ERROR("RdbStoreImpl::RemoteQuery get service failed");
268         errCode = err;
269         return nullptr;
270     }
271     auto [status, resultSet] = service->RemoteQuery(syncerParam_, device, sql, selectionArgs);
272     errCode = status;
273     return resultSet;
274 }
275 
NotifyDataChange()276 void RdbStoreImpl::NotifyDataChange()
277 {
278     int errCode = RegisterDataChangeCallback();
279     if (errCode != E_OK) {
280         LOG_ERROR("RegisterDataChangeCallback is failed, err is %{public}d.", errCode);
281     }
282     DistributedRdb::RdbChangedData rdbChangedData;
283     if (delayNotifier_ != nullptr) {
284         delayNotifier_->UpdateNotify(rdbChangedData, true);
285     }
286 }
287 
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const DistributedRdb::DistributedConfig & distributedConfig)288 int RdbStoreImpl::SetDistributedTables(const std::vector<std::string> &tables, int32_t type,
289                                        const DistributedRdb::DistributedConfig &distributedConfig)
290 {
291     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
292     if (config_.GetDBType() == DB_VECTOR || isReadOnly_) {
293         return E_NOT_SUPPORT;
294     }
295     if (tables.empty()) {
296         LOG_WARN("The distributed tables to be set is empty.");
297         return E_OK;
298     }
299     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
300     if (errCode != E_OK) {
301         return errCode;
302     }
303     int32_t errorCode = service->SetDistributedTables(syncerParam_, tables, distributedConfig.references,
304         distributedConfig.isRebuild, type);
305     if (errorCode != E_OK) {
306         LOG_ERROR("Fail to set distributed tables, error=%{public}d", errorCode);
307         return errorCode;
308     }
309     if (type != DistributedRdb::DISTRIBUTED_CLOUD) {
310         return E_OK;
311     }
312     auto conn = connectionPool_->AcquireConnection(false);
313     if (conn != nullptr) {
314         auto strategy = conn->GenerateExchangeStrategy(slaveStatus_);
315         if (strategy == ExchangeStrategy::BACKUP) {
316             (void)conn->Backup({}, {}, false, slaveStatus_);
317         }
318     }
319     {
320         std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
321         if (distributedConfig.autoSync) {
322             cloudInfo_->AddTables(tables);
323         } else {
324             cloudInfo_->RmvTables(tables);
325             return E_OK;
326         }
327     }
328     auto isRebuilt = RebuiltType::NONE;
329     GetRebuilt(isRebuilt);
330     if (isRebuilt == RebuiltType::REBUILT) {
331         DoCloudSync("");
332     }
333     return E_OK;
334 }
335 
ObtainDistributedTableName(const std::string & device,const std::string & table,int & errCode)336 std::string RdbStoreImpl::ObtainDistributedTableName(const std::string &device, const std::string &table, int &errCode)
337 {
338     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
339     if (config_.GetDBType() == DB_VECTOR) {
340         return "";
341     }
342     std::string uuid;
343     DeviceManagerAdaptor::RdbDeviceManagerAdaptor &deviceManager =
344         DeviceManagerAdaptor::RdbDeviceManagerAdaptor::GetInstance(syncerParam_.bundleName_);
345     errCode = deviceManager.GetEncryptedUuidByNetworkId(device, uuid);
346     if (errCode != E_OK) {
347         LOG_ERROR("GetUuid is failed.");
348         return "";
349     }
350 
351     auto translateCall = [uuid](const std::string &oriDevId, const DistributedDB::StoreInfo &info) {
352         return uuid;
353     };
354     DistributedDB::RuntimeConfig::SetTranslateToDeviceIdCallback(translateCall);
355 
356     return DistributedDB::RelationalStoreManager::GetDistributedTableName(uuid, table);
357 }
358 
Sync(const SyncOption & option,const AbsRdbPredicates & predicate,const AsyncBrief & callback)359 int RdbStoreImpl::Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncBrief &callback)
360 {
361     if (config_.GetDBType() == DB_VECTOR) {
362         return E_NOT_SUPPORT;
363     }
364     return Sync(option, predicate, [callback](Details &&details) {
365         Briefs briefs;
366         for (auto &[key, value] : details) {
367             briefs.insert_or_assign(key, value.code);
368         }
369         if (callback != nullptr) {
370             callback(briefs);
371         }
372     });
373 }
374 
Sync(const SyncOption & option,const std::vector<std::string> & tables,const AsyncDetail & async)375 int RdbStoreImpl::Sync(const SyncOption &option, const std::vector<std::string> &tables, const AsyncDetail &async)
376 {
377     return Sync(option, AbsRdbPredicates(tables), async);
378 }
379 
Sync(const SyncOption & option,const AbsRdbPredicates & predicate,const AsyncDetail & async)380 int RdbStoreImpl::Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncDetail &async)
381 {
382     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
383     DistributedRdb::RdbService::Option rdbOption;
384     rdbOption.mode = option.mode;
385     rdbOption.isAsync = !option.isBlock;
386     RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
387     ret = InnerSync(syncerParam_, rdbOption, predicate.GetDistributedPredicates(), async);
388     return ret;
389 }
390 
InnerSync(const RdbParam & param,const Options & option,const Memo & predicates,const AsyncDetail & async)391 int RdbStoreImpl::InnerSync(const RdbParam &param, const Options &option, const Memo &predicates,
392     const AsyncDetail &async)
393 {
394     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(param);
395     if (errCode == E_NOT_SUPPORT) {
396         return errCode;
397     }
398     if (errCode != E_OK) {
399         LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
400             param.bundleName_.c_str());
401         return errCode;
402     }
403     errCode = service->Sync(param, option, predicates, async);
404     if (errCode != E_OK) {
405         LOG_ERROR("Sync is failed, err is %{public}d.", errCode);
406         return errCode;
407     }
408     return E_OK;
409 }
410 
GetUri(const std::string & event)411 Uri RdbStoreImpl::GetUri(const std::string &event)
412 {
413     std::string rdbUri;
414     if (config_.GetDataGroupId().empty()) {
415         rdbUri = SCHEME_RDB + config_.GetBundleName() + "/" + path_ + "/" + event;
416     } else {
417         rdbUri = SCHEME_RDB + config_.GetDataGroupId() + "/" + path_ + "/" + event;
418     }
419     return Uri(rdbUri);
420 }
421 
SubscribeLocal(const SubscribeOption & option,RdbStoreObserver * observer)422 int RdbStoreImpl::SubscribeLocal(const SubscribeOption& option, RdbStoreObserver *observer)
423 {
424     std::lock_guard<std::mutex> lock(mutex_);
425     localObservers_.try_emplace(option.event);
426     auto &list = localObservers_.find(option.event)->second;
427     for (auto it = list.begin(); it != list.end(); it++) {
428         if ((*it)->getObserver() == observer) {
429             LOG_ERROR("duplicate subscribe.");
430             return E_OK;
431         }
432     }
433 
434     localObservers_[option.event].push_back(std::make_shared<RdbStoreLocalObserver>(observer));
435     return E_OK;
436 }
437 
SubscribeLocalShared(const SubscribeOption & option,RdbStoreObserver * observer)438 int RdbStoreImpl::SubscribeLocalShared(const SubscribeOption& option, RdbStoreObserver *observer)
439 {
440     std::lock_guard<std::mutex> lock(mutex_);
441     localSharedObservers_.try_emplace(option.event);
442     auto &list = localSharedObservers_.find(option.event)->second;
443     for (auto it = list.begin(); it != list.end(); it++) {
444         if ((*it)->getObserver() == observer) {
445             LOG_ERROR("duplicate subscribe.");
446             return E_OK;
447         }
448     }
449 
450     auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
451     if (client == nullptr) {
452         LOG_ERROR("Failed to get DataObsMgrClient.");
453         return E_GET_DATAOBSMGRCLIENT_FAIL;
454     }
455     sptr<RdbStoreLocalSharedObserver> localSharedObserver(new (std::nothrow) RdbStoreLocalSharedObserver(observer));
456     int32_t err = client->RegisterObserver(GetUri(option.event), localSharedObserver);
457     if (err != 0) {
458         LOG_ERROR("Subscribe failed.");
459         return err;
460     }
461     localSharedObservers_[option.event].push_back(std::move(localSharedObserver));
462     return E_OK;
463 }
464 
SubscribeLocalDetail(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)465 int32_t RdbStoreImpl::SubscribeLocalDetail(const SubscribeOption &option,
466                                            const std::shared_ptr<RdbStoreObserver> &observer)
467 {
468     auto connection = connectionPool_->AcquireConnection(false);
469     if (connection == nullptr) {
470         return E_DATABASE_BUSY;
471     }
472     int32_t errCode = connection->Subscribe(option.event, observer);
473     if (errCode != E_OK) {
474         LOG_ERROR("subscribe local detail observer failed. db name:%{public}s errCode:%{public}" PRId32,
475             SqliteUtils::Anonymous(config_.GetName()).c_str(), errCode);
476     }
477     return errCode;
478 }
479 
SubscribeRemote(const SubscribeOption & option,RdbStoreObserver * observer)480 int RdbStoreImpl::SubscribeRemote(const SubscribeOption& option, RdbStoreObserver *observer)
481 {
482     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
483     if (errCode != E_OK) {
484         return errCode;
485     }
486     return service->Subscribe(syncerParam_, option, observer);
487 }
488 
Subscribe(const SubscribeOption & option,RdbStoreObserver * observer)489 int RdbStoreImpl::Subscribe(const SubscribeOption &option, RdbStoreObserver *observer)
490 {
491     if (config_.GetDBType() == DB_VECTOR) {
492         return E_NOT_SUPPORT;
493     }
494     if (option.mode == SubscribeMode::LOCAL) {
495         return SubscribeLocal(option, observer);
496     }
497     if (option.mode == SubscribeMode::LOCAL_SHARED) {
498         return SubscribeLocalShared(option, observer);
499     }
500     return SubscribeRemote(option, observer);
501 }
502 
UnSubscribeLocal(const SubscribeOption & option,RdbStoreObserver * observer)503 int RdbStoreImpl::UnSubscribeLocal(const SubscribeOption& option, RdbStoreObserver *observer)
504 {
505     std::lock_guard<std::mutex> lock(mutex_);
506     auto obs = localObservers_.find(option.event);
507     if (obs == localObservers_.end()) {
508         return E_OK;
509     }
510 
511     auto &list = obs->second;
512     for (auto it = list.begin(); it != list.end(); it++) {
513         if ((*it)->getObserver() == observer) {
514             it = list.erase(it);
515             break;
516         }
517     }
518 
519     if (list.empty()) {
520         localObservers_.erase(option.event);
521     }
522     return E_OK;
523 }
524 
UnSubscribeLocalAll(const SubscribeOption & option)525 int RdbStoreImpl::UnSubscribeLocalAll(const SubscribeOption& option)
526 {
527     std::lock_guard<std::mutex> lock(mutex_);
528     auto obs = localObservers_.find(option.event);
529     if (obs == localObservers_.end()) {
530         return E_OK;
531     }
532 
533     localObservers_.erase(option.event);
534     return E_OK;
535 }
536 
UnSubscribeLocalShared(const SubscribeOption & option,RdbStoreObserver * observer)537 int RdbStoreImpl::UnSubscribeLocalShared(const SubscribeOption& option, RdbStoreObserver *observer)
538 {
539     std::lock_guard<std::mutex> lock(mutex_);
540     auto obs = localSharedObservers_.find(option.event);
541     if (obs == localSharedObservers_.end()) {
542         return E_OK;
543     }
544 
545     auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
546     if (client == nullptr) {
547         LOG_ERROR("Failed to get DataObsMgrClient.");
548         return E_GET_DATAOBSMGRCLIENT_FAIL;
549     }
550 
551     auto &list = obs->second;
552     for (auto it = list.begin(); it != list.end(); it++) {
553         if ((*it)->getObserver() == observer) {
554             int32_t err = client->UnregisterObserver(GetUri(option.event), *it);
555             if (err != 0) {
556                 LOG_ERROR("UnSubscribeLocalShared failed.");
557                 return err;
558             }
559             list.erase(it);
560             break;
561         }
562     }
563     if (list.empty()) {
564         localSharedObservers_.erase(option.event);
565     }
566     return E_OK;
567 }
568 
UnSubscribeLocalSharedAll(const SubscribeOption & option)569 int RdbStoreImpl::UnSubscribeLocalSharedAll(const SubscribeOption& option)
570 {
571     std::lock_guard<std::mutex> lock(mutex_);
572     auto obs = localSharedObservers_.find(option.event);
573     if (obs == localSharedObservers_.end()) {
574         return E_OK;
575     }
576 
577     auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
578     if (client == nullptr) {
579         LOG_ERROR("Failed to get DataObsMgrClient.");
580         return E_GET_DATAOBSMGRCLIENT_FAIL;
581     }
582 
583     auto &list = obs->second;
584     auto it = list.begin();
585     while (it != list.end()) {
586         int32_t err = client->UnregisterObserver(GetUri(option.event), *it);
587         if (err != 0) {
588             LOG_ERROR("UnSubscribe failed.");
589             return err;
590         }
591         it = list.erase(it);
592     }
593 
594     localSharedObservers_.erase(option.event);
595     return E_OK;
596 }
597 
UnsubscribeLocalDetail(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)598 int32_t RdbStoreImpl::UnsubscribeLocalDetail(const SubscribeOption& option,
599                                              const std::shared_ptr<RdbStoreObserver> &observer)
600 {
601     auto connection = connectionPool_->AcquireConnection(false);
602     if (connection == nullptr) {
603         return E_DATABASE_BUSY;
604     }
605     int32_t errCode = connection->Unsubscribe(option.event, observer);
606     if (errCode != E_OK) {
607         LOG_ERROR("unsubscribe local detail observer failed. db name:%{public}s errCode:%{public}" PRId32,
608             SqliteUtils::Anonymous(config_.GetName()).c_str(), errCode);
609     }
610     return errCode;
611 }
612 
UnSubscribeRemote(const SubscribeOption & option,RdbStoreObserver * observer)613 int RdbStoreImpl::UnSubscribeRemote(const SubscribeOption& option, RdbStoreObserver *observer)
614 {
615     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
616     if (errCode != E_OK) {
617         return errCode;
618     }
619     return service->UnSubscribe(syncerParam_, option, observer);
620 }
621 
UnSubscribe(const SubscribeOption & option,RdbStoreObserver * observer)622 int RdbStoreImpl::UnSubscribe(const SubscribeOption &option, RdbStoreObserver *observer)
623 {
624     if (config_.GetDBType() == DB_VECTOR) {
625         return E_NOT_SUPPORT;
626     }
627     if (option.mode == SubscribeMode::LOCAL && observer) {
628         return UnSubscribeLocal(option, observer);
629     } else if (option.mode == SubscribeMode::LOCAL && !observer) {
630         return UnSubscribeLocalAll(option);
631     } else if (option.mode == SubscribeMode::LOCAL_SHARED && observer) {
632         return UnSubscribeLocalShared(option, observer);
633     } else if (option.mode == SubscribeMode::LOCAL_SHARED && !observer) {
634         return UnSubscribeLocalSharedAll(option);
635     }
636     return UnSubscribeRemote(option, observer);
637 }
638 
SubscribeObserver(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)639 int RdbStoreImpl::SubscribeObserver(const SubscribeOption& option, const std::shared_ptr<RdbStoreObserver> &observer)
640 {
641     if (config_.GetDBType() == DB_VECTOR) {
642         return E_NOT_SUPPORT;
643     }
644     return SubscribeLocalDetail(option, observer);
645 }
646 
UnsubscribeObserver(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)647 int RdbStoreImpl::UnsubscribeObserver(const SubscribeOption& option, const std::shared_ptr<RdbStoreObserver> &observer)
648 {
649     if (config_.GetDBType() == DB_VECTOR) {
650         return E_NOT_SUPPORT;
651     }
652     return UnsubscribeLocalDetail(option, observer);
653 }
654 
Notify(const std::string & event)655 int RdbStoreImpl::Notify(const std::string &event)
656 {
657     if (config_.GetDBType() == DB_VECTOR) {
658         return E_NOT_SUPPORT;
659     }
660     auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
661     if (client == nullptr) {
662         LOG_ERROR("Failed to get DataObsMgrClient.");
663         return E_GET_DATAOBSMGRCLIENT_FAIL;
664     }
665     int32_t err = client->NotifyChange(GetUri(event));
666     if (err != 0) {
667         LOG_ERROR("Notify failed.");
668     }
669 
670     std::lock_guard<std::mutex> lock(mutex_);
671     auto obs = localObservers_.find(event);
672     if (obs != localObservers_.end()) {
673         auto &list = obs->second;
674         for (auto &it : list) {
675             it->OnChange();
676         }
677     }
678     return E_OK;
679 }
680 
SetSearchable(bool isSearchable)681 int RdbStoreImpl::SetSearchable(bool isSearchable)
682 {
683     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
684     if (errCode != E_OK || service == nullptr) {
685         LOG_ERROR("GetRdbService is failed, err is %{public}d.", errCode);
686         return errCode;
687     }
688     return service->SetSearchable(syncerParam_, isSearchable);
689 }
690 
RegisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)691 int RdbStoreImpl::RegisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)
692 {
693     if (config_.GetDBType() == DB_VECTOR) {
694         return E_NOT_SUPPORT;
695     }
696     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
697     if (errCode != E_OK) {
698         return errCode;
699     }
700     return service->RegisterAutoSyncCallback(syncerParam_, observer);
701 }
702 
UnregisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)703 int RdbStoreImpl::UnregisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)
704 {
705     if (config_.GetDBType() == DB_VECTOR) {
706         return E_NOT_SUPPORT;
707     }
708     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
709     if (errCode != E_OK) {
710         return errCode;
711     }
712     return service->UnregisterAutoSyncCallback(syncerParam_, observer);
713 }
714 
InitDelayNotifier()715 void RdbStoreImpl::InitDelayNotifier()
716 {
717     if (delayNotifier_ != nullptr) {
718         return;
719     }
720     delayNotifier_ = std::make_shared<DelayNotify>();
721     if (delayNotifier_ == nullptr) {
722         LOG_ERROR("Init delay notifier failed.");
723         return;
724     }
725     delayNotifier_->SetExecutorPool(TaskExecutor::GetInstance().GetExecutor());
726     delayNotifier_->SetTask([param = syncerParam_]
727         (const DistributedRdb::RdbChangedData& rdbChangedData, const RdbNotifyConfig& rdbNotifyConfig) -> int {
728         auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(param);
729         if (errCode == E_NOT_SUPPORT) {
730             return errCode;
731         }
732         if (errCode != E_OK || service == nullptr) {
733             LOG_ERROR("GetRdbService is failed, err is %{public}d.", errCode);
734             return errCode;
735         }
736         return service->NotifyDataChange(param, rdbChangedData, rdbNotifyConfig);
737     });
738 }
739 
RegisterDataChangeCallback()740 int RdbStoreImpl::RegisterDataChangeCallback()
741 {
742     if (!config_.IsSearchable()) {
743         return E_OK;
744     }
745 
746     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
747         return E_NOT_SUPPORT;
748     }
749     InitDelayNotifier();
750     auto callBack = [delayNotifier = delayNotifier_](const std::set<std::string> &tables) {
751         DistributedRdb::RdbChangedData rdbChangedData;
752         for (const auto& table : tables) {
753             rdbChangedData.tableData[table].isTrackedDataChange = true;
754         }
755         if (delayNotifier != nullptr) {
756             delayNotifier->UpdateNotify(rdbChangedData);
757         }
758     };
759     auto connection = connectionPool_->AcquireConnection(false);
760     if (connection == nullptr) {
761         return E_DATABASE_BUSY;
762     }
763     return connection->SubscribeTableChanges(callBack);
764 }
765 
GetHashKeyForLockRow(const AbsRdbPredicates & predicates,std::vector<std::vector<uint8_t>> & hashKeys)766 int RdbStoreImpl::GetHashKeyForLockRow(const AbsRdbPredicates &predicates, std::vector<std::vector<uint8_t>> &hashKeys)
767 {
768     std::string table = predicates.GetTableName();
769     if (table.empty()) {
770         return E_EMPTY_TABLE_NAME;
771     }
772     auto logTable = GetLogTableName(table);
773     std::string sql;
774     sql.append("SELECT ").append(logTable).append(".hash_key ").append("FROM ").append(logTable);
775     sql.append(" INNER JOIN ").append(table).append(" ON ");
776     sql.append(table).append(".ROWID = ").append(logTable).append(".data_key");
777     auto whereClause = predicates.GetWhereClause();
778     if (!whereClause.empty()) {
779         SqliteUtils::Replace(whereClause, SqliteUtils::REP, logTable + ".");
780         sql.append(" WHERE ").append(whereClause);
781     }
782 
783     auto result = QuerySql(sql, predicates.GetBindArgs());
784     if (result == nullptr) {
785         return E_ERROR;
786     }
787     int count = 0;
788     if (result->GetRowCount(count) != E_OK) {
789         return E_NO_ROW_IN_QUERY;
790     }
791 
792     if (count <= 0) {
793         return E_NO_ROW_IN_QUERY;
794     }
795     while (result->GoToNextRow() == E_OK) {
796         std::vector<uint8_t> hashKey;
797         if (result->GetBlob(0, hashKey) != E_OK) {
798             return E_ERROR;
799         }
800         hashKeys.push_back(std::move(hashKey));
801     }
802     return E_OK;
803 }
804 
ModifyLockStatus(const AbsRdbPredicates & predicates,bool isLock)805 int RdbStoreImpl::ModifyLockStatus(const AbsRdbPredicates &predicates, bool isLock)
806 {
807     std::vector<std::vector<uint8_t>> hashKeys;
808     int ret = GetHashKeyForLockRow(predicates, hashKeys);
809     if (ret != E_OK) {
810         LOG_ERROR("GetHashKeyForLockRow failed, err is %{public}d.", ret);
811         return ret;
812     }
813     auto [err, statement] = GetStatement(GlobalExpr::PRAGMA_VERSION);
814     if (statement == nullptr || err != E_OK) {
815         return err;
816     }
817     int errCode = statement->ModifyLockStatus(predicates.GetTableName(), hashKeys, isLock);
818     if (errCode == E_WAIT_COMPENSATED_SYNC) {
819         LOG_DEBUG("Start compensation sync.");
820         DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true, true };
821         auto memo = AbsRdbPredicates(predicates.GetTableName()).GetDistributedPredicates();
822         InnerSync(syncerParam_, option, memo, nullptr);
823         return E_OK;
824     }
825     if (errCode != E_OK) {
826         LOG_ERROR("ModifyLockStatus failed, err is %{public}d.", errCode);
827     }
828     return errCode;
829 }
830 
LockCloudContainer()831 std::pair<int32_t, uint32_t> RdbStoreImpl::LockCloudContainer()
832 {
833     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
834     RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
835     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
836     if (errCode == E_NOT_SUPPORT) {
837         LOG_ERROR("not support");
838         return { errCode, 0 };
839     }
840     if (errCode != E_OK) {
841         LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
842             syncerParam_.bundleName_.c_str());
843         return { errCode, 0 };
844     }
845     auto result = service->LockCloudContainer(syncerParam_);
846     if (result.first != E_OK) {
847         LOG_ERROR("LockCloudContainer failed, err is %{public}d.", result.first);
848     }
849     return result;
850 }
851 
UnlockCloudContainer()852 int32_t RdbStoreImpl::UnlockCloudContainer()
853 {
854     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
855     RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
856     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
857     if (errCode == E_NOT_SUPPORT) {
858         LOG_ERROR("not support");
859         return errCode;
860     }
861     if (errCode != E_OK) {
862         LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
863             syncerParam_.bundleName_.c_str());
864         return errCode;
865     }
866     errCode = service->UnlockCloudContainer(syncerParam_);
867     if (errCode != E_OK) {
868         LOG_ERROR("UnlockCloudContainer failed, err is %{public}d.", errCode);
869     }
870     return errCode;
871 }
872 #endif
873 
RdbStoreImpl(const RdbStoreConfig & config)874 RdbStoreImpl::RdbStoreImpl(const RdbStoreConfig &config)
875     : isMemoryRdb_(config.IsMemoryRdb()), config_(config), name_(config.GetName()),
876       fileType_(config.GetDatabaseFileType())
877 {
878     path_ = (config.GetRoleType() == VISITOR) ? config.GetVisitorDir() : config.GetPath();
879     isReadOnly_ = config.IsReadOnly() || config.GetRoleType() == VISITOR;
880 }
881 
RdbStoreImpl(const RdbStoreConfig & config,int & errCode)882 RdbStoreImpl::RdbStoreImpl(const RdbStoreConfig &config, int &errCode)
883     : isMemoryRdb_(config.IsMemoryRdb()), config_(config), name_(config.GetName()),
884       fileType_(config.GetDatabaseFileType())
885 {
886     isReadOnly_ = config.IsReadOnly() || config.GetRoleType() == VISITOR;
887     path_ = (config.GetRoleType() == VISITOR) ? config.GetVisitorDir() : config.GetPath();
888     bool created = access(path_.c_str(), F_OK) != 0;
889     connectionPool_ = ConnectionPool::Create(config_, errCode);
890     if (connectionPool_ == nullptr && (errCode == E_SQLITE_CORRUPT || errCode == E_INVALID_SECRET_KEY) &&
891         !isReadOnly_) {
892         LOG_ERROR("database corrupt, errCode:0x%{public}x, rebuild database %{public}s", errCode,
893             SqliteUtils::Anonymous(name_).c_str());
894 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
895         RdbParam param;
896         param.bundleName_ = config_.GetBundleName();
897         param.storeName_ = config_.GetName();
898         auto [err, service] = RdbMgr::GetInstance().GetRdbService(param);
899         if (service != nullptr) {
900             service->Disable(param);
901         }
902 #endif
903         config_.SetIter(0);
904         if (config_.IsEncrypt()) {
905             auto key = config_.GetEncryptKey();
906             RdbSecurityManager::GetInstance().RestoreKeyFile(path_, key);
907             key.assign(key.size(), 0);
908         }
909         std::tie(rebuild_, connectionPool_) = ConnectionPool::HandleDataCorruption(config_, errCode);
910         created = true;
911 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
912         if (service != nullptr) {
913             service->Enable(param);
914         }
915 #endif
916     }
917     if (connectionPool_ == nullptr || errCode != E_OK) {
918         connectionPool_ = nullptr;
919         LOG_ERROR("Create connPool failed, err is %{public}d, path:%{public}s", errCode,
920             SqliteUtils::Anonymous(path_).c_str());
921         return;
922     }
923     InitSyncerParam(config_, created);
924     InnerOpen();
925 }
926 
~RdbStoreImpl()927 RdbStoreImpl::~RdbStoreImpl()
928 {
929     connectionPool_ = nullptr;
930     trxConnMap_ = {};
931     for (auto &trans : transactions_) {
932         auto realTrans = trans.lock();
933         if (realTrans) {
934             (void)realTrans->Close();
935         }
936     }
937     transactions_ = {};
938 }
939 
GetConfig()940 const RdbStoreConfig &RdbStoreImpl::GetConfig()
941 {
942     return config_;
943 }
944 
Insert(const std::string & table,const Row & row,Resolution resolution)945 std::pair<int, int64_t> RdbStoreImpl::Insert(const std::string &table, const Row &row, Resolution resolution)
946 {
947     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
948     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
949         return { E_NOT_SUPPORT, -1 };
950     }
951     if (table.empty()) {
952         return { E_EMPTY_TABLE_NAME, -1 };
953     }
954 
955     if (row.IsEmpty()) {
956         return { E_EMPTY_VALUES_BUCKET, -1 };
957     }
958 
959     auto conflictClause = SqliteUtils::GetConflictClause(static_cast<int>(resolution));
960     if (conflictClause == nullptr) {
961         return { E_INVALID_CONFLICT_FLAG, -1 };
962     }
963     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
964     std::string sql;
965     sql.append("INSERT").append(conflictClause).append(" INTO ").append(table).append("(");
966     size_t bindArgsSize = row.values_.size();
967     std::vector<ValueObject> bindArgs;
968     bindArgs.reserve(bindArgsSize);
969     const char *split = "";
970     for (const auto &[key, val] : row.values_) {
971         sql.append(split).append(key);
972         if (val.GetType() == ValueObject::TYPE_ASSETS && resolution == ConflictResolution::ON_CONFLICT_REPLACE) {
973             return { E_INVALID_ARGS, -1 };
974         }
975         SqliteSqlBuilder::UpdateAssetStatus(val, AssetValue::STATUS_INSERT);
976         bindArgs.push_back(val); // columnValue
977         split = ",";
978     }
979 
980     sql.append(") VALUES (");
981     if (bindArgsSize > 0) {
982         sql.append(SqliteSqlBuilder::GetSqlArgs(bindArgsSize));
983     }
984 
985     sql.append(")");
986     int64_t rowid = -1;
987     auto errCode = ExecuteForLastInsertedRowId(rowid, sql, bindArgs);
988     if (errCode == E_OK) {
989         DoCloudSync(table);
990     }
991 
992     return { errCode, rowid };
993 }
994 
BatchInsert(const std::string & table,const ValuesBuckets & rows)995 std::pair<int, int64_t> RdbStoreImpl::BatchInsert(const std::string &table, const ValuesBuckets &rows)
996 {
997     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
998         return { E_NOT_SUPPORT, -1 };
999     }
1000 
1001     if (rows.RowSize() == 0) {
1002         return { E_OK, 0 };
1003     }
1004 
1005     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1006     auto connection = connectionPool_->AcquireConnection(false);
1007     if (connection == nullptr) {
1008         return { E_DATABASE_BUSY, -1 };
1009     }
1010 
1011     auto executeSqlArgs = SqliteSqlBuilder::GenerateSqls(table, rows, connection->GetMaxVariable());
1012     if (executeSqlArgs.empty()) {
1013         LOG_ERROR("empty, table=%{public}s, values:%{public}zu, max number:%{public}d.", table.c_str(), rows.RowSize(),
1014             connection->GetMaxVariable());
1015         return { E_INVALID_ARGS, -1 };
1016     }
1017 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
1018     PauseDelayNotify pauseDelayNotify(delayNotifier_);
1019 #endif
1020     for (const auto &[sql, bindArgs] : executeSqlArgs) {
1021         auto [errCode, statement] = GetStatement(sql, connection);
1022         if (statement == nullptr) {
1023             LOG_ERROR("statement is nullptr, errCode:0x%{public}x, args:%{public}zu, table:%{public}s, sql:%{public}s",
1024                 errCode, bindArgs.size(), table.c_str(), sql.c_str());
1025             return { E_OK, -1 };
1026         }
1027         for (const auto &args : bindArgs) {
1028             auto errCode = statement->Execute(args);
1029             if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1030                 connectionPool_->Dump(true, "BATCH");
1031                 return { errCode, -1 };
1032             }
1033             if (errCode != E_OK) {
1034                 LOG_ERROR("failed, errCode:%{public}d,args:%{public}zu,table:%{public}s,sql:%{public}s", errCode,
1035                     bindArgs.size(), table.c_str(), sql.c_str());
1036                 return { E_OK, -1 };
1037             }
1038         }
1039     }
1040     connection = nullptr;
1041     DoCloudSync(table);
1042     return { E_OK, int64_t(rows.RowSize()) };
1043 }
1044 
Update(const std::string & table,const Row & row,const std::string & where,const Values & args,Resolution resolution)1045 std::pair<int, int> RdbStoreImpl::Update(const std::string &table, const Row &row, const std::string &where,
1046     const Values &args, Resolution resolution)
1047 {
1048     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1049     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1050         return { E_NOT_SUPPORT, -1 };
1051     }
1052     if (table.empty()) {
1053         return { E_EMPTY_TABLE_NAME, -1 };
1054     }
1055 
1056     if (row.IsEmpty()) {
1057         return { E_EMPTY_VALUES_BUCKET, -1 };
1058     }
1059 
1060     auto clause = SqliteUtils::GetConflictClause(static_cast<int>(resolution));
1061     if (clause == nullptr) {
1062         return { E_INVALID_CONFLICT_FLAG, -1 };
1063     }
1064     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1065     std::string sql;
1066     sql.append("UPDATE").append(clause).append(" ").append(table).append(" SET ");
1067     std::vector<ValueObject> tmpBindArgs;
1068     size_t tmpBindSize = row.values_.size() + args.size();
1069     tmpBindArgs.reserve(tmpBindSize);
1070     const char *split = "";
1071     for (auto &[key, val] : row.values_) {
1072         sql.append(split);
1073         if (val.GetType() == ValueObject::TYPE_ASSETS) {
1074             sql.append(key).append("=merge_assets(").append(key).append(", ?)"); // columnName
1075         } else if (val.GetType() == ValueObject::TYPE_ASSET) {
1076             sql.append(key).append("=merge_asset(").append(key).append(", ?)"); // columnName
1077         } else {
1078             sql.append(key).append("=?"); // columnName
1079         }
1080         tmpBindArgs.push_back(val); // columnValue
1081         split = ",";
1082     }
1083 
1084     if (!where.empty()) {
1085         sql.append(" WHERE ").append(where);
1086     }
1087 
1088     tmpBindArgs.insert(tmpBindArgs.end(), args.begin(), args.end());
1089 
1090     int64_t changes = 0;
1091     auto errCode = ExecuteForChangedRowCount(changes, sql, tmpBindArgs);
1092     if (errCode == E_OK) {
1093         DoCloudSync(table);
1094     }
1095     return { errCode, int32_t(changes) };
1096 }
1097 
Delete(int & deletedRows,const std::string & table,const std::string & whereClause,const Values & args)1098 int RdbStoreImpl::Delete(int &deletedRows, const std::string &table, const std::string &whereClause, const Values &args)
1099 {
1100     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1101     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1102         return E_NOT_SUPPORT;
1103     }
1104     if (table.empty()) {
1105         return E_EMPTY_TABLE_NAME;
1106     }
1107 
1108     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1109     std::string sql;
1110     sql.append("DELETE FROM ").append(table);
1111     if (!whereClause.empty()) {
1112         sql.append(" WHERE ").append(whereClause);
1113     }
1114     int64_t changes = 0;
1115     auto errCode = ExecuteForChangedRowCount(changes, sql, args);
1116     if (errCode != E_OK) {
1117         return errCode;
1118     }
1119     deletedRows = changes;
1120     DoCloudSync(table);
1121     return E_OK;
1122 }
1123 
QuerySql(const std::string & sql,const Values & bindArgs)1124 std::shared_ptr<AbsSharedResultSet> RdbStoreImpl::QuerySql(const std::string &sql, const Values &bindArgs)
1125 {
1126     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1127     if (config_.GetDBType() == DB_VECTOR) {
1128         return nullptr;
1129     }
1130     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1131 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
1132     auto start = std::chrono::steady_clock::now();
1133     return std::make_shared<SqliteSharedResultSet>(start, connectionPool_->AcquireRef(true), sql, bindArgs, path_);
1134 #else
1135     (void)sql;
1136     (void)bindArgs;
1137     return nullptr;
1138 #endif
1139 }
1140 
QueryByStep(const std::string & sql,const Values & args,bool preCount)1141 std::shared_ptr<ResultSet> RdbStoreImpl::QueryByStep(const std::string &sql, const Values &args, bool preCount)
1142 {
1143     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1144     auto start = std::chrono::steady_clock::now();
1145 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
1146     return std::make_shared<StepResultSet>(start, connectionPool_->AcquireRef(true), sql, args, preCount);
1147 #else
1148     return std::make_shared<StepResultSet>(start, connectionPool_->AcquireRef(true), sql, args, false);
1149 #endif
1150 }
1151 
Count(int64_t & outValue,const AbsRdbPredicates & predicates)1152 int RdbStoreImpl::Count(int64_t &outValue, const AbsRdbPredicates &predicates)
1153 {
1154     if (config_.GetDBType() == DB_VECTOR) {
1155         return E_NOT_SUPPORT;
1156     }
1157     std::string sql = SqliteSqlBuilder::BuildCountString(predicates);
1158     return ExecuteAndGetLong(outValue, sql, predicates.GetBindArgs());
1159 }
1160 
ExecuteSql(const std::string & sql,const Values & args)1161 int RdbStoreImpl::ExecuteSql(const std::string &sql, const Values &args)
1162 {
1163     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1164     if (config_.GetDBType() == DB_VECTOR || isReadOnly_) {
1165         return E_NOT_SUPPORT;
1166     }
1167     int ret = CheckAttach(sql);
1168     if (ret != E_OK) {
1169         return ret;
1170     }
1171     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1172     auto [errCode, statement] = BeginExecuteSql(sql);
1173     if (statement == nullptr) {
1174         return errCode;
1175     }
1176     errCode = statement->Execute(args);
1177     if (errCode != E_OK) {
1178         LOG_ERROR("failed,error:0x%{public}x sql:%{public}s.", errCode, sql.c_str());
1179         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1180             connectionPool_->Dump(true, "EXECUTE");
1181         }
1182         return errCode;
1183     }
1184     int sqlType = SqliteUtils::GetSqlStatementType(sql);
1185     if (sqlType == SqliteUtils::STATEMENT_DDL) {
1186         statement->Reset();
1187         statement->Prepare("PRAGMA schema_version");
1188         auto [err, version] = statement->ExecuteForValue();
1189         statement = nullptr;
1190         if (vSchema_ < static_cast<int64_t>(version)) {
1191             LOG_INFO("db:%{public}s exe DDL schema<%{public}" PRIi64 "->%{public}" PRIi64 "> sql:%{public}s.",
1192                 SqliteUtils::Anonymous(name_).c_str(), vSchema_, static_cast<int64_t>(version), sql.c_str());
1193             vSchema_ = version;
1194             errCode = connectionPool_->RestartReaders();
1195         }
1196     }
1197     statement = nullptr;
1198     if (errCode == E_OK && (sqlType == SqliteUtils::STATEMENT_UPDATE || sqlType == SqliteUtils::STATEMENT_INSERT)) {
1199         DoCloudSync("");
1200     }
1201     return errCode;
1202 }
1203 
Execute(const std::string & sql,const Values & args,int64_t trxId)1204 std::pair<int32_t, ValueObject> RdbStoreImpl::Execute(const std::string &sql, const Values &args, int64_t trxId)
1205 {
1206     ValueObject object;
1207     if (isReadOnly_) {
1208         return { E_NOT_SUPPORT, object };
1209     }
1210 
1211     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1212     int sqlType = SqliteUtils::GetSqlStatementType(sql);
1213     if (!SqliteUtils::IsSupportSqlForExecute(sqlType)) {
1214         LOG_ERROR("Not support the sqlType: %{public}d, sql: %{public}s", sqlType, sql.c_str());
1215         return { E_NOT_SUPPORT_THE_SQL, object };
1216     }
1217 
1218     if (config_.IsVector() && trxId > 0) {
1219         return { ExecuteByTrxId(sql, trxId, false, args), ValueObject() };
1220     }
1221 
1222     auto connect = connectionPool_->AcquireConnection(false);
1223     if (connect == nullptr) {
1224         return { E_DATABASE_BUSY, object };
1225     }
1226 
1227     auto [errCode, statement] = GetStatement(sql, connect);
1228     if (errCode != E_OK) {
1229         return { errCode, object };
1230     }
1231 
1232     errCode = statement->Execute(args);
1233     if (errCode != E_OK) {
1234         LOG_ERROR("failed,error:0x%{public}x sql:%{public}s.", errCode, sql.c_str());
1235         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1236             connectionPool_->Dump(true, "EXECUTE");
1237         }
1238         return { errCode, object };
1239     }
1240 
1241     if (config_.IsVector()) {
1242         return { errCode, object };
1243     }
1244 
1245     return HandleDifferentSqlTypes(statement, sql, object, sqlType);
1246 }
1247 
HandleDifferentSqlTypes(std::shared_ptr<Statement> statement,const std::string & sql,const ValueObject & object,int sqlType)1248 std::pair<int32_t, ValueObject> RdbStoreImpl::HandleDifferentSqlTypes(std::shared_ptr<Statement> statement,
1249     const std::string &sql, const ValueObject &object, int sqlType)
1250 {
1251     int32_t errCode = E_OK;
1252     if (sqlType == SqliteUtils::STATEMENT_INSERT) {
1253         int64_t outValue = statement->Changes() > 0 ? statement->LastInsertRowId() : -1;
1254         return { errCode, ValueObject(outValue) };
1255     }
1256 
1257     if (sqlType == SqliteUtils::STATEMENT_UPDATE) {
1258         int outValue = statement->Changes();
1259         return { errCode, ValueObject(outValue) };
1260     }
1261 
1262     if (sqlType == SqliteUtils::STATEMENT_PRAGMA) {
1263         if (statement->GetColumnCount() == 1) {
1264             return statement->GetColumn(0);
1265         }
1266 
1267         if (statement->GetColumnCount() > 1) {
1268             LOG_ERROR("Not support the sql:%{public}s, column count more than 1", sql.c_str());
1269             return { E_NOT_SUPPORT_THE_SQL, object };
1270         }
1271     }
1272 
1273     if (sqlType == SqliteUtils::STATEMENT_DDL) {
1274         statement->Reset();
1275         statement->Prepare("PRAGMA schema_version");
1276         auto [err, version] = statement->ExecuteForValue();
1277         if (vSchema_ < static_cast<int64_t>(version)) {
1278             LOG_INFO("db:%{public}s exe DDL schema<%{public}" PRIi64 "->%{public}" PRIi64 "> sql:%{public}s.",
1279                      SqliteUtils::Anonymous(name_).c_str(), vSchema_, static_cast<int64_t>(version), sql.c_str());
1280             vSchema_ = version;
1281             errCode = connectionPool_->RestartReaders();
1282         }
1283     }
1284     return { errCode, object };
1285 }
1286 
ExecuteAndGetLong(int64_t & outValue,const std::string & sql,const Values & args)1287 int RdbStoreImpl::ExecuteAndGetLong(int64_t &outValue, const std::string &sql, const Values &args)
1288 {
1289     if (config_.GetDBType() == DB_VECTOR) {
1290         return E_NOT_SUPPORT;
1291     }
1292     auto [errCode, statement] = BeginExecuteSql(sql);
1293     if (statement == nullptr) {
1294         return errCode;
1295     }
1296     auto [err, object] = statement->ExecuteForValue(args);
1297     if (err != E_OK) {
1298         LOG_ERROR("failed, sql %{public}s,  ERROR is %{public}d.", sql.c_str(), err);
1299     }
1300     outValue = object;
1301     return err;
1302 }
1303 
ExecuteAndGetString(std::string & outValue,const std::string & sql,const Values & args)1304 int RdbStoreImpl::ExecuteAndGetString(std::string &outValue, const std::string &sql, const Values &args)
1305 {
1306     if (config_.GetDBType() == DB_VECTOR) {
1307         return E_NOT_SUPPORT;
1308     }
1309     auto [errCode, statement] = BeginExecuteSql(sql);
1310     if (statement == nullptr) {
1311         return errCode;
1312     }
1313     ValueObject object;
1314     std::tie(errCode, object) = statement->ExecuteForValue(args);
1315     if (errCode != E_OK) {
1316         LOG_ERROR("failed, sql %{public}s,  ERROR is %{public}d.", sql.c_str(), errCode);
1317     }
1318     outValue = static_cast<std::string>(object);
1319     return errCode;
1320 }
1321 
ExecuteForLastInsertedRowId(int64_t & outValue,const std::string & sql,const Values & args)1322 int RdbStoreImpl::ExecuteForLastInsertedRowId(int64_t &outValue, const std::string &sql, const Values &args)
1323 {
1324     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1325         return E_NOT_SUPPORT;
1326     }
1327     auto begin = std::chrono::steady_clock::now();
1328     auto [errCode, statement] = GetStatement(sql, false);
1329     if (statement == nullptr) {
1330         return errCode;
1331     }
1332     auto beginExec = std::chrono::steady_clock::now();
1333     errCode = statement->Execute(args);
1334     if (errCode != E_OK) {
1335         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1336             connectionPool_->Dump(true, "INSERT");
1337         }
1338         return errCode;
1339     }
1340     auto beginResult = std::chrono::steady_clock::now();
1341     outValue = statement->Changes() > 0 ? statement->LastInsertRowId() : -1;
1342     auto allEnd = std::chrono::steady_clock::now();
1343     int64_t totalCostTime = std::chrono::duration_cast<std::chrono::milliseconds>(begin - allEnd).count();
1344     if (totalCostTime >= TIME_OUT) {
1345         int64_t prepareCost =
1346             std::chrono::duration_cast<std::chrono::milliseconds>(beginExec - begin).count();
1347         int64_t execCost =
1348             std::chrono::duration_cast<std::chrono::milliseconds>(beginExec - beginResult).count();
1349         int64_t resultCost = std::chrono::duration_cast<std::chrono::milliseconds>(allEnd - beginResult).count();
1350         LOG_WARN("total[%{public}" PRId64 "] stmt[%{public}" PRId64 "] exec[%{public}" PRId64
1351                  "] result[%{public}" PRId64 "] "
1352                  "sql[%{public}s]",
1353             totalCostTime, prepareCost, execCost, resultCost, SqliteUtils::Anonymous(sql).c_str());
1354     }
1355     return E_OK;
1356 }
1357 
ExecuteForChangedRowCount(int64_t & outValue,const std::string & sql,const Values & args)1358 int RdbStoreImpl::ExecuteForChangedRowCount(int64_t &outValue, const std::string &sql, const Values &args)
1359 {
1360     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1361         return E_NOT_SUPPORT;
1362     }
1363     auto [errCode, statement] = GetStatement(sql, false);
1364     if (statement == nullptr) {
1365         return errCode;
1366     }
1367     errCode = statement->Execute(args);
1368     if (errCode != E_OK) {
1369         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1370             connectionPool_->Dump(true, "UPG DEL");
1371         }
1372         return errCode;
1373     }
1374     outValue = statement->Changes();
1375     return E_OK;
1376 }
1377 
GetDataBasePath(const std::string & databasePath,std::string & backupFilePath)1378 int RdbStoreImpl::GetDataBasePath(const std::string &databasePath, std::string &backupFilePath)
1379 {
1380     if (databasePath.empty()) {
1381         return E_INVALID_FILE_PATH;
1382     }
1383 
1384     if (ISFILE(databasePath)) {
1385         backupFilePath = ExtractFilePath(path_) + databasePath;
1386     } else {
1387         // 2 represents two characters starting from the len - 2 position
1388         if (!PathToRealPath(ExtractFilePath(databasePath), backupFilePath) || databasePath.back() == '/' ||
1389             databasePath.substr(databasePath.length() - 2, 2) == "\\") {
1390             LOG_ERROR("Invalid databasePath.");
1391             return E_INVALID_FILE_PATH;
1392         }
1393         backupFilePath = databasePath;
1394     }
1395 
1396     if (backupFilePath == path_) {
1397         LOG_ERROR("The backupPath and path should not be same.");
1398         return E_INVALID_FILE_PATH;
1399     }
1400 
1401     LOG_INFO("databasePath is %{public}s.", SqliteUtils::Anonymous(backupFilePath).c_str());
1402     return E_OK;
1403 }
1404 
GetSlaveName(const std::string & path,std::string & backupFilePath)1405 int RdbStoreImpl::GetSlaveName(const std::string &path, std::string &backupFilePath)
1406 {
1407     std::string suffix(".db");
1408     std::string slaveSuffix("_slave.db");
1409     auto pos = path.find(suffix);
1410     if (pos == std::string::npos) {
1411         backupFilePath = path + slaveSuffix;
1412     } else {
1413         backupFilePath = std::string(path, 0, pos) + slaveSuffix;
1414     }
1415     return E_OK;
1416 }
1417 
1418 /**
1419  * Backup a database from a specified encrypted or unencrypted database file.
1420  */
Backup(const std::string & databasePath,const std::vector<uint8_t> & encryptKey)1421 int RdbStoreImpl::Backup(const std::string &databasePath, const std::vector<uint8_t> &encryptKey)
1422 {
1423     LOG_INFO("Backup db: %{public}s.", SqliteUtils::Anonymous(config_.GetName()).c_str());
1424     if (isReadOnly_) {
1425         return E_NOT_SUPPORT;
1426     }
1427     std::string backupFilePath;
1428     if (TryGetMasterSlaveBackupPath(databasePath, backupFilePath)) {
1429         return InnerBackup(backupFilePath, encryptKey);
1430     }
1431 
1432     int ret = GetDataBasePath(databasePath, backupFilePath);
1433     if (ret != E_OK) {
1434         return ret;
1435     }
1436 
1437     RdbSecurityManager::KeyFiles keyFiles(path_ + BACKUP_RESTORE);
1438     keyFiles.Lock();
1439 
1440     auto deleteDirtyFiles = [&backupFilePath] {
1441         auto res = SqliteUtils::DeleteFile(backupFilePath);
1442         res = SqliteUtils::DeleteFile(backupFilePath + "-shm") && res;
1443         res = SqliteUtils::DeleteFile(backupFilePath + "-wal") && res;
1444         return res;
1445     };
1446 
1447     auto walFile = backupFilePath + "-wal";
1448     if (access(walFile.c_str(), F_OK) == E_OK) {
1449         if (!deleteDirtyFiles()) {
1450             keyFiles.Unlock();
1451             return E_ERROR;
1452         }
1453     }
1454     std::string tempPath = backupFilePath + ".tmp";
1455     if (access(tempPath.c_str(), F_OK) == E_OK) {
1456         SqliteUtils::DeleteFile(backupFilePath);
1457     } else {
1458         if (access(backupFilePath.c_str(), F_OK) == E_OK && !SqliteUtils::RenameFile(backupFilePath, tempPath)) {
1459             LOG_ERROR("rename backup file failed, path:%{public}s, errno:%{public}d",
1460                 SqliteUtils::Anonymous(backupFilePath).c_str(), errno);
1461             keyFiles.Unlock();
1462             return E_ERROR;
1463         }
1464     }
1465     ret = InnerBackup(backupFilePath, encryptKey);
1466     if (ret != E_OK || access(walFile.c_str(), F_OK) == E_OK) {
1467         if (deleteDirtyFiles()) {
1468             SqliteUtils::RenameFile(tempPath, backupFilePath);
1469         }
1470     } else {
1471         SqliteUtils::DeleteFile(tempPath);
1472     }
1473     keyFiles.Unlock();
1474     return ret;
1475 }
1476 
CreateBackupBindArgs(const std::string & databasePath,const std::vector<uint8_t> & destEncryptKey)1477 std::vector<ValueObject> RdbStoreImpl::CreateBackupBindArgs(const std::string &databasePath,
1478     const std::vector<uint8_t> &destEncryptKey)
1479 {
1480     std::vector<ValueObject> bindArgs;
1481     bindArgs.emplace_back(databasePath);
1482     if (!destEncryptKey.empty() && !config_.IsEncrypt()) {
1483         bindArgs.emplace_back(destEncryptKey);
1484     } else if (config_.IsEncrypt()) {
1485         std::vector<uint8_t> key = config_.GetEncryptKey();
1486         bindArgs.emplace_back(key);
1487         key.assign(key.size(), 0);
1488     } else {
1489         bindArgs.emplace_back("");
1490     }
1491     return bindArgs;
1492 }
1493 
1494 /**
1495  * Backup a database from a specified encrypted or unencrypted database file.
1496  */
InnerBackup(const std::string & databasePath,const std::vector<uint8_t> & destEncryptKey)1497 int RdbStoreImpl::InnerBackup(const std::string &databasePath, const std::vector<uint8_t> &destEncryptKey)
1498 {
1499     if (isReadOnly_) {
1500         return E_NOT_SUPPORT;
1501     }
1502 
1503     if (config_.GetDBType() == DB_VECTOR) {
1504         if (config_.IsEncrypt()) {
1505             return E_NOT_SUPPORT;
1506         }
1507 
1508         auto conn = connectionPool_->AcquireConnection(false);
1509         if (conn == nullptr) {
1510             return E_BASE;
1511         }
1512 
1513         return conn->Backup(databasePath, {}, false, slaveStatus_);
1514     }
1515 
1516     if (config_.GetHaMode() != HAMode::SINGLE && SqliteUtils::IsSlaveDbName(databasePath)) {
1517         auto conn = connectionPool_->AcquireConnection(false);
1518         return conn == nullptr ? E_BASE : conn->Backup(databasePath, {}, false, slaveStatus_);
1519     }
1520 
1521     auto [result, conn] = CreateWritableConn();
1522     if (result != E_OK) {
1523         return result;
1524     }
1525 
1526     if (config_.IsEncrypt()) {
1527         result = SetDefaultEncryptAlgo(conn, config_);
1528         if (result != E_OK) {
1529             return result;
1530         }
1531     }
1532 
1533     std::vector<ValueObject> bindArgs = CreateBackupBindArgs(databasePath, destEncryptKey);
1534     auto [errCode, statement] = conn->CreateStatement(GlobalExpr::ATTACH_BACKUP_SQL, conn);
1535     errCode = statement->Execute(bindArgs);
1536     if (errCode != E_OK) {
1537         return errCode;
1538     }
1539     errCode = statement->Prepare(GlobalExpr::PRAGMA_BACKUP_JOUR_MODE_WAL);
1540     errCode = statement->Execute();
1541     if (errCode != E_OK) {
1542         return errCode;
1543     }
1544     errCode = statement->Prepare(GlobalExpr::EXPORT_SQL);
1545     int ret = statement->Execute();
1546     errCode = statement->Prepare(GlobalExpr::DETACH_BACKUP_SQL);
1547     int res = statement->Execute();
1548     return (res == E_OK) ? ret : res;
1549 }
1550 
BeginExecuteSql(const std::string & sql)1551 std::pair<int32_t, RdbStoreImpl::Stmt> RdbStoreImpl::BeginExecuteSql(const std::string& sql)
1552 {
1553     int type = SqliteUtils::GetSqlStatementType(sql);
1554     if (SqliteUtils::IsSpecial(type)) {
1555         return { E_NOT_SUPPORT, nullptr };
1556     }
1557 
1558     bool assumeReadOnly = SqliteUtils::IsSqlReadOnly(type);
1559     auto conn = connectionPool_->AcquireConnection(assumeReadOnly);
1560     if (conn == nullptr) {
1561         return { E_DATABASE_BUSY, nullptr };
1562     }
1563 
1564     auto [errCode, statement] = conn->CreateStatement(sql, conn);
1565     if (statement == nullptr) {
1566         return { errCode, nullptr };
1567     }
1568 
1569     if (statement->ReadOnly() && conn->IsWriter()) {
1570         statement = nullptr;
1571         conn = nullptr;
1572         return GetStatement(sql, true);
1573     }
1574 
1575     return { errCode, statement };
1576 }
1577 
IsHoldingConnection()1578 bool RdbStoreImpl::IsHoldingConnection()
1579 {
1580     return connectionPool_ != nullptr;
1581 }
1582 
SetDefaultEncryptSql(const std::shared_ptr<Statement> & statement,std::string sql,const RdbStoreConfig & config)1583 int RdbStoreImpl::SetDefaultEncryptSql(
1584     const std::shared_ptr<Statement> &statement, std::string sql, const RdbStoreConfig &config)
1585 {
1586     auto errCode = statement->Prepare(sql);
1587     if (errCode != E_OK) {
1588         LOG_ERROR("Prepare failed: %{public}s, %{public}d, %{public}d, %{public}d, %{public}d, %{public}u",
1589             SqliteUtils::Anonymous(config.GetName()).c_str(), config.GetCryptoParam().iterNum,
1590             config.GetCryptoParam().encryptAlgo, config.GetCryptoParam().hmacAlgo, config.GetCryptoParam().kdfAlgo,
1591             config.GetCryptoParam().cryptoPageSize);
1592         return errCode;
1593     }
1594     errCode = statement->Execute();
1595     if (errCode != E_OK) {
1596         LOG_ERROR("Execute failed: %{public}s, %{public}d, %{public}d, %{public}d, %{public}d, %{public}u",
1597             SqliteUtils::Anonymous(config.GetName()).c_str(), config.GetCryptoParam().iterNum,
1598             config.GetCryptoParam().encryptAlgo, config.GetCryptoParam().hmacAlgo, config.GetCryptoParam().kdfAlgo,
1599             config.GetCryptoParam().cryptoPageSize);
1600         return errCode;
1601     }
1602     return E_OK;
1603 }
1604 
SetDefaultEncryptAlgo(const ConnectionPool::SharedConn & conn,const RdbStoreConfig & config)1605 int RdbStoreImpl::SetDefaultEncryptAlgo(const ConnectionPool::SharedConn &conn, const RdbStoreConfig &config)
1606 {
1607     if (conn == nullptr) {
1608         return E_DATABASE_BUSY;
1609     }
1610 
1611     if (!config.GetCryptoParam().IsValid()) {
1612         LOG_ERROR("Invalid crypto param, name:%{public}s", SqliteUtils::Anonymous(config.GetName()).c_str());
1613         return E_INVALID_ARGS;
1614     }
1615 
1616     std::string sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_CIPHER_PREFIX) +
1617                       SqliteUtils::EncryptAlgoDescription(config.GetEncryptAlgo()) +
1618                       std::string(GlobalExpr::ALGO_SUFFIX);
1619     auto [errCode, statement] = conn->CreateStatement(sql, conn);
1620     errCode = SetDefaultEncryptSql(statement, sql, config);
1621     if (errCode != E_OK) {
1622         return errCode;
1623     }
1624 
1625     if (config.GetIter() > 0) {
1626         sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_KDF_ITER_PREFIX) + std::to_string(config.GetIter());
1627         errCode = SetDefaultEncryptSql(statement, sql, config);
1628         if (errCode != E_OK) {
1629             return errCode;
1630         }
1631     }
1632 
1633     sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_HMAC_ALGO_PREFIX) +
1634           SqliteUtils::HmacAlgoDescription(config.GetCryptoParam().hmacAlgo) + std::string(GlobalExpr::ALGO_SUFFIX);
1635     errCode = SetDefaultEncryptSql(statement, sql, config);
1636     if (errCode != E_OK) {
1637         return errCode;
1638     }
1639 
1640     sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_KDF_ALGO_PREFIX) +
1641                       SqliteUtils::KdfAlgoDescription(config.GetCryptoParam().kdfAlgo) +
1642                       std::string(GlobalExpr::ALGO_SUFFIX);
1643     errCode = SetDefaultEncryptSql(statement, sql, config);
1644     if (errCode != E_OK) {
1645         return errCode;
1646     }
1647 
1648     sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_PAGE_SIZE_PREFIX) +
1649                       std::to_string(config.GetCryptoParam().cryptoPageSize);
1650     return SetDefaultEncryptSql(statement, sql, config);
1651 }
1652 
AttachInner(const RdbStoreConfig & config,const std::string & attachName,const std::string & dbPath,const std::vector<uint8_t> & key,int32_t waitTime)1653 int RdbStoreImpl::AttachInner(const RdbStoreConfig &config, const std::string &attachName, const std::string &dbPath,
1654     const std::vector<uint8_t> &key, int32_t waitTime)
1655 {
1656     auto [conn, readers] = connectionPool_->AcquireAll(waitTime);
1657     if (conn == nullptr) {
1658         return E_DATABASE_BUSY;
1659     }
1660 
1661     if (config_.GetStorageMode() != StorageMode::MODE_MEMORY &&
1662         conn->GetJournalMode() == static_cast<int32_t>(JournalMode::MODE_WAL)) {
1663         // close first to prevent the connection from being put back.
1664         connectionPool_->CloseAllConnections();
1665         conn = nullptr;
1666         readers.clear();
1667         auto [err, newConn] = connectionPool_->DisableWal();
1668         if (err != E_OK) {
1669             return err;
1670         }
1671         conn = newConn;
1672     }
1673     std::vector<ValueObject> bindArgs;
1674     bindArgs.emplace_back(ValueObject(dbPath));
1675     bindArgs.emplace_back(ValueObject(attachName));
1676     if (!key.empty()) {
1677         auto ret = SetDefaultEncryptAlgo(conn, config);
1678         if (ret != E_OK) {
1679             return ret;
1680         }
1681         bindArgs.emplace_back(ValueObject(key));
1682         auto [errCode, statement] = conn->CreateStatement(GlobalExpr::ATTACH_WITH_KEY_SQL, conn);
1683         if (statement == nullptr || errCode != E_OK) {
1684             LOG_ERROR("Attach get statement failed, code is %{public}d", errCode);
1685             return E_ERROR;
1686         }
1687         return statement->Execute(bindArgs);
1688     }
1689 
1690     auto [errCode, statement] = conn->CreateStatement(GlobalExpr::ATTACH_SQL, conn);
1691     if (statement == nullptr || errCode != E_OK) {
1692         LOG_ERROR("Attach get statement failed, code is %{public}d", errCode);
1693         return errCode;
1694     }
1695     return statement->Execute(bindArgs);
1696 }
1697 
1698 /**
1699  * Attaches a database.
1700  */
Attach(const RdbStoreConfig & config,const std::string & attachName,int32_t waitTime)1701 std::pair<int32_t, int32_t> RdbStoreImpl::Attach(
1702     const RdbStoreConfig &config, const std::string &attachName, int32_t waitTime)
1703 {
1704     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR) || config_.GetHaMode() != HAMode::SINGLE) {
1705         return { E_NOT_SUPPORT, 0 };
1706     }
1707     std::string dbPath;
1708     int err = SqliteGlobalConfig::GetDbPath(config, dbPath);
1709     if (err != E_OK || access(dbPath.c_str(), F_OK) != E_OK) {
1710         return { E_INVALID_FILE_PATH, 0 };
1711     }
1712 
1713     // encrypted databases are not supported to attach a non encrypted database.
1714     if (!config.IsEncrypt() && config_.IsEncrypt()) {
1715         return { E_NOT_SUPPORT, 0 };
1716     }
1717 
1718     if (attachedInfo_.Contains(attachName)) {
1719         return { E_ATTACHED_DATABASE_EXIST, 0 };
1720     }
1721 
1722     std::vector<uint8_t> key;
1723     config.Initialize();
1724     if (config.IsEncrypt()) {
1725         key = config.GetEncryptKey();
1726     }
1727     err = AttachInner(config, attachName, dbPath, key, waitTime);
1728     key.assign(key.size(), 0);
1729     if (err == E_SQLITE_ERROR) {
1730         // only when attachName is already in use, SQLITE-ERROR will be reported here.
1731         return { E_ATTACHED_DATABASE_EXIST, 0 };
1732     } else if (err != E_OK) {
1733         LOG_ERROR("failed, errCode[%{public}d] fileName[%{public}s] attachName[%{public}s] attach fileName"
1734                   "[%{public}s]",
1735             err, SqliteUtils::Anonymous(config_.GetName()).c_str(), attachName.c_str(),
1736             SqliteUtils::Anonymous(config.GetName()).c_str());
1737         return { err, 0 };
1738     }
1739     if (!attachedInfo_.Insert(attachName, dbPath)) {
1740         return { E_ATTACHED_DATABASE_EXIST, 0 };
1741     }
1742     return { E_OK, attachedInfo_.Size() };
1743 }
1744 
Detach(const std::string & attachName,int32_t waitTime)1745 std::pair<int32_t, int32_t> RdbStoreImpl::Detach(const std::string &attachName, int32_t waitTime)
1746 {
1747     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1748         return { E_NOT_SUPPORT, 0 };
1749     }
1750     if (!attachedInfo_.Contains(attachName)) {
1751         return { E_OK, attachedInfo_.Size() };
1752     }
1753 
1754     auto [connection, readers] = connectionPool_->AcquireAll(waitTime);
1755     if (connection == nullptr) {
1756         return { E_DATABASE_BUSY, 0 };
1757     }
1758     std::vector<ValueObject> bindArgs;
1759     bindArgs.push_back(ValueObject(attachName));
1760 
1761     auto [errCode, statement] = connection->CreateStatement(GlobalExpr::DETACH_SQL, connection);
1762     if (statement == nullptr || errCode != E_OK) {
1763         LOG_ERROR("Detach get statement failed, errCode %{public}d", errCode);
1764         return { errCode, 0 };
1765     }
1766     errCode = statement->Execute(bindArgs);
1767     if (errCode != E_OK) {
1768         LOG_ERROR("failed, errCode[%{public}d] fileName[%{public}s] attachName[%{public}s] attach", errCode,
1769             SqliteUtils::Anonymous(config_.GetName()).c_str(), attachName.c_str());
1770         return { errCode, 0 };
1771     }
1772 
1773     attachedInfo_.Erase(attachName);
1774     if (!attachedInfo_.Empty()) {
1775         return { E_OK, attachedInfo_.Size() };
1776     }
1777     statement = nullptr;
1778     // close first to prevent the connection from being put back.
1779     connectionPool_->CloseAllConnections();
1780     connection = nullptr;
1781     readers.clear();
1782     errCode = connectionPool_->EnableWal();
1783     return { errCode, 0 };
1784 }
1785 
1786 /**
1787  * Obtains the database version.
1788  */
GetVersion(int & version)1789 int RdbStoreImpl::GetVersion(int &version)
1790 {
1791     auto [errCode, statement] = GetStatement(GlobalExpr::PRAGMA_VERSION, isReadOnly_);
1792     if (statement == nullptr) {
1793         return errCode;
1794     }
1795     ValueObject value;
1796     std::tie(errCode, value) = statement->ExecuteForValue();
1797     auto val = std::get_if<int64_t>(&value.value);
1798     if (val != nullptr) {
1799         version = static_cast<int>(*val);
1800     }
1801     return errCode;
1802 }
1803 
1804 /**
1805  * Sets the version of a new database.
1806  */
SetVersion(int version)1807 int RdbStoreImpl::SetVersion(int version)
1808 {
1809     if (isReadOnly_) {
1810         return E_NOT_SUPPORT;
1811     }
1812     std::string sql = std::string(GlobalExpr::PRAGMA_VERSION) + " = " + std::to_string(version);
1813     auto [errCode, statement] = GetStatement(sql);
1814     if (statement == nullptr) {
1815         return errCode;
1816     }
1817     return statement->Execute();
1818 }
1819 /**
1820  * Begins a transaction in EXCLUSIVE mode.
1821  */
BeginTransaction()1822 int RdbStoreImpl::BeginTransaction()
1823 {
1824     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1825     std::lock_guard<std::mutex> lockGuard(connectionPool_->GetTransactionStackMutex());
1826     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1827         return E_NOT_SUPPORT;
1828     }
1829     // size + 1 means the number of transactions in process
1830     size_t transactionId = connectionPool_->GetTransactionStack().size() + 1;
1831     BaseTransaction transaction(connectionPool_->GetTransactionStack().size());
1832     auto [errCode, statement] = GetStatement(transaction.GetTransactionStr());
1833     if (statement == nullptr) {
1834         return errCode;
1835     }
1836     errCode = statement->Execute();
1837     if (errCode != E_OK) {
1838         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1839             connectionPool_->Dump(true, "BEGIN");
1840         }
1841         LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
1842             transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
1843         return errCode;
1844     }
1845     connectionPool_->SetInTransaction(true);
1846     connectionPool_->GetTransactionStack().push(transaction);
1847     // 1 means the number of transactions in process
1848     if (transactionId > 1) {
1849         LOG_WARN("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
1850             transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
1851     }
1852 
1853     return E_OK;
1854 }
1855 
BeginTrans()1856 std::pair<int, int64_t> RdbStoreImpl::BeginTrans()
1857 {
1858     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1859     if (!config_.IsVector() || isReadOnly_) {
1860         return {E_NOT_SUPPORT, 0};
1861     }
1862 
1863     int64_t tmpTrxId = 0;
1864     auto [errCode, connection] = connectionPool_->CreateTransConn(false);
1865     if (connection == nullptr) {
1866         LOG_ERROR("Get null connection, storeName: %{public}s errCode:0x%{public}x.",
1867             SqliteUtils::Anonymous(name_).c_str(), errCode);
1868         return {errCode, 0};
1869     }
1870     tmpTrxId = newTrxId_.fetch_add(1);
1871     trxConnMap_.Insert(tmpTrxId, connection);
1872     errCode = ExecuteByTrxId(BEGIN_TRANSACTION_SQL, tmpTrxId);
1873     if (errCode != E_OK) {
1874         trxConnMap_.Erase(tmpTrxId);
1875     }
1876     return {errCode, tmpTrxId};
1877 }
1878 
1879 /**
1880 * Begins a transaction in EXCLUSIVE mode.
1881 */
RollBack()1882 int RdbStoreImpl::RollBack()
1883 {
1884     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1885     std::lock_guard<std::mutex> lockGuard(connectionPool_->GetTransactionStackMutex());
1886     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1887         return E_NOT_SUPPORT;
1888     }
1889     size_t transactionId = connectionPool_->GetTransactionStack().size();
1890 
1891     if (connectionPool_->GetTransactionStack().empty()) {
1892         LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s", transactionId,
1893             SqliteUtils::Anonymous(name_).c_str());
1894         return E_NO_TRANSACTION_IN_SESSION;
1895     }
1896     BaseTransaction transaction = connectionPool_->GetTransactionStack().top();
1897     connectionPool_->GetTransactionStack().pop();
1898     if (transaction.GetType() != TransType::ROLLBACK_SELF && !connectionPool_->GetTransactionStack().empty()) {
1899         connectionPool_->GetTransactionStack().top().SetChildFailure(true);
1900     }
1901     auto [errCode, statement] = GetStatement(transaction.GetRollbackStr());
1902     if (statement == nullptr) {
1903         if (errCode == E_DATABASE_BUSY) {
1904             Reportor::Report(Reportor::Create(config_, errCode, "ErrorType: RollBusy"));
1905         }
1906         // size + 1 means the number of transactions in process
1907         LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s", transactionId + 1,
1908             SqliteUtils::Anonymous(name_).c_str());
1909         return E_DATABASE_BUSY;
1910     }
1911     errCode = statement->Execute();
1912     if (errCode != E_OK) {
1913         if (errCode == E_SQLITE_BUSY || errCode == E_SQLITE_LOCKED) {
1914             Reportor::Report(Reportor::Create(config_, errCode, "ErrorType: RollBusy"));
1915         }
1916         LOG_ERROR("failed, id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
1917             transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
1918         return errCode;
1919     }
1920     if (connectionPool_->GetTransactionStack().empty()) {
1921         connectionPool_->SetInTransaction(false);
1922     }
1923     // 1 means the number of transactions in process
1924     if (transactionId > 1) {
1925         LOG_WARN("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
1926             transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
1927     }
1928     return E_OK;
1929 }
1930 
ExecuteByTrxId(const std::string & sql,int64_t trxId,bool closeConnAfterExecute,const std::vector<ValueObject> & bindArgs)1931 int RdbStoreImpl::ExecuteByTrxId(const std::string &sql, int64_t trxId, bool closeConnAfterExecute,
1932     const std::vector<ValueObject> &bindArgs)
1933 {
1934     if ((!config_.IsVector()) || isReadOnly_) {
1935         return E_NOT_SUPPORT;
1936     }
1937     if (trxId == 0) {
1938         return E_INVALID_ARGS;
1939     }
1940 
1941     if (!trxConnMap_.Contains(trxId)) {
1942         LOG_ERROR("trxId hasn't appeared before %{public}" PRIu64, trxId);
1943         return E_INVALID_ARGS;
1944     }
1945     auto time = static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
1946     auto result = trxConnMap_.Find(trxId);
1947     auto connection = result.second;
1948     if (connection == nullptr) {
1949         LOG_ERROR("Get null connection, storeName: %{public}s time:%{public}" PRIu64 ".",
1950             SqliteUtils::Anonymous(name_).c_str(), time);
1951         return E_ERROR;
1952     }
1953     auto [ret, statement] = GetStatement(sql, connection);
1954     if (ret != E_OK) {
1955         return ret;
1956     }
1957     ret = statement->Execute(bindArgs);
1958     if (ret != E_OK) {
1959         LOG_ERROR("transaction id: %{public}" PRIu64 ", storeName: %{public}s, errCode: %{public}d" PRIu64, trxId,
1960             SqliteUtils::Anonymous(name_).c_str(), ret);
1961         trxConnMap_.Erase(trxId);
1962         return ret;
1963     }
1964     if (closeConnAfterExecute) {
1965         trxConnMap_.Erase(trxId);
1966     }
1967     return E_OK;
1968 }
1969 
RollBack(int64_t trxId)1970 int RdbStoreImpl::RollBack(int64_t trxId)
1971 {
1972     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1973     return ExecuteByTrxId(ROLLBACK_TRANSACTION_SQL, trxId, true);
1974 }
1975 
1976 /**
1977 * Begins a transaction in EXCLUSIVE mode.
1978 */
Commit()1979 int RdbStoreImpl::Commit()
1980 {
1981     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1982     std::lock_guard<std::mutex> lockGuard(connectionPool_->GetTransactionStackMutex());
1983     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1984         return E_NOT_SUPPORT;
1985     }
1986     size_t transactionId = connectionPool_->GetTransactionStack().size();
1987 
1988     if (connectionPool_->GetTransactionStack().empty()) {
1989         return E_OK;
1990     }
1991     BaseTransaction transaction = connectionPool_->GetTransactionStack().top();
1992     std::string sqlStr = transaction.GetCommitStr();
1993     if (sqlStr.size() <= 1) {
1994         LOG_WARN("id: %{public}zu, storeName: %{public}s, sql: %{public}s",
1995             transactionId, SqliteUtils::Anonymous(name_).c_str(), sqlStr.c_str());
1996         connectionPool_->GetTransactionStack().pop();
1997         return E_OK;
1998     }
1999     auto [errCode, statement] = GetStatement(sqlStr);
2000     if (statement == nullptr) {
2001         if (errCode == E_DATABASE_BUSY || errCode == E_SQLITE_BUSY || E_SQLITE_LOCKED) {
2002             Reportor::Report(Reportor::Create(config_, E_DATABASE_BUSY, "ErrorType: Busy"));
2003         }
2004         LOG_ERROR("id: %{public}zu, storeName: %{public}s, statement error", transactionId,
2005             SqliteUtils::Anonymous(name_).c_str());
2006         return E_DATABASE_BUSY;
2007     }
2008     errCode = statement->Execute();
2009     if (errCode != E_OK) {
2010         if (errCode == E_SQLITE_BUSY || errCode == E_SQLITE_LOCKED) {
2011             Reportor::Report(Reportor::Create(config_, errCode, "ErrorType: CommitBusy"));
2012         }
2013         LOG_ERROR("failed, id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
2014             transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
2015         return errCode;
2016     }
2017     connectionPool_->SetInTransaction(false);
2018     // 1 means the number of transactions in process
2019     if (transactionId > 1) {
2020         LOG_WARN("id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2021             SqliteUtils::Anonymous(name_).c_str(), errCode);
2022     }
2023     connectionPool_->GetTransactionStack().pop();
2024     return E_OK;
2025 }
2026 
Commit(int64_t trxId)2027 int RdbStoreImpl::Commit(int64_t trxId)
2028 {
2029     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2030     return ExecuteByTrxId(COMMIT_TRANSACTION_SQL, trxId, true);
2031 }
2032 
IsInTransaction()2033 bool RdbStoreImpl::IsInTransaction()
2034 {
2035     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2036         return false;
2037     }
2038     return connectionPool_->IsInTransaction();
2039 }
2040 
CheckAttach(const std::string & sql)2041 int RdbStoreImpl::CheckAttach(const std::string &sql)
2042 {
2043     size_t index = sql.find_first_not_of(' ');
2044     if (index == std::string::npos) {
2045         return E_OK;
2046     }
2047 
2048     /* The first 3 characters can determine the type */
2049     std::string sqlType = sql.substr(index, 3);
2050     sqlType = SqliteUtils::StrToUpper(sqlType);
2051     if (sqlType != "ATT") {
2052         return E_OK;
2053     }
2054 
2055     auto [errCode, statement] = GetStatement(GlobalExpr::PRAGMA_JOUR_MODE_EXP);
2056     if (statement == nullptr) {
2057         return errCode;
2058     }
2059 
2060     errCode = statement->Execute();
2061     if (errCode != E_OK) {
2062         LOG_ERROR("RdbStoreImpl CheckAttach fail to get journal mode : %{public}d", errCode);
2063         return errCode;
2064     }
2065     auto [errorCode, valueObject] = statement->GetColumn(0);
2066     if (errorCode != E_OK) {
2067         LOG_ERROR("RdbStoreImpl CheckAttach fail to get journal mode : %{public}d", errorCode);
2068         return errorCode;
2069     }
2070     auto journal = std::get_if<std::string>(&valueObject.value);
2071     auto journalMode = SqliteUtils::StrToUpper((journal == nullptr) ? "" : *journal);
2072     if (journalMode == RdbStoreConfig::DB_DEFAULT_JOURNAL_MODE) {
2073         LOG_ERROR("RdbStoreImpl attach is not supported in WAL mode");
2074         return E_NOT_SUPPORTED_ATTACH_IN_WAL_MODE;
2075     }
2076 
2077     return E_OK;
2078 }
2079 
IsOpen() const2080 bool RdbStoreImpl::IsOpen() const
2081 {
2082     return isOpen_;
2083 }
2084 
GetPath()2085 std::string RdbStoreImpl::GetPath()
2086 {
2087     return path_;
2088 }
2089 
IsReadOnly() const2090 bool RdbStoreImpl::IsReadOnly() const
2091 {
2092     return isReadOnly_;
2093 }
2094 
IsMemoryRdb() const2095 bool RdbStoreImpl::IsMemoryRdb() const
2096 {
2097     return isMemoryRdb_;
2098 }
2099 
GetName()2100 std::string RdbStoreImpl::GetName()
2101 {
2102     return name_;
2103 }
2104 
DoCloudSync(const std::string & table)2105 void RdbStoreImpl::DoCloudSync(const std::string &table)
2106 {
2107 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
2108     auto needSync = cloudInfo_->Change(table);
2109     if (!needSync) {
2110         return;
2111     }
2112     auto pool = TaskExecutor::GetInstance().GetExecutor();
2113     if (pool == nullptr) {
2114         return;
2115     }
2116     auto interval =
2117         std::chrono::duration_cast<std::chrono::steady_clock::duration>(std::chrono::milliseconds(INTERVAL));
2118     pool->Schedule(interval, [cloudInfo = std::weak_ptr<CloudTables>(cloudInfo_), param = syncerParam_]() {
2119         auto changeInfo = cloudInfo.lock();
2120         if (changeInfo == nullptr) {
2121             return ;
2122         }
2123         auto tables = changeInfo->Steal();
2124         if (tables.empty()) {
2125             return;
2126         }
2127         DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true };
2128         auto memo = AbsRdbPredicates(std::vector<std::string>(tables.begin(), tables.end())).GetDistributedPredicates();
2129         InnerSync(param, option, memo, nullptr);
2130     });
2131 #endif
2132 }
GetFileType()2133 std::string RdbStoreImpl::GetFileType()
2134 {
2135     return fileType_;
2136 }
2137 
2138 /**
2139  * Sets the database locale.
2140  */
ConfigLocale(const std::string & localeStr)2141 int RdbStoreImpl::ConfigLocale(const std::string &localeStr)
2142 {
2143     if (!isOpen_) {
2144         LOG_ERROR("The connection pool has been closed.");
2145         return E_ERROR;
2146     }
2147 
2148     if (connectionPool_ == nullptr) {
2149         LOG_ERROR("connectionPool_ is null.");
2150         return E_ERROR;
2151     }
2152     return connectionPool_->ConfigLocale(localeStr);
2153 }
2154 
GetDestPath(const std::string & backupPath,std::string & destPath)2155 int RdbStoreImpl::GetDestPath(const std::string &backupPath, std::string &destPath)
2156 {
2157     int ret = GetDataBasePath(backupPath, destPath);
2158     if (ret != E_OK) {
2159         return ret;
2160     }
2161     std::string tempPath = destPath + ".tmp";
2162     if (access(tempPath.c_str(), F_OK) == E_OK) {
2163         destPath = tempPath;
2164     } else {
2165         auto walFile = destPath + "-wal";
2166         if (access(walFile.c_str(), F_OK) == E_OK) {
2167             return E_ERROR;
2168         }
2169     }
2170 
2171     if (access(destPath.c_str(), F_OK) != E_OK) {
2172         LOG_ERROR("The backupFilePath does not exists.");
2173         return E_INVALID_FILE_PATH;
2174     }
2175     return E_OK;
2176 }
2177 
Restore(const std::string & backupPath,const std::vector<uint8_t> & newKey)2178 int RdbStoreImpl::Restore(const std::string &backupPath, const std::vector<uint8_t> &newKey)
2179 {
2180     LOG_INFO("Restore db: %{public}s.", SqliteUtils::Anonymous(config_.GetName()).c_str());
2181     if (isReadOnly_) {
2182         return E_NOT_SUPPORT;
2183     }
2184 
2185     if (!isOpen_ || connectionPool_ == nullptr) {
2186         LOG_ERROR("The pool is: %{public}d, pool is null: %{public}d", isOpen_, connectionPool_ == nullptr);
2187         return E_ERROR;
2188     }
2189 
2190     RdbSecurityManager::KeyFiles keyFiles(path_ + BACKUP_RESTORE);
2191     keyFiles.Lock();
2192     std::string destPath;
2193     bool isOK = TryGetMasterSlaveBackupPath(backupPath, destPath, true);
2194     if (!isOK) {
2195         int ret = GetDestPath(backupPath, destPath);
2196         if (ret != E_OK) {
2197             keyFiles.Unlock();
2198             return ret;
2199         }
2200     }
2201 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
2202     auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
2203     if (service != nullptr) {
2204         service->Disable(syncerParam_);
2205     }
2206 #endif
2207     bool corrupt = Reportor::IsReportCorruptedFault(path_);
2208     int errCode = connectionPool_->ChangeDbFileForRestore(path_, destPath, newKey, slaveStatus_);
2209     keyFiles.Unlock();
2210 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
2211     SecurityPolicy::SetSecurityLabel(config_);
2212     if (service != nullptr) {
2213         service->Enable(syncerParam_);
2214         if (errCode == E_OK) {
2215             auto syncerParam = syncerParam_;
2216             syncerParam.infos_ = Connection::Collect(config_);
2217             service->AfterOpen(syncerParam);
2218             NotifyDataChange();
2219         }
2220     }
2221 #endif
2222     if (errCode == E_OK) {
2223         Reportor::ReportRestore(Reportor::Create(config_, E_OK), corrupt);
2224         rebuild_ = RebuiltType::NONE;
2225     }
2226     DoCloudSync("");
2227     return errCode;
2228 }
2229 
CreateWritableConn()2230 std::pair<int32_t, std::shared_ptr<Connection>> RdbStoreImpl::CreateWritableConn()
2231 {
2232     auto config  = config_;
2233     config.SetHaMode(HAMode::SINGLE);
2234     auto [result, conn] = Connection::Create(config, true);
2235     if (result != E_OK || conn == nullptr) {
2236         LOG_ERROR("create connection failed, err:%{public}d", result);
2237         return { result, nullptr };
2238     }
2239     return { E_OK, conn };
2240 }
2241 
GetStatement(const std::string & sql,std::shared_ptr<Connection> conn) const2242 std::pair<int32_t, std::shared_ptr<Statement>> RdbStoreImpl::GetStatement(
2243     const std::string &sql, std::shared_ptr<Connection> conn) const
2244 {
2245     if (conn == nullptr) {
2246         return { E_DATABASE_BUSY, nullptr };
2247     }
2248     return conn->CreateStatement(sql, conn);
2249 }
2250 
GetStatement(const std::string & sql,bool read) const2251 std::pair<int32_t, std::shared_ptr<Statement>> RdbStoreImpl::GetStatement(const std::string& sql, bool read) const
2252 {
2253     auto conn = connectionPool_->AcquireConnection(read);
2254     if (conn == nullptr) {
2255         return { E_DATABASE_BUSY, nullptr };
2256     }
2257     return conn->CreateStatement(sql, conn);
2258 }
2259 
GetRebuilt(RebuiltType & rebuilt)2260 int RdbStoreImpl::GetRebuilt(RebuiltType &rebuilt)
2261 {
2262     rebuilt = static_cast<RebuiltType>(rebuild_);
2263     return E_OK;
2264 }
2265 
InterruptBackup()2266 int RdbStoreImpl::InterruptBackup()
2267 {
2268     if (config_.GetHaMode() != HAMode::MANUAL_TRIGGER) {
2269         return E_NOT_SUPPORT;
2270     }
2271     if (slaveStatus_ == SlaveStatus::BACKING_UP) {
2272         slaveStatus_ = SlaveStatus::BACKUP_INTERRUPT;
2273         return E_OK;
2274     }
2275     return E_INVALID_INTERRUPT;
2276 }
2277 
GetBackupStatus() const2278 int32_t RdbStoreImpl::GetBackupStatus() const
2279 {
2280     if (config_.GetHaMode() != HAMode::MANUAL_TRIGGER && config_.GetHaMode() != HAMode::MAIN_REPLICA) {
2281         return SlaveStatus::UNDEFINED;
2282     }
2283     return slaveStatus_;
2284 }
2285 
TryGetMasterSlaveBackupPath(const std::string & srcPath,std::string & destPath,bool isRestore)2286 bool RdbStoreImpl::TryGetMasterSlaveBackupPath(const std::string &srcPath, std::string &destPath, bool isRestore)
2287 {
2288     if (!srcPath.empty() || config_.GetHaMode() == HAMode::SINGLE || config_.GetDBType() != DB_SQLITE) {
2289         return false;
2290     }
2291     int ret = GetSlaveName(config_.GetPath(), destPath);
2292     if (ret != E_OK) {
2293         destPath = {};
2294         return false;
2295     }
2296     if (isRestore && access(destPath.c_str(), F_OK) != 0) {
2297         LOG_WARN("The backup path can not access: %{public}s", SqliteUtils::Anonymous(destPath).c_str());
2298         return false;
2299     }
2300     return true;
2301 }
2302 
IsSlaveDiffFromMaster() const2303 bool RdbStoreImpl::IsSlaveDiffFromMaster() const
2304 {
2305     std::string failureFlagFile = config_.GetPath() + "-slaveFailure";
2306     std::string slaveDbPath = SqliteUtils::GetSlavePath(config_.GetPath());
2307     return access(failureFlagFile.c_str(), F_OK) == 0 || access(slaveDbPath.c_str(), F_OK) != 0;
2308 }
2309 
ExchangeSlaverToMaster()2310 int32_t RdbStoreImpl::ExchangeSlaverToMaster()
2311 {
2312     if (isReadOnly_) {
2313         return E_OK;
2314     }
2315     auto conn = connectionPool_->AcquireConnection(false);
2316     if (conn == nullptr) {
2317         return E_DATABASE_BUSY;
2318     }
2319     auto strategy = conn->GenerateExchangeStrategy(slaveStatus_);
2320     if (strategy != ExchangeStrategy::NOT_HANDLE) {
2321         LOG_WARN("exchange st:%{public}d, %{public}s,", strategy, SqliteUtils::Anonymous(config_.GetName()).c_str());
2322     }
2323     int ret = E_OK;
2324     if (strategy == ExchangeStrategy::RESTORE) {
2325         conn = nullptr;
2326         // disable is required before restore
2327         ret = Restore({}, {});
2328     } else if (strategy == ExchangeStrategy::BACKUP) {
2329         // async backup
2330         ret = conn->Backup({}, {}, true, slaveStatus_);
2331     }
2332     return ret;
2333 }
2334 
GetDbType() const2335 int32_t RdbStoreImpl::GetDbType() const
2336 {
2337     return config_.GetDBType();
2338 }
2339 
CreateTransaction(int32_t type)2340 std::pair<int32_t, std::shared_ptr<Transaction>> RdbStoreImpl::CreateTransaction(int32_t type)
2341 {
2342     if (isReadOnly_) {
2343         return { E_NOT_SUPPORT, nullptr};
2344     }
2345 
2346     auto [errCode, conn] = connectionPool_->CreateTransConn();
2347     if (conn == nullptr) {
2348         return { errCode, nullptr };
2349     }
2350     std::shared_ptr<Transaction> trans;
2351     std::tie(errCode, trans) = Transaction::Create(type, conn, config_.GetName());
2352     if (trans == nullptr) {
2353         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
2354             connectionPool_->Dump(true, "TRANS");
2355         }
2356         return { errCode, nullptr };
2357     }
2358 
2359     std::lock_guard<decltype(mutex_)> guard(mutex_);
2360     for (auto it = transactions_.begin(); it != transactions_.end();) {
2361         if (it->expired()) {
2362             it = transactions_.erase(it);
2363         } else {
2364             it++;
2365         }
2366     }
2367     transactions_.push_back(trans);
2368     return { errCode, trans };
2369 }
2370 
AddTables(const std::vector<std::string> & tables)2371 int32_t RdbStoreImpl::CloudTables::AddTables(const std::vector<std::string> &tables)
2372 {
2373     std::lock_guard<std::mutex> lock(mutex_);
2374     for (auto &table : tables) {
2375         tables_.insert(table);
2376     }
2377     return E_OK;
2378 }
2379 
RmvTables(const std::vector<std::string> & tables)2380 int32_t RdbStoreImpl::CloudTables::RmvTables(const std::vector<std::string> &tables)
2381 {
2382     std::lock_guard<std::mutex> lock(mutex_);
2383     for (auto &table : tables) {
2384         tables_.erase(table);
2385     }
2386     return E_OK;
2387 }
2388 
Change(const std::string & table)2389 bool RdbStoreImpl::CloudTables::Change(const std::string &table)
2390 {
2391     bool needSync = false;
2392     {
2393         std::lock_guard<std::mutex> lock(mutex_);
2394         if (tables_.empty() || (!table.empty() && tables_.find(table) == tables_.end())) {
2395             return needSync;
2396         }
2397         // from empty, then need schedule the cloud sync, others only wait the schedule execute.
2398         needSync = changes_.empty();
2399         if (!table.empty()) {
2400             changes_.insert(table);
2401         } else {
2402             changes_.insert(tables_.begin(), tables_.end());
2403         }
2404     }
2405     return needSync;
2406 }
2407 
Steal()2408 std::set<std::string> RdbStoreImpl::CloudTables::Steal()
2409 {
2410     std::set<std::string> result;
2411     {
2412         std::lock_guard<std::mutex> lock(mutex_);
2413         result = std::move(changes_);
2414     }
2415     return result;
2416 }
2417 } // namespace OHOS::NativeRdb