1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define LOG_TAG "DBAdaptor"
17 #include "db_delegate.h"
18
19 #include "kv_delegate.h"
20 #include "log_print.h"
21 #include "rdb_delegate.h"
22 namespace OHOS::DataShare {
23 ExecutorPool::TaskId DBDelegate::taskId_ = ExecutorPool::INVALID_TASK_ID;
24 ConcurrentMap<uint32_t, std::map<std::string, std::shared_ptr<DBDelegate::Entity>>> DBDelegate::stores_ = {};
25 std::shared_ptr<ExecutorPool> DBDelegate::executor_ = nullptr;
Create(DistributedData::StoreMetaData & metaData,const std::string & extUri,const std::string & backup)26 std::shared_ptr<DBDelegate> DBDelegate::Create(DistributedData::StoreMetaData &metaData,
27 const std::string &extUri, const std::string &backup)
28 {
29 if (metaData.tokenId == 0) {
30 return std::make_shared<RdbDelegate>(metaData, NO_CHANGE_VERSION, true, extUri, backup);
31 }
32 std::shared_ptr<DBDelegate> store;
33 stores_.Compute(metaData.tokenId,
34 [&metaData, &store, extUri, &backup](auto &, std::map<std::string, std::shared_ptr<Entity>> &stores) -> bool {
35 auto it = stores.find(metaData.storeId);
36 if (it != stores.end()) {
37 store = it->second->store_;
38 it->second->time_ = std::chrono::steady_clock::now() + std::chrono::seconds(INTERVAL);
39 return !stores.empty();
40 }
41 store = std::make_shared<RdbDelegate>(metaData, NO_CHANGE_VERSION, true, extUri, backup);
42 if (store->IsInvalid()) {
43 store = nullptr;
44 ZLOGE("creator failed, storeName: %{public}s", metaData.GetStoreAlias().c_str());
45 return false;
46 }
47 auto entity = std::make_shared<Entity>(store);
48 stores.emplace(metaData.storeId, entity);
49 StartTimer();
50 return !stores.empty();
51 });
52 return store;
53 }
54
SetExecutorPool(std::shared_ptr<ExecutorPool> executor)55 void DBDelegate::SetExecutorPool(std::shared_ptr<ExecutorPool> executor)
56 {
57 executor_ = std::move(executor);
58 }
59
GarbageCollect()60 void DBDelegate::GarbageCollect()
61 {
62 stores_.EraseIf([](auto &, std::map<std::string, std::shared_ptr<Entity>> &stores) {
63 auto current = std::chrono::steady_clock::now();
64 for (auto it = stores.begin(); it != stores.end();) {
65 // if the store is BUSY we wait more INTERVAL minutes again
66 if (it->second->time_ < current) {
67 it = stores.erase(it);
68 } else {
69 ++it;
70 }
71 }
72 return stores.empty();
73 });
74 }
75
StartTimer()76 void DBDelegate::StartTimer()
77 {
78 if (executor_ == nullptr || taskId_ != Executor::INVALID_TASK_ID) {
79 return;
80 }
81 taskId_ = executor_->Schedule(
82 []() {
83 GarbageCollect();
84 stores_.DoActionIfEmpty([]() {
85 if (executor_ == nullptr || taskId_ == Executor::INVALID_TASK_ID) {
86 return;
87 }
88 executor_->Remove(taskId_);
89 ZLOGD("remove timer, taskId: %{public}" PRIu64, taskId_);
90 taskId_ = Executor::INVALID_TASK_ID;
91 });
92 },
93 std::chrono::seconds(INTERVAL), std::chrono::seconds(INTERVAL));
94 ZLOGD("start timer, taskId: %{public}" PRIu64, taskId_);
95 }
96
Entity(std::shared_ptr<DBDelegate> store)97 DBDelegate::Entity::Entity(std::shared_ptr<DBDelegate> store)
98 {
99 store_ = std::move(store);
100 time_ = std::chrono::steady_clock::now() + std::chrono::seconds(INTERVAL);
101 }
102
EraseStoreCache(const int32_t tokenId)103 void DBDelegate::EraseStoreCache(const int32_t tokenId)
104 {
105 stores_.Erase(tokenId);
106 }
107
GetInstance(bool reInit,const std::string & dir,const std::shared_ptr<ExecutorPool> & executors)108 std::shared_ptr<KvDBDelegate> KvDBDelegate::GetInstance(
109 bool reInit, const std::string &dir, const std::shared_ptr<ExecutorPool> &executors)
110 {
111 static std::shared_ptr<KvDBDelegate> delegate = nullptr;
112 static std::mutex mutex;
113 std::lock_guard<decltype(mutex)> lock(mutex);
114 if ((delegate == nullptr || reInit) && executors != nullptr) {
115 delegate = std::make_shared<KvDelegate>(dir, executors);
116 }
117 return delegate;
118 }
119
Marshal(DistributedData::Serializable::json & node) const120 bool Id::Marshal(DistributedData::Serializable::json &node) const
121 {
122 auto ret = false;
123 if (userId == INVALID_USER) {
124 ret = SetValue(node[GET_NAME(_id)], _id);
125 } else {
126 ret = SetValue(node[GET_NAME(_id)], _id + "_" + std::to_string(userId));
127 }
128 return ret;
129 }
130
Unmarshal(const DistributedData::Serializable::json & node)131 bool Id::Unmarshal(const DistributedData::Serializable::json &node)
132 {
133 return false;
134 }
135
Id(const std::string & id,const int32_t userId)136 Id::Id(const std::string &id, const int32_t userId) : _id(id), userId(userId) {}
137
VersionData(int version)138 VersionData::VersionData(int version) : version(version) {}
139
Unmarshal(const DistributedData::Serializable::json & node)140 bool VersionData::Unmarshal(const DistributedData::Serializable::json &node)
141 {
142 return GetValue(node, GET_NAME(version), version);
143 }
144
Marshal(DistributedData::Serializable::json & node) const145 bool VersionData::Marshal(DistributedData::Serializable::json &node) const
146 {
147 return SetValue(node[GET_NAME(version)], version);
148 }
149
GetId() const150 const std::string &KvData::GetId() const
151 {
152 return id;
153 }
154
KvData(const Id & id)155 KvData::KvData(const Id &id) : id(DistributedData::Serializable::Marshall(id)) {}
156 } // namespace OHOS::DataShare