1 /*
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "mission/distributed_data_storage.h"
17 
18 #include <thread>
19 #include <unistd.h>
20 
21 #include "datetime_ex.h"
22 #include "ipc_object_proxy.h"
23 #include "ipc_skeleton.h"
24 #include "iservice_registry.h"
25 #include "system_ability_definition.h"
26 
27 #include "distributed_sched_utils.h"
28 #include "dtbschedmgr_device_info_storage.h"
29 #include "dtbschedmgr_log.h"
30 #include "mission/distributed_sched_mission_manager.h"
31 
32 using namespace std;
33 using namespace OHOS::DistributedKv;
34 
35 namespace OHOS {
36 namespace DistributedSchedule {
37 namespace {
38 const string TAG = "DistributedDataStorage";
39 const string APP_ID = "DistributedSchedule";
40 const string STORE_ID = "SnapshotInfoDataStorage";
41 const string KVDB_PATH = "/data/service/el1/public/database/DistributedSchedule";
42 constexpr int32_t RETRY_TIMES_WAIT_KV_DATA = 30;
43 constexpr int32_t RETRY_TIMES_GET_KVSTORE = 5;
44 }
45 
DistributedDataStorage()46 DistributedDataStorage::DistributedDataStorage()
47 {
48     appId_.appId = APP_ID;
49     storeId_.storeId = STORE_ID;
50 }
51 
Init()52 bool DistributedDataStorage::Init()
53 {
54     HILOGD("begin.");
55     if (kvStoreDeathRecipient_ == nullptr) {
56         kvStoreDeathRecipient_ = sptr<IRemoteObject::DeathRecipient>(new KvStoreDeathRecipient());
57     }
58     if (dmsDataStorageHandler_ == nullptr) {
59         shared_ptr<AppExecFwk::EventRunner> runner = AppExecFwk::EventRunner::Create("dmsDataStorageHandler");
60         dmsDataStorageHandler_ = make_shared<AppExecFwk::EventHandler>(runner);
61     }
62     int32_t ret = InitKvDataService();
63     if (!ret) {
64         HILOGE("InitKvDataService failed!");
65         return false;
66     }
67     return true;
68 }
69 
InitKvDataService()70 bool DistributedDataStorage::InitKvDataService()
71 {
72     auto waitTask = [this]() {
73         if (!WaitKvDataService()) {
74             HILOGE("get kvDataService failed!");
75             return;
76         }
77         InitDistributedDataStorage();
78         distributedDataChangeListener_ = make_unique<DistributedDataChangeListener>();
79         SubscribeDistributedDataStorage();
80     };
81     if (!dmsDataStorageHandler_->PostTask(waitTask)) {
82         HILOGE("post task failed!");
83         return false;
84     }
85     return true;
86 }
87 
WaitKvDataService()88 bool DistributedDataStorage::WaitKvDataService()
89 {
90     auto samgrProxy = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
91     if (samgrProxy == nullptr) {
92         HILOGE("get samgrProxy failed!");
93         return false;
94     }
95     int32_t retryTimes = RETRY_TIMES_WAIT_KV_DATA;
96     do {
97         auto kvDataSvr = samgrProxy->CheckSystemAbility(DISTRIBUTED_KV_DATA_SERVICE_ABILITY_ID);
98         if (kvDataSvr != nullptr) {
99             IPCObjectProxy* proxy = reinterpret_cast<IPCObjectProxy*>(kvDataSvr.GetRefPtr());
100             if (proxy != nullptr && !proxy->IsObjectDead()) {
101                 HILOGI("get service success!");
102                 proxy->AddDeathRecipient(kvStoreDeathRecipient_);
103                 return true;
104             }
105         }
106         HILOGD("waiting for service...");
107         this_thread::sleep_for(1s);
108         if (--retryTimes <= 0) {
109             HILOGE("waiting service timeout(30)s.");
110             return false;
111         }
112     } while (true);
113     return false;
114 }
115 
InitDistributedDataStorage()116 void DistributedDataStorage::InitDistributedDataStorage()
117 {
118     int64_t begin = GetTickCount();
119     unique_lock<shared_mutex> writeLock(initLock_);
120     bool result = TryGetKvStore();
121     int64_t end = GetTickCount();
122     HILOGI("TryGetKvStore %{public}s, spend %{public}" PRId64 " ms", result ? "success" : "failed", end - begin);
123 }
124 
TryGetKvStore()125 bool DistributedDataStorage::TryGetKvStore()
126 {
127     int32_t retryTimes = 0;
128     while (retryTimes < RETRY_TIMES_GET_KVSTORE) {
129         if (GetKvStore() == Status::SUCCESS && kvStorePtr_ != nullptr) {
130             return true;
131         }
132         HILOGD("retry get kvstore...");
133         this_thread::sleep_for(500ms);
134         retryTimes++;
135     }
136     if (kvStorePtr_ == nullptr) {
137         return false;
138     }
139     return true;
140 }
141 
GetKvStore()142 Status DistributedDataStorage::GetKvStore()
143 {
144     Options options = {
145         .createIfMissing = true,
146         .encrypt = false,
147         .autoSync = false,
148         .securityLevel = DistributedKv::SecurityLevel::S2,
149         .area = 1,
150         .kvStoreType = KvStoreType::SINGLE_VERSION,
151         .baseDir = KVDB_PATH
152     };
153     Status status = dataManager_.GetSingleKvStore(options, appId_, storeId_, kvStorePtr_);
154     if (status != Status::SUCCESS) {
155         HILOGE("GetSingleKvStore failed, status = %{public}d.", status);
156     }
157     HILOGI("GetSingleKvStore success!");
158     return status;
159 }
160 
SubscribeDistributedDataStorage()161 void DistributedDataStorage::SubscribeDistributedDataStorage()
162 {
163     int64_t begin = GetTickCount();
164     shared_lock<shared_mutex> readLock(initLock_);
165     if (kvStorePtr_ == nullptr) {
166         HILOGW("kvStorePtr is null!");
167         return;
168     }
169     SubscribeType subscribeType = SubscribeType::SUBSCRIBE_TYPE_REMOTE;
170     if (distributedDataChangeListener_ != nullptr) {
171         HILOGD("SubscribeKvStore start.");
172         Status status = kvStorePtr_->SubscribeKvStore(subscribeType, move(distributedDataChangeListener_));
173         HILOGD("[PerformanceTest] SubscribeKvStore spend %{public}" PRId64 " ms", GetTickCount() - begin);
174         if (status != Status::SUCCESS) {
175             HILOGE("SubscribeKvStore failed! status = %{public}d.", status);
176             return;
177         }
178     }
179 }
180 
NotifyRemoteDied(const wptr<IRemoteObject> & remote)181 void DistributedDataStorage::NotifyRemoteDied(const wptr<IRemoteObject>& remote)
182 {
183     HILOGD("begin.");
184     if (kvStoreDeathRecipient_ != nullptr) {
185         remote->RemoveDeathRecipient(kvStoreDeathRecipient_);
186     }
187 }
188 
Stop()189 bool DistributedDataStorage::Stop()
190 {
191     HILOGD("begin.");
192     dmsDataStorageHandler_ = nullptr;
193     bool ret = UninitDistributedDataStorage();
194     if (!ret) {
195         HILOGE("UninitDistributedDataStorage failed!");
196         return false;
197     }
198     HILOGD("Stop success!");
199     return true;
200 }
201 
UninitDistributedDataStorage()202 bool DistributedDataStorage::UninitDistributedDataStorage()
203 {
204     int64_t begin = GetTickCount();
205     Status status;
206     if (distributedDataChangeListener_ != nullptr && kvStorePtr_ != nullptr) {
207         SubscribeType subscribeType = SubscribeType::SUBSCRIBE_TYPE_REMOTE;
208         status = kvStorePtr_->UnSubscribeKvStore(subscribeType, move(distributedDataChangeListener_));
209         HILOGI("[PerformanceTest] UnSubscribeKvStore spend %{public}" PRId64 " ms", GetTickCount() - begin);
210         if (status != Status::SUCCESS) {
211             HILOGE("UnSubscribeKvStore failed! status = %{public}d.", status);
212             return false;
213         }
214         distributedDataChangeListener_ = nullptr;
215     }
216     if (kvStorePtr_ != nullptr) {
217         status = dataManager_.CloseKvStore(appId_, storeId_);
218         if (status != Status::SUCCESS) {
219             HILOGE("CloseKvStore failed! status = %{public}d.", status);
220             return false;
221         }
222         kvStorePtr_ = nullptr;
223     }
224     status = dataManager_.DeleteKvStore(appId_, storeId_, KVDB_PATH);
225     if (status != Status::SUCCESS) {
226         HILOGE("DeleteKvStore failed! status = %{public}d.", status);
227         return false;
228     }
229     return true;
230 }
231 
Insert(const string & networkId,int32_t missionId,const uint8_t * byteStream,size_t len)232 bool DistributedDataStorage::Insert(const string& networkId, int32_t missionId,
233     const uint8_t* byteStream, size_t len)
234 {
235     if (networkId.empty()) {
236         HILOGW("networkId is empty!");
237         return false;
238     }
239     if (missionId < 0) {
240         HILOGW("missionId is invalid!");
241         return false;
242     }
243     string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
244     if (uuid.empty()) {
245         HILOGW("uuid is empty!");
246         return false;
247     }
248     {
249         unique_lock<shared_mutex> writeLock(initLock_);
250         bool ret = InsertInnerLocked(uuid, missionId, byteStream, len);
251         if (!ret) {
252             HILOGE("Insert fail, uuid: %{public}s, missionId: %{public}d.", GetAnonymStr(uuid).c_str(), missionId);
253             return false;
254         }
255     }
256     HILOGI("Insert success, uuid: %{public}s, missionId: %{public}d.", GetAnonymStr(uuid).c_str(), missionId);
257     return true;
258 }
259 
InsertInnerLocked(const string & uuid,int32_t missionId,const uint8_t * byteStream,size_t len)260 bool DistributedDataStorage::InsertInnerLocked(const string& uuid, int32_t missionId,
261     const uint8_t* byteStream, size_t len)
262 {
263     HILOGD("called.");
264     int64_t begin = GetTickCount();
265     if (kvStorePtr_ == nullptr) {
266         HILOGW("kvStorePtr is null!");
267         return false;
268     }
269     Key key;
270     Value value;
271     GenerateKey(uuid, missionId, key);
272     GenerateValue(byteStream, len, value);
273     auto status = kvStorePtr_->Put(key, value);
274     HILOGI("[PerformanceTest] Put Snapshot spend %{public}" PRId64 " ms", GetTickCount() - begin);
275     if (status != Status::SUCCESS) {
276         HILOGE("kvStorePtr Put failed! status = %{public}d.", status);
277         return false;
278     }
279     return true;
280 }
281 
Delete(const string & networkId,int32_t missionId)282 bool DistributedDataStorage::Delete(const string& networkId, int32_t missionId)
283 {
284     if (networkId.empty()) {
285         HILOGW("networkId is empty!");
286         return false;
287     }
288     if (missionId < 0) {
289         HILOGW("missionId is invalid!");
290         return false;
291     }
292     string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
293     if (uuid.empty()) {
294         HILOGW("uuid is empty!");
295         return false;
296     }
297     {
298         unique_lock<shared_mutex> writeLock(initLock_);
299         bool ret = DeleteInnerLocked(uuid, missionId);
300         if (!ret) {
301             HILOGE("Delete fail, uuid: %{public}s, missionId: %{public}d.", GetAnonymStr(uuid).c_str(), missionId);
302             return false;
303         }
304     }
305     HILOGI("Delete success, uuid: %{public}s, missionId: %{public}d.", GetAnonymStr(uuid).c_str(), missionId);
306     return true;
307 }
308 
DeleteInnerLocked(const string & uuid,int32_t missionId)309 bool DistributedDataStorage::DeleteInnerLocked(const string& uuid, int32_t missionId)
310 {
311     HILOGD("called.");
312     int64_t begin = GetTickCount();
313     if (kvStorePtr_ == nullptr) {
314         HILOGW("kvStorePtr is null!");
315         return false;
316     }
317     Key key;
318     GenerateKey(uuid, missionId, key);
319     auto status = kvStorePtr_->Delete(key);
320     HILOGI("[PerformanceTest] Delete Snapshot spend %{public}" PRId64 " ms", GetTickCount() - begin);
321     if (status != Status::SUCCESS) {
322         HILOGE("kvStorePtr Delete failed! status = %{public}d.", status);
323         return false;
324     }
325     return true;
326 }
327 
FuzzyDelete(const string & networkId)328 bool DistributedDataStorage::FuzzyDelete(const string& networkId)
329 {
330     if (networkId.empty()) {
331         HILOGW("networkId is empty!");
332         return false;
333     }
334     {
335         unique_lock<shared_mutex> writeLock(initLock_);
336         bool ret = FuzzyDeleteInnerLocked(networkId);
337         if (!ret) {
338             HILOGW("FuzzyDelete networkId: %{public}s fail.", GetAnonymStr(networkId).c_str());
339             return false;
340         }
341     }
342     HILOGI("FuzzyDelete networkId: %{public}s success.", GetAnonymStr(networkId).c_str());
343     return true;
344 }
345 
FuzzyDeleteInnerLocked(const string & networkId)346 bool DistributedDataStorage::FuzzyDeleteInnerLocked(const string& networkId)
347 {
348     HILOGD("called.");
349     int64_t begin = GetTickCount();
350     if (kvStorePtr_ == nullptr) {
351         HILOGW("kvStorePtr is null!");
352         return false;
353     }
354     auto status = kvStorePtr_->RemoveDeviceData(networkId);
355     HILOGI("[PerformanceTest] RemoveDeviceData Snapshot spend %{public}" PRId64 " ms", GetTickCount() - begin);
356     if (status != Status::SUCCESS) {
357         HILOGE("kvStorePtr RemoveDeviceData failed! status = %{public}d.", status);
358         return false;
359     }
360     return true;
361 }
362 
Query(const string & networkId,int32_t missionId,Value & value) const363 bool DistributedDataStorage::Query(const string& networkId, int32_t missionId, Value& value) const
364 {
365     if (networkId.empty()) {
366         HILOGW("networkId is empty!");
367         return false;
368     }
369     if (missionId < 0) {
370         HILOGW("missionId is invalid!");
371         return false;
372     }
373     {
374         shared_lock<shared_mutex> readLock(initLock_);
375         bool ret = QueryInnerLocked(networkId, missionId, value);
376         if (!ret) {
377             HILOGE("Query networkId: %{public}s, missionId: %{public}d fail.",
378                 GetAnonymStr(networkId).c_str(), missionId);
379             return false;
380         }
381     }
382     HILOGI("Query networkId: %{public}s, missionId: %{public}d success.", GetAnonymStr(networkId).c_str(), missionId);
383     return true;
384 }
385 
QueryInnerLocked(const string & networkId,int32_t missionId,Value & value) const386 bool DistributedDataStorage::QueryInnerLocked(const string& networkId, int32_t missionId, Value& value) const
387 {
388     HILOGD("called.");
389     int64_t begin = GetTickCount();
390     if (kvStorePtr_ == nullptr) {
391         HILOGW("kvStorePtr is null!");
392         return false;
393     }
394     string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
395     if (uuid.empty()) {
396         HILOGW("uuid is empty!");
397         return false;
398     }
399     Key key;
400     GenerateKey(uuid, missionId, key);
401     std::promise<OHOS::DistributedKv::Status> resultStatusSignal;
402     kvStorePtr_->Get(key, networkId,
403         [&value, &resultStatusSignal](Status innerStatus, Value innerValue) {
404             HILOGI("The get, result = %{public}d", innerStatus);
405             if (innerStatus == Status::SUCCESS) {
406                 value = innerValue;
407             }
408             resultStatusSignal.set_value(innerStatus);
409         });
410     Status status = GetResultSatus(resultStatusSignal);
411     HILOGI("[PerformanceTest] Get Snapshot spend %{public}" PRId64 " ms", GetTickCount() - begin);
412     if (status != Status::SUCCESS) {
413         HILOGE("kvStorePtr Get failed! status = %{public}d.", status);
414         return false;
415     }
416     return true;
417 }
418 
GetResultSatus(std::promise<OHOS::DistributedKv::Status> & resultStatusSignal) const419 Status DistributedDataStorage::GetResultSatus(std::promise<OHOS::DistributedKv::Status> &resultStatusSignal) const
420 {
421     auto future = resultStatusSignal.get_future();
422     if (future.wait_for(std::chrono::seconds(waittingTime_)) == std::future_status::ready) {
423         Status status = future.get();
424         return status;
425     }
426     return Status::ERROR;
427 }
428 
GenerateKey(const string & uuid,int32_t missionId,Key & key)429 void DistributedDataStorage::GenerateKey(const string& uuid, int32_t missionId, Key& key)
430 {
431     string keyString;
432     keyString.append(uuid).append("_").append(to_string(missionId));
433     key = keyString;
434 }
435 
GenerateValue(const uint8_t * byteStream,size_t len,Value & value)436 void DistributedDataStorage::GenerateValue(const uint8_t* byteStream, size_t len, Value& value)
437 {
438     Value valueString((char *)byteStream, len);
439     value = valueString;
440 }
441 } // DistributedSchedule
442 } // namespace OHOS