1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "storage_engine.h"
17
18 #include <algorithm>
19
20 #include "db_common.h"
21 #include "db_errno.h"
22 #include "log_print.h"
23
24 namespace DistributedDB {
25 const int StorageEngine::MAX_WAIT_TIME = 30;
26 const int StorageEngine::MAX_WRITE_SIZE = 1;
27 const int StorageEngine::MAX_READ_SIZE = 16;
28
StorageEngine()29 StorageEngine::StorageEngine()
30 : isUpdated_(false),
31 isMigrating_(false),
32 commitNotifyFunc_(nullptr),
33 schemaChangedFunc_(nullptr),
34 isSchemaChanged_(false),
35 isInitialized_(false),
36 perm_(OperatePerm::NORMAL_PERM),
37 operateAbort_(false),
38 isExistConnection_(false),
39 engineState_(EngineState::INVALID)
40 {}
41
~StorageEngine()42 StorageEngine::~StorageEngine()
43 {
44 CloseExecutor();
45 }
46
CloseAllExecutor()47 void StorageEngine::CloseAllExecutor()
48 {
49 CloseExecutor();
50 }
51
InitAllReadWriteExecutor()52 int StorageEngine::InitAllReadWriteExecutor()
53 {
54 return InitReadWriteExecutors();
55 }
56
GetOption()57 OpenDbProperties StorageEngine::GetOption()
58 {
59 return option_;
60 }
61
InitReadWriteExecutors()62 int StorageEngine::InitReadWriteExecutors()
63 {
64 int errCode = E_OK;
65 std::scoped_lock initLock(writeMutex_, readMutex_);
66 // only for create the database avoid the minimum number is 0.
67 StorageExecutor *handle = nullptr;
68 if (engineAttr_.minReadNum == 0 && engineAttr_.minWriteNum == 0) {
69 errCode = CreateNewExecutor(true, handle);
70 if (errCode != E_OK) {
71 return errCode;
72 }
73
74 if (handle != nullptr) {
75 delete handle;
76 handle = nullptr;
77 }
78 }
79
80 for (uint32_t i = 0; i < engineAttr_.minWriteNum; i++) {
81 handle = nullptr;
82 errCode = CreateNewExecutor(true, handle);
83 if (errCode != E_OK) {
84 return errCode;
85 }
86 AddStorageExecutor(handle, false);
87 }
88
89 for (uint32_t i = 0; i < engineAttr_.minReadNum; i++) {
90 handle = nullptr;
91 errCode = CreateNewExecutor(false, handle);
92 if (errCode != E_OK) {
93 return errCode;
94 }
95 AddStorageExecutor(handle, false);
96 }
97 return E_OK;
98 }
99
100
Init()101 int StorageEngine::Init()
102 {
103 if (isInitialized_.load()) {
104 LOGD("Storage engine has been initialized!");
105 return E_OK;
106 }
107
108 int errCode = InitReadWriteExecutors();
109 if (errCode == E_OK) {
110 isInitialized_.store(true);
111 initCondition_.notify_all();
112 return E_OK;
113 } else if (errCode == -E_EKEYREVOKED) {
114 // Assumed file system has classification function, can only get one write handle
115 std::unique_lock<std::mutex> lock(writeMutex_);
116 if (!writeIdleList_.empty() || !writeUsingList_.empty()) {
117 isInitialized_.store(true);
118 initCondition_.notify_all();
119 return E_OK;
120 }
121 }
122 initCondition_.notify_all();
123 Release();
124 return errCode;
125 }
126
ReInit()127 int StorageEngine::ReInit()
128 {
129 return E_OK;
130 }
131
FindExecutor(bool writable,OperatePerm perm,int & errCode,bool isExternal,int waitTime)132 StorageExecutor *StorageEngine::FindExecutor(bool writable, OperatePerm perm, int &errCode, bool isExternal,
133 int waitTime)
134 {
135 if (GetEngineState() == EngineState::ENGINE_BUSY) {
136 LOGI("Storage engine is busy!");
137 errCode = -E_BUSY;
138 return nullptr;
139 }
140
141 {
142 std::unique_lock<std::mutex> lock(initMutex_);
143 bool result = initCondition_.wait_for(lock, std::chrono::seconds(waitTime), [this]() {
144 return isInitialized_.load();
145 });
146 if (!result || !isInitialized_.load()) {
147 LOGE("Storage engine is not initialized");
148 errCode = -E_BUSY; // Usually in reinitialize engine, return BUSY
149 return nullptr;
150 }
151 }
152
153 if (writable) {
154 return FindWriteExecutor(perm, errCode, waitTime, isExternal);
155 }
156
157 return FindReadExecutor(perm, errCode, waitTime, isExternal);
158 }
159
FindWriteExecutor(OperatePerm perm,int & errCode,int waitTime,bool isExternal)160 StorageExecutor *StorageEngine::FindWriteExecutor(OperatePerm perm, int &errCode, int waitTime, bool isExternal)
161 {
162 LOGD("[FindWriteExecutor]Finding WriteExecutor");
163 std::unique_lock<std::mutex> lock(writeMutex_);
164 errCode = -E_BUSY;
165 if (perm_ == OperatePerm::DISABLE_PERM || perm_ != perm) {
166 LOGI("Not permitted to get the executor[%u]", static_cast<unsigned>(perm_));
167 return nullptr;
168 }
169 std::list<StorageExecutor *> &writeUsingList = isExternal ? externalWriteUsingList_ : writeUsingList_;
170 std::list<StorageExecutor *> &writeIdleList = isExternal ? externalWriteIdleList_ : writeIdleList_;
171 if (waitTime <= 0) { // non-blocking.
172 if (writeUsingList.empty() &&
173 writeIdleList.size() + writeUsingList.size() == engineAttr_.maxWriteNum) {
174 return nullptr;
175 }
176 return FetchStorageExecutor(true, writeIdleList, writeUsingList, errCode, isExternal);
177 }
178 // Not prohibited and there is an available handle
179 bool result = writeCondition_.wait_for(lock, std::chrono::seconds(waitTime),
180 [this, &perm, &writeUsingList, &writeIdleList]() {
181 return (perm_ == OperatePerm::NORMAL_PERM || perm_ == perm) && (!writeIdleList.empty() ||
182 (writeIdleList.size() + writeUsingList.size() < engineAttr_.maxWriteNum) ||
183 operateAbort_);
184 });
185 if (operateAbort_) {
186 LOGI("Abort write executor and executor and busy for operate!");
187 return nullptr;
188 }
189 if (!result) {
190 LOGI("Get write handle result[%d], permissType[%u], operType[%u], write[%zu-%zu-%" PRIu32 "]", result,
191 static_cast<unsigned>(perm_), static_cast<unsigned>(perm), writeIdleList.size(), writeUsingList.size(),
192 engineAttr_.maxWriteNum);
193 return nullptr;
194 }
195 return FetchStorageExecutor(true, writeIdleList, writeUsingList, errCode, isExternal);
196 }
197
FindReadExecutor(OperatePerm perm,int & errCode,int waitTime,bool isExternal)198 StorageExecutor *StorageEngine::FindReadExecutor(OperatePerm perm, int &errCode, int waitTime, bool isExternal)
199 {
200 std::unique_lock<std::mutex> lock(readMutex_);
201 errCode = -E_BUSY;
202 if (perm_ == OperatePerm::DISABLE_PERM || perm_ != perm) {
203 LOGI("Not permitted to get the executor[%u]", static_cast<unsigned>(perm_));
204 return nullptr;
205 }
206
207 std::list<StorageExecutor *> &readUsingList = isExternal ? externalReadUsingList_ : readUsingList_;
208 std::list<StorageExecutor *> &readIdleList = isExternal ? externalReadIdleList_ : readIdleList_;
209 if (waitTime <= 0) { // non-blocking.
210 if (readIdleList.empty() &&
211 readIdleList.size() + readUsingList.size() == engineAttr_.maxReadNum) {
212 return nullptr;
213 }
214 return FetchStorageExecutor(false, readIdleList, readUsingList, errCode, isExternal);
215 }
216
217 // Not prohibited and there is an available handle
218 uint32_t maxReadHandleNum = isExternal ? 1 : engineAttr_.maxReadNum;
219 bool result = readCondition_.wait_for(lock, std::chrono::seconds(waitTime),
220 [this, &perm, &readUsingList, &readIdleList, &maxReadHandleNum]() {
221 return (perm_ == OperatePerm::NORMAL_PERM || perm_ == perm) &&
222 (!readIdleList.empty() || (readIdleList.size() + readUsingList.size() < maxReadHandleNum) ||
223 operateAbort_);
224 });
225 if (operateAbort_) {
226 LOGI("Abort find read executor and busy for operate!");
227 return nullptr;
228 }
229 if (!result) {
230 LOGI("Get read handle result[%d], permissType[%u], operType[%u], read[%zu-%zu-%" PRIu32 "]", result,
231 static_cast<unsigned>(perm_), static_cast<unsigned>(perm), readIdleList.size(), readUsingList.size(),
232 engineAttr_.maxReadNum);
233 return nullptr;
234 }
235 return FetchStorageExecutor(false, readIdleList, readUsingList, errCode, isExternal);
236 }
237
Recycle(StorageExecutor * & handle,bool isExternal)238 void StorageEngine::Recycle(StorageExecutor *&handle, bool isExternal)
239 {
240 if (handle == nullptr) {
241 return;
242 }
243 LOGD("Recycle executor[%d] for id[%.6s]", handle->GetWritable(), hashIdentifier_.c_str());
244 std::list<StorageExecutor *> &writeUsingList = isExternal ? externalWriteUsingList_ : writeUsingList_;
245 std::list<StorageExecutor *> &writeIdleList = isExternal ? externalWriteIdleList_ : writeIdleList_;
246 std::list<StorageExecutor *> &readUsingList = isExternal ? externalReadUsingList_ : readUsingList_;
247 std::list<StorageExecutor *> &readIdleList = isExternal ? externalReadIdleList_ : readIdleList_;
248 if (handle->GetWritable()) {
249 std::unique_lock<std::mutex> lock(writeMutex_);
250 auto iter = std::find(writeUsingList.begin(), writeUsingList.end(), handle);
251 if (iter != writeUsingList.end()) {
252 writeUsingList.remove(handle);
253 if (!writeIdleList.empty()) {
254 delete handle;
255 handle = nullptr;
256 return;
257 }
258 handle->Reset();
259 writeIdleList.push_back(handle);
260 writeCondition_.notify_one();
261 idleCondition_.notify_all();
262 }
263 } else {
264 std::unique_lock<std::mutex> lock(readMutex_);
265 auto iter = std::find(readUsingList.begin(), readUsingList.end(), handle);
266 if (iter != readUsingList.end()) {
267 readUsingList.remove(handle);
268 if (!readIdleList.empty()) {
269 delete handle;
270 handle = nullptr;
271 return;
272 }
273 handle->Reset();
274 readIdleList.push_back(handle);
275 readCondition_.notify_one();
276 }
277 }
278 handle = nullptr;
279 }
280
ClearCorruptedFlag()281 void StorageEngine::ClearCorruptedFlag()
282 {
283 return;
284 }
285
IsEngineCorrupted() const286 bool StorageEngine::IsEngineCorrupted() const
287 {
288 return false;
289 }
290
Release()291 void StorageEngine::Release()
292 {
293 CloseExecutor();
294 isInitialized_.store(false);
295 isUpdated_ = false;
296 ClearCorruptedFlag();
297 SetEngineState(EngineState::INVALID);
298 }
299
TryToDisable(bool isNeedCheckAll,OperatePerm disableType)300 int StorageEngine::TryToDisable(bool isNeedCheckAll, OperatePerm disableType)
301 {
302 if (engineState_ != EngineState::MAINDB && engineState_ != EngineState::INVALID) {
303 LOGE("Not support disable handle when cacheDB existed! state = [%d]", engineState_);
304 return(engineState_ == EngineState::CACHEDB) ? -E_NOT_SUPPORT : -E_BUSY;
305 }
306
307 std::lock(writeMutex_, readMutex_);
308 std::lock_guard<std::mutex> writeLock(writeMutex_, std::adopt_lock);
309 std::lock_guard<std::mutex> readLock(readMutex_, std::adopt_lock);
310
311 if (!isNeedCheckAll) {
312 goto END;
313 }
314
315 if (!writeUsingList_.empty() || !readUsingList_.empty() || !externalWriteUsingList_.empty() ||
316 !externalReadUsingList_.empty()) {
317 LOGE("Database handle used");
318 return -E_BUSY;
319 }
320 END:
321 if (perm_ == OperatePerm::NORMAL_PERM) {
322 LOGI("database is disable for re-build:%d", static_cast<int>(disableType));
323 perm_ = disableType;
324 writeCondition_.notify_all();
325 readCondition_.notify_all();
326 }
327 return E_OK;
328 }
329
Enable(OperatePerm enableType)330 void StorageEngine::Enable(OperatePerm enableType)
331 {
332 std::lock(writeMutex_, readMutex_);
333 std::lock_guard<std::mutex> writeLock(writeMutex_, std::adopt_lock);
334 std::lock_guard<std::mutex> readLock(readMutex_, std::adopt_lock);
335 if (perm_ == enableType) {
336 LOGI("Re-enable the database");
337 perm_ = OperatePerm::NORMAL_PERM;
338 writeCondition_.notify_all();
339 readCondition_.notify_all();
340 }
341 }
342
Abort(OperatePerm enableType)343 void StorageEngine::Abort(OperatePerm enableType)
344 {
345 std::lock(writeMutex_, readMutex_);
346 std::lock_guard<std::mutex> writeLock(writeMutex_, std::adopt_lock);
347 std::lock_guard<std::mutex> readLock(readMutex_, std::adopt_lock);
348 if (perm_ == enableType) {
349 LOGI("Abort the handle occupy, release all!");
350 perm_ = OperatePerm::NORMAL_PERM;
351 operateAbort_ = true;
352
353 writeCondition_.notify_all();
354 readCondition_.notify_all();
355 }
356 }
357
IsNeedTobeReleased() const358 bool StorageEngine::IsNeedTobeReleased() const
359 {
360 EngineState engineState = GetEngineState();
361 return ((engineState == EngineState::MAINDB) || (engineState == EngineState::INVALID));
362 }
363
GetIdentifier() const364 const std::string &StorageEngine::GetIdentifier() const
365 {
366 return identifier_;
367 }
368
GetEngineState() const369 EngineState StorageEngine::GetEngineState() const
370 {
371 return engineState_;
372 }
373
SetEngineState(EngineState state)374 void StorageEngine::SetEngineState(EngineState state)
375 {
376 LOGI("Storage engine state to [%d]!", state);
377 engineState_ = state;
378 }
379
ExecuteMigrate()380 int StorageEngine::ExecuteMigrate()
381 {
382 LOGW("Migration is not supported!");
383 return -E_NOT_SUPPORT;
384 }
385
SetNotifiedCallback(const std::function<void (int,KvDBCommitNotifyFilterAbleData *)> & callback)386 void StorageEngine::SetNotifiedCallback(const std::function<void(int, KvDBCommitNotifyFilterAbleData *)> &callback)
387 {
388 std::unique_lock<std::shared_mutex> lock(notifyMutex_);
389 commitNotifyFunc_ = callback;
390 }
391
SetConnectionFlag(bool isExisted)392 void StorageEngine::SetConnectionFlag(bool isExisted)
393 {
394 return isExistConnection_.store(isExisted);
395 }
396
IsExistConnection() const397 bool StorageEngine::IsExistConnection() const
398 {
399 return isExistConnection_.load();
400 }
401
CheckEngineOption(const KvDBProperties & kvdbOption) const402 int StorageEngine::CheckEngineOption(const KvDBProperties &kvdbOption) const
403 {
404 return E_OK;
405 }
406
AddStorageExecutor(StorageExecutor * handle,bool isExternal)407 void StorageEngine::AddStorageExecutor(StorageExecutor *handle, bool isExternal)
408 {
409 if (handle == nullptr) {
410 return;
411 }
412
413 std::list<StorageExecutor *> &writeIdleList = isExternal ? externalWriteIdleList_ : writeIdleList_;
414 std::list<StorageExecutor *> &readIdleList = isExternal ? externalReadIdleList_ : readIdleList_;
415 if (handle->GetWritable()) {
416 writeIdleList.push_back(handle);
417 } else {
418 readIdleList.push_back(handle);
419 }
420 }
421
ClearHandleList(std::list<StorageExecutor * > & handleList)422 void ClearHandleList(std::list<StorageExecutor *> &handleList)
423 {
424 for (auto &item : handleList) {
425 if (item != nullptr) {
426 delete item;
427 item = nullptr;
428 }
429 }
430 handleList.clear();
431 }
432
CloseExecutor()433 void StorageEngine::CloseExecutor()
434 {
435 {
436 std::lock_guard<std::mutex> lock(writeMutex_);
437 ClearHandleList(writeIdleList_);
438 ClearHandleList(externalWriteIdleList_);
439 }
440
441 {
442 std::lock_guard<std::mutex> lock(readMutex_);
443 ClearHandleList(readIdleList_);
444 ClearHandleList(externalReadIdleList_);
445 }
446 }
447
FetchStorageExecutor(bool isWrite,std::list<StorageExecutor * > & idleList,std::list<StorageExecutor * > & usingList,int & errCode,bool isExternal)448 StorageExecutor *StorageEngine::FetchStorageExecutor(bool isWrite, std::list<StorageExecutor *> &idleList,
449 std::list<StorageExecutor *> &usingList, int &errCode, bool isExternal)
450 {
451 if (idleList.empty()) {
452 StorageExecutor *handle = nullptr;
453 errCode = CreateNewExecutor(isWrite, handle);
454 if ((errCode != E_OK) || (handle == nullptr)) {
455 if (errCode != -E_EKEYREVOKED) {
456 return nullptr;
457 }
458 LOGE("Key revoked status, couldn't create the new executor");
459 if (!usingList.empty()) {
460 LOGE("Can't create new executor for revoked");
461 errCode = -E_BUSY;
462 }
463 return nullptr;
464 }
465
466 AddStorageExecutor(handle, isExternal);
467 }
468 auto item = idleList.front();
469 usingList.push_back(item);
470 idleList.remove(item);
471 LOGD("Get executor[%d] from [%.3s]", isWrite, hashIdentifier_.c_str());
472 errCode = E_OK;
473 return item;
474 }
475
CheckEngineAttr(const StorageEngineAttr & poolSize)476 bool StorageEngine::CheckEngineAttr(const StorageEngineAttr &poolSize)
477 {
478 return (poolSize.maxReadNum > MAX_READ_SIZE ||
479 poolSize.maxWriteNum > MAX_WRITE_SIZE ||
480 poolSize.minReadNum > poolSize.maxReadNum ||
481 poolSize.minWriteNum > poolSize.maxWriteNum);
482 }
483
IsMigrating() const484 bool StorageEngine::IsMigrating() const
485 {
486 return isMigrating_.load();
487 }
488
WaitWriteHandleIdle()489 void StorageEngine::WaitWriteHandleIdle()
490 {
491 std::unique_lock<std::mutex> autoLock(idleMutex_);
492 LOGD("Wait wHandle release id[%s]. write[%zu-%zu-%" PRIu32 "]", hashIdentifier_.c_str(),
493 writeIdleList_.size(), writeUsingList_.size(), engineAttr_.maxWriteNum);
494 idleCondition_.wait(autoLock, [this]() {
495 return writeUsingList_.empty();
496 });
497 LOGD("Wait wHandle release finish id[%s]. write[%zu-%zu-%" PRIu32 "]",
498 hashIdentifier_.c_str(), writeIdleList_.size(), writeUsingList_.size(), engineAttr_.maxWriteNum);
499 }
500
IncreaseCacheRecordVersion()501 void StorageEngine::IncreaseCacheRecordVersion()
502 {
503 return;
504 }
505
GetCacheRecordVersion() const506 uint64_t StorageEngine::GetCacheRecordVersion() const
507 {
508 return 0;
509 }
510
GetAndIncreaseCacheRecordVersion()511 uint64_t StorageEngine::GetAndIncreaseCacheRecordVersion()
512 {
513 return 0;
514 }
515
SetSchemaChangedCallback(const std::function<int (void)> & callback)516 void StorageEngine::SetSchemaChangedCallback(const std::function<int(void)> &callback)
517 {
518 std::unique_lock<std::shared_mutex> lock(schemaChangedMutex_);
519 schemaChangedFunc_ = callback;
520 }
521 }
522