1 /*
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "mission/distributed_sched_mission_manager.h"
17 
18 #include <chrono>
19 #include <sys/time.h>
20 #include <unistd.h>
21 
22 #include "datetime_ex.h"
23 #include "ipc_skeleton.h"
24 #include "iservice_registry.h"
25 #include "nlohmann/json.hpp"
26 #include "string_ex.h"
27 #include "system_ability_definition.h"
28 
29 #include "distributed_sched_adapter.h"
30 #include "distributed_sched_utils.h"
31 #include "dtbschedmgr_device_info_storage.h"
32 #include "dtbschedmgr_log.h"
33 #include "mission/mission_changed_notify.h"
34 #include "mission/mission_constant.h"
35 #include "mission/mission_info_converter.h"
36 #include "mission/snapshot_converter.h"
37 
38 namespace OHOS {
39 namespace DistributedSchedule {
40 namespace {
41 const std::string TAG = "DistributedSchedMissionManager";
42 constexpr size_t MAX_CACHED_ITEM = 10;
43 constexpr int32_t MAX_RETRY_TIMES = 15;
44 constexpr int32_t RETRY_DELAYED = 2000;
45 constexpr int32_t GET_FOREGROUND_SNAPSHOT_DELAY_TIME = 800; // ms
46 const std::string DELETE_DATA_STORAGE = "DeleteDataStorage";
47 constexpr int32_t DELETE_DATA_STORAGE_DELAYED = 60000; // ms
48 const std::string INVAILD_LOCAL_DEVICE_ID = "-1";
49 }
50 namespace Mission {
51 constexpr int32_t GET_MAX_MISSIONS = 20;
52 } // Mission
53 using namespace std::chrono;
54 using namespace Constants::Mission;
55 using namespace OHOS::DistributedKv;
56 
57 IMPLEMENT_SINGLE_INSTANCE(DistributedSchedMissionManager);
58 
Init()59 void DistributedSchedMissionManager::Init()
60 {
61     listenerDeath_ = new ListenerDeathRecipient();
62     remoteDmsRecipient_ = new RemoteDmsDeathRecipient();
63     auto runner = AppExecFwk::EventRunner::Create("MissionManagerHandler");
64     missionHandler_ = std::make_shared<AppExecFwk::EventHandler>(runner);
65     auto updateRunner = AppExecFwk::EventRunner::Create("UpdateHandler");
66     updateHandler_ = std::make_shared<AppExecFwk::EventHandler>(updateRunner);
67     missonChangeListener_ = new DistributedMissionChangeListener();
68     auto missionChangeRunner = AppExecFwk::EventRunner::Create("DistributedMissionChange");
69     missionChangeHandler_ = std::make_shared<AppExecFwk::EventHandler>(missionChangeRunner);
70 }
71 
GetMissionInfos(const std::string & deviceId,int32_t numMissions,std::vector<AAFwk::MissionInfo> & missionInfoSet)72 int32_t DistributedSchedMissionManager::GetMissionInfos(const std::string& deviceId,
73     int32_t numMissions, std::vector<AAFwk::MissionInfo>& missionInfoSet)
74 {
75     HILOGI("start!");
76     if (!IsDeviceIdValidated(deviceId)) {
77         return INVALID_PARAMETERS_ERR;
78     }
79     if (numMissions <= 0) {
80         HILOGE("numMissions is illegal! numMissions:%{public}d", numMissions);
81         return INVALID_PARAMETERS_ERR;
82     }
83     std::vector<DstbMissionInfo> dstbMissionInfoSet;
84     int32_t ret = FetchCachedRemoteMissions(deviceId, numMissions, dstbMissionInfoSet);
85     if (ret != ERR_OK) {
86         HILOGE("FetchCachedRemoteMissions failed, ret = %{public}d", ret);
87         return ret;
88     }
89     return MissionInfoConverter::ConvertToMissionInfos(dstbMissionInfoSet, missionInfoSet);
90 }
91 
GetRemoteDms(const std::string & deviceId)92 sptr<IDistributedSched> DistributedSchedMissionManager::GetRemoteDms(const std::string& deviceId)
93 {
94     if (deviceId.empty()) {
95         HILOGE("GetRemoteDms remoteDeviceId is empty");
96         return nullptr;
97     }
98     int64_t begin = GetTickCount();
99     {
100         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
101         auto iter = remoteDmsMap_.find(deviceId);
102         if (iter != remoteDmsMap_.end()) {
103             auto object = iter->second;
104             if (object != nullptr) {
105                 HILOGI("[PerformanceTest] GetRemoteDms from cache spend %{public}" PRId64 " ms",
106                     GetTickCount() - begin);
107                 return object;
108             }
109         }
110     }
111     HILOGD("GetRemoteDms connect deviceid is %s", GetAnonymStr(deviceId).c_str());
112     auto samgr = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
113     if (samgr == nullptr) {
114         HILOGE("GetRemoteDms failed to connect to systemAbilityMgr!");
115         return nullptr;
116     }
117     auto object = samgr->CheckSystemAbility(DISTRIBUTED_SCHED_SA_ID, deviceId);
118     if (object == nullptr) {
119         HILOGE("GetRemoteDms failed to get dms for remote device: %{public}s!", GetAnonymStr(deviceId).c_str());
120         return nullptr;
121     }
122     auto ret = object->AddDeathRecipient(remoteDmsRecipient_);
123     HILOGD("GetRemoteDms AddDeathRecipient ret : %{public}d", ret);
124     sptr<IDistributedSched> remoteDmsObj = iface_cast<IDistributedSched>(object);
125     {
126         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
127         auto iter = remoteDmsMap_.find(deviceId);
128         if (iter != remoteDmsMap_.end()) {
129             iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
130         }
131         remoteDmsMap_[deviceId] = remoteDmsObj;
132     }
133     HILOGI("[PerformanceTest] GetRemoteDms spend %{public}" PRId64 " ms", GetTickCount() - begin);
134     return remoteDmsObj;
135 }
136 
IsDeviceIdValidated(const std::string & deviceId)137 bool DistributedSchedMissionManager::IsDeviceIdValidated(const std::string& deviceId)
138 {
139     if (deviceId.empty()) {
140         HILOGE("IsDeviceIdValidated deviceId is empty!");
141         return false;
142     }
143     if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(deviceId) == nullptr) {
144         HILOGW("IsDeviceIdValidated device offline.");
145         return false;
146     }
147 
148     return true;
149 }
150 
NotifyRemoteDied(const wptr<IRemoteObject> & remote)151 void DistributedSchedMissionManager::NotifyRemoteDied(const wptr<IRemoteObject>& remote)
152 {
153     if (distributedDataStorage_ == nullptr) {
154         HILOGE("DistributedDataStorage null!");
155         return;
156     }
157     distributedDataStorage_->NotifyRemoteDied(remote);
158 }
159 
InitDataStorage()160 int32_t DistributedSchedMissionManager::InitDataStorage()
161 {
162     if (distributedDataStorage_ == nullptr) {
163         distributedDataStorage_ = std::make_shared<DistributedDataStorage>();
164     }
165     if (!distributedDataStorage_->Init()) {
166         HILOGE("InitDataStorage DistributedDataStorage init failed!");
167         return ERR_NULL_OBJECT;
168     }
169     return ERR_NONE;
170 }
171 
StopDataStorage()172 int32_t DistributedSchedMissionManager::StopDataStorage()
173 {
174     if (distributedDataStorage_ == nullptr) {
175         HILOGE("StopDataStorage DistributedDataStorage null!");
176         return ERR_NULL_OBJECT;
177     }
178     if (!distributedDataStorage_->Stop()) {
179         HILOGE("StopDataStorage DistributedDataStorage stop failed!");
180         return ERR_NULL_OBJECT;
181     }
182     return ERR_NONE;
183 }
184 
StoreSnapshotInfo(const std::string & deviceId,int32_t missionId,const uint8_t * byteStream,size_t len)185 int32_t DistributedSchedMissionManager::StoreSnapshotInfo(const std::string& deviceId, int32_t missionId,
186     const uint8_t* byteStream, size_t len)
187 {
188     if (distributedDataStorage_ == nullptr) {
189         HILOGE("StoreSnapshotInfo DistributedDataStorage null!");
190         return ERR_NULL_OBJECT;
191     }
192     if (!distributedDataStorage_->Insert(deviceId, missionId, byteStream, len)) {
193         HILOGE("StoreSnapshotInfo DistributedDataStorage insert failed!");
194         return INVALID_PARAMETERS_ERR;
195     }
196     return ERR_NONE;
197 }
198 
RemoveSnapshotInfo(const std::string & deviceId,int32_t missionId)199 int32_t DistributedSchedMissionManager::RemoveSnapshotInfo(const std::string& deviceId, int32_t missionId)
200 {
201     if (distributedDataStorage_ == nullptr) {
202         HILOGE("RemoveSnapshotInfo DistributedDataStorage null!");
203         return ERR_NULL_OBJECT;
204     }
205     if (!distributedDataStorage_->Delete(deviceId, missionId)) {
206         HILOGE("RemoveSnapshotInfo DistributedDataStorage delete failed!");
207         return INVALID_PARAMETERS_ERR;
208     }
209     return ERR_NONE;
210 }
211 
GetRemoteMissionSnapshotInfo(const std::string & networkId,int32_t missionId,std::unique_ptr<AAFwk::MissionSnapshot> & missionSnapshot)212 int32_t DistributedSchedMissionManager::GetRemoteMissionSnapshotInfo(const std::string& networkId, int32_t missionId,
213     std::unique_ptr<AAFwk::MissionSnapshot>& missionSnapshot)
214 {
215     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
216     if (uuid.empty()) {
217         HILOGE("uuid is empty!");
218         return INVALID_PARAMETERS_ERR;
219     }
220     std::unique_ptr<Snapshot> snapshotPtr = DequeueCachedSnapshotInfo(uuid, missionId);
221     if (snapshotPtr != nullptr) {
222         HILOGI("Get snapshot from cache success, uuid: %{public}s, missionId: %{public}d.",
223             GetAnonymStr(uuid).c_str(), missionId);
224         SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
225         return ERR_NONE;
226     }
227     if (distributedDataStorage_ == nullptr) {
228         HILOGE("DistributedDataStorage is null!");
229         return ERR_NULL_OBJECT;
230     }
231     DistributedKv::Value value;
232     bool ret = distributedDataStorage_->Query(networkId, missionId, value);
233     if (!ret) {
234         HILOGE("DistributedDataStorage query failed!");
235         return INVALID_PARAMETERS_ERR;
236     }
237     snapshotPtr = Snapshot::Create(value.Data());
238     if (snapshotPtr == nullptr) {
239         HILOGE("snapshot create failed!");
240         return ERR_NULL_OBJECT;
241     }
242     HILOGI("Get snapshot from DistributedDB success, uuid: %{public}s, missionId: %{public}d.",
243         GetAnonymStr(uuid).c_str(), missionId);
244     SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
245     return ERR_NONE;
246 }
247 
DeviceOnlineNotify(const std::string & networkId)248 void DistributedSchedMissionManager::DeviceOnlineNotify(const std::string& networkId)
249 {
250     if (networkId.empty()) {
251         HILOGW("DeviceOnlineNotify networkId empty!");
252         return;
253     }
254 
255     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
256     if (missionHandler_ != nullptr) {
257         HILOGI("DeviceOnlineNotify RemoveTask");
258         missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
259     }
260 }
261 
DeviceOfflineNotify(const std::string & networkId)262 void DistributedSchedMissionManager::DeviceOfflineNotify(const std::string& networkId)
263 {
264     if (networkId.empty()) {
265         HILOGW("DeviceOfflineNotify networkId empty!");
266         return;
267     }
268     StopSyncMissionsFromRemote(networkId);
269     CleanMissionResources(networkId);
270     {
271         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
272         auto iter = remoteDmsMap_.find(networkId);
273         if (iter != remoteDmsMap_.end()) {
274             iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
275             remoteDmsMap_.erase(iter);
276         }
277     }
278     HILOGI("DeviceOfflineNotify erase value for networkId: %{public}s.", GetAnonymStr(networkId).c_str());
279 }
280 
DeleteDataStorage(const std::string & deviceId,bool isDelayed)281 void DistributedSchedMissionManager::DeleteDataStorage(const std::string& deviceId, bool isDelayed)
282 {
283     if (distributedDataStorage_ == nullptr) {
284         HILOGE("DeleteDataStorage DistributedDataStorage null!");
285         return;
286     }
287     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
288     auto callback = [this, uuid, deviceId]() {
289         if (!distributedDataStorage_->FuzzyDelete(deviceId)) {
290             HILOGE("DeleteDataStorage storage delete failed!");
291         } else {
292             HILOGI("DeleteDataStorage storage delete successfully!");
293         }
294     };
295     if (isDelayed) {
296         if (missionHandler_ != nullptr) {
297             HILOGI("DeleteDataStorage PostTask");
298             missionHandler_->PostTask(callback, DELETE_DATA_STORAGE + uuid, DELETE_DATA_STORAGE_DELAYED);
299         }
300     } else {
301         if (missionHandler_ != nullptr) {
302             HILOGI("DeleteDataStorage RemoveTask");
303             missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
304         }
305         callback();
306     }
307 }
308 
RegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)309 int32_t DistributedSchedMissionManager::RegisterMissionListener(const std::u16string& devId,
310     const sptr<IRemoteObject>& listener)
311 {
312     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(Str16ToStr8(devId));
313     if (uuid.empty()) {
314         HILOGE("uuid is empty!");
315         return INVALID_PARAMETERS_ERR;
316     }
317     if (missionHandler_ != nullptr) {
318         HILOGI("RemoveTask");
319         missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
320     }
321     if (listener == nullptr) {
322         return INVALID_PARAMETERS_ERR;
323     }
324     std::string localDeviceId;
325     std::string remoteDeviceId = Str16ToStr8(devId);
326     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
327         || localDeviceId == remoteDeviceId) {
328         HILOGE("check deviceId failed!");
329         return INVALID_PARAMETERS_ERR;
330     }
331     {
332         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
333         auto& listenerInfo = listenDeviceMap_[devId];
334         if (!listenerInfo.Emplace(listener)) {
335             HILOGW("RegisterSyncListener listener has already inserted!");
336             return ERR_NONE;
337         }
338         bool ret = listener->AddDeathRecipient(listenerDeath_);
339         if (!ret) {
340             HILOGW("RegisterSyncListener AddDeathRecipient failed!");
341         }
342         if (listenerInfo.Size() > 1) {
343             HILOGI("RegisterMissionListener not notify remote DMS!");
344             return ERR_NONE;
345         }
346     }
347     return ERR_NONE;
348 }
349 
StartSyncRemoteMissions(const std::string & dstDevId,const std::string & localDevId)350 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
351     const std::string& localDevId)
352 {
353     std::u16string devId = Str8ToStr16(dstDevId);
354     {
355         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
356         auto iterItem = listenDeviceMap_.find(devId);
357         if (iterItem == listenDeviceMap_.end()) {
358             return ERR_NONE;
359         }
360         bool callFlag = iterItem->second.called;
361         if (callFlag) {
362             HILOGI("StartSyncRemoteMissions already called!");
363             return ERR_NONE;
364         }
365     }
366     sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
367     if (remoteDms == nullptr) {
368         HILOGE("get remoteDms failed!");
369         RetryStartSyncRemoteMissions(dstDevId, localDevId, 0);
370         return GET_REMOTE_DMS_FAIL;
371     }
372     int32_t ret = StartSyncRemoteMissions(dstDevId, remoteDms);
373     if (ret == ERR_NONE) {
374         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
375         auto iterItem = listenDeviceMap_.find(devId);
376         if (iterItem != listenDeviceMap_.end()) {
377             iterItem->second.called = true;
378         }
379     }
380     return ret;
381 }
382 
StartSyncRemoteMissions(const std::string & dstDevId,const sptr<IDistributedSched> & remoteDms)383 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
384     const sptr<IDistributedSched>& remoteDms)
385 {
386     if (remoteDms == nullptr) {
387         HILOGE("remoteDms is null");
388         return INVALID_PARAMETERS_ERR;
389     }
390     std::vector<DstbMissionInfo> missionInfos;
391     CallerInfo callerInfo;
392     if (!GenerateCallerInfo(callerInfo)) {
393         return GET_LOCAL_DEVICE_ERR;
394     }
395     int64_t begin = GetTickCount();
396     int32_t ret = remoteDms->StartSyncMissionsFromRemote(callerInfo, missionInfos);
397     HILOGI("[PerformanceTest] StartSyncMissionsFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
398         ret, GetTickCount() - begin);
399     if (ret == ERR_NONE) {
400         RebornMissionCache(dstDevId, missionInfos);
401     }
402     return ret;
403 }
404 
UnRegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)405 int32_t DistributedSchedMissionManager::UnRegisterMissionListener(const std::u16string& devId,
406     const sptr<IRemoteObject>& listener)
407 {
408     if (listener == nullptr) {
409         return INVALID_PARAMETERS_ERR;
410     }
411     if (!IsDeviceIdValidated(Str16ToStr8(devId))) {
412         return INVALID_PARAMETERS_ERR;
413     }
414     std::string localDeviceId;
415     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
416         || localDeviceId == Str16ToStr8(devId)) {
417         HILOGE("check deviceId fail");
418         return INVALID_PARAMETERS_ERR;
419     }
420     {
421         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
422         auto iterItem = listenDeviceMap_.find(devId);
423         if (iterItem == listenDeviceMap_.end()) {
424             return ERR_NONE;
425         }
426         auto& listenerInfo = iterItem->second;
427         auto ret = listenerInfo.Find(listener);
428         if (!ret) {
429             HILOGI("listener not registered!");
430             return ERR_NONE;
431         }
432         listener->RemoveDeathRecipient(listenerDeath_);
433         listenerInfo.Erase(listener);
434         if (!listenerInfo.Empty()) {
435             return ERR_NONE;
436         }
437         listenDeviceMap_.erase(iterItem);
438     }
439     return ERR_NONE;
440 }
441 
CleanMissionResources(const std::string & networkId)442 void DistributedSchedMissionManager::CleanMissionResources(const std::string& networkId)
443 {
444     {
445         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
446         auto iterDevice = listenDeviceMap_.find(Str8ToStr16(networkId));
447         if (iterDevice == listenDeviceMap_.end()) {
448             return;
449         }
450         auto& listenerInfo = iterDevice->second;
451         for (sptr<IRemoteObject> listener : listenerInfo.listenerSet) {
452             if (listener != nullptr) {
453                 listener->RemoveDeathRecipient(listenerDeath_);
454             }
455         }
456         listenDeviceMap_.erase(iterDevice);
457     }
458     StopSyncRemoteMissions(networkId, true);
459 }
460 
StopSyncRemoteMissions(const std::string & dstDevId,bool offline,bool exit)461 int32_t DistributedSchedMissionManager::StopSyncRemoteMissions(const std::string& dstDevId,
462     bool offline, bool exit)
463 {
464     CleanMissionCache(dstDevId);
465     DeleteCachedSnapshotInfo(dstDevId);
466     DeleteDataStorage(dstDevId, true);
467 
468     if (offline) {
469         return ERR_NONE;
470     }
471     sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
472     if (remoteDms == nullptr) {
473         HILOGE("DMS get remoteDms failed");
474         return GET_REMOTE_DMS_FAIL;
475     }
476 
477     CallerInfo callerInfo;
478     if (!GenerateCallerInfo(callerInfo)) {
479         return GET_LOCAL_DEVICE_ERR;
480     }
481     int64_t begin = GetTickCount();
482     int32_t ret = remoteDms->StopSyncMissionsFromRemote(callerInfo);
483     HILOGI("[PerformanceTest] ret: %{public}d, spend %{public}" PRId64 " ms", ret, GetTickCount() - begin);
484     return ret;
485 }
486 
StartSyncRemoteMissions(const std::string & dstDevId,bool fixConflict,int64_t tag)487 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId, bool fixConflict,
488     int64_t tag)
489 {
490     std::string localDeviceId;
491     if (!IsDeviceIdValidated(dstDevId)) {
492         return INVALID_PARAMETERS_ERR;
493     }
494     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
495         || (dstDevId == localDeviceId)) {
496         HILOGE("check deviceId fail");
497         return INVALID_PARAMETERS_ERR;
498     }
499     HILOGI("begin, dstDevId is %{public}s, local deviceId is %{public}s",
500         GetAnonymStr(dstDevId).c_str(), GetAnonymStr(localDeviceId).c_str());
501     auto ret = StartSyncRemoteMissions(dstDevId, localDeviceId);
502     if (ret != ERR_NONE) {
503         HILOGE("StartSyncRemoteMissions failed, %{public}d", ret);
504         return ret;
505     }
506     return ERR_NONE;
507 }
508 
StartSyncMissionsFromRemote(const CallerInfo & callerInfo,std::vector<DstbMissionInfo> & missionInfoSet)509 int32_t DistributedSchedMissionManager::StartSyncMissionsFromRemote(const CallerInfo& callerInfo,
510     std::vector<DstbMissionInfo>& missionInfoSet)
511 {
512     auto deviceId = callerInfo.sourceDeviceId;
513     HILOGD("remote version is %{public}d!", callerInfo.dmsVersion);
514     {
515         std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
516         remoteSyncDeviceSet_.emplace(deviceId);
517     }
518     int32_t result = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
519         missionInfoSet);
520     auto func = [this, missionInfoSet]() {
521         HILOGD("RegisterMissionListener called.");
522         if (!isRegMissionChange_) {
523             int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
524             if (ret == ERR_OK) {
525                 isRegMissionChange_ = true;
526             }
527             InitAllSnapshots(missionInfoSet);
528         }
529     };
530     if (missionHandler_ != nullptr && !missionHandler_->PostTask(func)) {
531         HILOGE("post RegisterMissionListener and InitAllSnapshots Task failed");
532     }
533     return result;
534 }
535 
StopSyncMissionsFromRemote(const std::string & networkId)536 void DistributedSchedMissionManager::StopSyncMissionsFromRemote(const std::string& networkId)
537 {
538     HILOGD(" %{private}s!", GetAnonymStr(networkId).c_str());
539     {
540         std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
541         remoteSyncDeviceSet_.erase(networkId);
542         if (remoteSyncDeviceSet_.empty()) {
543             auto func = [this]() {
544                 int32_t ret = DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
545                 if (ret == ERR_OK) {
546                     isRegMissionChange_ = false;
547                 }
548             };
549             if (missionHandler_ != nullptr && !missionHandler_->PostTask(func)) {
550                 HILOGE("post UnRegisterMissionListener Task failed");
551             }
552         }
553     }
554 }
555 
NeedSyncDevice(const std::string & deviceId)556 bool DistributedSchedMissionManager::NeedSyncDevice(const std::string& deviceId)
557 {
558     if (deviceId.empty()) {
559         HILOGD("deviceId empty!");
560         return false;
561     }
562     std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
563     if (remoteSyncDeviceSet_.count(deviceId) == 0) {
564         return false;
565     }
566     return true;
567 }
568 
HasSyncListener(const std::string & networkId)569 bool DistributedSchedMissionManager::HasSyncListener(const std::string& networkId)
570 {
571     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
572     auto iter = listenDeviceMap_.find(Str8ToStr16(networkId));
573     if (iter != listenDeviceMap_.end()) {
574         return iter->second.called;
575     }
576     return false;
577 }
578 
NotifySnapshotChanged(const std::string & networkId,int32_t missionId)579 void DistributedSchedMissionManager::NotifySnapshotChanged(const std::string& networkId, int32_t missionId)
580 {
581     std::u16string u16DevId = Str8ToStr16(networkId);
582     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
583     auto iter = listenDeviceMap_.find(u16DevId);
584     if (iter == listenDeviceMap_.end()) {
585         return;
586     }
587     auto& listenerInfo = iter->second;
588     for (auto& listener : listenerInfo.listenerSet) {
589         MissionChangedNotify::NotifySnapshot(listener, u16DevId, missionId);
590     }
591 }
592 
OnRemoteDied(const wptr<IRemoteObject> & remote)593 void DistributedSchedMissionManager::OnRemoteDied(const wptr<IRemoteObject>& remote)
594 {
595     HILOGD("OnRemoteDied!");
596     sptr<IRemoteObject> listener = remote.promote();
597     if (listener == nullptr) {
598         HILOGW("listener is null");
599         return;
600     }
601     auto remoteDiedFunc = [this, listener]() {
602         OnMissionListenerDied(listener);
603     };
604     if (missionHandler_ != nullptr) {
605         missionHandler_->PostTask(remoteDiedFunc);
606     }
607 }
608 
OnRemoteDied(const wptr<IRemoteObject> & remote)609 void DistributedSchedMissionManager::ListenerDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
610 {
611     DistributedSchedMissionManager::GetInstance().OnRemoteDied(remote);
612 }
613 
EnqueueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId,std::unique_ptr<Snapshot> snapshot)614 void DistributedSchedMissionManager::EnqueueCachedSnapshotInfo(const std::string& deviceId, int32_t missionId,
615     std::unique_ptr<Snapshot> snapshot)
616 {
617     if (deviceId.empty() || snapshot == nullptr) {
618         HILOGW("EnqueueCachedSnapshotInfo invalid input param!");
619         return;
620     }
621     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
622     std::string keyInfo = GenerateKeyInfo(deviceId, missionId);
623     auto iter = cachedSnapshotInfos_.find(keyInfo);
624     if (iter != cachedSnapshotInfos_.end()) {
625         if (iter->second == nullptr) {
626             HILOGE("snapshotInfo is null");
627             return;
628         }
629         if (snapshot->GetCreatedTime() < iter->second->GetCreatedTime()) {
630             return;
631         }
632     }
633 
634     if (cachedSnapshotInfos_.size() == MAX_CACHED_ITEM) {
635         int64_t oldest = -1;
636         auto iterOldest = cachedSnapshotInfos_.end();
637         for (auto iterItem = cachedSnapshotInfos_.begin(); iterItem != cachedSnapshotInfos_.end(); ++iterItem) {
638             if (iterItem->second == nullptr) {
639                 HILOGE("snapshotInfo is null");
640                 continue;
641             }
642             if (oldest == -1 || iterItem->second->GetLastAccessTime() < oldest) {
643                 oldest = iterItem->second->GetLastAccessTime();
644                 iterOldest = iterItem;
645             }
646         }
647         if (iterOldest != cachedSnapshotInfos_.end()) {
648             cachedSnapshotInfos_.erase(iterOldest);
649         }
650     }
651     cachedSnapshotInfos_[keyInfo] = std::move(snapshot);
652 }
653 
DequeueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId)654 std::unique_ptr<Snapshot> DistributedSchedMissionManager::DequeueCachedSnapshotInfo(const std::string& deviceId,
655     int32_t missionId)
656 {
657     if (deviceId.empty()) {
658         HILOGW("DequeueCachedSnapshotInfo invalid input param!");
659         return nullptr;
660     }
661     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
662     auto iter = cachedSnapshotInfos_.find(GenerateKeyInfo(deviceId, missionId));
663     if (iter != cachedSnapshotInfos_.end()) {
664         std::unique_ptr<Snapshot> snapshot = std::move(iter->second);
665         if (snapshot == nullptr) {
666             HILOGE("snapshot is null");
667             return nullptr;
668         }
669         snapshot->UpdateLastAccessTime(GetTickCount());
670         iter->second = nullptr;
671         cachedSnapshotInfos_.erase(iter);
672         return snapshot;
673     }
674     return nullptr;
675 }
676 
DeleteCachedSnapshotInfo(const std::string & networkId)677 void DistributedSchedMissionManager::DeleteCachedSnapshotInfo(const std::string& networkId)
678 {
679     if (networkId.empty()) {
680         HILOGW("networkId empty!");
681         return;
682     }
683     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
684     if (uuid.empty()) {
685         HILOGW("uuid empty!");
686         return;
687     }
688     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
689     auto iter = cachedSnapshotInfos_.begin();
690     while (iter != cachedSnapshotInfos_.end()) {
691         if (iter->first.find(uuid) != std::string::npos) {
692             iter = cachedSnapshotInfos_.erase(iter);
693         } else {
694             ++iter;
695         }
696     }
697 }
698 
FetchCachedRemoteMissions(const std::string & srcId,int32_t numMissions,std::vector<DstbMissionInfo> & missionInfoSet)699 int32_t DistributedSchedMissionManager::FetchCachedRemoteMissions(const std::string& srcId, int32_t numMissions,
700     std::vector<DstbMissionInfo>& missionInfoSet)
701 {
702     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(srcId);
703     if (uuid.empty()) {
704         HILOGE("uuid empty!");
705         return INVALID_PARAMETERS_ERR;
706     }
707     std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
708     auto iter = deviceMissionInfos_.find(uuid);
709     if (iter == deviceMissionInfos_.end()) {
710         HILOGE("can not find uuid, deviceId: %{public}s!", GetAnonymStr(srcId).c_str());
711         return ERR_NULL_OBJECT;
712     }
713 
714     // get at most numMissions missions
715     int32_t actualNums = static_cast<int32_t>((iter->second).size());
716     if (actualNums < 0) {
717         HILOGE("invalid size!");
718         return ERR_NULL_OBJECT;
719     }
720     missionInfoSet.assign((iter->second).begin(),
721         (actualNums > numMissions) ? (iter->second).begin() + numMissions : (iter->second).end());
722     return ERR_NONE;
723 }
724 
RebornMissionCache(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfoSet)725 void DistributedSchedMissionManager::RebornMissionCache(const std::string& deviceId,
726     const std::vector<DstbMissionInfo>& missionInfoSet)
727 {
728     HILOGI("start! deviceId is %{public}s.", GetAnonymStr(deviceId).c_str());
729     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
730     if (uuid.empty()) {
731         HILOGE("uuid empty!");
732         return;
733     }
734     {
735         std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
736         deviceMissionInfos_[uuid] = missionInfoSet;
737     }
738     HILOGI("RebornMissionCache end!");
739 }
740 
CleanMissionCache(const std::string & deviceId)741 void DistributedSchedMissionManager::CleanMissionCache(const std::string& deviceId)
742 {
743     HILOGI("CleanMissionCache start! deviceId is %{public}s.", GetAnonymStr(deviceId).c_str());
744     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
745     if (uuid.empty()) {
746         HILOGE("CleanMissionCache uuid empty!");
747         return;
748     }
749     {
750         std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
751         deviceMissionInfos_.erase(uuid);
752     }
753     HILOGI("CleanMissionCache end!");
754 }
755 
NotifyMissionsChangedFromRemote(const CallerInfo & callerInfo,const std::vector<DstbMissionInfo> & missionInfoSet)756 int32_t DistributedSchedMissionManager::NotifyMissionsChangedFromRemote(const CallerInfo& callerInfo,
757     const std::vector<DstbMissionInfo>& missionInfoSet)
758 {
759     HILOGI("NotifyMissionsChangedFromRemote version is %{public}d!", callerInfo.dmsVersion);
760     std::u16string u16DevId = Str8ToStr16(callerInfo.sourceDeviceId);
761     RebornMissionCache(callerInfo.sourceDeviceId, missionInfoSet);
762     {
763         HILOGI("NotifyMissionsChangedFromRemote notify mission start!");
764         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
765         auto iter = listenDeviceMap_.find(u16DevId);
766         if (iter == listenDeviceMap_.end()) {
767             HILOGE("NotifyMissionsChangedFromRemote notify mission no listener!");
768             return INVALID_PARAMETERS_ERR;
769         }
770         auto& listenerSet = iter->second.listenerSet;
771         auto notifyChanged = [listenerSet, u16DevId] () {
772             for (const auto& listener : listenerSet) {
773                 MissionChangedNotify::NotifyMissionsChanged(listener, u16DevId);
774             }
775         };
776         if (missionHandler_ != nullptr) {
777             missionHandler_->PostTask(notifyChanged);
778             HILOGI("NotifyMissionsChangedFromRemote end!");
779             return ERR_NONE;
780         }
781     }
782     return INVALID_PARAMETERS_ERR;
783 }
784 
NotifyLocalMissionsChanged()785 void DistributedSchedMissionManager::NotifyLocalMissionsChanged()
786 {
787     auto func = [this]() {
788         HILOGI("NotifyLocalMissionsChanged");
789         std::vector<DstbMissionInfo> missionInfos;
790         int32_t ret = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
791             missionInfos);
792         if (ret == ERR_OK) {
793             int32_t result = NotifyMissionsChangedToRemote(missionInfos);
794             HILOGI("NotifyMissionsChangedToRemote result = %{public}d", result);
795         }
796     };
797     if (missionChangeHandler_ == nullptr) {
798         HILOGE("missionChangeHandler_ is null");
799         return;
800     }
801     if (!missionChangeHandler_->PostTask(func)) {
802         HILOGE("postTask failed");
803     }
804 }
805 
NotifyMissionSnapshotCreated(int32_t missionId)806 void DistributedSchedMissionManager::NotifyMissionSnapshotCreated(int32_t missionId)
807 {
808     auto func = [this, missionId]() {
809         HILOGD("called.");
810         ErrCode errCode = MissionSnapshotChanged(missionId);
811         if (errCode != ERR_OK) {
812             HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
813         }
814     };
815     if (missionChangeHandler_ == nullptr) {
816         HILOGE("missionChangeHandler_ is null");
817         return;
818     }
819     if (!missionChangeHandler_->PostTask(func, GET_FOREGROUND_SNAPSHOT_DELAY_TIME)) {
820         HILOGE("post MissionSnapshotChanged delay Task failed");
821     }
822 }
823 
NotifyMissionSnapshotChanged(int32_t missionId)824 void DistributedSchedMissionManager::NotifyMissionSnapshotChanged(int32_t missionId)
825 {
826     auto func = [this, missionId]() {
827         HILOGD("called.");
828         ErrCode errCode = MissionSnapshotChanged(missionId);
829         if (errCode != ERR_OK) {
830             HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
831         }
832     };
833     if (missionChangeHandler_ == nullptr) {
834         HILOGE("missionChangeHandler_ is null");
835         return;
836     }
837     if (!missionChangeHandler_->PostTask(func)) {
838         HILOGE("post MissionSnapshotChanged Task failed");
839     }
840 }
841 
NotifyMissionSnapshotDestroyed(int32_t missionId)842 void DistributedSchedMissionManager::NotifyMissionSnapshotDestroyed(int32_t missionId)
843 {
844     auto func = [this, missionId]() {
845         HILOGD("called.");
846         ErrCode errCode = MissionSnapshotDestroyed(missionId);
847         if (errCode != ERR_OK) {
848             HILOGE("mission snapshot removed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
849         }
850     };
851     if (missionChangeHandler_ == nullptr) {
852         HILOGE("missionChangeHandler_ is null");
853         return;
854     }
855     if (!missionChangeHandler_->PostTask(func)) {
856         HILOGE("post MissionSnapshotDestroyed Task failed");
857     }
858 }
859 
NotifyMissionsChangedToRemote(const std::vector<DstbMissionInfo> & missionInfoSet)860 int32_t DistributedSchedMissionManager::NotifyMissionsChangedToRemote(
861     const std::vector<DstbMissionInfo> &missionInfoSet)
862 {
863     CallerInfo callerInfo;
864     if (!GenerateCallerInfo(callerInfo)) {
865         return GET_LOCAL_DEVICE_ERR;
866     }
867     std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
868     for (const auto& destDeviceId : remoteSyncDeviceSet_) {
869         auto handler = FetchDeviceHandler(destDeviceId);
870         if (handler == nullptr) {
871             HILOGE("NotifyMissionsChangedToRemote fetch handler failed!");
872             continue;
873         }
874         auto callback = [destDeviceId, missionInfoSet, callerInfo, this] () {
875             NotifyMissionsChangedToRemoteInner(destDeviceId, missionInfoSet, callerInfo);
876         };
877         if (!handler->PostTask(callback)) {
878             HILOGE("NotifyMissionsChangedToRemote PostTask failed!");
879             return ERR_NULL_OBJECT;
880         }
881     }
882 
883     return ERR_NONE;
884 }
885 
NotifyMissionsChangedToRemoteInner(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfoSet,const CallerInfo & callerInfo)886 void DistributedSchedMissionManager::NotifyMissionsChangedToRemoteInner(const std::string& deviceId,
887     const std::vector<DstbMissionInfo>& missionInfoSet, const CallerInfo& callerInfo)
888 {
889     sptr<IDistributedSched> remoteDms = GetRemoteDms(deviceId);
890     if (remoteDms == nullptr) {
891         HILOGE("NotifyMissionsChangedToRemote DMS get remoteDms failed");
892         return;
893     }
894     int64_t begin = GetTickCount();
895     int32_t result = remoteDms->NotifyMissionsChangedFromRemote(missionInfoSet, callerInfo);
896     HILOGI("[PerformanceTest] NotifyMissionsChangedFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
897         result, GetTickCount() - begin);
898 }
899 
GenerateCallerInfo(CallerInfo & callerInfo)900 bool DistributedSchedMissionManager::GenerateCallerInfo(CallerInfo& callerInfo)
901 {
902     std::string localUuid;
903     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localUuid)) {
904         HILOGE("get local uuid failed!");
905         return false;
906     }
907     callerInfo.uid = IPCSkeleton::GetCallingUid();
908     callerInfo.pid = IPCSkeleton::GetCallingRealPid();
909     callerInfo.callerType = CALLER_TYPE_HARMONY;
910     callerInfo.sourceDeviceId = localUuid;
911     callerInfo.dmsVersion = VERSION;
912     return true;
913 }
914 
FetchDeviceHandler(const std::string & deviceId)915 std::shared_ptr<AppExecFwk::EventHandler> DistributedSchedMissionManager::FetchDeviceHandler(
916     const std::string& deviceId)
917 {
918     if (!IsDeviceIdValidated(deviceId)) {
919         HILOGW("FetchDeviceHandler device:%{public}s offline.", GetAnonymStr(deviceId).c_str());
920         return nullptr;
921     }
922 
923     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
924     if (uuid.empty()) {
925         HILOGE("FetchDeviceHandler uuid empty!");
926         return nullptr;
927     }
928 
929     auto iter = deviceHandle_.find(uuid);
930     if (iter != deviceHandle_.end()) {
931         return iter->second;
932     }
933 
934     auto anonyUuid = GetAnonymStr(uuid);
935     auto runner = AppExecFwk::EventRunner::Create(anonyUuid + "_MissionN");
936     auto handler = std::make_shared<AppExecFwk::EventHandler>(runner);
937     deviceHandle_.emplace(uuid, handler);
938     return handler;
939 }
940 
OnRemoteDied(const wptr<IRemoteObject> & remote)941 void DistributedSchedMissionManager::RemoteDmsDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
942 {
943     HILOGI("OnRemoteDied received died notify!");
944     DistributedSchedMissionManager::GetInstance().OnRemoteDmsDied(remote);
945 }
946 
OnRemoteDmsDied(const wptr<IRemoteObject> & remote)947 void DistributedSchedMissionManager::OnRemoteDmsDied(const wptr<IRemoteObject>& remote)
948 {
949     sptr<IRemoteObject> diedRemoted = remote.promote();
950     if (diedRemoted == nullptr) {
951         HILOGW("OnRemoteDmsDied promote failed!");
952         return;
953     }
954     HILOGD("delete diedRemoted");
955     auto remoteDmsDiedFunc = [this, diedRemoted]() {
956         OnRemoteDmsDied(diedRemoted);
957     };
958     if (missionHandler_ != nullptr) {
959         missionHandler_->PostTask(remoteDmsDiedFunc);
960     }
961 }
962 
RetryStartSyncRemoteMissions(const std::string & dstDeviceId,const std::string & localDevId,int32_t retryTimes)963 void DistributedSchedMissionManager::RetryStartSyncRemoteMissions(const std::string& dstDeviceId,
964     const std::string& localDevId, int32_t retryTimes)
965 {
966     auto retryFunc = [this, dstDeviceId, localDevId, retryTimes]() {
967         bool ret = HasSyncListener(dstDeviceId);
968         if (!ret) {
969             return;
970         }
971         sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDeviceId);
972         if (remoteDms == nullptr) {
973             HILOGI("RetryStartSyncRemoteMissions DMS get remoteDms failed");
974             RetryStartSyncRemoteMissions(dstDeviceId, localDevId, retryTimes + 1);
975             return;
976         }
977         int32_t errNo = StartSyncRemoteMissions(dstDeviceId, remoteDms);
978         HILOGI("RetryStartSyncRemoteMissions result:%{public}d", errNo);
979     };
980     if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
981         missionHandler_->PostTask(retryFunc, RETRY_DELAYED);
982     }
983 }
984 
OnMissionListenerDied(const sptr<IRemoteObject> & remote)985 void DistributedSchedMissionManager::OnMissionListenerDied(const sptr<IRemoteObject>& remote)
986 {
987     HILOGI("OnMissionListenerDied");
988     std::set<std::string> deviceSet;
989     {
990         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
991         auto iterItem = listenDeviceMap_.begin();
992         while (iterItem != listenDeviceMap_.end()) {
993             auto& listenerInfo = iterItem->second;
994             auto ret = listenerInfo.Find(remote);
995             if (!ret) {
996                 ++iterItem;
997                 continue;
998             }
999             if (remote != nullptr) {
1000                 remote->RemoveDeathRecipient(listenerDeath_);
1001             }
1002             listenerInfo.Erase(remote);
1003             if (listenerInfo.Empty()) {
1004                 if (listenerInfo.called) {
1005                     deviceSet.emplace(Str16ToStr8(iterItem->first));
1006                 }
1007                 iterItem = listenDeviceMap_.erase(iterItem);
1008             } else {
1009                 ++iterItem;
1010             }
1011         }
1012     }
1013     for (auto& devId : deviceSet) {
1014         StopSyncRemoteMissions(devId, false);
1015     }
1016 }
1017 
OnRemoteDmsDied(const sptr<IRemoteObject> & remote)1018 void DistributedSchedMissionManager::OnRemoteDmsDied(const sptr<IRemoteObject>& remote)
1019 {
1020     HILOGI("OnRemoteDmsDied");
1021     std::string devId;
1022     {
1023         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
1024         for (auto iter = remoteDmsMap_.begin(); iter != remoteDmsMap_.end(); ++iter) {
1025             if (iter->second->AsObject() == remote && iter->second->AsObject() != nullptr) {
1026                 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
1027                 devId = iter->first;
1028                 remoteDmsMap_.erase(iter);
1029                 break;
1030             }
1031         }
1032     }
1033     if (devId.empty()) {
1034         return;
1035     }
1036     bool ret = HasSyncListener(devId);
1037     if (ret) {
1038         std::string localDeviceId;
1039         if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
1040             return;
1041         }
1042         RetryStartSyncRemoteMissions(devId, localDeviceId, 0);
1043     }
1044 }
1045 
NotifyDmsProxyProcessDied()1046 void DistributedSchedMissionManager::NotifyDmsProxyProcessDied()
1047 {
1048     HILOGI("NotifyDmsProxyProcessDied!");
1049     if (!isRegMissionChange_) {
1050         return;
1051     }
1052     RetryRegisterMissionChange(0);
1053 }
1054 
RetryRegisterMissionChange(int32_t retryTimes)1055 void DistributedSchedMissionManager::RetryRegisterMissionChange(int32_t retryTimes)
1056 {
1057     auto remoteDiedFunc = [this, retryTimes]() {
1058         HILOGI("RetryRegisterMissionChange retryTimes:%{public}d begin", retryTimes);
1059         if (!isRegMissionChange_) {
1060             return;
1061         }
1062         int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
1063         if (ret == ERR_NULL_OBJECT) {
1064             RetryRegisterMissionChange(retryTimes + 1);
1065             HILOGI("RetryRegisterMissionChange dmsproxy null, retry!");
1066             return;
1067         }
1068         HILOGI("RetryRegisterMissionChange result:%{public}d", ret);
1069     };
1070     if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
1071         missionHandler_->PostTask(remoteDiedFunc, RETRY_DELAYED);
1072     }
1073 }
1074 
InitAllSnapshots(const std::vector<DstbMissionInfo> & missionInfoSet)1075 void DistributedSchedMissionManager::InitAllSnapshots(const std::vector<DstbMissionInfo>& missionInfoSet)
1076 {
1077     for (auto iter = missionInfoSet.begin(); iter != missionInfoSet.end(); iter++) {
1078         ErrCode errCode = MissionSnapshotChanged(iter->id);
1079         if (errCode != ERR_OK) {
1080             HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", iter->id, errCode);
1081         }
1082     }
1083 }
1084 
MissionSnapshotChanged(int32_t missionId)1085 int32_t DistributedSchedMissionManager::MissionSnapshotChanged(int32_t missionId)
1086 {
1087     std::string networkId;
1088     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1089         HILOGE("get local networkId failed!");
1090         return INVALID_PARAMETERS_ERR;
1091     }
1092     AAFwk::MissionSnapshot missionSnapshot;
1093     ErrCode errCode = DistributedSchedAdapter::GetInstance()
1094         .GetLocalMissionSnapshotInfo(networkId, missionId, missionSnapshot);
1095     if (errCode != ERR_OK) {
1096         HILOGE("get local mission snapshot failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
1097         return errCode;
1098     }
1099     Snapshot snapshot;
1100     SnapshotConverter::ConvertToSnapshot(missionSnapshot, snapshot);
1101     MessageParcel data;
1102     errCode = MissionSnapshotSequence(snapshot, data);
1103     if (errCode != ERR_OK) {
1104         HILOGE("mission snapshot sequence failed, errCode=%{public}d", errCode);
1105         return errCode;
1106     }
1107     size_t len = data.GetReadableBytes();
1108     const uint8_t* byteStream = data.ReadBuffer(len);
1109     errCode = StoreSnapshotInfo(networkId, missionId, byteStream, len);
1110     return errCode;
1111 }
1112 
MissionSnapshotDestroyed(int32_t missionId)1113 int32_t DistributedSchedMissionManager::MissionSnapshotDestroyed(int32_t missionId)
1114 {
1115     std::string networkId;
1116     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1117         HILOGE("get local networkId failed!");
1118         return INVALID_PARAMETERS_ERR;
1119     }
1120     ErrCode errCode = RemoveSnapshotInfo(networkId, missionId);
1121     return errCode;
1122 }
1123 
MissionSnapshotSequence(const Snapshot & snapshot,MessageParcel & data)1124 int32_t DistributedSchedMissionManager::MissionSnapshotSequence(const Snapshot& snapshot, MessageParcel& data)
1125 {
1126     bool ret = snapshot.WriteSnapshotInfo(data);
1127     if (!ret) {
1128         HILOGE("WriteSnapshotInfo failed!");
1129         return ERR_FLATTEN_OBJECT;
1130     }
1131     ret = snapshot.WritePixelMap(data);
1132     if (!ret) {
1133         HILOGE("WritePixelMap failed!");
1134         return ERR_FLATTEN_OBJECT;
1135     }
1136     return ERR_OK;
1137 }
1138 
OnDnetDied()1139 void DistributedSchedMissionManager::OnDnetDied()
1140 {
1141     auto dnetDiedFunc = [this]() {
1142         HILOGI("OnDnetDied");
1143         std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
1144         if (!isRegMissionChange_) {
1145             return;
1146         }
1147         remoteSyncDeviceSet_.clear();
1148         DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
1149         isRegMissionChange_ = false;
1150     };
1151     if (missionHandler_ != nullptr) {
1152         missionHandler_->PostTask(dnetDiedFunc);
1153     }
1154 }
1155 } // namespace DistributedSchedule
1156 } // namespace OHOS
1157