1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "ConnectionPool"
16 #include "connection_pool.h"
17
18 #include <base_transaction.h>
19 #include <condition_variable>
20 #include <iterator>
21 #include <mutex>
22 #include <sstream>
23 #include <vector>
24
25 #include "logger.h"
26 #include "connection.h"
27 #include "rdb_common.h"
28 #include "rdb_errno.h"
29 #include "rdb_fault_hiview_reporter.h"
30 #include "rdb_sql_statistic.h"
31 #include "sqlite_global_config.h"
32 #include "sqlite_utils.h"
33
34 namespace OHOS {
35 namespace NativeRdb {
36 using namespace OHOS::Rdb;
37 using namespace std::chrono;
38 using Conn = Connection;
39 using ConnPool = ConnectionPool;
40 using SharedConn = std::shared_ptr<Connection>;
41 using SharedConns = std::vector<std::shared_ptr<Connection>>;
42 using SqlStatistic = DistributedRdb::SqlStatistic;
43 using Reportor = RdbFaultHiViewReporter;
44 constexpr int32_t TRANSACTION_TIMEOUT(2);
45
Create(const RdbStoreConfig & config,int & errCode)46 std::shared_ptr<ConnPool> ConnPool::Create(const RdbStoreConfig &config, int &errCode)
47 {
48 std::shared_ptr<ConnPool> pool(new (std::nothrow) ConnPool(config));
49 if (pool == nullptr) {
50 LOG_ERROR("ConnPool::Create new failed, pool is nullptr.");
51 errCode = E_ERROR;
52 return nullptr;
53 }
54 std::shared_ptr<Connection> conn;
55 for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
56 std::tie(errCode, conn) = pool->Init();
57 if (errCode != E_SQLITE_CORRUPT) {
58 break;
59 }
60 config.SetIter(ITER_V1);
61 }
62 std::string dbPath;
63 (void)SqliteGlobalConfig::GetDbPath(config, dbPath);
64 LOG_INFO("code:%{public}d app:%{public}s path:[%{public}s] "
65 "cfg:[%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d]"
66 "%{public}s",
67 errCode, config.GetBundleName().c_str(), SqliteUtils::Anonymous(dbPath).c_str(), config.GetDBType(),
68 config.GetHaMode(), config.IsEncrypt(), config.GetArea(), config.GetSecurityLevel(), config.GetRoleType(),
69 config.IsReadOnly(),
70 Reportor::FormatBrief(Connection::Collect(config), SqliteUtils::Anonymous(config.GetName())).c_str());
71 return errCode == E_OK ? pool : nullptr;
72 }
73
HandleDataCorruption(const RdbStoreConfig & storeConfig,int & errCode)74 std::pair<RebuiltType, std::shared_ptr<ConnectionPool>> ConnPool::HandleDataCorruption
75 (const RdbStoreConfig &storeConfig, int &errCode)
76 {
77 std::pair<RebuiltType, std::shared_ptr<ConnectionPool>> result;
78 auto &[rebuiltType, pool] = result;
79
80 int repairErrCode = Connection::Repair(storeConfig);
81 if (repairErrCode == E_OK) {
82 rebuiltType = RebuiltType::REPAIRED;
83 } else if (storeConfig.GetAllowRebuild()) {
84 Connection::Delete(storeConfig);
85 rebuiltType = RebuiltType::REBUILT;
86 } else if (storeConfig.IsEncrypt() && errCode == E_INVALID_SECRET_KEY) {
87 return result;
88 } else {
89 errCode = E_SQLITE_CORRUPT;
90 return result;
91 }
92 pool = Create(storeConfig, errCode);
93 if (errCode != E_OK) {
94 LOG_WARN("failed, type %{public}d db %{public}s encrypt %{public}d error %{public}d, errno",
95 static_cast<uint32_t>(rebuiltType), SqliteUtils::Anonymous(storeConfig.GetName()).c_str(),
96 storeConfig.IsEncrypt(), errCode, errno);
97 } else {
98 Reportor::ReportRestore(Reportor::Create(storeConfig, E_OK, "RestoreType:Rebuild"), false);
99 }
100
101 return result;
102 }
103
ConnectionPool(const RdbStoreConfig & storeConfig)104 ConnPool::ConnectionPool(const RdbStoreConfig &storeConfig)
105 : config_(storeConfig), attachConfig_(storeConfig), writers_(), readers_(), transactionStack_(),
106 transactionUsed_(false)
107 {
108 attachConfig_.SetJournalMode(JournalMode::MODE_TRUNCATE);
109 }
110
Init(bool isAttach,bool needWriter)111 std::pair<int32_t, std::shared_ptr<Connection>> ConnPool::Init(bool isAttach, bool needWriter)
112 {
113 const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
114 std::pair<int32_t, std::shared_ptr<Connection>> result;
115 auto &[errCode, conn] = result;
116 errCode = config.Initialize();
117 if (errCode != E_OK) {
118 return result;
119 }
120
121 if (config.GetRoleType() == OWNER && !config.IsReadOnly()) {
122 // write connect count is 1
123 std::shared_ptr<ConnPool::ConnNode> node;
124 std::tie(errCode, node) = writers_.Initialize(
125 [this, isAttach]() {
126 const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
127 return Connection::Create(config, true);
128 },
129 1, config.GetWriteTime(), true, needWriter);
130 conn = Convert2AutoConn(node);
131 if (errCode != E_OK) {
132 return result;
133 }
134 }
135
136 maxReader_ = GetMaxReaders(config);
137 // max read connect count is 64
138 if (maxReader_ > 64) {
139 return { E_ARGS_READ_CON_OVERLOAD, nullptr };
140 }
141 auto [ret, node] = readers_.Initialize(
142 [this, isAttach]() {
143 const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
144 return Connection::Create(config, false);
145 },
146 maxReader_, config.GetReadTime(), maxReader_ == 0);
147 errCode = ret;
148 return result;
149 }
150
~ConnectionPool()151 ConnPool::~ConnectionPool()
152 {
153 CloseAllConnections();
154 }
155
GetMaxReaders(const RdbStoreConfig & config)156 int32_t ConnPool::GetMaxReaders(const RdbStoreConfig &config)
157 {
158 if (config.GetStorageMode() != StorageMode::MODE_MEMORY &&
159 config.GetJournalMode() == RdbStoreConfig::GetJournalModeValue(JournalMode::MODE_WAL)) {
160 return config.GetReadConSize();
161 } else {
162 return 0;
163 }
164 }
165
Convert2AutoConn(std::shared_ptr<ConnNode> node,bool isTrans)166 std::shared_ptr<Connection> ConnPool::Convert2AutoConn(std::shared_ptr<ConnNode> node, bool isTrans)
167 {
168 if (node == nullptr) {
169 return nullptr;
170 }
171
172 auto conn = node->GetConnect();
173 if (conn == nullptr) {
174 return nullptr;
175 }
176 if (isTrans) {
177 transCount_++;
178 }
179
180 return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), node, isTrans](auto *) mutable {
181 auto realPool = pool.lock();
182 if (realPool == nullptr) {
183 return;
184 }
185 realPool->ReleaseNode(node, !isTrans);
186 if (isTrans) {
187 realPool->transCount_--;
188 }
189 node = nullptr;
190 });
191 }
192
CloseAllConnections()193 void ConnPool::CloseAllConnections()
194 {
195 writers_.Clear();
196 readers_.Clear();
197 }
198
IsInTransaction()199 bool ConnPool::IsInTransaction()
200 {
201 return isInTransaction_.load();
202 }
203
SetInTransaction(bool isInTransaction)204 void ConnPool::SetInTransaction(bool isInTransaction)
205 {
206 isInTransaction_.store(isInTransaction);
207 }
208
CreateTransConn(bool limited)209 std::pair<int32_t, std::shared_ptr<Connection>> ConnPool::CreateTransConn(bool limited)
210 {
211 if (transCount_ >= MAX_TRANS && limited) {
212 writers_.Dump("NO TRANS", transCount_ + isInTransaction_);
213 return { E_DATABASE_BUSY, nullptr };
214 }
215 auto [errCode, node] = writers_.Create();
216 return { errCode, Convert2AutoConn(node, true) };
217 }
218
AcquireConnection(bool isReadOnly)219 std::shared_ptr<Conn> ConnPool::AcquireConnection(bool isReadOnly)
220 {
221 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
222 return Acquire(isReadOnly);
223 }
224
AcquireAll(int32_t time)225 std::pair<SharedConn, SharedConns> ConnPool::AcquireAll(int32_t time)
226 {
227 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
228 using namespace std::chrono;
229 std::pair<SharedConn, SharedConns> result;
230 auto &[writer, readers] = result;
231 auto interval = duration_cast<milliseconds>(seconds(time));
232 auto start = steady_clock::now();
233 auto writerNodes = writers_.AcquireAll(interval);
234 if (writerNodes.empty()) {
235 return {};
236 }
237 writer = Convert2AutoConn(writerNodes.front());
238
239 auto usedTime = duration_cast<milliseconds>(steady_clock::now() - start);
240 if (writer == nullptr || usedTime >= interval) {
241 return {};
242 }
243
244 if (maxReader_ == 0) {
245 return result;
246 }
247
248 readers_.Disable();
249 auto nodes = readers_.AcquireAll(interval - usedTime);
250 if (nodes.empty()) {
251 readers_.Enable();
252 return {};
253 }
254
255 for (auto node : nodes) {
256 auto conn = Convert2AutoConn(node);
257 if (conn == nullptr) {
258 continue;
259 }
260 readers.push_back(conn);
261 }
262 return result;
263 }
264
Acquire(bool isReadOnly,std::chrono::milliseconds ms)265 std::shared_ptr<Conn> ConnPool::Acquire(bool isReadOnly, std::chrono::milliseconds ms)
266 {
267 Container *container = (isReadOnly && maxReader_ != 0) ? &readers_ : &writers_;
268 auto node = container->Acquire(ms);
269 if (node == nullptr) {
270 const char *header = (isReadOnly && maxReader_ != 0) ? "readers_" : "writers_";
271 container->Dump(header, transCount_ + isInTransaction_);
272 return nullptr;
273 }
274 return Convert2AutoConn(node);
275 }
276
AcquireRef(bool isReadOnly,std::chrono::milliseconds ms)277 SharedConn ConnPool::AcquireRef(bool isReadOnly, std::chrono::milliseconds ms)
278 {
279 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
280 if (maxReader_ != 0) {
281 return Acquire(isReadOnly, ms);
282 }
283 auto node = writers_.Acquire(ms);
284 if (node == nullptr) {
285 writers_.Dump("writers_", transCount_ + isInTransaction_);
286 return nullptr;
287 }
288 auto conn = node->connect_;
289 writers_.Release(node);
290 return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), conn](Connection *) {
291 auto realPool = pool.lock();
292 if (realPool == nullptr) {
293 return;
294 }
295 realPool->writers_.cond_.notify_all();
296 });
297 }
298
ReleaseNode(std::shared_ptr<ConnNode> node,bool reuse)299 void ConnPool::ReleaseNode(std::shared_ptr<ConnNode> node, bool reuse)
300 {
301 if (node == nullptr) {
302 return;
303 }
304 auto now = steady_clock::now();
305 auto timeout = now > (failedTime_.load() + minutes(CHECK_POINT_INTERVAL)) || now < failedTime_.load() ||
306 failedTime_.load() == steady_clock::time_point();
307 auto transCount = transCount_ + isInTransaction_;
308 auto remainCount = reuse ? transCount : transCount - 1;
309 auto errCode = node->Unused(remainCount, timeout);
310 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
311 writers_.Dump("WAL writers_", transCount);
312 readers_.Dump("WAL readers_", transCount);
313 }
314
315 if (node->IsWriter() && (errCode != E_INNER_WARNING && errCode != E_NOT_SUPPORT)) {
316 failedTime_ = errCode != E_OK ? now : steady_clock::time_point();
317 }
318
319 auto &container = node->IsWriter() ? writers_ : readers_;
320 if (reuse) {
321 container.Release(node);
322 } else {
323 container.Drop(node);
324 }
325 }
326
AcquireTransaction()327 int ConnPool::AcquireTransaction()
328 {
329 std::unique_lock<std::mutex> lock(transMutex_);
330 if (transCondition_.wait_for(lock, std::chrono::seconds(TRANSACTION_TIMEOUT), [this] {
331 return !transactionUsed_;
332 })) {
333 transactionUsed_ = true;
334 return E_OK;
335 }
336 LOG_WARN("transactionUsed_ is %{public}d", transactionUsed_);
337 return E_DATABASE_BUSY;
338 }
339
ReleaseTransaction()340 void ConnPool::ReleaseTransaction()
341 {
342 {
343 std::unique_lock<std::mutex> lock(transMutex_);
344 transactionUsed_ = false;
345 }
346 transCondition_.notify_one();
347 }
348
RestartReaders()349 int ConnPool::RestartReaders()
350 {
351 readers_.Clear();
352 auto [errCode, node] = readers_.Initialize(
353 [this]() {
354 return Connection::Create(config_, false);
355 },
356 maxReader_, config_.GetReadTime(), maxReader_ == 0);
357 return errCode;
358 }
359
360 /**
361 * The database locale.
362 */
ConfigLocale(const std::string & localeStr)363 int ConnPool::ConfigLocale(const std::string &localeStr)
364 {
365 auto errCode = readers_.ConfigLocale(localeStr);
366 if (errCode != E_OK) {
367 return errCode;
368 }
369 return writers_.ConfigLocale(localeStr);
370 }
371
372 /**
373 * Rename the backed up database.
374 */
ChangeDbFileForRestore(const std::string & newPath,const std::string & backupPath,const std::vector<uint8_t> & newKey,SlaveStatus & slaveStatus)375 int ConnPool::ChangeDbFileForRestore(const std::string &newPath, const std::string &backupPath,
376 const std::vector<uint8_t> &newKey, SlaveStatus &slaveStatus)
377 {
378 if (!writers_.IsFull() || config_.GetPath() == backupPath || newPath == backupPath) {
379 LOG_ERROR("Connection pool is busy now!");
380 return E_ERROR;
381 }
382 if (config_.GetDBType() == DB_VECTOR) {
383 CloseAllConnections();
384 auto [retVal, connection] = CreateTransConn();
385
386 if (connection == nullptr) {
387 LOG_ERROR("Get null connection.");
388 return retVal;
389 }
390 retVal = connection->Restore(backupPath, {}, slaveStatus);
391 if (retVal != E_OK) {
392 LOG_ERROR("RdDbRestore error.");
393 return retVal;
394 }
395 CloseAllConnections();
396 auto [errCode, node] = Init();
397 return errCode;
398 }
399 return RestoreByDbSqliteType(newPath, backupPath, slaveStatus);
400 }
401
RestoreByDbSqliteType(const std::string & newPath,const std::string & backupPath,SlaveStatus & slaveStatus)402 int ConnPool::RestoreByDbSqliteType(const std::string &newPath, const std::string &backupPath, SlaveStatus &slaveStatus)
403 {
404 if (SqliteUtils::IsSlaveDbName(backupPath) && config_.GetHaMode() != HAMode::SINGLE) {
405 auto connection = AcquireConnection(false);
406 if (connection == nullptr) {
407 return E_DATABASE_BUSY;
408 }
409 return connection->Restore(backupPath, {}, slaveStatus);
410 }
411
412 return RestoreMasterDb(newPath, backupPath);
413 }
414
RestoreMasterDb(const std::string & newPath,const std::string & backupPath)415 int ConnPool::RestoreMasterDb(const std::string &newPath, const std::string &backupPath)
416 {
417 if (!CheckIntegrity(backupPath)) {
418 LOG_ERROR("backup file is corrupted, %{public}s", SqliteUtils::Anonymous(backupPath).c_str());
419 return E_SQLITE_CORRUPT;
420 }
421 SqliteUtils::DeleteFile(backupPath + "-shm");
422 SqliteUtils::DeleteFile(backupPath + "-wal");
423
424 CloseAllConnections();
425 Connection::Delete(config_);
426
427 if (config_.GetPath() != newPath) {
428 RdbStoreConfig config(newPath);
429 config.SetPath(newPath);
430 Connection::Delete(config);
431 }
432
433 bool copyRet = SqliteUtils::CopyFile(backupPath, newPath);
434 int32_t errCode = E_OK;
435 std::shared_ptr<Connection> pool;
436 for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
437 std::tie(errCode, pool) = Init();
438 if (errCode == E_OK) {
439 break;
440 }
441 if (errCode != E_SQLITE_CORRUPT || !config_.IsEncrypt()) {
442 break;
443 }
444 config_.SetIter(ITER_V1);
445 }
446 if (errCode != E_OK) {
447 CloseAllConnections();
448 Connection::Delete(config_);
449 std::tie(errCode, pool) = Init();
450 LOG_WARN("restore failed! rebuild res:%{public}d, path:%{public}s.", errCode,
451 SqliteUtils::Anonymous(backupPath).c_str());
452 }
453 return copyRet ? errCode : E_ERROR;
454 }
455
GetTransactionStack()456 std::stack<BaseTransaction> &ConnPool::GetTransactionStack()
457 {
458 return transactionStack_;
459 }
460
GetTransactionStackMutex()461 std::mutex &ConnPool::GetTransactionStackMutex()
462 {
463 return transactionStackMutex_;
464 }
465
DisableWal()466 std::pair<int, std::shared_ptr<Conn>> ConnPool::DisableWal()
467 {
468 return Init(true, true);
469 }
470
EnableWal()471 int ConnPool::EnableWal()
472 {
473 auto [errCode, node] = Init();
474 return errCode;
475 }
476
Dump(bool isWriter,const char * header)477 int32_t ConnectionPool::Dump(bool isWriter, const char *header)
478 {
479 Container *container = (isWriter || maxReader_ == 0) ? &writers_ : &readers_;
480 container->Dump(header, transCount_ + isInTransaction_);
481 return E_OK;
482 }
483
ConnNode(std::shared_ptr<Conn> conn)484 ConnPool::ConnNode::ConnNode(std::shared_ptr<Conn> conn) : connect_(std::move(conn))
485 {
486 }
487
GetConnect()488 std::shared_ptr<Conn> ConnPool::ConnNode::GetConnect()
489 {
490 tid_ = gettid();
491 time_ = steady_clock::now();
492 return connect_;
493 }
494
GetUsingTime() const495 int64_t ConnPool::ConnNode::GetUsingTime() const
496 {
497 auto time = steady_clock::now() - time_;
498 return duration_cast<milliseconds>(time).count();
499 }
500
Unused(int32_t count,bool timeout)501 int32_t ConnPool::ConnNode::Unused(int32_t count, bool timeout)
502 {
503 time_ = steady_clock::now();
504 if (connect_ == nullptr) {
505 return E_OK;
506 }
507
508 connect_->ClearCache();
509 int32_t errCode = E_INNER_WARNING;
510 if (count <= 0) {
511 errCode = connect_->TryCheckPoint(timeout);
512 }
513
514 time_ = steady_clock::now();
515 if (!connect_->IsWriter()) {
516 tid_ = 0;
517 }
518 return errCode;
519 }
520
IsWriter() const521 bool ConnPool::ConnNode::IsWriter() const
522 {
523 if (connect_ != nullptr) {
524 return connect_->IsWriter();
525 }
526 return false;
527 }
528
Initialize(Creator creator,int32_t max,int32_t timeout,bool disable,bool acquire)529 std::pair<int32_t, std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::Initialize(Creator creator, int32_t max,
530 int32_t timeout, bool disable, bool acquire)
531 {
532 std::shared_ptr<ConnNode> connNode = nullptr;
533 {
534 std::unique_lock<decltype(mutex_)> lock(mutex_);
535 disable_ = disable;
536 max_ = max;
537 creator_ = creator;
538 timeout_ = std::chrono::seconds(timeout);
539 for (int i = 0; i < max; ++i) {
540 auto errCode = ExtendNode();
541 if (errCode != E_OK) {
542 nodes_.clear();
543 details_.clear();
544 return { errCode, nullptr };
545 }
546 }
547
548 if (acquire && count_ > 0) {
549 connNode = nodes_.back();
550 nodes_.pop_back();
551 count_--;
552 }
553 }
554 cond_.notify_all();
555 return { E_OK, connNode };
556 }
557
ConfigLocale(const std::string & locale)558 int32_t ConnPool::Container::ConfigLocale(const std::string &locale)
559 {
560 std::unique_lock<decltype(mutex_)> lock(mutex_);
561 if (total_ != count_) {
562 return E_DATABASE_BUSY;
563 }
564 for (auto it = details_.begin(); it != details_.end();) {
565 auto conn = it->lock();
566 if (conn == nullptr || conn->connect_ == nullptr) {
567 it = details_.erase(it);
568 continue;
569 }
570 conn->connect_->ConfigLocale(locale);
571 }
572 return E_OK;
573 }
574
Acquire(std::chrono::milliseconds milliS)575 std::shared_ptr<ConnPool::ConnNode> ConnPool::Container::Acquire(std::chrono::milliseconds milliS)
576 {
577 auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
578 std::unique_lock<decltype(mutex_)> lock(mutex_);
579 if (max_ == 0) {
580 return nullptr;
581 }
582 auto waiter = [this]() -> bool {
583 if (count_ > 0) {
584 return true;
585 }
586
587 if (disable_) {
588 return false;
589 }
590 return ExtendNode() == E_OK;
591 };
592 if (cond_.wait_for(lock, interval, waiter)) {
593 if (nodes_.empty()) {
594 LOG_ERROR(
595 "nodes is empty.count %{public}d max %{public}d total %{public}d left %{public}d right%{public}d",
596 count_, max_, total_, left_, right_);
597 count_ = 0;
598 return nullptr;
599 }
600 auto node = nodes_.back();
601 nodes_.pop_back();
602 count_--;
603 return node;
604 }
605 return nullptr;
606 }
607
Create()608 std::pair<int32_t, std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::Create()
609 {
610 if (creator_ == nullptr) {
611 return { E_NOT_SUPPORT, nullptr };
612 }
613
614 auto [errCode, conn] = creator_();
615 if (conn == nullptr) {
616 return { errCode, nullptr };
617 }
618
619 auto node = std::make_shared<ConnNode>(conn);
620 if (node == nullptr) {
621 return { E_ERROR, nullptr };
622 }
623 node->id_ = MIN_TRANS_ID + trans_;
624 conn->SetId(node->id_);
625 details_.push_back(node);
626 trans_++;
627 return { E_OK, node };
628 }
629
ExtendNode()630 int32_t ConnPool::Container::ExtendNode()
631 {
632 if (creator_ == nullptr) {
633 return E_ERROR;
634 }
635 auto [errCode, conn] = creator_();
636 if (conn == nullptr) {
637 return errCode;
638 }
639 auto node = std::make_shared<ConnNode>(conn);
640 node->id_ = right_++;
641 conn->SetId(node->id_);
642 nodes_.push_back(node);
643 details_.push_back(node);
644 count_++;
645 total_++;
646 return E_OK;
647 }
648
AcquireAll(std::chrono::milliseconds milliS)649 std::list<std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::AcquireAll(std::chrono::milliseconds milliS)
650 {
651 std::list<std::shared_ptr<ConnNode>> nodes;
652 int32_t count = 0;
653 auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
654 auto time = std::chrono::steady_clock::now() + interval;
655 std::unique_lock<decltype(mutex_)> lock(mutex_);
656 while (count < total_ && cond_.wait_until(lock, time, [this]() {
657 return count_ > 0;
658 })) {
659 nodes.merge(std::move(nodes_));
660 nodes_.clear();
661 count += count_;
662 count_ = 0;
663 }
664
665 if (count != total_) {
666 count_ = count;
667 nodes_ = std::move(nodes);
668 nodes.clear();
669 return nodes;
670 }
671 auto func = [](const std::list<std::shared_ptr<ConnNode>> &nodes) -> bool {
672 for (auto &node : nodes) {
673 if (node->connect_ == nullptr) {
674 continue;
675 }
676 if (node->connect_.use_count() != 1) {
677 return false;
678 }
679 }
680 return true;
681 };
682 bool failed = false;
683 while (failed = !func(nodes), failed && cond_.wait_until(lock, time) != std::cv_status::timeout) {
684 }
685 if (failed) {
686 count_ = count;
687 nodes_ = std::move(nodes);
688 nodes.clear();
689 }
690 return nodes;
691 }
692
Disable()693 void ConnPool::Container::Disable()
694 {
695 disable_ = true;
696 cond_.notify_one();
697 }
698
Enable()699 void ConnPool::Container::Enable()
700 {
701 disable_ = false;
702 cond_.notify_one();
703 }
704
Release(std::shared_ptr<ConnNode> node)705 int32_t ConnPool::Container::Release(std::shared_ptr<ConnNode> node)
706 {
707 {
708 std::unique_lock<decltype(mutex_)> lock(mutex_);
709 if (node->id_ < left_ || node->id_ >= right_) {
710 return E_OK;
711 }
712 if (count_ == max_) {
713 total_ = total_ > count_ ? total_ - 1 : count_;
714 RelDetails(node);
715 } else {
716 nodes_.push_front(node);
717 count_++;
718 }
719 }
720 cond_.notify_one();
721 return E_OK;
722 }
723
Drop(std::shared_ptr<ConnNode> node)724 int32_t ConnectionPool::Container::Drop(std::shared_ptr<ConnNode> node)
725 {
726 {
727 std::unique_lock<decltype(mutex_)> lock(mutex_);
728 RelDetails(node);
729 }
730 cond_.notify_one();
731 return E_OK;
732 }
733
RelDetails(std::shared_ptr<ConnNode> node)734 int32_t ConnectionPool::Container::RelDetails(std::shared_ptr<ConnNode> node)
735 {
736 for (auto it = details_.begin(); it != details_.end();) {
737 auto detailNode = it->lock();
738 if (detailNode == nullptr || detailNode->id_ == node->id_) {
739 it = details_.erase(it);
740 } else {
741 it++;
742 }
743 }
744 return E_OK;
745 }
746
CheckIntegrity(const std::string & dbPath)747 bool ConnectionPool::CheckIntegrity(const std::string &dbPath)
748 {
749 RdbStoreConfig config(config_);
750 config.SetPath(dbPath);
751 config.SetIntegrityCheck(IntegrityCheck::FULL);
752 config.SetHaMode(HAMode::SINGLE);
753 for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
754 auto [ret, connection] = Connection::Create(config, true);
755 if (ret == E_OK) {
756 return true;
757 }
758 if (ret != E_SQLITE_CORRUPT || !config.IsEncrypt()) {
759 break;
760 }
761 config.SetIter(ITER_V1);
762 }
763 return false;
764 }
765
Clear()766 int32_t ConnPool::Container::Clear()
767 {
768 std::list<std::shared_ptr<ConnNode>> nodes;
769 std::list<std::weak_ptr<ConnNode>> details;
770 {
771 std::unique_lock<decltype(mutex_)> lock(mutex_);
772 nodes = std::move(nodes_);
773 details = std::move(details_);
774 disable_ = true;
775 total_ = 0;
776 count_ = 0;
777 if (right_ > MAX_RIGHT) {
778 right_ = 0;
779 }
780 left_ = right_;
781 creator_ = nullptr;
782 }
783 nodes.clear();
784 details.clear();
785 return 0;
786 }
787
IsFull()788 bool ConnPool::Container::IsFull()
789 {
790 std::unique_lock<decltype(mutex_)> lock(mutex_);
791 return total_ == count_;
792 }
793
Dump(const char * header,int32_t count)794 int32_t ConnPool::Container::Dump(const char *header, int32_t count)
795 {
796 std::string info;
797 std::vector<std::shared_ptr<ConnNode>> details;
798 std::string title = "B_M_T_C[" + std::to_string(count) + "," + std::to_string(max_) + "," +
799 std::to_string(total_) + "," + std::to_string(count_) + "]";
800 {
801 std::unique_lock<decltype(mutex_)> lock(mutex_);
802 details.reserve(details_.size());
803 for (auto &detail : details_) {
804 auto node = detail.lock();
805 if (node == nullptr) {
806 continue;
807 }
808 details.push_back(node);
809 }
810 }
811
812 for (auto &node : details) {
813 info.append("<")
814 .append(std::to_string(node->id_))
815 .append(",")
816 .append(std::to_string(node->tid_))
817 .append(",")
818 .append(std::to_string(node->GetUsingTime()))
819 .append(">");
820 // 256 represent that limit to info length
821 if (info.size() > 256) {
822 LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
823 info.clear();
824 }
825 }
826 LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
827 return 0;
828 }
829 } // namespace NativeRdb
830 } // namespace OHOS
831