1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "storage_engine.h"
17 
18 #include <algorithm>
19 
20 #include "db_common.h"
21 #include "db_errno.h"
22 #include "log_print.h"
23 
24 namespace DistributedDB {
25 const int StorageEngine::MAX_WAIT_TIME = 30;
26 const int StorageEngine::MAX_WRITE_SIZE = 1;
27 const int StorageEngine::MAX_READ_SIZE = 16;
28 
StorageEngine()29 StorageEngine::StorageEngine()
30     : isUpdated_(false),
31       isMigrating_(false),
32       commitNotifyFunc_(nullptr),
33       schemaChangedFunc_(nullptr),
34       isSchemaChanged_(false),
35       isInitialized_(false),
36       perm_(OperatePerm::NORMAL_PERM),
37       operateAbort_(false),
38       isExistConnection_(false),
39       engineState_(EngineState::INVALID)
40 {}
41 
~StorageEngine()42 StorageEngine::~StorageEngine()
43 {
44     CloseExecutor();
45 }
46 
CloseAllExecutor()47 void StorageEngine::CloseAllExecutor()
48 {
49     CloseExecutor();
50 }
51 
InitAllReadWriteExecutor()52 int StorageEngine::InitAllReadWriteExecutor()
53 {
54     return InitReadWriteExecutors();
55 }
56 
GetOption()57 OpenDbProperties StorageEngine::GetOption()
58 {
59     return option_;
60 }
61 
InitReadWriteExecutors()62 int StorageEngine::InitReadWriteExecutors()
63 {
64     int errCode = E_OK;
65     std::scoped_lock initLock(writeMutex_, readMutex_);
66     // only for create the database avoid the minimum number is 0.
67     StorageExecutor *handle = nullptr;
68     if (engineAttr_.minReadNum == 0 && engineAttr_.minWriteNum == 0) {
69         errCode = CreateNewExecutor(true, handle);
70         if (errCode != E_OK) {
71             return errCode;
72         }
73 
74         if (handle != nullptr) {
75             delete handle;
76             handle = nullptr;
77         }
78     }
79 
80     for (uint32_t i = 0; i < engineAttr_.minWriteNum; i++) {
81         handle = nullptr;
82         errCode = CreateNewExecutor(true, handle);
83         if (errCode != E_OK) {
84             return errCode;
85         }
86         AddStorageExecutor(handle, false);
87     }
88 
89     for (uint32_t i = 0; i < engineAttr_.minReadNum; i++) {
90         handle = nullptr;
91         errCode = CreateNewExecutor(false, handle);
92         if (errCode != E_OK) {
93             return errCode;
94         }
95         AddStorageExecutor(handle, false);
96     }
97     return E_OK;
98 }
99 
100 
Init()101 int StorageEngine::Init()
102 {
103     if (isInitialized_.load()) {
104         LOGD("Storage engine has been initialized!");
105         return E_OK;
106     }
107 
108     int errCode = InitReadWriteExecutors();
109     if (errCode == E_OK) {
110         isInitialized_.store(true);
111         initCondition_.notify_all();
112         return E_OK;
113     } else if (errCode == -E_EKEYREVOKED) {
114         // Assumed file system has classification function, can only get one write handle
115         std::unique_lock<std::mutex> lock(writeMutex_);
116         if (!writeIdleList_.empty() || !writeUsingList_.empty()) {
117             isInitialized_.store(true);
118             initCondition_.notify_all();
119             return E_OK;
120         }
121     }
122     initCondition_.notify_all();
123     Release();
124     return errCode;
125 }
126 
ReInit()127 int StorageEngine::ReInit()
128 {
129     return E_OK;
130 }
131 
FindExecutor(bool writable,OperatePerm perm,int & errCode,bool isExternal,int waitTime)132 StorageExecutor *StorageEngine::FindExecutor(bool writable, OperatePerm perm, int &errCode, bool isExternal,
133     int waitTime)
134 {
135     if (GetEngineState() == EngineState::ENGINE_BUSY) {
136         LOGI("Storage engine is busy!");
137         errCode = -E_BUSY;
138         return nullptr;
139     }
140 
141     {
142         std::unique_lock<std::mutex> lock(initMutex_);
143         bool result = initCondition_.wait_for(lock, std::chrono::seconds(waitTime), [this]() {
144             return isInitialized_.load();
145         });
146         if (!result || !isInitialized_.load()) {
147             LOGE("Storage engine is not initialized");
148             errCode = -E_BUSY; // Usually in reinitialize engine, return BUSY
149             return nullptr;
150         }
151     }
152 
153     if (writable) {
154         return FindWriteExecutor(perm, errCode, waitTime, isExternal);
155     }
156 
157     return FindReadExecutor(perm, errCode, waitTime, isExternal);
158 }
159 
FindWriteExecutor(OperatePerm perm,int & errCode,int waitTime,bool isExternal)160 StorageExecutor *StorageEngine::FindWriteExecutor(OperatePerm perm, int &errCode, int waitTime, bool isExternal)
161 {
162     LOGD("[FindWriteExecutor]Finding WriteExecutor");
163     std::unique_lock<std::mutex> lock(writeMutex_);
164     errCode = -E_BUSY;
165     if (perm_ == OperatePerm::DISABLE_PERM || perm_ != perm) {
166         LOGI("Not permitted to get the executor[%u]", static_cast<unsigned>(perm_));
167         return nullptr;
168     }
169     std::list<StorageExecutor *> &writeUsingList = isExternal ? externalWriteUsingList_ : writeUsingList_;
170     std::list<StorageExecutor *> &writeIdleList = isExternal ?  externalWriteIdleList_ : writeIdleList_;
171     if (waitTime <= 0) { // non-blocking.
172         if (writeUsingList.empty() &&
173                 writeIdleList.size() + writeUsingList.size() == engineAttr_.maxWriteNum) {
174             return nullptr;
175         }
176         return FetchStorageExecutor(true, writeIdleList, writeUsingList, errCode, isExternal);
177     }
178     // Not prohibited and there is an available handle
179     bool result = writeCondition_.wait_for(lock, std::chrono::seconds(waitTime),
180         [this, &perm, &writeUsingList, &writeIdleList]() {
181             return (perm_ == OperatePerm::NORMAL_PERM || perm_ == perm) && (!writeIdleList.empty() ||
182                 (writeIdleList.size() + writeUsingList.size() < engineAttr_.maxWriteNum) ||
183                 operateAbort_);
184         });
185     if (operateAbort_) {
186         LOGI("Abort write executor and executor and busy for operate!");
187         return nullptr;
188     }
189     if (!result) {
190         LOGI("Get write handle result[%d], permissType[%u], operType[%u], write[%zu-%zu-%" PRIu32 "]", result,
191             static_cast<unsigned>(perm_), static_cast<unsigned>(perm), writeIdleList.size(), writeUsingList.size(),
192             engineAttr_.maxWriteNum);
193         return nullptr;
194     }
195     return FetchStorageExecutor(true, writeIdleList, writeUsingList, errCode, isExternal);
196 }
197 
FindReadExecutor(OperatePerm perm,int & errCode,int waitTime,bool isExternal)198 StorageExecutor *StorageEngine::FindReadExecutor(OperatePerm perm, int &errCode, int waitTime, bool isExternal)
199 {
200     std::unique_lock<std::mutex> lock(readMutex_);
201     errCode = -E_BUSY;
202     if (perm_ == OperatePerm::DISABLE_PERM || perm_ != perm) {
203         LOGI("Not permitted to get the executor[%u]", static_cast<unsigned>(perm_));
204         return nullptr;
205     }
206 
207     std::list<StorageExecutor *> &readUsingList = isExternal ? externalReadUsingList_ : readUsingList_;
208     std::list<StorageExecutor *> &readIdleList = isExternal ?  externalReadIdleList_ : readIdleList_;
209     if (waitTime <= 0) { // non-blocking.
210         if (readIdleList.empty() &&
211             readIdleList.size() + readUsingList.size() == engineAttr_.maxReadNum) {
212             return nullptr;
213         }
214         return FetchStorageExecutor(false, readIdleList, readUsingList, errCode, isExternal);
215     }
216 
217     // Not prohibited and there is an available handle
218     uint32_t maxReadHandleNum = isExternal ? 1 : engineAttr_.maxReadNum;
219     bool result = readCondition_.wait_for(lock, std::chrono::seconds(waitTime),
220         [this, &perm, &readUsingList, &readIdleList, &maxReadHandleNum]() {
221             return (perm_ == OperatePerm::NORMAL_PERM || perm_ == perm) &&
222                 (!readIdleList.empty() || (readIdleList.size() + readUsingList.size() < maxReadHandleNum) ||
223                 operateAbort_);
224         });
225     if (operateAbort_) {
226         LOGI("Abort find read executor and busy for operate!");
227         return nullptr;
228     }
229     if (!result) {
230         LOGI("Get read handle result[%d], permissType[%u], operType[%u], read[%zu-%zu-%" PRIu32 "]", result,
231             static_cast<unsigned>(perm_), static_cast<unsigned>(perm), readIdleList.size(), readUsingList.size(),
232             engineAttr_.maxReadNum);
233         return nullptr;
234     }
235     return FetchStorageExecutor(false, readIdleList, readUsingList, errCode, isExternal);
236 }
237 
Recycle(StorageExecutor * & handle,bool isExternal)238 void StorageEngine::Recycle(StorageExecutor *&handle, bool isExternal)
239 {
240     if (handle == nullptr) {
241         return;
242     }
243     LOGD("Recycle executor[%d] for id[%.6s]", handle->GetWritable(), hashIdentifier_.c_str());
244     std::list<StorageExecutor *> &writeUsingList = isExternal ? externalWriteUsingList_ : writeUsingList_;
245     std::list<StorageExecutor *> &writeIdleList = isExternal ?  externalWriteIdleList_ : writeIdleList_;
246     std::list<StorageExecutor *> &readUsingList = isExternal ? externalReadUsingList_ : readUsingList_;
247     std::list<StorageExecutor *> &readIdleList = isExternal ?  externalReadIdleList_ : readIdleList_;
248     if (handle->GetWritable()) {
249         std::unique_lock<std::mutex> lock(writeMutex_);
250         auto iter = std::find(writeUsingList.begin(), writeUsingList.end(), handle);
251         if (iter != writeUsingList.end()) {
252             writeUsingList.remove(handle);
253             if (!writeIdleList.empty()) {
254                 delete handle;
255                 handle = nullptr;
256                 return;
257             }
258             handle->Reset();
259             writeIdleList.push_back(handle);
260             writeCondition_.notify_one();
261             idleCondition_.notify_all();
262         }
263     } else {
264         std::unique_lock<std::mutex> lock(readMutex_);
265         auto iter = std::find(readUsingList.begin(), readUsingList.end(), handle);
266         if (iter != readUsingList.end()) {
267             readUsingList.remove(handle);
268             if (!readIdleList.empty()) {
269                 delete handle;
270                 handle = nullptr;
271                 return;
272             }
273             handle->Reset();
274             readIdleList.push_back(handle);
275             readCondition_.notify_one();
276         }
277     }
278     handle = nullptr;
279 }
280 
ClearCorruptedFlag()281 void StorageEngine::ClearCorruptedFlag()
282 {
283     return;
284 }
285 
IsEngineCorrupted() const286 bool StorageEngine::IsEngineCorrupted() const
287 {
288     return false;
289 }
290 
Release()291 void StorageEngine::Release()
292 {
293     CloseExecutor();
294     isInitialized_.store(false);
295     isUpdated_ = false;
296     ClearCorruptedFlag();
297     SetEngineState(EngineState::INVALID);
298 }
299 
TryToDisable(bool isNeedCheckAll,OperatePerm disableType)300 int StorageEngine::TryToDisable(bool isNeedCheckAll, OperatePerm disableType)
301 {
302     if (engineState_ != EngineState::MAINDB && engineState_ != EngineState::INVALID) {
303         LOGE("Not support disable handle when cacheDB existed! state = [%d]", engineState_);
304         return(engineState_ == EngineState::CACHEDB) ? -E_NOT_SUPPORT : -E_BUSY;
305     }
306 
307     std::lock(writeMutex_, readMutex_);
308     std::lock_guard<std::mutex> writeLock(writeMutex_, std::adopt_lock);
309     std::lock_guard<std::mutex> readLock(readMutex_, std::adopt_lock);
310 
311     if (!isNeedCheckAll) {
312         goto END;
313     }
314 
315     if (!writeUsingList_.empty() || !readUsingList_.empty() || !externalWriteUsingList_.empty() ||
316         !externalReadUsingList_.empty()) {
317         LOGE("Database handle used");
318         return -E_BUSY;
319     }
320 END:
321     if (perm_ == OperatePerm::NORMAL_PERM) {
322         LOGI("database is disable for re-build:%d", static_cast<int>(disableType));
323         perm_ = disableType;
324         writeCondition_.notify_all();
325         readCondition_.notify_all();
326     }
327     return E_OK;
328 }
329 
Enable(OperatePerm enableType)330 void StorageEngine::Enable(OperatePerm enableType)
331 {
332     std::lock(writeMutex_, readMutex_);
333     std::lock_guard<std::mutex> writeLock(writeMutex_, std::adopt_lock);
334     std::lock_guard<std::mutex> readLock(readMutex_, std::adopt_lock);
335     if (perm_ == enableType) {
336         LOGI("Re-enable the database");
337         perm_ = OperatePerm::NORMAL_PERM;
338         writeCondition_.notify_all();
339         readCondition_.notify_all();
340     }
341 }
342 
Abort(OperatePerm enableType)343 void StorageEngine::Abort(OperatePerm enableType)
344 {
345     std::lock(writeMutex_, readMutex_);
346     std::lock_guard<std::mutex> writeLock(writeMutex_, std::adopt_lock);
347     std::lock_guard<std::mutex> readLock(readMutex_, std::adopt_lock);
348     if (perm_ == enableType) {
349         LOGI("Abort the handle occupy, release all!");
350         perm_ = OperatePerm::NORMAL_PERM;
351         operateAbort_ = true;
352 
353         writeCondition_.notify_all();
354         readCondition_.notify_all();
355     }
356 }
357 
IsNeedTobeReleased() const358 bool StorageEngine::IsNeedTobeReleased() const
359 {
360     EngineState engineState = GetEngineState();
361     return ((engineState == EngineState::MAINDB) || (engineState == EngineState::INVALID));
362 }
363 
GetIdentifier() const364 const std::string &StorageEngine::GetIdentifier() const
365 {
366     return identifier_;
367 }
368 
GetEngineState() const369 EngineState StorageEngine::GetEngineState() const
370 {
371     return engineState_;
372 }
373 
SetEngineState(EngineState state)374 void StorageEngine::SetEngineState(EngineState state)
375 {
376     LOGI("Storage engine state to [%d]!", state);
377     engineState_ = state;
378 }
379 
ExecuteMigrate()380 int StorageEngine::ExecuteMigrate()
381 {
382     LOGW("Migration is not supported!");
383     return -E_NOT_SUPPORT;
384 }
385 
SetNotifiedCallback(const std::function<void (int,KvDBCommitNotifyFilterAbleData *)> & callback)386 void StorageEngine::SetNotifiedCallback(const std::function<void(int, KvDBCommitNotifyFilterAbleData *)> &callback)
387 {
388     std::unique_lock<std::shared_mutex> lock(notifyMutex_);
389     commitNotifyFunc_ = callback;
390 }
391 
SetConnectionFlag(bool isExisted)392 void StorageEngine::SetConnectionFlag(bool isExisted)
393 {
394     return isExistConnection_.store(isExisted);
395 }
396 
IsExistConnection() const397 bool StorageEngine::IsExistConnection() const
398 {
399     return isExistConnection_.load();
400 }
401 
CheckEngineOption(const KvDBProperties & kvdbOption) const402 int StorageEngine::CheckEngineOption(const KvDBProperties &kvdbOption) const
403 {
404     return E_OK;
405 }
406 
AddStorageExecutor(StorageExecutor * handle,bool isExternal)407 void StorageEngine::AddStorageExecutor(StorageExecutor *handle, bool isExternal)
408 {
409     if (handle == nullptr) {
410         return;
411     }
412 
413     std::list<StorageExecutor *> &writeIdleList = isExternal ?  externalWriteIdleList_ : writeIdleList_;
414     std::list<StorageExecutor *> &readIdleList = isExternal ?  externalReadIdleList_ : readIdleList_;
415     if (handle->GetWritable()) {
416         writeIdleList.push_back(handle);
417     } else {
418         readIdleList.push_back(handle);
419     }
420 }
421 
ClearHandleList(std::list<StorageExecutor * > & handleList)422 void ClearHandleList(std::list<StorageExecutor *> &handleList)
423 {
424     for (auto &item : handleList) {
425         if (item != nullptr) {
426             delete item;
427             item = nullptr;
428         }
429     }
430     handleList.clear();
431 }
432 
CloseExecutor()433 void StorageEngine::CloseExecutor()
434 {
435     {
436         std::lock_guard<std::mutex> lock(writeMutex_);
437         ClearHandleList(writeIdleList_);
438         ClearHandleList(externalWriteIdleList_);
439     }
440 
441     {
442         std::lock_guard<std::mutex> lock(readMutex_);
443         ClearHandleList(readIdleList_);
444         ClearHandleList(externalReadIdleList_);
445     }
446 }
447 
FetchStorageExecutor(bool isWrite,std::list<StorageExecutor * > & idleList,std::list<StorageExecutor * > & usingList,int & errCode,bool isExternal)448 StorageExecutor *StorageEngine::FetchStorageExecutor(bool isWrite, std::list<StorageExecutor *> &idleList,
449     std::list<StorageExecutor *> &usingList, int &errCode, bool isExternal)
450 {
451     if (idleList.empty()) {
452         StorageExecutor *handle = nullptr;
453         errCode = CreateNewExecutor(isWrite, handle);
454         if ((errCode != E_OK) || (handle == nullptr)) {
455             if (errCode != -E_EKEYREVOKED) {
456                 return nullptr;
457             }
458             LOGE("Key revoked status, couldn't create the new executor");
459             if (!usingList.empty()) {
460                 LOGE("Can't create new executor for revoked");
461                 errCode = -E_BUSY;
462             }
463             return nullptr;
464         }
465 
466         AddStorageExecutor(handle, isExternal);
467     }
468     auto item = idleList.front();
469     usingList.push_back(item);
470     idleList.remove(item);
471     LOGD("Get executor[%d] from [%.3s]", isWrite, hashIdentifier_.c_str());
472     errCode = E_OK;
473     return item;
474 }
475 
CheckEngineAttr(const StorageEngineAttr & poolSize)476 bool StorageEngine::CheckEngineAttr(const StorageEngineAttr &poolSize)
477 {
478     return (poolSize.maxReadNum > MAX_READ_SIZE ||
479             poolSize.maxWriteNum > MAX_WRITE_SIZE ||
480             poolSize.minReadNum > poolSize.maxReadNum ||
481             poolSize.minWriteNum > poolSize.maxWriteNum);
482 }
483 
IsMigrating() const484 bool StorageEngine::IsMigrating() const
485 {
486     return isMigrating_.load();
487 }
488 
WaitWriteHandleIdle()489 void StorageEngine::WaitWriteHandleIdle()
490 {
491     std::unique_lock<std::mutex> autoLock(idleMutex_);
492     LOGD("Wait wHandle release id[%s]. write[%zu-%zu-%" PRIu32 "]", hashIdentifier_.c_str(),
493         writeIdleList_.size(), writeUsingList_.size(), engineAttr_.maxWriteNum);
494     idleCondition_.wait(autoLock, [this]() {
495         return writeUsingList_.empty();
496     });
497     LOGD("Wait wHandle release finish id[%s]. write[%zu-%zu-%" PRIu32 "]",
498         hashIdentifier_.c_str(), writeIdleList_.size(), writeUsingList_.size(), engineAttr_.maxWriteNum);
499 }
500 
IncreaseCacheRecordVersion()501 void StorageEngine::IncreaseCacheRecordVersion()
502 {
503     return;
504 }
505 
GetCacheRecordVersion() const506 uint64_t StorageEngine::GetCacheRecordVersion() const
507 {
508     return 0;
509 }
510 
GetAndIncreaseCacheRecordVersion()511 uint64_t StorageEngine::GetAndIncreaseCacheRecordVersion()
512 {
513     return 0;
514 }
515 
SetSchemaChangedCallback(const std::function<int (void)> & callback)516 void StorageEngine::SetSchemaChangedCallback(const std::function<int(void)> &callback)
517 {
518     std::unique_lock<std::shared_mutex> lock(schemaChangedMutex_);
519     schemaChangedFunc_ = callback;
520 }
521 }
522