1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "ConnectionPool"
16 #include "connection_pool.h"
17 
18 #include <base_transaction.h>
19 #include <condition_variable>
20 #include <iterator>
21 #include <mutex>
22 #include <sstream>
23 #include <vector>
24 
25 #include "logger.h"
26 #include "connection.h"
27 #include "rdb_common.h"
28 #include "rdb_errno.h"
29 #include "rdb_fault_hiview_reporter.h"
30 #include "rdb_sql_statistic.h"
31 #include "sqlite_global_config.h"
32 #include "sqlite_utils.h"
33 
34 namespace OHOS {
35 namespace NativeRdb {
36 using namespace OHOS::Rdb;
37 using namespace std::chrono;
38 using Conn = Connection;
39 using ConnPool = ConnectionPool;
40 using SharedConn = std::shared_ptr<Connection>;
41 using SharedConns = std::vector<std::shared_ptr<Connection>>;
42 using SqlStatistic = DistributedRdb::SqlStatistic;
43 using Reportor = RdbFaultHiViewReporter;
44 constexpr int32_t TRANSACTION_TIMEOUT(2);
45 
Create(const RdbStoreConfig & config,int & errCode)46 std::shared_ptr<ConnPool> ConnPool::Create(const RdbStoreConfig &config, int &errCode)
47 {
48     std::shared_ptr<ConnPool> pool(new (std::nothrow) ConnPool(config));
49     if (pool == nullptr) {
50         LOG_ERROR("ConnPool::Create new failed, pool is nullptr.");
51         errCode = E_ERROR;
52         return nullptr;
53     }
54     std::shared_ptr<Connection> conn;
55     for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
56         std::tie(errCode, conn) = pool->Init();
57         if (errCode != E_SQLITE_CORRUPT) {
58             break;
59         }
60         config.SetIter(ITER_V1);
61     }
62     std::string dbPath;
63     (void)SqliteGlobalConfig::GetDbPath(config, dbPath);
64     LOG_INFO("code:%{public}d app:%{public}s path:[%{public}s] "
65              "cfg:[%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d]"
66              "%{public}s",
67         errCode, config.GetBundleName().c_str(), SqliteUtils::Anonymous(dbPath).c_str(), config.GetDBType(),
68         config.GetHaMode(), config.IsEncrypt(), config.GetArea(), config.GetSecurityLevel(), config.GetRoleType(),
69         config.IsReadOnly(),
70         Reportor::FormatBrief(Connection::Collect(config), SqliteUtils::Anonymous(config.GetName())).c_str());
71     return errCode == E_OK ? pool : nullptr;
72 }
73 
HandleDataCorruption(const RdbStoreConfig & storeConfig,int & errCode)74 std::pair<RebuiltType, std::shared_ptr<ConnectionPool>> ConnPool::HandleDataCorruption
75     (const RdbStoreConfig &storeConfig, int &errCode)
76 {
77     std::pair<RebuiltType, std::shared_ptr<ConnectionPool>> result;
78     auto &[rebuiltType, pool] = result;
79 
80     int repairErrCode = Connection::Repair(storeConfig);
81     if (repairErrCode == E_OK) {
82         rebuiltType = RebuiltType::REPAIRED;
83     } else if (storeConfig.GetAllowRebuild()) {
84         Connection::Delete(storeConfig);
85         rebuiltType = RebuiltType::REBUILT;
86     } else if (storeConfig.IsEncrypt() && errCode == E_INVALID_SECRET_KEY) {
87         return result;
88     } else {
89         errCode = E_SQLITE_CORRUPT;
90         return result;
91     }
92     pool = Create(storeConfig, errCode);
93     if (errCode != E_OK) {
94         LOG_WARN("failed, type %{public}d db %{public}s encrypt %{public}d error %{public}d, errno",
95             static_cast<uint32_t>(rebuiltType), SqliteUtils::Anonymous(storeConfig.GetName()).c_str(),
96             storeConfig.IsEncrypt(), errCode, errno);
97     } else {
98         Reportor::ReportRestore(Reportor::Create(storeConfig, E_OK, "RestoreType:Rebuild"), false);
99     }
100 
101     return result;
102 }
103 
ConnectionPool(const RdbStoreConfig & storeConfig)104 ConnPool::ConnectionPool(const RdbStoreConfig &storeConfig)
105     : config_(storeConfig), attachConfig_(storeConfig), writers_(), readers_(), transactionStack_(),
106       transactionUsed_(false)
107 {
108     attachConfig_.SetJournalMode(JournalMode::MODE_TRUNCATE);
109 }
110 
Init(bool isAttach,bool needWriter)111 std::pair<int32_t, std::shared_ptr<Connection>> ConnPool::Init(bool isAttach, bool needWriter)
112 {
113     const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
114     std::pair<int32_t, std::shared_ptr<Connection>> result;
115     auto &[errCode, conn] = result;
116     errCode = config.Initialize();
117     if (errCode != E_OK) {
118         return result;
119     }
120 
121     if (config.GetRoleType() == OWNER && !config.IsReadOnly()) {
122         // write connect count is 1
123         std::shared_ptr<ConnPool::ConnNode> node;
124         std::tie(errCode, node) = writers_.Initialize(
125             [this, isAttach]() {
126                 const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
127                 return Connection::Create(config, true);
128             },
129             1, config.GetWriteTime(), true, needWriter);
130         conn = Convert2AutoConn(node);
131         if (errCode != E_OK) {
132             return result;
133         }
134     }
135 
136     maxReader_ = GetMaxReaders(config);
137     // max read connect count is 64
138     if (maxReader_ > 64) {
139         return { E_ARGS_READ_CON_OVERLOAD, nullptr };
140     }
141     auto [ret, node] = readers_.Initialize(
142         [this, isAttach]() {
143             const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
144             return Connection::Create(config, false);
145         },
146         maxReader_, config.GetReadTime(), maxReader_ == 0);
147     errCode = ret;
148     return result;
149 }
150 
~ConnectionPool()151 ConnPool::~ConnectionPool()
152 {
153     CloseAllConnections();
154 }
155 
GetMaxReaders(const RdbStoreConfig & config)156 int32_t ConnPool::GetMaxReaders(const RdbStoreConfig &config)
157 {
158     if (config.GetStorageMode() != StorageMode::MODE_MEMORY &&
159         config.GetJournalMode() == RdbStoreConfig::GetJournalModeValue(JournalMode::MODE_WAL)) {
160         return config.GetReadConSize();
161     } else {
162         return 0;
163     }
164 }
165 
Convert2AutoConn(std::shared_ptr<ConnNode> node,bool isTrans)166 std::shared_ptr<Connection> ConnPool::Convert2AutoConn(std::shared_ptr<ConnNode> node, bool isTrans)
167 {
168     if (node == nullptr) {
169         return nullptr;
170     }
171 
172     auto conn = node->GetConnect();
173     if (conn == nullptr) {
174         return nullptr;
175     }
176     if (isTrans) {
177         transCount_++;
178     }
179 
180     return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), node, isTrans](auto *) mutable {
181         auto realPool = pool.lock();
182         if (realPool == nullptr) {
183             return;
184         }
185         realPool->ReleaseNode(node, !isTrans);
186         if (isTrans) {
187             realPool->transCount_--;
188         }
189         node = nullptr;
190     });
191 }
192 
CloseAllConnections()193 void ConnPool::CloseAllConnections()
194 {
195     writers_.Clear();
196     readers_.Clear();
197 }
198 
IsInTransaction()199 bool ConnPool::IsInTransaction()
200 {
201     return isInTransaction_.load();
202 }
203 
SetInTransaction(bool isInTransaction)204 void ConnPool::SetInTransaction(bool isInTransaction)
205 {
206     isInTransaction_.store(isInTransaction);
207 }
208 
CreateTransConn(bool limited)209 std::pair<int32_t, std::shared_ptr<Connection>> ConnPool::CreateTransConn(bool limited)
210 {
211     if (transCount_ >= MAX_TRANS && limited) {
212         writers_.Dump("NO TRANS", transCount_ + isInTransaction_);
213         return { E_DATABASE_BUSY, nullptr };
214     }
215     auto [errCode, node] = writers_.Create();
216     return { errCode, Convert2AutoConn(node, true) };
217 }
218 
AcquireConnection(bool isReadOnly)219 std::shared_ptr<Conn> ConnPool::AcquireConnection(bool isReadOnly)
220 {
221     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
222     return Acquire(isReadOnly);
223 }
224 
AcquireAll(int32_t time)225 std::pair<SharedConn, SharedConns> ConnPool::AcquireAll(int32_t time)
226 {
227     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
228     using namespace std::chrono;
229     std::pair<SharedConn, SharedConns> result;
230     auto &[writer, readers] = result;
231     auto interval = duration_cast<milliseconds>(seconds(time));
232     auto start = steady_clock::now();
233     auto writerNodes = writers_.AcquireAll(interval);
234     if (writerNodes.empty()) {
235         return {};
236     }
237     writer = Convert2AutoConn(writerNodes.front());
238 
239     auto usedTime = duration_cast<milliseconds>(steady_clock::now() - start);
240     if (writer == nullptr || usedTime >= interval) {
241         return {};
242     }
243 
244     if (maxReader_ == 0) {
245         return result;
246     }
247 
248     readers_.Disable();
249     auto nodes = readers_.AcquireAll(interval - usedTime);
250     if (nodes.empty()) {
251         readers_.Enable();
252         return {};
253     }
254 
255     for (auto node : nodes) {
256         auto conn = Convert2AutoConn(node);
257         if (conn == nullptr) {
258             continue;
259         }
260         readers.push_back(conn);
261     }
262     return result;
263 }
264 
Acquire(bool isReadOnly,std::chrono::milliseconds ms)265 std::shared_ptr<Conn> ConnPool::Acquire(bool isReadOnly, std::chrono::milliseconds ms)
266 {
267     Container *container = (isReadOnly && maxReader_ != 0) ? &readers_ : &writers_;
268     auto node = container->Acquire(ms);
269     if (node == nullptr) {
270         const char *header = (isReadOnly && maxReader_ != 0) ? "readers_" : "writers_";
271         container->Dump(header, transCount_ + isInTransaction_);
272         return nullptr;
273     }
274     return Convert2AutoConn(node);
275 }
276 
AcquireRef(bool isReadOnly,std::chrono::milliseconds ms)277 SharedConn ConnPool::AcquireRef(bool isReadOnly, std::chrono::milliseconds ms)
278 {
279     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
280     if (maxReader_ != 0) {
281         return Acquire(isReadOnly, ms);
282     }
283     auto node = writers_.Acquire(ms);
284     if (node == nullptr) {
285         writers_.Dump("writers_", transCount_ + isInTransaction_);
286         return nullptr;
287     }
288     auto conn = node->connect_;
289     writers_.Release(node);
290     return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), conn](Connection *) {
291         auto realPool = pool.lock();
292         if (realPool == nullptr) {
293             return;
294         }
295         realPool->writers_.cond_.notify_all();
296     });
297 }
298 
ReleaseNode(std::shared_ptr<ConnNode> node,bool reuse)299 void ConnPool::ReleaseNode(std::shared_ptr<ConnNode> node,  bool reuse)
300 {
301     if (node == nullptr) {
302         return;
303     }
304     auto now = steady_clock::now();
305     auto timeout = now > (failedTime_.load() + minutes(CHECK_POINT_INTERVAL)) || now < failedTime_.load() ||
306                    failedTime_.load() == steady_clock::time_point();
307     auto transCount = transCount_ + isInTransaction_;
308     auto remainCount = reuse ? transCount : transCount - 1;
309     auto errCode = node->Unused(remainCount, timeout);
310     if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
311         writers_.Dump("WAL writers_", transCount);
312         readers_.Dump("WAL readers_", transCount);
313     }
314 
315     if (node->IsWriter() && (errCode != E_INNER_WARNING && errCode != E_NOT_SUPPORT)) {
316         failedTime_ = errCode != E_OK ? now : steady_clock::time_point();
317     }
318 
319     auto &container = node->IsWriter() ? writers_ : readers_;
320     if (reuse) {
321         container.Release(node);
322     } else {
323         container.Drop(node);
324     }
325 }
326 
AcquireTransaction()327 int ConnPool::AcquireTransaction()
328 {
329     std::unique_lock<std::mutex> lock(transMutex_);
330     if (transCondition_.wait_for(lock, std::chrono::seconds(TRANSACTION_TIMEOUT), [this] {
331             return !transactionUsed_;
332         })) {
333         transactionUsed_ = true;
334         return E_OK;
335     }
336     LOG_WARN("transactionUsed_ is %{public}d", transactionUsed_);
337     return E_DATABASE_BUSY;
338 }
339 
ReleaseTransaction()340 void ConnPool::ReleaseTransaction()
341 {
342     {
343         std::unique_lock<std::mutex> lock(transMutex_);
344         transactionUsed_ = false;
345     }
346     transCondition_.notify_one();
347 }
348 
RestartReaders()349 int ConnPool::RestartReaders()
350 {
351     readers_.Clear();
352     auto [errCode, node] = readers_.Initialize(
353         [this]() {
354             return Connection::Create(config_, false);
355         },
356         maxReader_, config_.GetReadTime(), maxReader_ == 0);
357     return errCode;
358 }
359 
360 /**
361  * The database locale.
362  */
ConfigLocale(const std::string & localeStr)363 int ConnPool::ConfigLocale(const std::string &localeStr)
364 {
365     auto errCode = readers_.ConfigLocale(localeStr);
366     if (errCode != E_OK) {
367         return errCode;
368     }
369     return writers_.ConfigLocale(localeStr);
370 }
371 
372 /**
373  * Rename the backed up database.
374  */
ChangeDbFileForRestore(const std::string & newPath,const std::string & backupPath,const std::vector<uint8_t> & newKey,SlaveStatus & slaveStatus)375 int ConnPool::ChangeDbFileForRestore(const std::string &newPath, const std::string &backupPath,
376     const std::vector<uint8_t> &newKey, SlaveStatus &slaveStatus)
377 {
378     if (!writers_.IsFull() || config_.GetPath() == backupPath || newPath == backupPath) {
379         LOG_ERROR("Connection pool is busy now!");
380         return E_ERROR;
381     }
382     if (config_.GetDBType() == DB_VECTOR) {
383         CloseAllConnections();
384         auto [retVal, connection] = CreateTransConn();
385 
386         if (connection == nullptr) {
387             LOG_ERROR("Get null connection.");
388             return retVal;
389         }
390         retVal = connection->Restore(backupPath, {}, slaveStatus);
391         if (retVal != E_OK) {
392             LOG_ERROR("RdDbRestore error.");
393             return retVal;
394         }
395         CloseAllConnections();
396         auto [errCode, node] = Init();
397         return errCode;
398     }
399     return RestoreByDbSqliteType(newPath, backupPath, slaveStatus);
400 }
401 
RestoreByDbSqliteType(const std::string & newPath,const std::string & backupPath,SlaveStatus & slaveStatus)402 int ConnPool::RestoreByDbSqliteType(const std::string &newPath, const std::string &backupPath, SlaveStatus &slaveStatus)
403 {
404     if (SqliteUtils::IsSlaveDbName(backupPath) && config_.GetHaMode() != HAMode::SINGLE) {
405         auto connection = AcquireConnection(false);
406         if (connection == nullptr) {
407             return E_DATABASE_BUSY;
408         }
409         return connection->Restore(backupPath, {}, slaveStatus);
410     }
411 
412     return RestoreMasterDb(newPath, backupPath);
413 }
414 
RestoreMasterDb(const std::string & newPath,const std::string & backupPath)415 int ConnPool::RestoreMasterDb(const std::string &newPath, const std::string &backupPath)
416 {
417     if (!CheckIntegrity(backupPath)) {
418         LOG_ERROR("backup file is corrupted, %{public}s", SqliteUtils::Anonymous(backupPath).c_str());
419         return E_SQLITE_CORRUPT;
420     }
421     SqliteUtils::DeleteFile(backupPath + "-shm");
422     SqliteUtils::DeleteFile(backupPath + "-wal");
423 
424     CloseAllConnections();
425     Connection::Delete(config_);
426 
427     if (config_.GetPath() != newPath) {
428         RdbStoreConfig config(newPath);
429         config.SetPath(newPath);
430         Connection::Delete(config);
431     }
432 
433     bool copyRet = SqliteUtils::CopyFile(backupPath, newPath);
434     int32_t errCode = E_OK;
435     std::shared_ptr<Connection> pool;
436     for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
437         std::tie(errCode, pool) = Init();
438         if (errCode == E_OK) {
439             break;
440         }
441         if (errCode != E_SQLITE_CORRUPT || !config_.IsEncrypt()) {
442             break;
443         }
444         config_.SetIter(ITER_V1);
445     }
446     if (errCode != E_OK) {
447         CloseAllConnections();
448         Connection::Delete(config_);
449         std::tie(errCode, pool) = Init();
450         LOG_WARN("restore failed! rebuild res:%{public}d, path:%{public}s.", errCode,
451             SqliteUtils::Anonymous(backupPath).c_str());
452     }
453     return copyRet ? errCode : E_ERROR;
454 }
455 
GetTransactionStack()456 std::stack<BaseTransaction> &ConnPool::GetTransactionStack()
457 {
458     return transactionStack_;
459 }
460 
GetTransactionStackMutex()461 std::mutex &ConnPool::GetTransactionStackMutex()
462 {
463     return transactionStackMutex_;
464 }
465 
DisableWal()466 std::pair<int, std::shared_ptr<Conn>> ConnPool::DisableWal()
467 {
468     return Init(true, true);
469 }
470 
EnableWal()471 int ConnPool::EnableWal()
472 {
473     auto [errCode, node] = Init();
474     return errCode;
475 }
476 
Dump(bool isWriter,const char * header)477 int32_t ConnectionPool::Dump(bool isWriter, const char *header)
478 {
479     Container *container = (isWriter || maxReader_ == 0) ? &writers_ : &readers_;
480     container->Dump(header, transCount_ + isInTransaction_);
481     return E_OK;
482 }
483 
ConnNode(std::shared_ptr<Conn> conn)484 ConnPool::ConnNode::ConnNode(std::shared_ptr<Conn> conn) : connect_(std::move(conn))
485 {
486 }
487 
GetConnect()488 std::shared_ptr<Conn> ConnPool::ConnNode::GetConnect()
489 {
490     tid_ = gettid();
491     time_ = steady_clock::now();
492     return connect_;
493 }
494 
GetUsingTime() const495 int64_t ConnPool::ConnNode::GetUsingTime() const
496 {
497     auto time = steady_clock::now() - time_;
498     return duration_cast<milliseconds>(time).count();
499 }
500 
Unused(int32_t count,bool timeout)501 int32_t ConnPool::ConnNode::Unused(int32_t count, bool timeout)
502 {
503     time_ = steady_clock::now();
504     if (connect_ == nullptr) {
505         return E_OK;
506     }
507 
508     connect_->ClearCache();
509     int32_t errCode = E_INNER_WARNING;
510     if (count <= 0) {
511         errCode = connect_->TryCheckPoint(timeout);
512     }
513 
514     time_ = steady_clock::now();
515     if (!connect_->IsWriter()) {
516         tid_ = 0;
517     }
518     return errCode;
519 }
520 
IsWriter() const521 bool ConnPool::ConnNode::IsWriter() const
522 {
523     if (connect_ != nullptr) {
524         return connect_->IsWriter();
525     }
526     return false;
527 }
528 
Initialize(Creator creator,int32_t max,int32_t timeout,bool disable,bool acquire)529 std::pair<int32_t, std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::Initialize(Creator creator, int32_t max,
530     int32_t timeout, bool disable, bool acquire)
531 {
532     std::shared_ptr<ConnNode> connNode = nullptr;
533     {
534         std::unique_lock<decltype(mutex_)> lock(mutex_);
535         disable_ = disable;
536         max_ = max;
537         creator_ = creator;
538         timeout_ = std::chrono::seconds(timeout);
539         for (int i = 0; i < max; ++i) {
540             auto errCode = ExtendNode();
541             if (errCode != E_OK) {
542                 nodes_.clear();
543                 details_.clear();
544                 return { errCode, nullptr };
545             }
546         }
547 
548         if (acquire && count_ > 0) {
549             connNode = nodes_.back();
550             nodes_.pop_back();
551             count_--;
552         }
553     }
554     cond_.notify_all();
555     return { E_OK, connNode };
556 }
557 
ConfigLocale(const std::string & locale)558 int32_t ConnPool::Container::ConfigLocale(const std::string &locale)
559 {
560     std::unique_lock<decltype(mutex_)> lock(mutex_);
561     if (total_ != count_) {
562         return E_DATABASE_BUSY;
563     }
564     for (auto it = details_.begin(); it != details_.end();) {
565         auto conn = it->lock();
566         if (conn == nullptr || conn->connect_ == nullptr) {
567             it = details_.erase(it);
568             continue;
569         }
570         conn->connect_->ConfigLocale(locale);
571     }
572     return E_OK;
573 }
574 
Acquire(std::chrono::milliseconds milliS)575 std::shared_ptr<ConnPool::ConnNode> ConnPool::Container::Acquire(std::chrono::milliseconds milliS)
576 {
577     auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
578     std::unique_lock<decltype(mutex_)> lock(mutex_);
579     if (max_ == 0) {
580         return nullptr;
581     }
582     auto waiter = [this]() -> bool {
583         if (count_ > 0) {
584             return true;
585         }
586 
587         if (disable_) {
588             return false;
589         }
590         return ExtendNode() == E_OK;
591     };
592     if (cond_.wait_for(lock, interval, waiter)) {
593         if (nodes_.empty()) {
594             LOG_ERROR(
595                 "nodes is empty.count %{public}d max %{public}d total %{public}d left %{public}d right%{public}d",
596                 count_, max_, total_, left_, right_);
597             count_ = 0;
598             return nullptr;
599         }
600         auto node = nodes_.back();
601         nodes_.pop_back();
602         count_--;
603         return node;
604     }
605     return nullptr;
606 }
607 
Create()608 std::pair<int32_t, std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::Create()
609 {
610     if (creator_ == nullptr) {
611         return { E_NOT_SUPPORT, nullptr };
612     }
613 
614     auto [errCode, conn] = creator_();
615     if (conn == nullptr) {
616         return { errCode, nullptr };
617     }
618 
619     auto node = std::make_shared<ConnNode>(conn);
620     if (node == nullptr) {
621         return { E_ERROR, nullptr };
622     }
623     node->id_ = MIN_TRANS_ID + trans_;
624     conn->SetId(node->id_);
625     details_.push_back(node);
626     trans_++;
627     return { E_OK, node };
628 }
629 
ExtendNode()630 int32_t ConnPool::Container::ExtendNode()
631 {
632     if (creator_ == nullptr) {
633         return E_ERROR;
634     }
635     auto [errCode, conn] = creator_();
636     if (conn == nullptr) {
637         return errCode;
638     }
639     auto node = std::make_shared<ConnNode>(conn);
640     node->id_ = right_++;
641     conn->SetId(node->id_);
642     nodes_.push_back(node);
643     details_.push_back(node);
644     count_++;
645     total_++;
646     return E_OK;
647 }
648 
AcquireAll(std::chrono::milliseconds milliS)649 std::list<std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::AcquireAll(std::chrono::milliseconds milliS)
650 {
651     std::list<std::shared_ptr<ConnNode>> nodes;
652     int32_t count = 0;
653     auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
654     auto time = std::chrono::steady_clock::now() + interval;
655     std::unique_lock<decltype(mutex_)> lock(mutex_);
656     while (count < total_ && cond_.wait_until(lock, time, [this]() {
657         return count_ > 0;
658     })) {
659         nodes.merge(std::move(nodes_));
660         nodes_.clear();
661         count += count_;
662         count_ = 0;
663     }
664 
665     if (count != total_) {
666         count_ = count;
667         nodes_ = std::move(nodes);
668         nodes.clear();
669         return nodes;
670     }
671     auto func = [](const std::list<std::shared_ptr<ConnNode>> &nodes) -> bool {
672         for (auto &node : nodes) {
673             if (node->connect_ == nullptr) {
674                 continue;
675             }
676             if (node->connect_.use_count() != 1) {
677                 return false;
678             }
679         }
680         return true;
681     };
682     bool failed = false;
683     while (failed = !func(nodes), failed && cond_.wait_until(lock, time) != std::cv_status::timeout) {
684     }
685     if (failed) {
686         count_ = count;
687         nodes_ = std::move(nodes);
688         nodes.clear();
689     }
690     return nodes;
691 }
692 
Disable()693 void ConnPool::Container::Disable()
694 {
695     disable_ = true;
696     cond_.notify_one();
697 }
698 
Enable()699 void ConnPool::Container::Enable()
700 {
701     disable_ = false;
702     cond_.notify_one();
703 }
704 
Release(std::shared_ptr<ConnNode> node)705 int32_t ConnPool::Container::Release(std::shared_ptr<ConnNode> node)
706 {
707     {
708         std::unique_lock<decltype(mutex_)> lock(mutex_);
709         if (node->id_ < left_ || node->id_ >= right_) {
710             return E_OK;
711         }
712         if (count_ == max_) {
713             total_ = total_ > count_ ? total_ - 1 : count_;
714             RelDetails(node);
715         } else {
716             nodes_.push_front(node);
717             count_++;
718         }
719     }
720     cond_.notify_one();
721     return E_OK;
722 }
723 
Drop(std::shared_ptr<ConnNode> node)724 int32_t ConnectionPool::Container::Drop(std::shared_ptr<ConnNode> node)
725 {
726     {
727         std::unique_lock<decltype(mutex_)> lock(mutex_);
728         RelDetails(node);
729     }
730     cond_.notify_one();
731     return E_OK;
732 }
733 
RelDetails(std::shared_ptr<ConnNode> node)734 int32_t ConnectionPool::Container::RelDetails(std::shared_ptr<ConnNode> node)
735 {
736     for (auto it = details_.begin(); it != details_.end();) {
737         auto detailNode = it->lock();
738         if (detailNode == nullptr || detailNode->id_ == node->id_) {
739             it = details_.erase(it);
740         } else {
741             it++;
742         }
743     }
744     return E_OK;
745 }
746 
CheckIntegrity(const std::string & dbPath)747 bool ConnectionPool::CheckIntegrity(const std::string &dbPath)
748 {
749     RdbStoreConfig config(config_);
750     config.SetPath(dbPath);
751     config.SetIntegrityCheck(IntegrityCheck::FULL);
752     config.SetHaMode(HAMode::SINGLE);
753     for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
754         auto [ret, connection] = Connection::Create(config, true);
755         if (ret == E_OK) {
756             return true;
757         }
758         if (ret != E_SQLITE_CORRUPT || !config.IsEncrypt()) {
759             break;
760         }
761         config.SetIter(ITER_V1);
762     }
763     return false;
764 }
765 
Clear()766 int32_t ConnPool::Container::Clear()
767 {
768     std::list<std::shared_ptr<ConnNode>> nodes;
769     std::list<std::weak_ptr<ConnNode>> details;
770     {
771         std::unique_lock<decltype(mutex_)> lock(mutex_);
772         nodes = std::move(nodes_);
773         details = std::move(details_);
774         disable_ = true;
775         total_ = 0;
776         count_ = 0;
777         if (right_ > MAX_RIGHT) {
778             right_ = 0;
779         }
780         left_ = right_;
781         creator_ = nullptr;
782     }
783     nodes.clear();
784     details.clear();
785     return 0;
786 }
787 
IsFull()788 bool ConnPool::Container::IsFull()
789 {
790     std::unique_lock<decltype(mutex_)> lock(mutex_);
791     return total_ == count_;
792 }
793 
Dump(const char * header,int32_t count)794 int32_t ConnPool::Container::Dump(const char *header, int32_t count)
795 {
796     std::string info;
797     std::vector<std::shared_ptr<ConnNode>> details;
798     std::string title = "B_M_T_C[" + std::to_string(count) + "," + std::to_string(max_) + "," +
799                         std::to_string(total_) + "," + std::to_string(count_) + "]";
800     {
801         std::unique_lock<decltype(mutex_)> lock(mutex_);
802         details.reserve(details_.size());
803         for (auto &detail : details_) {
804             auto node = detail.lock();
805             if (node == nullptr) {
806                 continue;
807             }
808             details.push_back(node);
809         }
810     }
811 
812     for (auto &node : details) {
813         info.append("<")
814             .append(std::to_string(node->id_))
815             .append(",")
816             .append(std::to_string(node->tid_))
817             .append(",")
818             .append(std::to_string(node->GetUsingTime()))
819             .append(">");
820         // 256 represent that limit to info length
821         if (info.size() > 256) {
822             LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
823             info.clear();
824         }
825     }
826     LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
827     return 0;
828 }
829 } // namespace NativeRdb
830 } // namespace OHOS
831