1 /*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "mission/distributed_data_storage.h"
17
18 #include <thread>
19 #include <unistd.h>
20
21 #include "datetime_ex.h"
22 #include "ipc_object_proxy.h"
23 #include "ipc_skeleton.h"
24 #include "iservice_registry.h"
25 #include "system_ability_definition.h"
26
27 #include "distributed_sched_utils.h"
28 #include "dtbschedmgr_device_info_storage.h"
29 #include "dtbschedmgr_log.h"
30 #include "mission/distributed_sched_mission_manager.h"
31
32 using namespace std;
33 using namespace OHOS::DistributedKv;
34
35 namespace OHOS {
36 namespace DistributedSchedule {
37 namespace {
38 const string TAG = "DistributedDataStorage";
39 const string APP_ID = "DistributedSchedule";
40 const string STORE_ID = "SnapshotInfoDataStorage";
41 const string KVDB_PATH = "/data/service/el1/public/database/DistributedSchedule";
42 constexpr int32_t RETRY_TIMES_WAIT_KV_DATA = 30;
43 constexpr int32_t RETRY_TIMES_GET_KVSTORE = 5;
44 }
45
DistributedDataStorage()46 DistributedDataStorage::DistributedDataStorage()
47 {
48 appId_.appId = APP_ID;
49 storeId_.storeId = STORE_ID;
50 }
51
Init()52 bool DistributedDataStorage::Init()
53 {
54 HILOGD("begin.");
55 if (kvStoreDeathRecipient_ == nullptr) {
56 kvStoreDeathRecipient_ = sptr<IRemoteObject::DeathRecipient>(new KvStoreDeathRecipient());
57 }
58 if (dmsDataStorageHandler_ == nullptr) {
59 shared_ptr<AppExecFwk::EventRunner> runner = AppExecFwk::EventRunner::Create("dmsDataStorageHandler");
60 dmsDataStorageHandler_ = make_shared<AppExecFwk::EventHandler>(runner);
61 }
62 int32_t ret = InitKvDataService();
63 if (!ret) {
64 HILOGE("InitKvDataService failed!");
65 return false;
66 }
67 return true;
68 }
69
InitKvDataService()70 bool DistributedDataStorage::InitKvDataService()
71 {
72 auto waitTask = [this]() {
73 if (!WaitKvDataService()) {
74 HILOGE("get kvDataService failed!");
75 return;
76 }
77 InitDistributedDataStorage();
78 distributedDataChangeListener_ = make_unique<DistributedDataChangeListener>();
79 SubscribeDistributedDataStorage();
80 };
81 if (!dmsDataStorageHandler_->PostTask(waitTask)) {
82 HILOGE("post task failed!");
83 return false;
84 }
85 return true;
86 }
87
WaitKvDataService()88 bool DistributedDataStorage::WaitKvDataService()
89 {
90 auto samgrProxy = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
91 if (samgrProxy == nullptr) {
92 HILOGE("get samgrProxy failed!");
93 return false;
94 }
95 int32_t retryTimes = RETRY_TIMES_WAIT_KV_DATA;
96 do {
97 auto kvDataSvr = samgrProxy->CheckSystemAbility(DISTRIBUTED_KV_DATA_SERVICE_ABILITY_ID);
98 if (kvDataSvr != nullptr) {
99 IPCObjectProxy* proxy = reinterpret_cast<IPCObjectProxy*>(kvDataSvr.GetRefPtr());
100 if (proxy != nullptr && !proxy->IsObjectDead()) {
101 HILOGI("get service success!");
102 proxy->AddDeathRecipient(kvStoreDeathRecipient_);
103 return true;
104 }
105 }
106 HILOGD("waiting for service...");
107 this_thread::sleep_for(1s);
108 if (--retryTimes <= 0) {
109 HILOGE("waiting service timeout(30)s.");
110 return false;
111 }
112 } while (true);
113 return false;
114 }
115
InitDistributedDataStorage()116 void DistributedDataStorage::InitDistributedDataStorage()
117 {
118 int64_t begin = GetTickCount();
119 unique_lock<shared_mutex> writeLock(initLock_);
120 bool result = TryGetKvStore();
121 int64_t end = GetTickCount();
122 HILOGI("TryGetKvStore %{public}s, spend %{public}" PRId64 " ms", result ? "success" : "failed", end - begin);
123 }
124
TryGetKvStore()125 bool DistributedDataStorage::TryGetKvStore()
126 {
127 int32_t retryTimes = 0;
128 while (retryTimes < RETRY_TIMES_GET_KVSTORE) {
129 if (GetKvStore() == Status::SUCCESS && kvStorePtr_ != nullptr) {
130 return true;
131 }
132 HILOGD("retry get kvstore...");
133 this_thread::sleep_for(500ms);
134 retryTimes++;
135 }
136 if (kvStorePtr_ == nullptr) {
137 return false;
138 }
139 return true;
140 }
141
GetKvStore()142 Status DistributedDataStorage::GetKvStore()
143 {
144 Options options = {
145 .createIfMissing = true,
146 .encrypt = false,
147 .autoSync = false,
148 .securityLevel = DistributedKv::SecurityLevel::S2,
149 .area = 1,
150 .kvStoreType = KvStoreType::SINGLE_VERSION,
151 .baseDir = KVDB_PATH
152 };
153 Status status = dataManager_.GetSingleKvStore(options, appId_, storeId_, kvStorePtr_);
154 if (status != Status::SUCCESS) {
155 HILOGE("GetSingleKvStore failed, status = %{public}d.", status);
156 }
157 HILOGI("GetSingleKvStore success!");
158 return status;
159 }
160
SubscribeDistributedDataStorage()161 void DistributedDataStorage::SubscribeDistributedDataStorage()
162 {
163 int64_t begin = GetTickCount();
164 shared_lock<shared_mutex> readLock(initLock_);
165 if (kvStorePtr_ == nullptr) {
166 HILOGW("kvStorePtr is null!");
167 return;
168 }
169 SubscribeType subscribeType = SubscribeType::SUBSCRIBE_TYPE_REMOTE;
170 if (distributedDataChangeListener_ != nullptr) {
171 HILOGD("SubscribeKvStore start.");
172 Status status = kvStorePtr_->SubscribeKvStore(subscribeType, move(distributedDataChangeListener_));
173 HILOGD("[PerformanceTest] SubscribeKvStore spend %{public}" PRId64 " ms", GetTickCount() - begin);
174 if (status != Status::SUCCESS) {
175 HILOGE("SubscribeKvStore failed! status = %{public}d.", status);
176 return;
177 }
178 }
179 }
180
NotifyRemoteDied(const wptr<IRemoteObject> & remote)181 void DistributedDataStorage::NotifyRemoteDied(const wptr<IRemoteObject>& remote)
182 {
183 HILOGD("begin.");
184 if (kvStoreDeathRecipient_ != nullptr) {
185 remote->RemoveDeathRecipient(kvStoreDeathRecipient_);
186 }
187 }
188
Stop()189 bool DistributedDataStorage::Stop()
190 {
191 HILOGD("begin.");
192 dmsDataStorageHandler_ = nullptr;
193 bool ret = UninitDistributedDataStorage();
194 if (!ret) {
195 HILOGE("UninitDistributedDataStorage failed!");
196 return false;
197 }
198 HILOGD("Stop success!");
199 return true;
200 }
201
UninitDistributedDataStorage()202 bool DistributedDataStorage::UninitDistributedDataStorage()
203 {
204 int64_t begin = GetTickCount();
205 Status status;
206 if (distributedDataChangeListener_ != nullptr && kvStorePtr_ != nullptr) {
207 SubscribeType subscribeType = SubscribeType::SUBSCRIBE_TYPE_REMOTE;
208 status = kvStorePtr_->UnSubscribeKvStore(subscribeType, move(distributedDataChangeListener_));
209 HILOGI("[PerformanceTest] UnSubscribeKvStore spend %{public}" PRId64 " ms", GetTickCount() - begin);
210 if (status != Status::SUCCESS) {
211 HILOGE("UnSubscribeKvStore failed! status = %{public}d.", status);
212 return false;
213 }
214 distributedDataChangeListener_ = nullptr;
215 }
216 if (kvStorePtr_ != nullptr) {
217 status = dataManager_.CloseKvStore(appId_, storeId_);
218 if (status != Status::SUCCESS) {
219 HILOGE("CloseKvStore failed! status = %{public}d.", status);
220 return false;
221 }
222 kvStorePtr_ = nullptr;
223 }
224 status = dataManager_.DeleteKvStore(appId_, storeId_, KVDB_PATH);
225 if (status != Status::SUCCESS) {
226 HILOGE("DeleteKvStore failed! status = %{public}d.", status);
227 return false;
228 }
229 return true;
230 }
231
Insert(const string & networkId,int32_t missionId,const uint8_t * byteStream,size_t len)232 bool DistributedDataStorage::Insert(const string& networkId, int32_t missionId,
233 const uint8_t* byteStream, size_t len)
234 {
235 if (networkId.empty()) {
236 HILOGW("networkId is empty!");
237 return false;
238 }
239 if (missionId < 0) {
240 HILOGW("missionId is invalid!");
241 return false;
242 }
243 string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
244 if (uuid.empty()) {
245 HILOGW("uuid is empty!");
246 return false;
247 }
248 {
249 unique_lock<shared_mutex> writeLock(initLock_);
250 bool ret = InsertInnerLocked(uuid, missionId, byteStream, len);
251 if (!ret) {
252 HILOGE("Insert fail, uuid: %{public}s, missionId: %{public}d.", GetAnonymStr(uuid).c_str(), missionId);
253 return false;
254 }
255 }
256 HILOGI("Insert success, uuid: %{public}s, missionId: %{public}d.", GetAnonymStr(uuid).c_str(), missionId);
257 return true;
258 }
259
InsertInnerLocked(const string & uuid,int32_t missionId,const uint8_t * byteStream,size_t len)260 bool DistributedDataStorage::InsertInnerLocked(const string& uuid, int32_t missionId,
261 const uint8_t* byteStream, size_t len)
262 {
263 HILOGD("called.");
264 int64_t begin = GetTickCount();
265 if (kvStorePtr_ == nullptr) {
266 HILOGW("kvStorePtr is null!");
267 return false;
268 }
269 Key key;
270 Value value;
271 GenerateKey(uuid, missionId, key);
272 GenerateValue(byteStream, len, value);
273 auto status = kvStorePtr_->Put(key, value);
274 HILOGI("[PerformanceTest] Put Snapshot spend %{public}" PRId64 " ms", GetTickCount() - begin);
275 if (status != Status::SUCCESS) {
276 HILOGE("kvStorePtr Put failed! status = %{public}d.", status);
277 return false;
278 }
279 return true;
280 }
281
Delete(const string & networkId,int32_t missionId)282 bool DistributedDataStorage::Delete(const string& networkId, int32_t missionId)
283 {
284 if (networkId.empty()) {
285 HILOGW("networkId is empty!");
286 return false;
287 }
288 if (missionId < 0) {
289 HILOGW("missionId is invalid!");
290 return false;
291 }
292 string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
293 if (uuid.empty()) {
294 HILOGW("uuid is empty!");
295 return false;
296 }
297 {
298 unique_lock<shared_mutex> writeLock(initLock_);
299 bool ret = DeleteInnerLocked(uuid, missionId);
300 if (!ret) {
301 HILOGE("Delete fail, uuid: %{public}s, missionId: %{public}d.", GetAnonymStr(uuid).c_str(), missionId);
302 return false;
303 }
304 }
305 HILOGI("Delete success, uuid: %{public}s, missionId: %{public}d.", GetAnonymStr(uuid).c_str(), missionId);
306 return true;
307 }
308
DeleteInnerLocked(const string & uuid,int32_t missionId)309 bool DistributedDataStorage::DeleteInnerLocked(const string& uuid, int32_t missionId)
310 {
311 HILOGD("called.");
312 int64_t begin = GetTickCount();
313 if (kvStorePtr_ == nullptr) {
314 HILOGW("kvStorePtr is null!");
315 return false;
316 }
317 Key key;
318 GenerateKey(uuid, missionId, key);
319 auto status = kvStorePtr_->Delete(key);
320 HILOGI("[PerformanceTest] Delete Snapshot spend %{public}" PRId64 " ms", GetTickCount() - begin);
321 if (status != Status::SUCCESS) {
322 HILOGE("kvStorePtr Delete failed! status = %{public}d.", status);
323 return false;
324 }
325 return true;
326 }
327
FuzzyDelete(const string & networkId)328 bool DistributedDataStorage::FuzzyDelete(const string& networkId)
329 {
330 if (networkId.empty()) {
331 HILOGW("networkId is empty!");
332 return false;
333 }
334 {
335 unique_lock<shared_mutex> writeLock(initLock_);
336 bool ret = FuzzyDeleteInnerLocked(networkId);
337 if (!ret) {
338 HILOGW("FuzzyDelete networkId: %{public}s fail.", GetAnonymStr(networkId).c_str());
339 return false;
340 }
341 }
342 HILOGI("FuzzyDelete networkId: %{public}s success.", GetAnonymStr(networkId).c_str());
343 return true;
344 }
345
FuzzyDeleteInnerLocked(const string & networkId)346 bool DistributedDataStorage::FuzzyDeleteInnerLocked(const string& networkId)
347 {
348 HILOGD("called.");
349 int64_t begin = GetTickCount();
350 if (kvStorePtr_ == nullptr) {
351 HILOGW("kvStorePtr is null!");
352 return false;
353 }
354 auto status = kvStorePtr_->RemoveDeviceData(networkId);
355 HILOGI("[PerformanceTest] RemoveDeviceData Snapshot spend %{public}" PRId64 " ms", GetTickCount() - begin);
356 if (status != Status::SUCCESS) {
357 HILOGE("kvStorePtr RemoveDeviceData failed! status = %{public}d.", status);
358 return false;
359 }
360 return true;
361 }
362
Query(const string & networkId,int32_t missionId,Value & value) const363 bool DistributedDataStorage::Query(const string& networkId, int32_t missionId, Value& value) const
364 {
365 if (networkId.empty()) {
366 HILOGW("networkId is empty!");
367 return false;
368 }
369 if (missionId < 0) {
370 HILOGW("missionId is invalid!");
371 return false;
372 }
373 {
374 shared_lock<shared_mutex> readLock(initLock_);
375 bool ret = QueryInnerLocked(networkId, missionId, value);
376 if (!ret) {
377 HILOGE("Query networkId: %{public}s, missionId: %{public}d fail.",
378 GetAnonymStr(networkId).c_str(), missionId);
379 return false;
380 }
381 }
382 HILOGI("Query networkId: %{public}s, missionId: %{public}d success.", GetAnonymStr(networkId).c_str(), missionId);
383 return true;
384 }
385
QueryInnerLocked(const string & networkId,int32_t missionId,Value & value) const386 bool DistributedDataStorage::QueryInnerLocked(const string& networkId, int32_t missionId, Value& value) const
387 {
388 HILOGD("called.");
389 int64_t begin = GetTickCount();
390 if (kvStorePtr_ == nullptr) {
391 HILOGW("kvStorePtr is null!");
392 return false;
393 }
394 string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
395 if (uuid.empty()) {
396 HILOGW("uuid is empty!");
397 return false;
398 }
399 Key key;
400 GenerateKey(uuid, missionId, key);
401 std::promise<OHOS::DistributedKv::Status> resultStatusSignal;
402 kvStorePtr_->Get(key, networkId,
403 [&value, &resultStatusSignal](Status innerStatus, Value innerValue) {
404 HILOGI("The get, result = %{public}d", innerStatus);
405 if (innerStatus == Status::SUCCESS) {
406 value = innerValue;
407 }
408 resultStatusSignal.set_value(innerStatus);
409 });
410 Status status = GetResultSatus(resultStatusSignal);
411 HILOGI("[PerformanceTest] Get Snapshot spend %{public}" PRId64 " ms", GetTickCount() - begin);
412 if (status != Status::SUCCESS) {
413 HILOGE("kvStorePtr Get failed! status = %{public}d.", status);
414 return false;
415 }
416 return true;
417 }
418
GetResultSatus(std::promise<OHOS::DistributedKv::Status> & resultStatusSignal) const419 Status DistributedDataStorage::GetResultSatus(std::promise<OHOS::DistributedKv::Status> &resultStatusSignal) const
420 {
421 auto future = resultStatusSignal.get_future();
422 if (future.wait_for(std::chrono::seconds(waittingTime_)) == std::future_status::ready) {
423 Status status = future.get();
424 return status;
425 }
426 return Status::ERROR;
427 }
428
GenerateKey(const string & uuid,int32_t missionId,Key & key)429 void DistributedDataStorage::GenerateKey(const string& uuid, int32_t missionId, Key& key)
430 {
431 string keyString;
432 keyString.append(uuid).append("_").append(to_string(missionId));
433 key = keyString;
434 }
435
GenerateValue(const uint8_t * byteStream,size_t len,Value & value)436 void DistributedDataStorage::GenerateValue(const uint8_t* byteStream, size_t len, Value& value)
437 {
438 Value valueString((char *)byteStream, len);
439 value = valueString;
440 }
441 } // DistributedSchedule
442 } // namespace OHOS