1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "storage_engine_manager.h"
17 #include "log_print.h"
18 #include "db_errno.h"
19 #include "runtime_context.h"
20 #include "rd_single_ver_storage_engine.h"
21 #include "sqlite_single_ver_storage_engine.h"
22
23 namespace DistributedDB {
24 volatile bool StorageEngineManager::isRegLockStatusListener_ = false;
25 std::mutex StorageEngineManager::instanceLock_;
26 std::atomic<StorageEngineManager *> StorageEngineManager::instance_{nullptr};
27 std::mutex StorageEngineManager::storageEnginesLock_;
28
29 namespace {
GetIdentifier(const KvDBProperties & property)30 std::string GetIdentifier(const KvDBProperties &property)
31 {
32 return property.GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
33 }
34
GetDatabaseType(const KvDBProperties & property)35 int GetDatabaseType(const KvDBProperties &property)
36 {
37 return property.GetIntProp(KvDBProperties::DATABASE_TYPE, KvDBProperties::LOCAL_TYPE_SQLITE);
38 }
39
IsSingleVerType(int databaseType)40 int IsSingleVerType(int databaseType)
41 {
42 return databaseType == KvDBProperties::SINGLE_VER_TYPE_SQLITE ||
43 databaseType == KvDBProperties::SINGLE_VER_TYPE_RD_KERNAL;
44 }
45 }
46
StorageEngineManager()47 StorageEngineManager::StorageEngineManager() : lockStatusListener_(nullptr)
48 {}
49
~StorageEngineManager()50 StorageEngineManager::~StorageEngineManager()
51 {
52 if (lockStatusListener_ != nullptr) {
53 lockStatusListener_->Drop(true);
54 }
55 }
56
GetStorageEngine(const KvDBProperties & property,int & errCode)57 StorageEngine *StorageEngineManager::GetStorageEngine(const KvDBProperties &property, int &errCode)
58 {
59 StorageEngineManager *manager = GetInstance();
60 if (manager == nullptr) {
61 LOGE("[StorageEngineManager] GetInstance failed");
62 errCode = -E_OUT_OF_MEMORY;
63 return nullptr;
64 }
65 std::string identifier = GetIdentifier(property);
66 manager->EnterGetEngineProcess(identifier);
67 auto storageEngine = manager->FindStorageEngine(identifier);
68 if (storageEngine == nullptr) {
69 storageEngine = manager->CreateStorageEngine(property, errCode);
70 if (errCode == E_OK) {
71 manager->InsertStorageEngine(identifier, storageEngine);
72 }
73 } else {
74 errCode = storageEngine->CheckEngineOption(property);
75 if (errCode != E_OK) {
76 LOGE("kvdb property mismatch engine option! errCode = [%d]", errCode);
77 storageEngine = nullptr;
78 }
79 }
80
81 manager->ExitGetEngineProcess(identifier);
82 return storageEngine;
83 }
84
ReleaseStorageEngine(StorageEngine * storageEngine)85 int StorageEngineManager::ReleaseStorageEngine(StorageEngine *storageEngine)
86 {
87 if (storageEngine == nullptr) {
88 LOGE("[StorageEngineManager] The engine to be released is nullptr");
89 return -E_INVALID_ARGS;
90 }
91
92 // Clear commit notify callback function.
93 storageEngine->SetNotifiedCallback(nullptr);
94
95 // If the cacheDB is valid, the storageEngine is not released to prevent the cache mechanism failed
96 bool isRelease = storageEngine->IsNeedTobeReleased();
97 if (!isRelease) {
98 LOGW("[StorageEngineManager] storageEngine do not need to be released.");
99 return E_OK;
100 }
101
102 StorageEngineManager *manager = GetInstance();
103 if (manager == nullptr) {
104 LOGE("[StorageEngineManager] Release GetInstance failed");
105 return -E_OUT_OF_MEMORY;
106 }
107
108 LOGD("[StorageEngineManager] storageEngine to be released.");
109 return manager->ReleaseEngine(storageEngine);
110 }
111
ForceReleaseStorageEngine(const std::string & identifier)112 int StorageEngineManager::ForceReleaseStorageEngine(const std::string &identifier)
113 {
114 StorageEngineManager *manager = GetInstance();
115 if (manager == nullptr) {
116 LOGE("[StorageEngineManager] Force release GetInstance failed");
117 return -E_OUT_OF_MEMORY;
118 }
119
120 LOGD("[StorageEngineManager] Force release engine.");
121 manager->ReleaseResources(identifier);
122 return E_OK;
123 }
124
ExecuteMigration(StorageEngine * storageEngine)125 int StorageEngineManager::ExecuteMigration(StorageEngine *storageEngine)
126 {
127 if (storageEngine == nullptr) {
128 LOGE("storage engine is nullptr can not execute migration!");
129 return -E_INVALID_ARGS;
130 }
131 if (storageEngine->IsExistConnection()) {
132 return storageEngine->ExecuteMigrate();
133 }
134 LOGI("connection is not existed, not need execute migration!");
135 return -E_INVALID_DB;
136 }
137
GetInstance()138 StorageEngineManager *StorageEngineManager::GetInstance()
139 {
140 // For Double-Checked Locking, we need check instance_ twice
141 if (instance_ == nullptr) {
142 std::lock_guard<std::mutex> lockGuard(instanceLock_);
143 if (instance_ == nullptr) {
144 instance_ = new (std::nothrow) StorageEngineManager();
145 if (instance_ == nullptr) {
146 LOGE("[StorageEngineManager] Failed to alloc the engine manager!");
147 return nullptr;
148 }
149 }
150 }
151
152 if (!isRegLockStatusListener_) {
153 std::lock_guard<std::mutex> mgrLock(storageEnginesLock_);
154 if (!isRegLockStatusListener_) {
155 int errCode = (instance_.load())->RegisterLockStatusListener();
156 if (errCode == E_OK) {
157 isRegLockStatusListener_ = true;
158 LOGW("[StorageEngineManager] Register lock status listener.");
159 }
160 }
161 }
162 return instance_;
163 }
164
RegisterLockStatusListener()165 int StorageEngineManager::RegisterLockStatusListener()
166 {
167 int errCode = E_OK;
168 lockStatusListener_ = RuntimeContext::GetInstance()->RegisterLockStatusLister(
169 [this](void *lockStatus) {
170 if (lockStatus == nullptr) {
171 return;
172 }
173 bool isLocked = *static_cast<bool *>(lockStatus);
174 LOGD("[StorageEngineManager] Lock status to %d", isLocked);
175 if (isLocked) {
176 return;
177 }
178 int taskErrCode = RuntimeContext::GetInstance()->ScheduleTask(
179 [this, isLocked] { LockStatusNotifier(isLocked); });
180 if (taskErrCode != E_OK) {
181 LOGE("[StorageEngineManager] LockStatusNotifier ScheduleTask failed : %d", taskErrCode);
182 }
183 }, errCode);
184 return errCode;
185 }
186
LockStatusNotifier(bool isAccessControlled)187 void StorageEngineManager::LockStatusNotifier(bool isAccessControlled)
188 {
189 (void)isAccessControlled;
190 std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
191 StorageEngine *storageEngine = nullptr;
192 for (const auto &item : storageEngines_) {
193 storageEngine = item.second;
194 LOGD("Begin to migrate for lock status change");
195 (void)ExecuteMigration(storageEngine);
196 }
197 }
198
CreateSingleVerStorageEngine(int databaseType)199 StorageEngine *CreateSingleVerStorageEngine(int databaseType)
200 {
201 if (databaseType == KvDBProperties::SINGLE_VER_TYPE_SQLITE) {
202 return new (std::nothrow) SQLiteSingleVerStorageEngine();
203 } else {
204 return new (std::nothrow) RdSingleVerStorageEngine();
205 }
206 return nullptr;
207 }
208
CreateStorageEngine(const KvDBProperties & property,int & errCode)209 StorageEngine *StorageEngineManager::CreateStorageEngine(const KvDBProperties &property, int &errCode)
210 {
211 int databaseType = GetDatabaseType(property);
212 if (!IsSingleVerType(databaseType)) {
213 LOGE("[StorageEngineManager] Database type error : %d", databaseType);
214 errCode = -E_NOT_SUPPORT;
215 return nullptr;
216 }
217
218 auto storageEngine = CreateSingleVerStorageEngine(databaseType);
219 if (storageEngine == nullptr) {
220 LOGE("[StorageEngineManager] Create storage engine failed");
221 errCode = -E_OUT_OF_MEMORY;
222 return nullptr;
223 }
224 errCode = E_OK;
225 return storageEngine;
226 }
227
FindStorageEngine(const std::string & identifier)228 StorageEngine *StorageEngineManager::FindStorageEngine(const std::string &identifier)
229 {
230 std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
231 auto iter = storageEngines_.find(identifier);
232 if (iter != storageEngines_.end()) {
233 auto storageEngine = iter->second;
234 if (storageEngine == nullptr) {
235 LOGE("[StorageEngineManager] storageEngine in cache is nullptr");
236 storageEngines_.erase(identifier);
237 return nullptr;
238 }
239
240 return storageEngine;
241 }
242
243 return nullptr;
244 }
245
InsertStorageEngine(const std::string & identifier,StorageEngine * & storageEngine)246 void StorageEngineManager::InsertStorageEngine(const std::string &identifier, StorageEngine *&storageEngine)
247 {
248 std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
249 storageEngines_.insert(std::pair<std::string, StorageEngine *>(identifier, storageEngine));
250 }
251
EraseStorageEngine(const std::string & identifier)252 void StorageEngineManager::EraseStorageEngine(const std::string &identifier)
253 {
254 std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
255 storageEngines_.erase(identifier);
256 }
257
ReleaseResources(const std::string & identifier)258 void StorageEngineManager::ReleaseResources(const std::string &identifier)
259 {
260 StorageEngine *storageEngine = nullptr;
261
262 {
263 std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
264 auto iter = storageEngines_.find(identifier);
265 if (iter != storageEngines_.end()) {
266 storageEngine = iter->second;
267 storageEngines_.erase(identifier);
268 }
269 }
270
271 if (storageEngine != nullptr) {
272 LOGI("[StorageEngineManager] Release storage engine");
273 RefObject::KillAndDecObjRef(storageEngine);
274 }
275 }
276
ReleaseEngine(StorageEngine * releaseEngine)277 int StorageEngineManager::ReleaseEngine(StorageEngine *releaseEngine)
278 {
279 const std::string identifier = releaseEngine->GetIdentifier();
280 StorageEngine *cacheEngine = nullptr;
281
282 {
283 std::lock_guard<std::mutex> lockGuard(storageEnginesLock_);
284 auto iter = storageEngines_.find(identifier);
285 if (iter != storageEngines_.end()) {
286 cacheEngine = iter->second;
287 storageEngines_.erase(identifier);
288 }
289 }
290
291 if (cacheEngine == nullptr) {
292 LOGE("[StorageEngineManager] cache engine is null");
293 return -E_ALREADY_RELEASE;
294 }
295 if (cacheEngine != releaseEngine) {
296 LOGE("[StorageEngineManager] cache engine is not equal the input engine");
297 return -E_INVALID_ARGS;
298 }
299
300 RefObject::KillAndDecObjRef(releaseEngine);
301 return E_OK;
302 }
303
EnterGetEngineProcess(const std::string & identifier)304 void StorageEngineManager::EnterGetEngineProcess(const std::string &identifier)
305 {
306 std::unique_lock<std::mutex> lock(getEngineMutex_);
307 getEngineCondition_.wait(lock, [this, &identifier]() {
308 return this->getEngineSet_.count(identifier) == 0;
309 });
310 (void)getEngineSet_.insert(identifier);
311 }
312
ExitGetEngineProcess(const std::string & identifier)313 void StorageEngineManager::ExitGetEngineProcess(const std::string &identifier)
314 {
315 std::unique_lock<std::mutex> lock(getEngineMutex_);
316 (void)getEngineSet_.erase(identifier);
317 getEngineCondition_.notify_all();
318 }
319 } // namespace DistributedDB
320