1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "kv_adapter.h"
16 
17 #include <cinttypes>
18 #include <mutex>
19 #include <unistd.h>
20 
21 #include "cJSON.h"
22 #include "datetime_ex.h"
23 #include "string_ex.h"
24 
25 #include "dm_anonymous.h"
26 #include "dm_constants.h"
27 #include "dm_log.h"
28 #include "ffrt.h"
29 
30 namespace OHOS {
31 namespace DistributedHardware {
32 using namespace OHOS::DistributedKv;
33 namespace {
34     const std::string APP_ID = "distributed_device_manager_service";
35     const std::string STORE_ID = "dm_kv_store";
36     const std::string DATABASE_DIR = "/data/service/el1/public/database/distributed_device_manager_service";
37     const std::string KV_REINIT_THREAD = "reinit_kv_store";
38     constexpr uint32_t MAX_BATCH_SIZE = 128;
39     constexpr int32_t MAX_STRING_LEN = 4096;
40     constexpr int32_t MAX_INIT_RETRY_TIMES = 20;
41     constexpr int32_t INIT_RETRY_SLEEP_INTERVAL = 200 * 1000; // 200ms
42 }
43 
Init()44 int32_t KVAdapter::Init()
45 {
46     LOGI("Init local DB, dataType: %{public}d", static_cast<int32_t>(dataType_));
47     if (isInited_.load()) {
48         LOGI("Local DB already inited.");
49         return DM_OK;
50     }
51     this->appId_.appId = APP_ID;
52     this->storeId_.storeId = STORE_ID;
53     std::lock_guard<std::mutex> lock(kvAdapterMutex_);
54     int32_t tryTimes = MAX_INIT_RETRY_TIMES;
55     while (tryTimes > 0) {
56         DistributedKv::Status status = GetLocalKvStorePtr();
57         if (status == DistributedKv::Status::SUCCESS && kvStorePtr_) {
58             LOGI("Init KvStorePtr Success");
59             RegisterKvStoreDeathListener();
60             isInited_.store(true);
61             return DM_OK;
62         }
63         LOGE("CheckKvStore, left times: %{public}d, status: %{public}d", tryTimes, status);
64         if (status == DistributedKv::Status::STORE_META_CHANGED ||
65             status == DistributedKv::Status::SECURITY_LEVEL_ERROR ||
66             status == DistributedKv::Status::CRYPT_ERROR) {
67             LOGE("init db error, remove and rebuild it");
68             DeleteKvStore();
69         }
70         usleep(INIT_RETRY_SLEEP_INTERVAL);
71         tryTimes--;
72     }
73     CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_INIT_FAILED);
74     isInited_.store(true);
75     return DM_OK;
76 }
77 
UnInit()78 void KVAdapter::UnInit()
79 {
80     LOGI("KVAdapter Uninted");
81     if (isInited_.load()) {
82         std::lock_guard<std::mutex> lock(kvAdapterMutex_);
83         CHECK_NULL_VOID(kvStorePtr_);
84         UnregisterKvStoreDeathListener();
85         kvStorePtr_.reset();
86         isInited_.store(false);
87     }
88 }
89 
ReInit()90 int32_t KVAdapter::ReInit()
91 {
92     LOGI("KVAdapter ReInit");
93     UnInit();
94     return Init();
95 }
96 
Put(const std::string & key,const std::string & value)97 int32_t KVAdapter::Put(const std::string &key, const std::string &value)
98 {
99     if (key.empty() || key.size() > MAX_STRING_LEN || value.empty() || value.size() > MAX_STRING_LEN) {
100         LOGE("Param is invalid!");
101         return ERR_DM_FAILED;
102     }
103     DistributedKv::Status status;
104     {
105         std::lock_guard<std::mutex> lock(kvAdapterMutex_);
106         CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_POINT_NULL);
107 
108         DistributedKv::Key kvKey(key);
109         DistributedKv::Value kvValue(value);
110         status = kvStorePtr_->Put(kvKey, kvValue);
111     }
112     if (status != DistributedKv::Status::SUCCESS) {
113         LOGE("Put kv to db failed, ret: %{public}d", status);
114         return ERR_DM_FAILED;
115     }
116     return DM_OK;
117 }
118 
Get(const std::string & key,std::string & value)119 int32_t KVAdapter::Get(const std::string &key, std::string &value)
120 {
121     LOGI("Get data by key: %{public}s", GetAnonyString(key).c_str());
122     DistributedKv::Key kvKey(key);
123     DistributedKv::Value kvValue;
124     DistributedKv::Status status;
125     {
126         std::lock_guard<std::mutex> lock(kvAdapterMutex_);
127         CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_POINT_NULL);
128         status = kvStorePtr_->Get(kvKey, kvValue);
129     }
130     if (status != DistributedKv::Status::SUCCESS) {
131         LOGE("Get data from kv failed, key: %{public}s", GetAnonyString(key).c_str());
132         return ERR_DM_FAILED;
133     }
134     value = kvValue.ToString();
135     return DM_OK;
136 }
137 
OnRemoteDied()138 void KVAdapter::OnRemoteDied()
139 {
140     LOGI("OnRemoteDied, recover db begin");
141     auto reInitTask = [this]() {
142         LOGI("ReInit, storeId:%{public}s", storeId_.storeId.c_str());
143         ReInit();
144     };
145     ffrt::submit(reInitTask);
146 }
147 
GetLocalKvStorePtr()148 DistributedKv::Status KVAdapter::GetLocalKvStorePtr()
149 {
150     DistributedKv::Options options = {
151         .createIfMissing = true,
152         .encrypt = false,
153         .autoSync = false,
154         .securityLevel = DistributedKv::SecurityLevel::S1,
155         .area = DistributedKv::EL1,
156         .kvStoreType = DistributedKv::KvStoreType::SINGLE_VERSION,
157         .baseDir = DATABASE_DIR
158     };
159     DistributedKv::Status status = kvDataMgr_.GetSingleKvStore(options, appId_, storeId_, kvStorePtr_);
160     return status;
161 }
162 
RegisterKvStoreDeathListener()163 void KVAdapter::RegisterKvStoreDeathListener()
164 {
165     LOGI("Register syncCompleted listener");
166     kvDataMgr_.RegisterKvStoreServiceDeathRecipient(shared_from_this());
167 }
168 
UnregisterKvStoreDeathListener()169 void KVAdapter::UnregisterKvStoreDeathListener()
170 {
171     LOGI("UnRegister death listener");
172     kvDataMgr_.UnRegisterKvStoreServiceDeathRecipient(shared_from_this());
173 }
174 
DeleteKvStore()175 int32_t KVAdapter::DeleteKvStore()
176 {
177     LOGI("Delete KvStore!");
178     kvDataMgr_.CloseKvStore(appId_, storeId_);
179     kvDataMgr_.DeleteKvStore(appId_, storeId_, DATABASE_DIR);
180     return DM_OK;
181 }
182 
DeleteByAppId(const std::string & appId,const std::string & prefix)183 int32_t KVAdapter::DeleteByAppId(const std::string &appId, const std::string &prefix)
184 {
185     if (appId.empty()) {
186         LOGE("appId is empty");
187         return ERR_DM_FAILED;
188     }
189     std::vector<DistributedKv::Entry> localEntries;
190     {
191         std::lock_guard<std::mutex> lock(kvAdapterMutex_);
192         if (kvStorePtr_ == nullptr) {
193             LOGE("kvStoragePtr_ is null");
194             return ERR_DM_POINT_NULL;
195         }
196         if (kvStorePtr_->GetEntries(prefix + appId, localEntries) != DistributedKv::Status::SUCCESS) {
197             LOGE("Get entrys from DB failed.");
198             return ERR_DM_FAILED;
199         }
200     }
201     std::vector<std::string> delKeys;
202     for (const auto &entry : localEntries) {
203         delKeys.emplace_back(entry.key.ToString());
204         DmKVValue kvValue;
205         ConvertJsonToDmKVValue(entry.value.ToString(), kvValue);
206         delKeys.emplace_back(prefix + kvValue.anoyDeviceId);
207     }
208     return DeleteBatch(delKeys);
209 }
210 
DeleteBatch(const std::vector<std::string> & keys)211 int32_t KVAdapter::DeleteBatch(const std::vector<std::string> &keys)
212 {
213     if (keys.empty()) {
214         LOGE("keys size(%{public}zu) is invalid!", keys.size());
215         return ERR_DM_FAILED;
216     }
217     uint32_t keysSize = static_cast<uint32_t>(keys.size());
218     std::vector<std::vector<DistributedKv::Key>> delKeyBatches;
219     for (uint32_t i = 0; i < keysSize; i += MAX_BATCH_SIZE) {
220         uint32_t end = (i + MAX_BATCH_SIZE) > keysSize ? keysSize : (i + MAX_BATCH_SIZE);
221         auto batch = std::vector<std::string>(keys.begin() + i, keys.begin() + end);
222         std::vector<DistributedKv::Key> delKeys;
223         for (auto item : batch) {
224             DistributedKv::Key key(item);
225             delKeys.emplace_back(key);
226         }
227         delKeyBatches.emplace_back(delKeys);
228     }
229 
230     {
231         std::lock_guard<std::mutex> lock(kvAdapterMutex_);
232         if (kvStorePtr_ == nullptr) {
233             LOGE("kvStorePtr is nullptr!");
234             return ERR_DM_POINT_NULL;
235         }
236         for (auto delKeys : delKeyBatches) {
237             DistributedKv::Status status = kvStorePtr_->DeleteBatch(delKeys);
238             if (status != DistributedKv::Status::SUCCESS) {
239                 LOGE("DeleteBatch failed!");
240                 return ERR_DM_FAILED;
241             }
242         }
243     }
244     return DM_OK;
245 }
246 } // namespace DistributedHardware
247 } // namespace OHOS
248