1 /*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "mission/distributed_sched_mission_manager.h"
17
18 #include <chrono>
19 #include <sys/time.h>
20 #include <unistd.h>
21
22 #include "datetime_ex.h"
23 #include "ipc_skeleton.h"
24 #include "iservice_registry.h"
25 #include "nlohmann/json.hpp"
26 #include "string_ex.h"
27 #include "system_ability_definition.h"
28
29 #include "distributed_sched_adapter.h"
30 #include "distributed_sched_utils.h"
31 #include "dtbschedmgr_device_info_storage.h"
32 #include "dtbschedmgr_log.h"
33 #include "mission/mission_changed_notify.h"
34 #include "mission/mission_constant.h"
35 #include "mission/mission_info_converter.h"
36 #include "mission/snapshot_converter.h"
37
38 namespace OHOS {
39 namespace DistributedSchedule {
40 namespace {
41 const std::string TAG = "DistributedSchedMissionManager";
42 constexpr size_t MAX_CACHED_ITEM = 10;
43 constexpr int32_t MAX_RETRY_TIMES = 15;
44 constexpr int32_t RETRY_DELAYED = 2000;
45 constexpr int32_t GET_FOREGROUND_SNAPSHOT_DELAY_TIME = 800; // ms
46 const std::string DELETE_DATA_STORAGE = "DeleteDataStorage";
47 constexpr int32_t DELETE_DATA_STORAGE_DELAYED = 60000; // ms
48 const std::string INVAILD_LOCAL_DEVICE_ID = "-1";
49 }
50 namespace Mission {
51 constexpr int32_t GET_MAX_MISSIONS = 20;
52 } // Mission
53 using namespace std::chrono;
54 using namespace Constants::Mission;
55 using namespace OHOS::DistributedKv;
56
57 IMPLEMENT_SINGLE_INSTANCE(DistributedSchedMissionManager);
58
Init()59 void DistributedSchedMissionManager::Init()
60 {
61 listenerDeath_ = new ListenerDeathRecipient();
62 remoteDmsRecipient_ = new RemoteDmsDeathRecipient();
63 auto runner = AppExecFwk::EventRunner::Create("MissionManagerHandler");
64 missionHandler_ = std::make_shared<AppExecFwk::EventHandler>(runner);
65 auto updateRunner = AppExecFwk::EventRunner::Create("UpdateHandler");
66 updateHandler_ = std::make_shared<AppExecFwk::EventHandler>(updateRunner);
67 missonChangeListener_ = new DistributedMissionChangeListener();
68 auto missionChangeRunner = AppExecFwk::EventRunner::Create("DistributedMissionChange");
69 missionChangeHandler_ = std::make_shared<AppExecFwk::EventHandler>(missionChangeRunner);
70 }
71
GetMissionInfos(const std::string & deviceId,int32_t numMissions,std::vector<AAFwk::MissionInfo> & missionInfoSet)72 int32_t DistributedSchedMissionManager::GetMissionInfos(const std::string& deviceId,
73 int32_t numMissions, std::vector<AAFwk::MissionInfo>& missionInfoSet)
74 {
75 HILOGI("start!");
76 if (!IsDeviceIdValidated(deviceId)) {
77 return INVALID_PARAMETERS_ERR;
78 }
79 if (numMissions <= 0) {
80 HILOGE("numMissions is illegal! numMissions:%{public}d", numMissions);
81 return INVALID_PARAMETERS_ERR;
82 }
83 std::vector<DstbMissionInfo> dstbMissionInfoSet;
84 int32_t ret = FetchCachedRemoteMissions(deviceId, numMissions, dstbMissionInfoSet);
85 if (ret != ERR_OK) {
86 HILOGE("FetchCachedRemoteMissions failed, ret = %{public}d", ret);
87 return ret;
88 }
89 return MissionInfoConverter::ConvertToMissionInfos(dstbMissionInfoSet, missionInfoSet);
90 }
91
GetRemoteDms(const std::string & deviceId)92 sptr<IDistributedSched> DistributedSchedMissionManager::GetRemoteDms(const std::string& deviceId)
93 {
94 if (deviceId.empty()) {
95 HILOGE("GetRemoteDms remoteDeviceId is empty");
96 return nullptr;
97 }
98 int64_t begin = GetTickCount();
99 {
100 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
101 auto iter = remoteDmsMap_.find(deviceId);
102 if (iter != remoteDmsMap_.end()) {
103 auto object = iter->second;
104 if (object != nullptr) {
105 HILOGI("[PerformanceTest] GetRemoteDms from cache spend %{public}" PRId64 " ms",
106 GetTickCount() - begin);
107 return object;
108 }
109 }
110 }
111 HILOGD("GetRemoteDms connect deviceid is %s", GetAnonymStr(deviceId).c_str());
112 auto samgr = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
113 if (samgr == nullptr) {
114 HILOGE("GetRemoteDms failed to connect to systemAbilityMgr!");
115 return nullptr;
116 }
117 auto object = samgr->CheckSystemAbility(DISTRIBUTED_SCHED_SA_ID, deviceId);
118 if (object == nullptr) {
119 HILOGE("GetRemoteDms failed to get dms for remote device: %{public}s!", GetAnonymStr(deviceId).c_str());
120 return nullptr;
121 }
122 auto ret = object->AddDeathRecipient(remoteDmsRecipient_);
123 HILOGD("GetRemoteDms AddDeathRecipient ret : %{public}d", ret);
124 sptr<IDistributedSched> remoteDmsObj = iface_cast<IDistributedSched>(object);
125 {
126 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
127 auto iter = remoteDmsMap_.find(deviceId);
128 if (iter != remoteDmsMap_.end()) {
129 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
130 }
131 remoteDmsMap_[deviceId] = remoteDmsObj;
132 }
133 HILOGI("[PerformanceTest] GetRemoteDms spend %{public}" PRId64 " ms", GetTickCount() - begin);
134 return remoteDmsObj;
135 }
136
IsDeviceIdValidated(const std::string & deviceId)137 bool DistributedSchedMissionManager::IsDeviceIdValidated(const std::string& deviceId)
138 {
139 if (deviceId.empty()) {
140 HILOGE("IsDeviceIdValidated deviceId is empty!");
141 return false;
142 }
143 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(deviceId) == nullptr) {
144 HILOGW("IsDeviceIdValidated device offline.");
145 return false;
146 }
147
148 return true;
149 }
150
NotifyRemoteDied(const wptr<IRemoteObject> & remote)151 void DistributedSchedMissionManager::NotifyRemoteDied(const wptr<IRemoteObject>& remote)
152 {
153 if (distributedDataStorage_ == nullptr) {
154 HILOGE("DistributedDataStorage null!");
155 return;
156 }
157 distributedDataStorage_->NotifyRemoteDied(remote);
158 }
159
InitDataStorage()160 int32_t DistributedSchedMissionManager::InitDataStorage()
161 {
162 if (distributedDataStorage_ == nullptr) {
163 distributedDataStorage_ = std::make_shared<DistributedDataStorage>();
164 }
165 if (!distributedDataStorage_->Init()) {
166 HILOGE("InitDataStorage DistributedDataStorage init failed!");
167 return ERR_NULL_OBJECT;
168 }
169 return ERR_NONE;
170 }
171
StopDataStorage()172 int32_t DistributedSchedMissionManager::StopDataStorage()
173 {
174 if (distributedDataStorage_ == nullptr) {
175 HILOGE("StopDataStorage DistributedDataStorage null!");
176 return ERR_NULL_OBJECT;
177 }
178 if (!distributedDataStorage_->Stop()) {
179 HILOGE("StopDataStorage DistributedDataStorage stop failed!");
180 return ERR_NULL_OBJECT;
181 }
182 return ERR_NONE;
183 }
184
StoreSnapshotInfo(const std::string & deviceId,int32_t missionId,const uint8_t * byteStream,size_t len)185 int32_t DistributedSchedMissionManager::StoreSnapshotInfo(const std::string& deviceId, int32_t missionId,
186 const uint8_t* byteStream, size_t len)
187 {
188 if (distributedDataStorage_ == nullptr) {
189 HILOGE("StoreSnapshotInfo DistributedDataStorage null!");
190 return ERR_NULL_OBJECT;
191 }
192 if (!distributedDataStorage_->Insert(deviceId, missionId, byteStream, len)) {
193 HILOGE("StoreSnapshotInfo DistributedDataStorage insert failed!");
194 return INVALID_PARAMETERS_ERR;
195 }
196 return ERR_NONE;
197 }
198
RemoveSnapshotInfo(const std::string & deviceId,int32_t missionId)199 int32_t DistributedSchedMissionManager::RemoveSnapshotInfo(const std::string& deviceId, int32_t missionId)
200 {
201 if (distributedDataStorage_ == nullptr) {
202 HILOGE("RemoveSnapshotInfo DistributedDataStorage null!");
203 return ERR_NULL_OBJECT;
204 }
205 if (!distributedDataStorage_->Delete(deviceId, missionId)) {
206 HILOGE("RemoveSnapshotInfo DistributedDataStorage delete failed!");
207 return INVALID_PARAMETERS_ERR;
208 }
209 return ERR_NONE;
210 }
211
GetRemoteMissionSnapshotInfo(const std::string & networkId,int32_t missionId,std::unique_ptr<AAFwk::MissionSnapshot> & missionSnapshot)212 int32_t DistributedSchedMissionManager::GetRemoteMissionSnapshotInfo(const std::string& networkId, int32_t missionId,
213 std::unique_ptr<AAFwk::MissionSnapshot>& missionSnapshot)
214 {
215 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
216 if (uuid.empty()) {
217 HILOGE("uuid is empty!");
218 return INVALID_PARAMETERS_ERR;
219 }
220 std::unique_ptr<Snapshot> snapshotPtr = DequeueCachedSnapshotInfo(uuid, missionId);
221 if (snapshotPtr != nullptr) {
222 HILOGI("Get snapshot from cache success, uuid: %{public}s, missionId: %{public}d.",
223 GetAnonymStr(uuid).c_str(), missionId);
224 SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
225 return ERR_NONE;
226 }
227 if (distributedDataStorage_ == nullptr) {
228 HILOGE("DistributedDataStorage is null!");
229 return ERR_NULL_OBJECT;
230 }
231 DistributedKv::Value value;
232 bool ret = distributedDataStorage_->Query(networkId, missionId, value);
233 if (!ret) {
234 HILOGE("DistributedDataStorage query failed!");
235 return INVALID_PARAMETERS_ERR;
236 }
237 snapshotPtr = Snapshot::Create(value.Data());
238 if (snapshotPtr == nullptr) {
239 HILOGE("snapshot create failed!");
240 return ERR_NULL_OBJECT;
241 }
242 HILOGI("Get snapshot from DistributedDB success, uuid: %{public}s, missionId: %{public}d.",
243 GetAnonymStr(uuid).c_str(), missionId);
244 SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
245 return ERR_NONE;
246 }
247
DeviceOnlineNotify(const std::string & networkId)248 void DistributedSchedMissionManager::DeviceOnlineNotify(const std::string& networkId)
249 {
250 if (networkId.empty()) {
251 HILOGW("DeviceOnlineNotify networkId empty!");
252 return;
253 }
254
255 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
256 if (missionHandler_ != nullptr) {
257 HILOGI("DeviceOnlineNotify RemoveTask");
258 missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
259 }
260 }
261
DeviceOfflineNotify(const std::string & networkId)262 void DistributedSchedMissionManager::DeviceOfflineNotify(const std::string& networkId)
263 {
264 if (networkId.empty()) {
265 HILOGW("DeviceOfflineNotify networkId empty!");
266 return;
267 }
268 StopSyncMissionsFromRemote(networkId);
269 CleanMissionResources(networkId);
270 {
271 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
272 auto iter = remoteDmsMap_.find(networkId);
273 if (iter != remoteDmsMap_.end()) {
274 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
275 remoteDmsMap_.erase(iter);
276 }
277 }
278 HILOGI("DeviceOfflineNotify erase value for networkId: %{public}s.", GetAnonymStr(networkId).c_str());
279 }
280
DeleteDataStorage(const std::string & deviceId,bool isDelayed)281 void DistributedSchedMissionManager::DeleteDataStorage(const std::string& deviceId, bool isDelayed)
282 {
283 if (distributedDataStorage_ == nullptr) {
284 HILOGE("DeleteDataStorage DistributedDataStorage null!");
285 return;
286 }
287 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
288 auto callback = [this, uuid, deviceId]() {
289 if (!distributedDataStorage_->FuzzyDelete(deviceId)) {
290 HILOGE("DeleteDataStorage storage delete failed!");
291 } else {
292 HILOGI("DeleteDataStorage storage delete successfully!");
293 }
294 };
295 if (isDelayed) {
296 if (missionHandler_ != nullptr) {
297 HILOGI("DeleteDataStorage PostTask");
298 missionHandler_->PostTask(callback, DELETE_DATA_STORAGE + uuid, DELETE_DATA_STORAGE_DELAYED);
299 }
300 } else {
301 if (missionHandler_ != nullptr) {
302 HILOGI("DeleteDataStorage RemoveTask");
303 missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
304 }
305 callback();
306 }
307 }
308
RegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)309 int32_t DistributedSchedMissionManager::RegisterMissionListener(const std::u16string& devId,
310 const sptr<IRemoteObject>& listener)
311 {
312 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(Str16ToStr8(devId));
313 if (uuid.empty()) {
314 HILOGE("uuid is empty!");
315 return INVALID_PARAMETERS_ERR;
316 }
317 if (missionHandler_ != nullptr) {
318 HILOGI("RemoveTask");
319 missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
320 }
321 if (listener == nullptr) {
322 return INVALID_PARAMETERS_ERR;
323 }
324 std::string localDeviceId;
325 std::string remoteDeviceId = Str16ToStr8(devId);
326 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
327 || localDeviceId == remoteDeviceId) {
328 HILOGE("check deviceId failed!");
329 return INVALID_PARAMETERS_ERR;
330 }
331 {
332 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
333 auto& listenerInfo = listenDeviceMap_[devId];
334 if (!listenerInfo.Emplace(listener)) {
335 HILOGW("RegisterSyncListener listener has already inserted!");
336 return ERR_NONE;
337 }
338 bool ret = listener->AddDeathRecipient(listenerDeath_);
339 if (!ret) {
340 HILOGW("RegisterSyncListener AddDeathRecipient failed!");
341 }
342 if (listenerInfo.Size() > 1) {
343 HILOGI("RegisterMissionListener not notify remote DMS!");
344 return ERR_NONE;
345 }
346 }
347 return ERR_NONE;
348 }
349
StartSyncRemoteMissions(const std::string & dstDevId,const std::string & localDevId)350 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
351 const std::string& localDevId)
352 {
353 std::u16string devId = Str8ToStr16(dstDevId);
354 {
355 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
356 auto iterItem = listenDeviceMap_.find(devId);
357 if (iterItem == listenDeviceMap_.end()) {
358 return ERR_NONE;
359 }
360 bool callFlag = iterItem->second.called;
361 if (callFlag) {
362 HILOGI("StartSyncRemoteMissions already called!");
363 return ERR_NONE;
364 }
365 }
366 sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
367 if (remoteDms == nullptr) {
368 HILOGE("get remoteDms failed!");
369 RetryStartSyncRemoteMissions(dstDevId, localDevId, 0);
370 return GET_REMOTE_DMS_FAIL;
371 }
372 int32_t ret = StartSyncRemoteMissions(dstDevId, remoteDms);
373 if (ret == ERR_NONE) {
374 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
375 auto iterItem = listenDeviceMap_.find(devId);
376 if (iterItem != listenDeviceMap_.end()) {
377 iterItem->second.called = true;
378 }
379 }
380 return ret;
381 }
382
StartSyncRemoteMissions(const std::string & dstDevId,const sptr<IDistributedSched> & remoteDms)383 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
384 const sptr<IDistributedSched>& remoteDms)
385 {
386 if (remoteDms == nullptr) {
387 HILOGE("remoteDms is null");
388 return INVALID_PARAMETERS_ERR;
389 }
390 std::vector<DstbMissionInfo> missionInfos;
391 CallerInfo callerInfo;
392 if (!GenerateCallerInfo(callerInfo)) {
393 return GET_LOCAL_DEVICE_ERR;
394 }
395 int64_t begin = GetTickCount();
396 int32_t ret = remoteDms->StartSyncMissionsFromRemote(callerInfo, missionInfos);
397 HILOGI("[PerformanceTest] StartSyncMissionsFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
398 ret, GetTickCount() - begin);
399 if (ret == ERR_NONE) {
400 RebornMissionCache(dstDevId, missionInfos);
401 }
402 return ret;
403 }
404
UnRegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)405 int32_t DistributedSchedMissionManager::UnRegisterMissionListener(const std::u16string& devId,
406 const sptr<IRemoteObject>& listener)
407 {
408 if (listener == nullptr) {
409 return INVALID_PARAMETERS_ERR;
410 }
411 if (!IsDeviceIdValidated(Str16ToStr8(devId))) {
412 return INVALID_PARAMETERS_ERR;
413 }
414 std::string localDeviceId;
415 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
416 || localDeviceId == Str16ToStr8(devId)) {
417 HILOGE("check deviceId fail");
418 return INVALID_PARAMETERS_ERR;
419 }
420 {
421 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
422 auto iterItem = listenDeviceMap_.find(devId);
423 if (iterItem == listenDeviceMap_.end()) {
424 return ERR_NONE;
425 }
426 auto& listenerInfo = iterItem->second;
427 auto ret = listenerInfo.Find(listener);
428 if (!ret) {
429 HILOGI("listener not registered!");
430 return ERR_NONE;
431 }
432 listener->RemoveDeathRecipient(listenerDeath_);
433 listenerInfo.Erase(listener);
434 if (!listenerInfo.Empty()) {
435 return ERR_NONE;
436 }
437 listenDeviceMap_.erase(iterItem);
438 }
439 return ERR_NONE;
440 }
441
CleanMissionResources(const std::string & networkId)442 void DistributedSchedMissionManager::CleanMissionResources(const std::string& networkId)
443 {
444 {
445 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
446 auto iterDevice = listenDeviceMap_.find(Str8ToStr16(networkId));
447 if (iterDevice == listenDeviceMap_.end()) {
448 return;
449 }
450 auto& listenerInfo = iterDevice->second;
451 for (sptr<IRemoteObject> listener : listenerInfo.listenerSet) {
452 if (listener != nullptr) {
453 listener->RemoveDeathRecipient(listenerDeath_);
454 }
455 }
456 listenDeviceMap_.erase(iterDevice);
457 }
458 StopSyncRemoteMissions(networkId, true);
459 }
460
StopSyncRemoteMissions(const std::string & dstDevId,bool offline,bool exit)461 int32_t DistributedSchedMissionManager::StopSyncRemoteMissions(const std::string& dstDevId,
462 bool offline, bool exit)
463 {
464 CleanMissionCache(dstDevId);
465 DeleteCachedSnapshotInfo(dstDevId);
466 DeleteDataStorage(dstDevId, true);
467
468 if (offline) {
469 return ERR_NONE;
470 }
471 sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
472 if (remoteDms == nullptr) {
473 HILOGE("DMS get remoteDms failed");
474 return GET_REMOTE_DMS_FAIL;
475 }
476
477 CallerInfo callerInfo;
478 if (!GenerateCallerInfo(callerInfo)) {
479 return GET_LOCAL_DEVICE_ERR;
480 }
481 int64_t begin = GetTickCount();
482 int32_t ret = remoteDms->StopSyncMissionsFromRemote(callerInfo);
483 HILOGI("[PerformanceTest] ret: %{public}d, spend %{public}" PRId64 " ms", ret, GetTickCount() - begin);
484 return ret;
485 }
486
StartSyncRemoteMissions(const std::string & dstDevId,bool fixConflict,int64_t tag)487 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId, bool fixConflict,
488 int64_t tag)
489 {
490 std::string localDeviceId;
491 if (!IsDeviceIdValidated(dstDevId)) {
492 return INVALID_PARAMETERS_ERR;
493 }
494 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
495 || (dstDevId == localDeviceId)) {
496 HILOGE("check deviceId fail");
497 return INVALID_PARAMETERS_ERR;
498 }
499 HILOGI("begin, dstDevId is %{public}s, local deviceId is %{public}s",
500 GetAnonymStr(dstDevId).c_str(), GetAnonymStr(localDeviceId).c_str());
501 auto ret = StartSyncRemoteMissions(dstDevId, localDeviceId);
502 if (ret != ERR_NONE) {
503 HILOGE("StartSyncRemoteMissions failed, %{public}d", ret);
504 return ret;
505 }
506 return ERR_NONE;
507 }
508
StartSyncMissionsFromRemote(const CallerInfo & callerInfo,std::vector<DstbMissionInfo> & missionInfoSet)509 int32_t DistributedSchedMissionManager::StartSyncMissionsFromRemote(const CallerInfo& callerInfo,
510 std::vector<DstbMissionInfo>& missionInfoSet)
511 {
512 auto deviceId = callerInfo.sourceDeviceId;
513 HILOGD("remote version is %{public}d!", callerInfo.dmsVersion);
514 {
515 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
516 remoteSyncDeviceSet_.emplace(deviceId);
517 }
518 int32_t result = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
519 missionInfoSet);
520 auto func = [this, missionInfoSet]() {
521 HILOGD("RegisterMissionListener called.");
522 if (!isRegMissionChange_) {
523 int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
524 if (ret == ERR_OK) {
525 isRegMissionChange_ = true;
526 }
527 InitAllSnapshots(missionInfoSet);
528 }
529 };
530 if (missionHandler_ != nullptr && !missionHandler_->PostTask(func)) {
531 HILOGE("post RegisterMissionListener and InitAllSnapshots Task failed");
532 }
533 return result;
534 }
535
StopSyncMissionsFromRemote(const std::string & networkId)536 void DistributedSchedMissionManager::StopSyncMissionsFromRemote(const std::string& networkId)
537 {
538 HILOGD(" %{private}s!", GetAnonymStr(networkId).c_str());
539 {
540 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
541 remoteSyncDeviceSet_.erase(networkId);
542 if (remoteSyncDeviceSet_.empty()) {
543 auto func = [this]() {
544 int32_t ret = DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
545 if (ret == ERR_OK) {
546 isRegMissionChange_ = false;
547 }
548 };
549 if (missionHandler_ != nullptr && !missionHandler_->PostTask(func)) {
550 HILOGE("post UnRegisterMissionListener Task failed");
551 }
552 }
553 }
554 }
555
NeedSyncDevice(const std::string & deviceId)556 bool DistributedSchedMissionManager::NeedSyncDevice(const std::string& deviceId)
557 {
558 if (deviceId.empty()) {
559 HILOGD("deviceId empty!");
560 return false;
561 }
562 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
563 if (remoteSyncDeviceSet_.count(deviceId) == 0) {
564 return false;
565 }
566 return true;
567 }
568
HasSyncListener(const std::string & networkId)569 bool DistributedSchedMissionManager::HasSyncListener(const std::string& networkId)
570 {
571 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
572 auto iter = listenDeviceMap_.find(Str8ToStr16(networkId));
573 if (iter != listenDeviceMap_.end()) {
574 return iter->second.called;
575 }
576 return false;
577 }
578
NotifySnapshotChanged(const std::string & networkId,int32_t missionId)579 void DistributedSchedMissionManager::NotifySnapshotChanged(const std::string& networkId, int32_t missionId)
580 {
581 std::u16string u16DevId = Str8ToStr16(networkId);
582 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
583 auto iter = listenDeviceMap_.find(u16DevId);
584 if (iter == listenDeviceMap_.end()) {
585 return;
586 }
587 auto& listenerInfo = iter->second;
588 for (auto& listener : listenerInfo.listenerSet) {
589 MissionChangedNotify::NotifySnapshot(listener, u16DevId, missionId);
590 }
591 }
592
OnRemoteDied(const wptr<IRemoteObject> & remote)593 void DistributedSchedMissionManager::OnRemoteDied(const wptr<IRemoteObject>& remote)
594 {
595 HILOGD("OnRemoteDied!");
596 sptr<IRemoteObject> listener = remote.promote();
597 if (listener == nullptr) {
598 HILOGW("listener is null");
599 return;
600 }
601 auto remoteDiedFunc = [this, listener]() {
602 OnMissionListenerDied(listener);
603 };
604 if (missionHandler_ != nullptr) {
605 missionHandler_->PostTask(remoteDiedFunc);
606 }
607 }
608
OnRemoteDied(const wptr<IRemoteObject> & remote)609 void DistributedSchedMissionManager::ListenerDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
610 {
611 DistributedSchedMissionManager::GetInstance().OnRemoteDied(remote);
612 }
613
EnqueueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId,std::unique_ptr<Snapshot> snapshot)614 void DistributedSchedMissionManager::EnqueueCachedSnapshotInfo(const std::string& deviceId, int32_t missionId,
615 std::unique_ptr<Snapshot> snapshot)
616 {
617 if (deviceId.empty() || snapshot == nullptr) {
618 HILOGW("EnqueueCachedSnapshotInfo invalid input param!");
619 return;
620 }
621 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
622 std::string keyInfo = GenerateKeyInfo(deviceId, missionId);
623 auto iter = cachedSnapshotInfos_.find(keyInfo);
624 if (iter != cachedSnapshotInfos_.end()) {
625 if (iter->second == nullptr) {
626 HILOGE("snapshotInfo is null");
627 return;
628 }
629 if (snapshot->GetCreatedTime() < iter->second->GetCreatedTime()) {
630 return;
631 }
632 }
633
634 if (cachedSnapshotInfos_.size() == MAX_CACHED_ITEM) {
635 int64_t oldest = -1;
636 auto iterOldest = cachedSnapshotInfos_.end();
637 for (auto iterItem = cachedSnapshotInfos_.begin(); iterItem != cachedSnapshotInfos_.end(); ++iterItem) {
638 if (iterItem->second == nullptr) {
639 HILOGE("snapshotInfo is null");
640 continue;
641 }
642 if (oldest == -1 || iterItem->second->GetLastAccessTime() < oldest) {
643 oldest = iterItem->second->GetLastAccessTime();
644 iterOldest = iterItem;
645 }
646 }
647 if (iterOldest != cachedSnapshotInfos_.end()) {
648 cachedSnapshotInfos_.erase(iterOldest);
649 }
650 }
651 cachedSnapshotInfos_[keyInfo] = std::move(snapshot);
652 }
653
DequeueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId)654 std::unique_ptr<Snapshot> DistributedSchedMissionManager::DequeueCachedSnapshotInfo(const std::string& deviceId,
655 int32_t missionId)
656 {
657 if (deviceId.empty()) {
658 HILOGW("DequeueCachedSnapshotInfo invalid input param!");
659 return nullptr;
660 }
661 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
662 auto iter = cachedSnapshotInfos_.find(GenerateKeyInfo(deviceId, missionId));
663 if (iter != cachedSnapshotInfos_.end()) {
664 std::unique_ptr<Snapshot> snapshot = std::move(iter->second);
665 if (snapshot == nullptr) {
666 HILOGE("snapshot is null");
667 return nullptr;
668 }
669 snapshot->UpdateLastAccessTime(GetTickCount());
670 iter->second = nullptr;
671 cachedSnapshotInfos_.erase(iter);
672 return snapshot;
673 }
674 return nullptr;
675 }
676
DeleteCachedSnapshotInfo(const std::string & networkId)677 void DistributedSchedMissionManager::DeleteCachedSnapshotInfo(const std::string& networkId)
678 {
679 if (networkId.empty()) {
680 HILOGW("networkId empty!");
681 return;
682 }
683 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
684 if (uuid.empty()) {
685 HILOGW("uuid empty!");
686 return;
687 }
688 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
689 auto iter = cachedSnapshotInfos_.begin();
690 while (iter != cachedSnapshotInfos_.end()) {
691 if (iter->first.find(uuid) != std::string::npos) {
692 iter = cachedSnapshotInfos_.erase(iter);
693 } else {
694 ++iter;
695 }
696 }
697 }
698
FetchCachedRemoteMissions(const std::string & srcId,int32_t numMissions,std::vector<DstbMissionInfo> & missionInfoSet)699 int32_t DistributedSchedMissionManager::FetchCachedRemoteMissions(const std::string& srcId, int32_t numMissions,
700 std::vector<DstbMissionInfo>& missionInfoSet)
701 {
702 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(srcId);
703 if (uuid.empty()) {
704 HILOGE("uuid empty!");
705 return INVALID_PARAMETERS_ERR;
706 }
707 std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
708 auto iter = deviceMissionInfos_.find(uuid);
709 if (iter == deviceMissionInfos_.end()) {
710 HILOGE("can not find uuid, deviceId: %{public}s!", GetAnonymStr(srcId).c_str());
711 return ERR_NULL_OBJECT;
712 }
713
714 // get at most numMissions missions
715 int32_t actualNums = static_cast<int32_t>((iter->second).size());
716 if (actualNums < 0) {
717 HILOGE("invalid size!");
718 return ERR_NULL_OBJECT;
719 }
720 missionInfoSet.assign((iter->second).begin(),
721 (actualNums > numMissions) ? (iter->second).begin() + numMissions : (iter->second).end());
722 return ERR_NONE;
723 }
724
RebornMissionCache(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfoSet)725 void DistributedSchedMissionManager::RebornMissionCache(const std::string& deviceId,
726 const std::vector<DstbMissionInfo>& missionInfoSet)
727 {
728 HILOGI("start! deviceId is %{public}s.", GetAnonymStr(deviceId).c_str());
729 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
730 if (uuid.empty()) {
731 HILOGE("uuid empty!");
732 return;
733 }
734 {
735 std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
736 deviceMissionInfos_[uuid] = missionInfoSet;
737 }
738 HILOGI("RebornMissionCache end!");
739 }
740
CleanMissionCache(const std::string & deviceId)741 void DistributedSchedMissionManager::CleanMissionCache(const std::string& deviceId)
742 {
743 HILOGI("CleanMissionCache start! deviceId is %{public}s.", GetAnonymStr(deviceId).c_str());
744 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
745 if (uuid.empty()) {
746 HILOGE("CleanMissionCache uuid empty!");
747 return;
748 }
749 {
750 std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
751 deviceMissionInfos_.erase(uuid);
752 }
753 HILOGI("CleanMissionCache end!");
754 }
755
NotifyMissionsChangedFromRemote(const CallerInfo & callerInfo,const std::vector<DstbMissionInfo> & missionInfoSet)756 int32_t DistributedSchedMissionManager::NotifyMissionsChangedFromRemote(const CallerInfo& callerInfo,
757 const std::vector<DstbMissionInfo>& missionInfoSet)
758 {
759 HILOGI("NotifyMissionsChangedFromRemote version is %{public}d!", callerInfo.dmsVersion);
760 std::u16string u16DevId = Str8ToStr16(callerInfo.sourceDeviceId);
761 RebornMissionCache(callerInfo.sourceDeviceId, missionInfoSet);
762 {
763 HILOGI("NotifyMissionsChangedFromRemote notify mission start!");
764 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
765 auto iter = listenDeviceMap_.find(u16DevId);
766 if (iter == listenDeviceMap_.end()) {
767 HILOGE("NotifyMissionsChangedFromRemote notify mission no listener!");
768 return INVALID_PARAMETERS_ERR;
769 }
770 auto& listenerSet = iter->second.listenerSet;
771 auto notifyChanged = [listenerSet, u16DevId] () {
772 for (const auto& listener : listenerSet) {
773 MissionChangedNotify::NotifyMissionsChanged(listener, u16DevId);
774 }
775 };
776 if (missionHandler_ != nullptr) {
777 missionHandler_->PostTask(notifyChanged);
778 HILOGI("NotifyMissionsChangedFromRemote end!");
779 return ERR_NONE;
780 }
781 }
782 return INVALID_PARAMETERS_ERR;
783 }
784
NotifyLocalMissionsChanged()785 void DistributedSchedMissionManager::NotifyLocalMissionsChanged()
786 {
787 auto func = [this]() {
788 HILOGI("NotifyLocalMissionsChanged");
789 std::vector<DstbMissionInfo> missionInfos;
790 int32_t ret = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
791 missionInfos);
792 if (ret == ERR_OK) {
793 int32_t result = NotifyMissionsChangedToRemote(missionInfos);
794 HILOGI("NotifyMissionsChangedToRemote result = %{public}d", result);
795 }
796 };
797 if (missionChangeHandler_ == nullptr) {
798 HILOGE("missionChangeHandler_ is null");
799 return;
800 }
801 if (!missionChangeHandler_->PostTask(func)) {
802 HILOGE("postTask failed");
803 }
804 }
805
NotifyMissionSnapshotCreated(int32_t missionId)806 void DistributedSchedMissionManager::NotifyMissionSnapshotCreated(int32_t missionId)
807 {
808 auto func = [this, missionId]() {
809 HILOGD("called.");
810 ErrCode errCode = MissionSnapshotChanged(missionId);
811 if (errCode != ERR_OK) {
812 HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
813 }
814 };
815 if (missionChangeHandler_ == nullptr) {
816 HILOGE("missionChangeHandler_ is null");
817 return;
818 }
819 if (!missionChangeHandler_->PostTask(func, GET_FOREGROUND_SNAPSHOT_DELAY_TIME)) {
820 HILOGE("post MissionSnapshotChanged delay Task failed");
821 }
822 }
823
NotifyMissionSnapshotChanged(int32_t missionId)824 void DistributedSchedMissionManager::NotifyMissionSnapshotChanged(int32_t missionId)
825 {
826 auto func = [this, missionId]() {
827 HILOGD("called.");
828 ErrCode errCode = MissionSnapshotChanged(missionId);
829 if (errCode != ERR_OK) {
830 HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
831 }
832 };
833 if (missionChangeHandler_ == nullptr) {
834 HILOGE("missionChangeHandler_ is null");
835 return;
836 }
837 if (!missionChangeHandler_->PostTask(func)) {
838 HILOGE("post MissionSnapshotChanged Task failed");
839 }
840 }
841
NotifyMissionSnapshotDestroyed(int32_t missionId)842 void DistributedSchedMissionManager::NotifyMissionSnapshotDestroyed(int32_t missionId)
843 {
844 auto func = [this, missionId]() {
845 HILOGD("called.");
846 ErrCode errCode = MissionSnapshotDestroyed(missionId);
847 if (errCode != ERR_OK) {
848 HILOGE("mission snapshot removed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
849 }
850 };
851 if (missionChangeHandler_ == nullptr) {
852 HILOGE("missionChangeHandler_ is null");
853 return;
854 }
855 if (!missionChangeHandler_->PostTask(func)) {
856 HILOGE("post MissionSnapshotDestroyed Task failed");
857 }
858 }
859
NotifyMissionsChangedToRemote(const std::vector<DstbMissionInfo> & missionInfoSet)860 int32_t DistributedSchedMissionManager::NotifyMissionsChangedToRemote(
861 const std::vector<DstbMissionInfo> &missionInfoSet)
862 {
863 CallerInfo callerInfo;
864 if (!GenerateCallerInfo(callerInfo)) {
865 return GET_LOCAL_DEVICE_ERR;
866 }
867 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
868 for (const auto& destDeviceId : remoteSyncDeviceSet_) {
869 auto handler = FetchDeviceHandler(destDeviceId);
870 if (handler == nullptr) {
871 HILOGE("NotifyMissionsChangedToRemote fetch handler failed!");
872 continue;
873 }
874 auto callback = [destDeviceId, missionInfoSet, callerInfo, this] () {
875 NotifyMissionsChangedToRemoteInner(destDeviceId, missionInfoSet, callerInfo);
876 };
877 if (!handler->PostTask(callback)) {
878 HILOGE("NotifyMissionsChangedToRemote PostTask failed!");
879 return ERR_NULL_OBJECT;
880 }
881 }
882
883 return ERR_NONE;
884 }
885
NotifyMissionsChangedToRemoteInner(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfoSet,const CallerInfo & callerInfo)886 void DistributedSchedMissionManager::NotifyMissionsChangedToRemoteInner(const std::string& deviceId,
887 const std::vector<DstbMissionInfo>& missionInfoSet, const CallerInfo& callerInfo)
888 {
889 sptr<IDistributedSched> remoteDms = GetRemoteDms(deviceId);
890 if (remoteDms == nullptr) {
891 HILOGE("NotifyMissionsChangedToRemote DMS get remoteDms failed");
892 return;
893 }
894 int64_t begin = GetTickCount();
895 int32_t result = remoteDms->NotifyMissionsChangedFromRemote(missionInfoSet, callerInfo);
896 HILOGI("[PerformanceTest] NotifyMissionsChangedFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
897 result, GetTickCount() - begin);
898 }
899
GenerateCallerInfo(CallerInfo & callerInfo)900 bool DistributedSchedMissionManager::GenerateCallerInfo(CallerInfo& callerInfo)
901 {
902 std::string localUuid;
903 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localUuid)) {
904 HILOGE("get local uuid failed!");
905 return false;
906 }
907 callerInfo.uid = IPCSkeleton::GetCallingUid();
908 callerInfo.pid = IPCSkeleton::GetCallingRealPid();
909 callerInfo.callerType = CALLER_TYPE_HARMONY;
910 callerInfo.sourceDeviceId = localUuid;
911 callerInfo.dmsVersion = VERSION;
912 return true;
913 }
914
FetchDeviceHandler(const std::string & deviceId)915 std::shared_ptr<AppExecFwk::EventHandler> DistributedSchedMissionManager::FetchDeviceHandler(
916 const std::string& deviceId)
917 {
918 if (!IsDeviceIdValidated(deviceId)) {
919 HILOGW("FetchDeviceHandler device:%{public}s offline.", GetAnonymStr(deviceId).c_str());
920 return nullptr;
921 }
922
923 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
924 if (uuid.empty()) {
925 HILOGE("FetchDeviceHandler uuid empty!");
926 return nullptr;
927 }
928
929 auto iter = deviceHandle_.find(uuid);
930 if (iter != deviceHandle_.end()) {
931 return iter->second;
932 }
933
934 auto anonyUuid = GetAnonymStr(uuid);
935 auto runner = AppExecFwk::EventRunner::Create(anonyUuid + "_MissionN");
936 auto handler = std::make_shared<AppExecFwk::EventHandler>(runner);
937 deviceHandle_.emplace(uuid, handler);
938 return handler;
939 }
940
OnRemoteDied(const wptr<IRemoteObject> & remote)941 void DistributedSchedMissionManager::RemoteDmsDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
942 {
943 HILOGI("OnRemoteDied received died notify!");
944 DistributedSchedMissionManager::GetInstance().OnRemoteDmsDied(remote);
945 }
946
OnRemoteDmsDied(const wptr<IRemoteObject> & remote)947 void DistributedSchedMissionManager::OnRemoteDmsDied(const wptr<IRemoteObject>& remote)
948 {
949 sptr<IRemoteObject> diedRemoted = remote.promote();
950 if (diedRemoted == nullptr) {
951 HILOGW("OnRemoteDmsDied promote failed!");
952 return;
953 }
954 HILOGD("delete diedRemoted");
955 auto remoteDmsDiedFunc = [this, diedRemoted]() {
956 OnRemoteDmsDied(diedRemoted);
957 };
958 if (missionHandler_ != nullptr) {
959 missionHandler_->PostTask(remoteDmsDiedFunc);
960 }
961 }
962
RetryStartSyncRemoteMissions(const std::string & dstDeviceId,const std::string & localDevId,int32_t retryTimes)963 void DistributedSchedMissionManager::RetryStartSyncRemoteMissions(const std::string& dstDeviceId,
964 const std::string& localDevId, int32_t retryTimes)
965 {
966 auto retryFunc = [this, dstDeviceId, localDevId, retryTimes]() {
967 bool ret = HasSyncListener(dstDeviceId);
968 if (!ret) {
969 return;
970 }
971 sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDeviceId);
972 if (remoteDms == nullptr) {
973 HILOGI("RetryStartSyncRemoteMissions DMS get remoteDms failed");
974 RetryStartSyncRemoteMissions(dstDeviceId, localDevId, retryTimes + 1);
975 return;
976 }
977 int32_t errNo = StartSyncRemoteMissions(dstDeviceId, remoteDms);
978 HILOGI("RetryStartSyncRemoteMissions result:%{public}d", errNo);
979 };
980 if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
981 missionHandler_->PostTask(retryFunc, RETRY_DELAYED);
982 }
983 }
984
OnMissionListenerDied(const sptr<IRemoteObject> & remote)985 void DistributedSchedMissionManager::OnMissionListenerDied(const sptr<IRemoteObject>& remote)
986 {
987 HILOGI("OnMissionListenerDied");
988 std::set<std::string> deviceSet;
989 {
990 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
991 auto iterItem = listenDeviceMap_.begin();
992 while (iterItem != listenDeviceMap_.end()) {
993 auto& listenerInfo = iterItem->second;
994 auto ret = listenerInfo.Find(remote);
995 if (!ret) {
996 ++iterItem;
997 continue;
998 }
999 if (remote != nullptr) {
1000 remote->RemoveDeathRecipient(listenerDeath_);
1001 }
1002 listenerInfo.Erase(remote);
1003 if (listenerInfo.Empty()) {
1004 if (listenerInfo.called) {
1005 deviceSet.emplace(Str16ToStr8(iterItem->first));
1006 }
1007 iterItem = listenDeviceMap_.erase(iterItem);
1008 } else {
1009 ++iterItem;
1010 }
1011 }
1012 }
1013 for (auto& devId : deviceSet) {
1014 StopSyncRemoteMissions(devId, false);
1015 }
1016 }
1017
OnRemoteDmsDied(const sptr<IRemoteObject> & remote)1018 void DistributedSchedMissionManager::OnRemoteDmsDied(const sptr<IRemoteObject>& remote)
1019 {
1020 HILOGI("OnRemoteDmsDied");
1021 std::string devId;
1022 {
1023 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
1024 for (auto iter = remoteDmsMap_.begin(); iter != remoteDmsMap_.end(); ++iter) {
1025 if (iter->second->AsObject() == remote && iter->second->AsObject() != nullptr) {
1026 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
1027 devId = iter->first;
1028 remoteDmsMap_.erase(iter);
1029 break;
1030 }
1031 }
1032 }
1033 if (devId.empty()) {
1034 return;
1035 }
1036 bool ret = HasSyncListener(devId);
1037 if (ret) {
1038 std::string localDeviceId;
1039 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
1040 return;
1041 }
1042 RetryStartSyncRemoteMissions(devId, localDeviceId, 0);
1043 }
1044 }
1045
NotifyDmsProxyProcessDied()1046 void DistributedSchedMissionManager::NotifyDmsProxyProcessDied()
1047 {
1048 HILOGI("NotifyDmsProxyProcessDied!");
1049 if (!isRegMissionChange_) {
1050 return;
1051 }
1052 RetryRegisterMissionChange(0);
1053 }
1054
RetryRegisterMissionChange(int32_t retryTimes)1055 void DistributedSchedMissionManager::RetryRegisterMissionChange(int32_t retryTimes)
1056 {
1057 auto remoteDiedFunc = [this, retryTimes]() {
1058 HILOGI("RetryRegisterMissionChange retryTimes:%{public}d begin", retryTimes);
1059 if (!isRegMissionChange_) {
1060 return;
1061 }
1062 int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
1063 if (ret == ERR_NULL_OBJECT) {
1064 RetryRegisterMissionChange(retryTimes + 1);
1065 HILOGI("RetryRegisterMissionChange dmsproxy null, retry!");
1066 return;
1067 }
1068 HILOGI("RetryRegisterMissionChange result:%{public}d", ret);
1069 };
1070 if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
1071 missionHandler_->PostTask(remoteDiedFunc, RETRY_DELAYED);
1072 }
1073 }
1074
InitAllSnapshots(const std::vector<DstbMissionInfo> & missionInfoSet)1075 void DistributedSchedMissionManager::InitAllSnapshots(const std::vector<DstbMissionInfo>& missionInfoSet)
1076 {
1077 for (auto iter = missionInfoSet.begin(); iter != missionInfoSet.end(); iter++) {
1078 ErrCode errCode = MissionSnapshotChanged(iter->id);
1079 if (errCode != ERR_OK) {
1080 HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", iter->id, errCode);
1081 }
1082 }
1083 }
1084
MissionSnapshotChanged(int32_t missionId)1085 int32_t DistributedSchedMissionManager::MissionSnapshotChanged(int32_t missionId)
1086 {
1087 std::string networkId;
1088 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1089 HILOGE("get local networkId failed!");
1090 return INVALID_PARAMETERS_ERR;
1091 }
1092 AAFwk::MissionSnapshot missionSnapshot;
1093 ErrCode errCode = DistributedSchedAdapter::GetInstance()
1094 .GetLocalMissionSnapshotInfo(networkId, missionId, missionSnapshot);
1095 if (errCode != ERR_OK) {
1096 HILOGE("get local mission snapshot failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
1097 return errCode;
1098 }
1099 Snapshot snapshot;
1100 SnapshotConverter::ConvertToSnapshot(missionSnapshot, snapshot);
1101 MessageParcel data;
1102 errCode = MissionSnapshotSequence(snapshot, data);
1103 if (errCode != ERR_OK) {
1104 HILOGE("mission snapshot sequence failed, errCode=%{public}d", errCode);
1105 return errCode;
1106 }
1107 size_t len = data.GetReadableBytes();
1108 const uint8_t* byteStream = data.ReadBuffer(len);
1109 errCode = StoreSnapshotInfo(networkId, missionId, byteStream, len);
1110 return errCode;
1111 }
1112
MissionSnapshotDestroyed(int32_t missionId)1113 int32_t DistributedSchedMissionManager::MissionSnapshotDestroyed(int32_t missionId)
1114 {
1115 std::string networkId;
1116 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1117 HILOGE("get local networkId failed!");
1118 return INVALID_PARAMETERS_ERR;
1119 }
1120 ErrCode errCode = RemoveSnapshotInfo(networkId, missionId);
1121 return errCode;
1122 }
1123
MissionSnapshotSequence(const Snapshot & snapshot,MessageParcel & data)1124 int32_t DistributedSchedMissionManager::MissionSnapshotSequence(const Snapshot& snapshot, MessageParcel& data)
1125 {
1126 bool ret = snapshot.WriteSnapshotInfo(data);
1127 if (!ret) {
1128 HILOGE("WriteSnapshotInfo failed!");
1129 return ERR_FLATTEN_OBJECT;
1130 }
1131 ret = snapshot.WritePixelMap(data);
1132 if (!ret) {
1133 HILOGE("WritePixelMap failed!");
1134 return ERR_FLATTEN_OBJECT;
1135 }
1136 return ERR_OK;
1137 }
1138
OnDnetDied()1139 void DistributedSchedMissionManager::OnDnetDied()
1140 {
1141 auto dnetDiedFunc = [this]() {
1142 HILOGI("OnDnetDied");
1143 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
1144 if (!isRegMissionChange_) {
1145 return;
1146 }
1147 remoteSyncDeviceSet_.clear();
1148 DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
1149 isRegMissionChange_ = false;
1150 };
1151 if (missionHandler_ != nullptr) {
1152 missionHandler_->PostTask(dnetDiedFunc);
1153 }
1154 }
1155 } // namespace DistributedSchedule
1156 } // namespace OHOS
1157