1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "storage_engine_manager.h"
17 #include "log_print.h"
18 #include "db_errno.h"
19 #include "runtime_context.h"
20 #include "rd_single_ver_storage_engine.h"
21 #include "sqlite_single_ver_storage_engine.h"
22 
23 namespace DistributedDB {
24 volatile bool StorageEngineManager::isRegLockStatusListener_ = false;
25 std::mutex StorageEngineManager::instanceLock_;
26 std::atomic<StorageEngineManager *> StorageEngineManager::instance_{nullptr};
27 std::mutex StorageEngineManager::storageEnginesLock_;
28 
29 namespace {
GetIdentifier(const KvDBProperties & property)30     std::string GetIdentifier(const KvDBProperties &property)
31     {
32         return property.GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
33     }
34 
GetDatabaseType(const KvDBProperties & property)35     int GetDatabaseType(const KvDBProperties &property)
36     {
37         return property.GetIntProp(KvDBProperties::DATABASE_TYPE, KvDBProperties::LOCAL_TYPE_SQLITE);
38     }
39 
IsSingleVerType(int databaseType)40     int IsSingleVerType(int databaseType)
41     {
42         return databaseType == KvDBProperties::SINGLE_VER_TYPE_SQLITE ||
43             databaseType == KvDBProperties::SINGLE_VER_TYPE_RD_KERNAL;
44     }
45 }
46 
StorageEngineManager()47 StorageEngineManager::StorageEngineManager() : lockStatusListener_(nullptr)
48 {}
49 
~StorageEngineManager()50 StorageEngineManager::~StorageEngineManager()
51 {
52     if (lockStatusListener_ != nullptr) {
53         lockStatusListener_->Drop(true);
54     }
55 }
56 
GetStorageEngine(const KvDBProperties & property,int & errCode)57 StorageEngine *StorageEngineManager::GetStorageEngine(const KvDBProperties &property, int &errCode)
58 {
59     StorageEngineManager *manager = GetInstance();
60     if (manager == nullptr) {
61         LOGE("[StorageEngineManager] GetInstance failed");
62         errCode = -E_OUT_OF_MEMORY;
63         return nullptr;
64     }
65     std::string identifier = GetIdentifier(property);
66     manager->EnterGetEngineProcess(identifier);
67     auto storageEngine = manager->FindStorageEngine(identifier);
68     if (storageEngine == nullptr) {
69         storageEngine = manager->CreateStorageEngine(property, errCode);
70         if (errCode == E_OK) {
71             manager->InsertStorageEngine(identifier, storageEngine);
72         }
73     } else {
74         errCode = storageEngine->CheckEngineOption(property);
75         if (errCode != E_OK) {
76             LOGE("kvdb property mismatch engine option! errCode = [%d]", errCode);
77             storageEngine = nullptr;
78         }
79     }
80 
81     manager->ExitGetEngineProcess(identifier);
82     return storageEngine;
83 }
84 
ReleaseStorageEngine(StorageEngine * storageEngine)85 int StorageEngineManager::ReleaseStorageEngine(StorageEngine *storageEngine)
86 {
87     if (storageEngine == nullptr) {
88         LOGE("[StorageEngineManager] The engine to be released is nullptr");
89         return -E_INVALID_ARGS;
90     }
91 
92     // Clear commit notify callback function.
93     storageEngine->SetNotifiedCallback(nullptr);
94 
95     // If the cacheDB is valid, the storageEngine is not released to prevent the cache mechanism failed
96     bool isRelease = storageEngine->IsNeedTobeReleased();
97     if (!isRelease) {
98         LOGW("[StorageEngineManager] storageEngine do not need to be released.");
99         return E_OK;
100     }
101 
102     StorageEngineManager *manager = GetInstance();
103     if (manager == nullptr) {
104         LOGE("[StorageEngineManager] Release GetInstance failed");
105         return -E_OUT_OF_MEMORY;
106     }
107 
108     LOGD("[StorageEngineManager] storageEngine to be released.");
109     return manager->ReleaseEngine(storageEngine);
110 }
111 
ForceReleaseStorageEngine(const std::string & identifier)112 int StorageEngineManager::ForceReleaseStorageEngine(const std::string &identifier)
113 {
114     StorageEngineManager *manager = GetInstance();
115     if (manager == nullptr) {
116         LOGE("[StorageEngineManager] Force release GetInstance failed");
117         return -E_OUT_OF_MEMORY;
118     }
119 
120     LOGD("[StorageEngineManager] Force release engine.");
121     manager->ReleaseResources(identifier);
122     return E_OK;
123 }
124 
ExecuteMigration(StorageEngine * storageEngine)125 int StorageEngineManager::ExecuteMigration(StorageEngine *storageEngine)
126 {
127     if (storageEngine == nullptr) {
128         LOGE("storage engine is nullptr can not execute migration!");
129         return -E_INVALID_ARGS;
130     }
131     if (storageEngine->IsExistConnection()) {
132         return storageEngine->ExecuteMigrate();
133     }
134     LOGI("connection is not existed, not need execute migration!");
135     return -E_INVALID_DB;
136 }
137 
GetInstance()138 StorageEngineManager *StorageEngineManager::GetInstance()
139 {
140     // For Double-Checked Locking, we need check instance_ twice
141     if (instance_ == nullptr) {
142         std::lock_guard<std::mutex> lockGuard(instanceLock_);
143         if (instance_ == nullptr) {
144             instance_ = new (std::nothrow) StorageEngineManager();
145             if (instance_ == nullptr) {
146                 LOGE("[StorageEngineManager] Failed to alloc the engine manager!");
147                 return nullptr;
148             }
149         }
150     }
151 
152     if (!isRegLockStatusListener_) {
153         std::lock_guard<std::mutex> mgrLock(storageEnginesLock_);
154         if (!isRegLockStatusListener_) {
155             int errCode = (instance_.load())->RegisterLockStatusListener();
156             if (errCode == E_OK) {
157                 isRegLockStatusListener_ = true;
158                 LOGW("[StorageEngineManager] Register lock status listener.");
159             }
160         }
161     }
162     return instance_;
163 }
164 
RegisterLockStatusListener()165 int StorageEngineManager::RegisterLockStatusListener()
166 {
167     int errCode = E_OK;
168     lockStatusListener_ = RuntimeContext::GetInstance()->RegisterLockStatusLister(
169         [this](void *lockStatus) {
170             if (lockStatus == nullptr) {
171                 return;
172             }
173             bool isLocked = *static_cast<bool *>(lockStatus);
174             LOGD("[StorageEngineManager] Lock status to %d", isLocked);
175             if (isLocked) {
176                 return;
177             }
178             int taskErrCode = RuntimeContext::GetInstance()->ScheduleTask(
179                 [this, isLocked] { LockStatusNotifier(isLocked); });
180             if (taskErrCode != E_OK) {
181                 LOGE("[StorageEngineManager] LockStatusNotifier ScheduleTask failed : %d", taskErrCode);
182             }
183         }, errCode);
184     return errCode;
185 }
186 
LockStatusNotifier(bool isAccessControlled)187 void StorageEngineManager::LockStatusNotifier(bool isAccessControlled)
188 {
189     (void)isAccessControlled;
190     std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
191     StorageEngine *storageEngine = nullptr;
192     for (const auto &item : storageEngines_) {
193         storageEngine = item.second;
194         LOGD("Begin to migrate for lock status change");
195         (void)ExecuteMigration(storageEngine);
196     }
197 }
198 
CreateSingleVerStorageEngine(int databaseType)199 StorageEngine *CreateSingleVerStorageEngine(int databaseType)
200 {
201     if (databaseType == KvDBProperties::SINGLE_VER_TYPE_SQLITE) {
202         return new (std::nothrow) SQLiteSingleVerStorageEngine();
203     } else {
204         return new (std::nothrow) RdSingleVerStorageEngine();
205     }
206     return nullptr;
207 }
208 
CreateStorageEngine(const KvDBProperties & property,int & errCode)209 StorageEngine *StorageEngineManager::CreateStorageEngine(const KvDBProperties &property, int &errCode)
210 {
211     int databaseType = GetDatabaseType(property);
212     if (!IsSingleVerType(databaseType)) {
213         LOGE("[StorageEngineManager] Database type error : %d", databaseType);
214         errCode = -E_NOT_SUPPORT;
215         return nullptr;
216     }
217 
218     auto storageEngine = CreateSingleVerStorageEngine(databaseType);
219     if (storageEngine == nullptr) {
220         LOGE("[StorageEngineManager] Create storage engine failed");
221         errCode = -E_OUT_OF_MEMORY;
222         return nullptr;
223     }
224     errCode = E_OK;
225     return storageEngine;
226 }
227 
FindStorageEngine(const std::string & identifier)228 StorageEngine *StorageEngineManager::FindStorageEngine(const std::string &identifier)
229 {
230     std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
231     auto iter = storageEngines_.find(identifier);
232     if (iter != storageEngines_.end()) {
233         auto storageEngine = iter->second;
234         if (storageEngine == nullptr) {
235             LOGE("[StorageEngineManager] storageEngine in cache is nullptr");
236             storageEngines_.erase(identifier);
237             return nullptr;
238         }
239 
240         return storageEngine;
241     }
242 
243     return nullptr;
244 }
245 
InsertStorageEngine(const std::string & identifier,StorageEngine * & storageEngine)246 void StorageEngineManager::InsertStorageEngine(const std::string &identifier, StorageEngine *&storageEngine)
247 {
248     std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
249     storageEngines_.insert(std::pair<std::string, StorageEngine *>(identifier, storageEngine));
250 }
251 
EraseStorageEngine(const std::string & identifier)252 void StorageEngineManager::EraseStorageEngine(const std::string &identifier)
253 {
254     std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
255     storageEngines_.erase(identifier);
256 }
257 
ReleaseResources(const std::string & identifier)258 void StorageEngineManager::ReleaseResources(const std::string &identifier)
259 {
260     StorageEngine *storageEngine = nullptr;
261 
262     {
263         std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
264         auto iter = storageEngines_.find(identifier);
265         if (iter != storageEngines_.end()) {
266             storageEngine = iter->second;
267             storageEngines_.erase(identifier);
268         }
269     }
270 
271     if (storageEngine != nullptr) {
272         LOGI("[StorageEngineManager] Release storage engine");
273         RefObject::KillAndDecObjRef(storageEngine);
274     }
275 }
276 
ReleaseEngine(StorageEngine * releaseEngine)277 int StorageEngineManager::ReleaseEngine(StorageEngine *releaseEngine)
278 {
279     const std::string identifier = releaseEngine->GetIdentifier();
280     StorageEngine *cacheEngine = nullptr;
281 
282     {
283         std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
284         auto iter = storageEngines_.find(identifier);
285         if (iter != storageEngines_.end()) {
286             cacheEngine = iter->second;
287             storageEngines_.erase(identifier);
288         }
289     }
290 
291     if (cacheEngine == nullptr) {
292         LOGE("[StorageEngineManager] cache engine is null");
293         return -E_ALREADY_RELEASE;
294     }
295     if (cacheEngine != releaseEngine) {
296         LOGE("[StorageEngineManager] cache engine is not equal the input engine");
297         return -E_INVALID_ARGS;
298     }
299 
300     RefObject::KillAndDecObjRef(releaseEngine);
301     return E_OK;
302 }
303 
EnterGetEngineProcess(const std::string & identifier)304 void StorageEngineManager::EnterGetEngineProcess(const std::string &identifier)
305 {
306     std::unique_lock<std::mutex> lock(getEngineMutex_);
307     getEngineCondition_.wait(lock, [this, &identifier]() {
308         return this->getEngineSet_.count(identifier) == 0;
309     });
310     (void)getEngineSet_.insert(identifier);
311 }
312 
ExitGetEngineProcess(const std::string & identifier)313 void StorageEngineManager::ExitGetEngineProcess(const std::string &identifier)
314 {
315     std::unique_lock<std::mutex> lock(getEngineMutex_);
316     (void)getEngineSet_.erase(identifier);
317     getEngineCondition_.notify_all();
318 }
319 } // namespace DistributedDB
320