1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "kv_adapter.h"
16
17 #include <cinttypes>
18 #include <mutex>
19 #include <unistd.h>
20
21 #include "cJSON.h"
22 #include "datetime_ex.h"
23 #include "string_ex.h"
24
25 #include "dm_anonymous.h"
26 #include "dm_constants.h"
27 #include "dm_log.h"
28 #include "ffrt.h"
29
30 namespace OHOS {
31 namespace DistributedHardware {
32 using namespace OHOS::DistributedKv;
33 namespace {
34 const std::string APP_ID = "distributed_device_manager_service";
35 const std::string STORE_ID = "dm_kv_store";
36 const std::string DATABASE_DIR = "/data/service/el1/public/database/distributed_device_manager_service";
37 const std::string KV_REINIT_THREAD = "reinit_kv_store";
38 constexpr uint32_t MAX_BATCH_SIZE = 128;
39 constexpr int32_t MAX_STRING_LEN = 4096;
40 constexpr int32_t MAX_INIT_RETRY_TIMES = 20;
41 constexpr int32_t INIT_RETRY_SLEEP_INTERVAL = 200 * 1000; // 200ms
42 }
43
Init()44 int32_t KVAdapter::Init()
45 {
46 LOGI("Init local DB, dataType: %{public}d", static_cast<int32_t>(dataType_));
47 if (isInited_.load()) {
48 LOGI("Local DB already inited.");
49 return DM_OK;
50 }
51 this->appId_.appId = APP_ID;
52 this->storeId_.storeId = STORE_ID;
53 std::lock_guard<std::mutex> lock(kvAdapterMutex_);
54 int32_t tryTimes = MAX_INIT_RETRY_TIMES;
55 while (tryTimes > 0) {
56 DistributedKv::Status status = GetLocalKvStorePtr();
57 if (status == DistributedKv::Status::SUCCESS && kvStorePtr_) {
58 LOGI("Init KvStorePtr Success");
59 RegisterKvStoreDeathListener();
60 isInited_.store(true);
61 return DM_OK;
62 }
63 LOGE("CheckKvStore, left times: %{public}d, status: %{public}d", tryTimes, status);
64 if (status == DistributedKv::Status::STORE_META_CHANGED ||
65 status == DistributedKv::Status::SECURITY_LEVEL_ERROR ||
66 status == DistributedKv::Status::CRYPT_ERROR) {
67 LOGE("init db error, remove and rebuild it");
68 DeleteKvStore();
69 }
70 usleep(INIT_RETRY_SLEEP_INTERVAL);
71 tryTimes--;
72 }
73 CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_INIT_FAILED);
74 isInited_.store(true);
75 return DM_OK;
76 }
77
UnInit()78 void KVAdapter::UnInit()
79 {
80 LOGI("KVAdapter Uninted");
81 if (isInited_.load()) {
82 std::lock_guard<std::mutex> lock(kvAdapterMutex_);
83 CHECK_NULL_VOID(kvStorePtr_);
84 UnregisterKvStoreDeathListener();
85 kvStorePtr_.reset();
86 isInited_.store(false);
87 }
88 }
89
ReInit()90 int32_t KVAdapter::ReInit()
91 {
92 LOGI("KVAdapter ReInit");
93 UnInit();
94 return Init();
95 }
96
Put(const std::string & key,const std::string & value)97 int32_t KVAdapter::Put(const std::string &key, const std::string &value)
98 {
99 if (key.empty() || key.size() > MAX_STRING_LEN || value.empty() || value.size() > MAX_STRING_LEN) {
100 LOGE("Param is invalid!");
101 return ERR_DM_FAILED;
102 }
103 DistributedKv::Status status;
104 {
105 std::lock_guard<std::mutex> lock(kvAdapterMutex_);
106 CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_POINT_NULL);
107
108 DistributedKv::Key kvKey(key);
109 DistributedKv::Value kvValue(value);
110 status = kvStorePtr_->Put(kvKey, kvValue);
111 }
112 if (status != DistributedKv::Status::SUCCESS) {
113 LOGE("Put kv to db failed, ret: %{public}d", status);
114 return ERR_DM_FAILED;
115 }
116 return DM_OK;
117 }
118
Get(const std::string & key,std::string & value)119 int32_t KVAdapter::Get(const std::string &key, std::string &value)
120 {
121 LOGI("Get data by key: %{public}s", GetAnonyString(key).c_str());
122 DistributedKv::Key kvKey(key);
123 DistributedKv::Value kvValue;
124 DistributedKv::Status status;
125 {
126 std::lock_guard<std::mutex> lock(kvAdapterMutex_);
127 CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_POINT_NULL);
128 status = kvStorePtr_->Get(kvKey, kvValue);
129 }
130 if (status != DistributedKv::Status::SUCCESS) {
131 LOGE("Get data from kv failed, key: %{public}s", GetAnonyString(key).c_str());
132 return ERR_DM_FAILED;
133 }
134 value = kvValue.ToString();
135 return DM_OK;
136 }
137
OnRemoteDied()138 void KVAdapter::OnRemoteDied()
139 {
140 LOGI("OnRemoteDied, recover db begin");
141 auto reInitTask = [this]() {
142 LOGI("ReInit, storeId:%{public}s", storeId_.storeId.c_str());
143 ReInit();
144 };
145 ffrt::submit(reInitTask);
146 }
147
GetLocalKvStorePtr()148 DistributedKv::Status KVAdapter::GetLocalKvStorePtr()
149 {
150 DistributedKv::Options options = {
151 .createIfMissing = true,
152 .encrypt = false,
153 .autoSync = false,
154 .securityLevel = DistributedKv::SecurityLevel::S1,
155 .area = DistributedKv::EL1,
156 .kvStoreType = DistributedKv::KvStoreType::SINGLE_VERSION,
157 .baseDir = DATABASE_DIR
158 };
159 DistributedKv::Status status = kvDataMgr_.GetSingleKvStore(options, appId_, storeId_, kvStorePtr_);
160 return status;
161 }
162
RegisterKvStoreDeathListener()163 void KVAdapter::RegisterKvStoreDeathListener()
164 {
165 LOGI("Register syncCompleted listener");
166 kvDataMgr_.RegisterKvStoreServiceDeathRecipient(shared_from_this());
167 }
168
UnregisterKvStoreDeathListener()169 void KVAdapter::UnregisterKvStoreDeathListener()
170 {
171 LOGI("UnRegister death listener");
172 kvDataMgr_.UnRegisterKvStoreServiceDeathRecipient(shared_from_this());
173 }
174
DeleteKvStore()175 int32_t KVAdapter::DeleteKvStore()
176 {
177 LOGI("Delete KvStore!");
178 kvDataMgr_.CloseKvStore(appId_, storeId_);
179 kvDataMgr_.DeleteKvStore(appId_, storeId_, DATABASE_DIR);
180 return DM_OK;
181 }
182
DeleteByAppId(const std::string & appId,const std::string & prefix)183 int32_t KVAdapter::DeleteByAppId(const std::string &appId, const std::string &prefix)
184 {
185 if (appId.empty()) {
186 LOGE("appId is empty");
187 return ERR_DM_FAILED;
188 }
189 std::vector<DistributedKv::Entry> localEntries;
190 {
191 std::lock_guard<std::mutex> lock(kvAdapterMutex_);
192 if (kvStorePtr_ == nullptr) {
193 LOGE("kvStoragePtr_ is null");
194 return ERR_DM_POINT_NULL;
195 }
196 if (kvStorePtr_->GetEntries(prefix + appId, localEntries) != DistributedKv::Status::SUCCESS) {
197 LOGE("Get entrys from DB failed.");
198 return ERR_DM_FAILED;
199 }
200 }
201 std::vector<std::string> delKeys;
202 for (const auto &entry : localEntries) {
203 delKeys.emplace_back(entry.key.ToString());
204 DmKVValue kvValue;
205 ConvertJsonToDmKVValue(entry.value.ToString(), kvValue);
206 delKeys.emplace_back(prefix + kvValue.anoyDeviceId);
207 }
208 return DeleteBatch(delKeys);
209 }
210
DeleteBatch(const std::vector<std::string> & keys)211 int32_t KVAdapter::DeleteBatch(const std::vector<std::string> &keys)
212 {
213 if (keys.empty()) {
214 LOGE("keys size(%{public}zu) is invalid!", keys.size());
215 return ERR_DM_FAILED;
216 }
217 uint32_t keysSize = static_cast<uint32_t>(keys.size());
218 std::vector<std::vector<DistributedKv::Key>> delKeyBatches;
219 for (uint32_t i = 0; i < keysSize; i += MAX_BATCH_SIZE) {
220 uint32_t end = (i + MAX_BATCH_SIZE) > keysSize ? keysSize : (i + MAX_BATCH_SIZE);
221 auto batch = std::vector<std::string>(keys.begin() + i, keys.begin() + end);
222 std::vector<DistributedKv::Key> delKeys;
223 for (auto item : batch) {
224 DistributedKv::Key key(item);
225 delKeys.emplace_back(key);
226 }
227 delKeyBatches.emplace_back(delKeys);
228 }
229
230 {
231 std::lock_guard<std::mutex> lock(kvAdapterMutex_);
232 if (kvStorePtr_ == nullptr) {
233 LOGE("kvStorePtr is nullptr!");
234 return ERR_DM_POINT_NULL;
235 }
236 for (auto delKeys : delKeyBatches) {
237 DistributedKv::Status status = kvStorePtr_->DeleteBatch(delKeys);
238 if (status != DistributedKv::Status::SUCCESS) {
239 LOGE("DeleteBatch failed!");
240 return ERR_DM_FAILED;
241 }
242 }
243 }
244 return DM_OK;
245 }
246 } // namespace DistributedHardware
247 } // namespace OHOS
248