1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef STORAGE_ENGINE_H 17 #define STORAGE_ENGINE_H 18 19 #include <condition_variable> 20 #include <list> 21 #include <mutex> 22 #include <shared_mutex> 23 24 #include "db_types.h" 25 #include "macro_utils.h" 26 #include "sqlite_utils.h" 27 #include "storage_executor.h" 28 #include "kvdb_commit_notify_filterable_data.h" 29 30 namespace DistributedDB { 31 struct StorageEngineAttr { 32 uint32_t minWriteNum = 1; 33 uint32_t maxWriteNum = 1; 34 uint32_t minReadNum = 1; 35 uint32_t maxReadNum = 1; 36 }; 37 38 class StorageEngine : public RefObject { 39 public: 40 StorageEngine(); 41 ~StorageEngine() override; 42 43 // Delete the copy and assign constructors 44 DISABLE_COPY_ASSIGN_MOVE(StorageEngine); 45 46 int Init(); 47 48 virtual int ReInit(); 49 50 StorageExecutor *FindExecutor(bool writable, OperatePerm perm, int &errCode, bool isExternal = false, 51 int waitTime = MAX_WAIT_TIME); 52 53 void Recycle(StorageExecutor *&handle, bool isExternal = false); 54 55 virtual bool IsEngineCorrupted() const; 56 57 void Release(); 58 59 int TryToDisable(bool isNeedCheckAll, OperatePerm disableType = OperatePerm::DISABLE_PERM); 60 61 void Enable(OperatePerm enableType = OperatePerm::NORMAL_PERM); 62 63 void Abort(OperatePerm enableType = OperatePerm::NORMAL_PERM); 64 65 virtual bool IsNeedTobeReleased() const; 66 67 virtual const std::string &GetIdentifier() const; 68 69 EngineState GetEngineState() const; 70 71 void SetEngineState(EngineState state); 72 73 virtual int ExecuteMigrate(); 74 75 virtual void SetNotifiedCallback(const std::function<void(int, KvDBCommitNotifyFilterAbleData *)> &callback); 76 77 void SetConnectionFlag(bool isExisted); 78 79 bool IsExistConnection() const; 80 81 virtual int CheckEngineOption(const KvDBProperties &kvdbOption) const; 82 83 virtual bool IsMigrating() const; 84 85 void WaitWriteHandleIdle(); 86 87 virtual void IncreaseCacheRecordVersion(); 88 virtual uint64_t GetCacheRecordVersion() const; 89 virtual uint64_t GetAndIncreaseCacheRecordVersion(); 90 91 virtual void SetSchemaChangedCallback(const std::function<int(void)> &callback); 92 93 void CloseAllExecutor(); 94 95 int InitAllReadWriteExecutor(); 96 97 OpenDbProperties GetOption(); 98 99 protected: 100 virtual int CreateNewExecutor(bool isWrite, StorageExecutor *&handle) = 0; 101 102 void CloseExecutor(); 103 104 virtual void AddStorageExecutor(StorageExecutor *handle, bool isExternal); 105 106 static bool CheckEngineAttr(const StorageEngineAttr &poolSize); 107 108 int InitReadWriteExecutors(); 109 110 OpenDbProperties option_; 111 112 StorageEngineAttr engineAttr_; 113 bool isUpdated_; 114 std::atomic<bool> isMigrating_; 115 std::string identifier_; 116 std::string hashIdentifier_; 117 118 // Mutex for commitNotifyFunc_. 119 mutable std::shared_mutex notifyMutex_; 120 121 // Callback function for commit notify. 122 std::function<void(int, KvDBCommitNotifyFilterAbleData *)> commitNotifyFunc_; 123 124 // Mutex for schemaChangedFunc_. 125 mutable std::shared_mutex schemaChangedMutex_; 126 127 // Callback function for schema changed. 128 std::function<int(void)> schemaChangedFunc_; 129 130 bool isSchemaChanged_; 131 132 private: 133 StorageExecutor *FetchStorageExecutor(bool isWrite, std::list<StorageExecutor *> &idleList, 134 std::list<StorageExecutor *> &usingList, int &errCode, bool isExternal = false); 135 136 StorageExecutor *FindWriteExecutor(OperatePerm perm, int &errCode, int waitTime, bool isExternal = false); 137 StorageExecutor *FindReadExecutor(OperatePerm perm, int &errCode, int waitTime, bool isExternal = false); 138 139 virtual void ClearCorruptedFlag(); 140 141 static const int MAX_WAIT_TIME; 142 static const int MAX_WRITE_SIZE; 143 static const int MAX_READ_SIZE; 144 145 std::mutex initMutex_; 146 std::condition_variable initCondition_; 147 std::atomic<bool> isInitialized_; 148 OperatePerm perm_; 149 bool operateAbort_; 150 151 std::mutex readMutex_; 152 std::mutex writeMutex_; 153 std::condition_variable writeCondition_; 154 std::condition_variable readCondition_; 155 std::list<StorageExecutor *> writeUsingList_; 156 std::list<StorageExecutor *> writeIdleList_; 157 std::list<StorageExecutor *> readUsingList_; 158 std::list<StorageExecutor *> readIdleList_; 159 std::list<StorageExecutor *> externalWriteUsingList_; 160 std::list<StorageExecutor *> externalWriteIdleList_; 161 std::list<StorageExecutor *> externalReadUsingList_; 162 std::list<StorageExecutor *> externalReadIdleList_; 163 std::atomic<bool> isExistConnection_; 164 165 std::mutex idleMutex_; 166 std::condition_variable idleCondition_; 167 168 EngineState engineState_; 169 }; 170 } // namespace DistributedDB 171 #endif // STORAGE_ENGINE_H 172