1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "dsched_continue_manager.h"
17 
18 #include <chrono>
19 #include <sys/prctl.h>
20 
21 #include "cJSON.h"
22 
23 #include "continue_scene_session_handler.h"
24 #include "dfx/distributed_radar.h"
25 #include "distributed_sched_utils.h"
26 #include "dsched_transport_softbus_adapter.h"
27 #include "dtbschedmgr_device_info_storage.h"
28 #include "dtbschedmgr_log.h"
29 #include "mission/distributed_bm_storage.h"
30 #include "mission/dms_continue_send_manager.h"
31 #include "mission/dms_continue_recv_manager.h"
32 
33 namespace OHOS {
34 namespace DistributedSchedule {
35 namespace {
36 const std::string TAG = "DSchedContinueManager";
37 const std::string DSCHED_CONTINUE_MANAGER = "dsched_continue_manager";
38 const std::string CONTINUE_TIMEOUT_TASK = "continue_timeout_task";
39 }
40 
41 IMPLEMENT_SINGLE_INSTANCE(DSchedContinueManager);
42 
DSchedContinueManager()43 DSchedContinueManager::DSchedContinueManager()
44 {
45 }
46 
~DSchedContinueManager()47 DSchedContinueManager::~DSchedContinueManager()
48 {
49     HILOGI("DSchedContinueManager delete");
50     UnInit();
51 }
52 
Init()53 void DSchedContinueManager::Init()
54 {
55     HILOGI("Init DSchedContinueManager start");
56     if (eventHandler_ != nullptr) {
57         HILOGI("DSchedContinueManager already inited, end.");
58         return;
59     }
60     DSchedTransportSoftbusAdapter::GetInstance().InitChannel();
61     softbusListener_ = std::make_shared<DSchedContinueManager::SoftbusListener>();
62     DSchedTransportSoftbusAdapter::GetInstance().RegisterListener(SERVICE_TYPE_CONTINUE, softbusListener_);
63     eventThread_ = std::thread(&DSchedContinueManager::StartEvent, this);
64     std::unique_lock<std::mutex> lock(eventMutex_);
65     eventCon_.wait(lock, [this] {
66         return eventHandler_ != nullptr;
67     });
68     HILOGI("Init DSchedContinueManager end");
69 }
70 
StartEvent()71 void DSchedContinueManager::StartEvent()
72 {
73     HILOGI("StartEvent start");
74     prctl(PR_SET_NAME, DSCHED_CONTINUE_MANAGER.c_str());
75     auto runner = AppExecFwk::EventRunner::Create(false);
76     {
77         std::lock_guard<std::mutex> lock(eventMutex_);
78         eventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
79     }
80     eventCon_.notify_one();
81     runner->Run();
82     HILOGI("StartEvent end");
83 }
84 
UnInit()85 void DSchedContinueManager::UnInit()
86 {
87     HILOGI("UnInit start");
88     DSchedTransportSoftbusAdapter::GetInstance().UnregisterListener(SERVICE_TYPE_CONTINUE, softbusListener_);
89     DSchedTransportSoftbusAdapter::GetInstance().ReleaseChannel();
90     continues_.clear();
91     cntSink_ = 0;
92     cntSource_ = 0;
93 
94     if (eventHandler_ != nullptr) {
95         eventHandler_->GetEventRunner()->Stop();
96         eventThread_.join();
97         eventHandler_ = nullptr;
98     } else {
99         HILOGE("eventHandler_ is nullptr");
100     }
101     HILOGI("UnInit end");
102 }
103 
NotifyAllConnectDecision(std::string peerDeviceId,bool isSupport)104 void DSchedContinueManager::NotifyAllConnectDecision(std::string peerDeviceId, bool isSupport)
105 {
106     HILOGI("Notify all connect decision, peerDeviceId %{public}s, isSupport %{public}d.",
107         GetAnonymStr(peerDeviceId).c_str(), isSupport);
108 #ifdef DMSFWK_ALL_CONNECT_MGR
109     std::lock_guard<std::mutex> decisionLock(connectDecisionMutex_);
110     peerConnectDecision_[peerDeviceId] = isSupport;
111     connectDecisionCond_.notify_all();
112 #endif
113 }
114 
ContinueMission(const std::string & srcDeviceId,const std::string & dstDeviceId,int32_t missionId,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)115 int32_t DSchedContinueManager::ContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId,
116     int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
117 {
118     if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
119         HILOGE("srcDeviceId or dstDeviceId or callback is null!");
120         return INVALID_PARAMETERS_ERR;
121     }
122 
123     std::string localDevId;
124     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
125         HILOGE("get local deviceId failed!");
126         return INVALID_PARAMETERS_ERR;
127     }
128     if (localDevId != srcDeviceId && localDevId != dstDeviceId) {
129         HILOGE("Input srcDevId: %{public}s or dstDevId: %{public}s must be locDevId: %{public}s.",
130             GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), GetAnonymStr(localDevId).c_str());
131         return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
132     }
133     if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(
134         localDevId == srcDeviceId ? dstDeviceId : srcDeviceId) == nullptr) {
135         HILOGE("GetDeviceInfoById fail, locDevId: %{public}s, srcDevId: %{public}s, dstDevId: %{public}s.",
136             GetAnonymStr(localDevId).c_str(), GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str());
137         return INVALID_REMOTE_PARAMETERS_ERR;
138     }
139 
140     auto func = [this, srcDeviceId, dstDeviceId, missionId, callback, wantParams]() {
141         HandleContinueMission(srcDeviceId, dstDeviceId, missionId, callback, wantParams);
142     };
143     if (eventHandler_ == nullptr) {
144         HILOGE("eventHandler_ is nullptr");
145         return INVALID_PARAMETERS_ERR;
146     }
147     eventHandler_->PostTask(func);
148     return ERR_OK;
149 }
150 
HandleContinueMission(const std::string & srcDeviceId,const std::string & dstDeviceId,int32_t missionId,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)151 void DSchedContinueManager::HandleContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId,
152     int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
153 {
154     HILOGI("start, srcDeviceId: %{public}s. dstDeviceId: %{public}s. missionId: %{public}d.",
155         GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), missionId);
156 
157     if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
158         HILOGE("srcDeviceId or dstDeviceId or callback is null!");
159         return;
160     }
161 
162     std::string localDevId;
163     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
164         HILOGE("get local deviceId failed!");
165         return;
166     }
167     DSchedContinueInfo info = DSchedContinueInfo(srcDeviceId, dstDeviceId, missionId);
168 
169     AAFwk::MissionInfo missionInfo;
170     if (AAFwk::AbilityManagerClient::GetInstance()->GetMissionInfo("", missionId, missionInfo) == ERR_OK
171         && srcDeviceId == localDevId) {
172         info.sourceBundleName_ = missionInfo.want.GetBundle();
173         info.sinkBundleName_ = missionInfo.want.GetBundle();
174     }
175 
176     HandleContinueMissionWithBundleName(info, callback, wantParams);
177     return;
178 }
179 
ContinueMission(const DSchedContinueInfo & continueInfo,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)180 int32_t DSchedContinueManager::ContinueMission(const DSchedContinueInfo& continueInfo,
181     const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
182 {
183     std::string srcDeviceId = continueInfo.sourceDeviceId_;
184     std::string dstDeviceId = continueInfo.sinkDeviceId_;
185 
186     if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
187         HILOGE("srcDeviceId or dstDeviceId or callback is null!");
188         return INVALID_PARAMETERS_ERR;
189     }
190 
191     std::string localDevId;
192     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
193         HILOGE("get local deviceId failed!");
194         return INVALID_PARAMETERS_ERR;
195     }
196     if (localDevId != srcDeviceId && localDevId != dstDeviceId) {
197         HILOGE("Input srcDevId: %{public}s or dstDevId: %{public}s must be locDevId: %{public}s.",
198             GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), GetAnonymStr(localDevId).c_str());
199         return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
200     }
201     if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(
202         localDevId == srcDeviceId ? dstDeviceId : srcDeviceId) == nullptr) {
203         HILOGE("GetDeviceInfoById fail, locDevId: %{public}s, srcDevId: %{public}s, dstDevId: %{public}s.",
204             GetAnonymStr(localDevId).c_str(), GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str());
205         return INVALID_REMOTE_PARAMETERS_ERR;
206     }
207 
208 #ifdef SUPPORT_DISTRIBUTED_MISSION_MANAGER
209     if (localDevId == srcDeviceId) {
210         int32_t missionId = -1;
211         int32_t ret = DMSContinueSendMgr::GetInstance().GetMissionIdByBundleName(
212             continueInfo.sinkBundleName_, missionId);
213         if (ret != ERR_OK) {
214             HILOGE("get missionId fail, ret %{public}d.", ret);
215             return INVALID_PARAMETERS_ERR;
216         }
217     }
218 #endif
219 
220     auto func = [this, continueInfo, callback, wantParams]() {
221         HandleContinueMission(continueInfo, callback, wantParams);
222     };
223     if (eventHandler_ == nullptr) {
224         HILOGE("eventHandler_ is nullptr");
225         return INVALID_PARAMETERS_ERR;
226     }
227     eventHandler_->PostTask(func);
228     return ERR_OK;
229 }
230 
HandleContinueMission(const DSchedContinueInfo & continueInfo,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)231 void DSchedContinueManager::HandleContinueMission(const DSchedContinueInfo& continueInfo,
232     const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
233 {
234     std::string srcDeviceId = continueInfo.sourceDeviceId_;
235     std::string dstDeviceId = continueInfo.sinkDeviceId_;
236     std::string srcBundleName = continueInfo.sourceBundleName_;
237     std::string bundleName = continueInfo.sinkBundleName_;
238     std::string continueType = continueInfo.continueType_;
239     HILOGI("start, srcDeviceId: %{public}s. dstDeviceId: %{public}s. bundleName: %{public}s."
240         " continueType: %{public}s.", GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(),
241         bundleName.c_str(), continueType.c_str());
242 
243     if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
244         HILOGE("srcDeviceId or dstDeviceId or callback is null!");
245         return;
246     }
247 
248     DSchedContinueInfo info = DSchedContinueInfo(srcDeviceId, srcBundleName, dstDeviceId, bundleName, continueType);
249     HandleContinueMissionWithBundleName(info, callback, wantParams);
250     return;
251 }
252 
GetFirstBundleName(DSchedContinueInfo & info,std::string & firstBundleName,std::string bundleName,std::string deviceId)253 bool DSchedContinueManager::GetFirstBundleName(DSchedContinueInfo &info, std::string &firstBundleName,
254     std::string bundleName, std::string deviceId)
255 {
256     uint16_t bundleNameId;
257     DmsBundleInfo distributedBundleInfo;
258     DmsBmStorage::GetInstance()->GetBundleNameId(bundleName, bundleNameId);
259     bool result = DmsBmStorage::GetInstance()->GetDistributedBundleInfo(deviceId,
260         bundleNameId, distributedBundleInfo);
261     if (!result) {
262         HILOGE("GetDistributedBundleInfo faild");
263         return false;
264     }
265     std::vector<DmsAbilityInfo> dmsAbilityInfos = distributedBundleInfo.dmsAbilityInfos;
266     for (DmsAbilityInfo &ability: dmsAbilityInfos) {
267         std::vector<std::string> abilityContinueTypes = ability.continueType;
268         for (std::string &ability_continue_type: abilityContinueTypes) {
269             if (ability_continue_type == info.continueType_ && !ability.continueBundleName.empty()) {
270                 firstBundleName = *ability.continueBundleName.begin();
271                 return true;
272             }
273         }
274     }
275     HILOGE("can not get abilicy info or continue bundle names is empty for continue type:%{public}s",
276            info.continueType_.c_str());
277     return false;
278 }
279 
HandleContinueMissionWithBundleName(DSchedContinueInfo & info,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)280 void DSchedContinueManager::HandleContinueMissionWithBundleName(DSchedContinueInfo &info,
281     const sptr<IRemoteObject> &callback, const OHOS::AAFwk::WantParams &wantParams)
282 {
283     int32_t direction = CONTINUE_SINK;
284     int32_t ret = CheckContinuationLimit(info.sourceDeviceId_, info.sinkDeviceId_, direction);
285     if (ret != ERR_OK) {
286         HILOGE("CheckContinuationLimit failed, ret: %{public}d", ret);
287         return;
288     }
289     int32_t subType = CONTINUE_PUSH;
290     if (direction == CONTINUE_SOURCE) {
291         cntSource_++;
292     } else {
293         cntSink_++;
294         subType = CONTINUE_PULL;
295         if (info.sourceBundleName_.empty()) {
296             HILOGW("current sub type is continue pull; but can not get source bundle name from recv cache.");
297             std::string firstBundleNamme;
298             std::string bundleName = info.sinkBundleName_;
299             std::string deviceId = info.sinkDeviceId_;
300             if (GetFirstBundleName(info, firstBundleNamme, bundleName, deviceId)) {
301                 info.sourceBundleName_ = firstBundleNamme;
302             }
303         }
304     }
305 
306     {
307         std::lock_guard<std::mutex> continueLock(continueMutex_);
308         if (!continues_.empty() && continues_.find(info) != continues_.end()) {
309             HILOGE("a same continue task is already in progress.");
310             return;
311         }
312         auto newContinue = std::make_shared<DSchedContinue>(subType, direction, callback, info);
313         newContinue->Init();
314         continues_.insert(std::make_pair(info, newContinue));
315 #ifdef DMSFWK_ALL_CONNECT_MGR
316         {
317             std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_);
318             std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_;
319             if (peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end()) {
320                 peerConnectDecision_.erase(peerDeviceId);
321             }
322         }
323 #endif
324         newContinue->OnContinueMission(wantParams);
325     }
326     WaitAllConnectDecision(direction, info, CONTINUE_TIMEOUT);
327     HILOGI("end, subType: %{public}d dirction: %{public}d, continue info: %{public}s",
328         subType, direction, info.toString().c_str());
329 }
330 
WaitAllConnectDecision(int32_t direction,const DSchedContinueInfo & info,int32_t timeout)331 void DSchedContinueManager::WaitAllConnectDecision(int32_t direction, const DSchedContinueInfo &info, int32_t timeout)
332 {
333 #ifdef DMSFWK_ALL_CONNECT_MGR
334     std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_;
335     {
336         std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_);
337         connectDecisionCond_.wait_for(decisionLock, std::chrono::seconds(CONNECT_DECISION_WAIT_S),
338             [this, peerDeviceId]() {
339                 return peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end() &&
340                     peerConnectDecision_.at(peerDeviceId).load();
341             });
342 
343         if (peerConnectDecision_.find(peerDeviceId) == peerConnectDecision_.end()) {
344             HILOGE("Not find peerDeviceId %{public}s in peerConnectDecision.", GetAnonymStr(peerDeviceId).c_str());
345             SetTimeOut(info, 0);
346             return;
347         }
348         if (!peerConnectDecision_.at(peerDeviceId).load()) {
349             HILOGE("All connect manager refuse bind to PeerDeviceId %{public}s.", GetAnonymStr(peerDeviceId).c_str());
350             peerConnectDecision_.erase(peerDeviceId);
351             SetTimeOut(info, 0);
352             return;
353         }
354         peerConnectDecision_.erase(peerDeviceId);
355     }
356 #endif
357     SetTimeOut(info, timeout);
358 }
359 
SetTimeOut(const DSchedContinueInfo & info,int32_t timeout)360 void DSchedContinueManager::SetTimeOut(const DSchedContinueInfo &info, int32_t timeout)
361 {
362     auto func = [this, info]() {
363         if (continues_.empty() || continues_.count(info) == 0) {
364             HILOGE("continue not exist.");
365             return;
366         }
367         HILOGE("continue timeout! info: %{public}s", info.toString().c_str());
368         auto dsContinue = continues_[info];
369         if (dsContinue != nullptr) {
370             dsContinue->OnContinueEnd(CONTINUE_ABILITY_TIMEOUT_ERR);
371         }
372     };
373     if (eventHandler_ == nullptr) {
374         HILOGE("eventHandler_ is nullptr");
375         return;
376     }
377     timeout > 0 ? eventHandler_->PostTask(func, info.ToStringIgnoreMissionId(), timeout) :
378         eventHandler_->PostTask(func);
379 }
380 
StartContinuation(const OHOS::AAFwk::Want & want,int32_t missionId,int32_t callerUid,int32_t status,uint32_t accessToken)381 int32_t DSchedContinueManager::StartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId,
382     int32_t callerUid, int32_t status, uint32_t accessToken)
383 {
384     std::string dstDeviceId = want.GetElement().GetDeviceID();
385     if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(dstDeviceId) == nullptr) {
386         HILOGE("GetDeviceInfoById fail, dstDevId: %{public}s.", GetAnonymStr(dstDeviceId).c_str());
387         return INVALID_REMOTE_PARAMETERS_ERR;
388     }
389     if (GetDSchedContinueByWant(want, missionId) == nullptr) {
390         HILOGE("GetDSchedContinueByWant fail, dstDevId: %{public}s, missionId: %{public}d.",
391             GetAnonymStr(dstDeviceId).c_str(), missionId);
392         return INVALID_REMOTE_PARAMETERS_ERR;
393     }
394 
395     auto func = [this, want, missionId, callerUid, status, accessToken]() {
396         HandleStartContinuation(want, missionId, callerUid, status, accessToken);
397     };
398     if (eventHandler_ == nullptr) {
399         HILOGE("eventHandler_ is nullptr");
400         return INVALID_PARAMETERS_ERR;
401     }
402     eventHandler_->PostTask(func);
403     return ERR_OK;
404 }
405 
HandleStartContinuation(const OHOS::AAFwk::Want & want,int32_t missionId,int32_t callerUid,int32_t status,uint32_t accessToken)406 void DSchedContinueManager::HandleStartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId,
407     int32_t callerUid, int32_t status, uint32_t accessToken)
408 {
409     HILOGI("begin");
410     auto dContinue = GetDSchedContinueByWant(want, missionId);
411     if (dContinue != nullptr) {
412         dContinue->OnStartContinuation(want, callerUid, status, accessToken);
413     } else {
414         DmsRadar::GetInstance().SaveDataDmsRemoteWant("HandleStartContinuation", INVALID_PARAMETERS_ERR);
415     }
416     HILOGI("end");
417     return;
418 }
419 
GetDSchedContinueByWant(const OHOS::AAFwk::Want & want,int32_t missionId)420 std::shared_ptr<DSchedContinue> DSchedContinueManager::GetDSchedContinueByWant(
421     const OHOS::AAFwk::Want& want, int32_t missionId)
422 {
423     std::string srcDeviceId;
424     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(srcDeviceId)) {
425         DmsRadar::GetInstance().SaveDataDmsRemoteWant("GetDSchedContinueByWant", GET_LOCAL_DEVICE_ERR);
426         HILOGE("get local deviceId failed!");
427         return nullptr;
428     }
429     std::string dstDeviceId = want.GetElement().GetDeviceID();
430     std::string bundleName = want.GetElement().GetBundleName();
431     auto info = DSchedContinueInfo(srcDeviceId, bundleName, dstDeviceId, bundleName, "");
432 
433     HILOGI("continue info: %{public}s.", info.toString().c_str());
434     {
435         std::lock_guard<std::mutex> continueLock(continueMutex_);
436         if (continues_.empty()) {
437             HILOGE("continue info doesn't match an existing continuation.");
438             return nullptr;
439         }
440         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
441             if (iter->second == nullptr) {
442                 continue;
443             }
444             DSchedContinueInfo continueInfo = iter->second->GetContinueInfo();
445             if (srcDeviceId == continueInfo.sourceDeviceId_
446                 && bundleName == continueInfo.sourceBundleName_
447                 && dstDeviceId == continueInfo.sinkDeviceId_) {
448                 return iter->second;
449             }
450         }
451     }
452     HILOGE("missionId doesn't match the existing continuation, continueInfo: %{public}s.",
453         info.toString().c_str());
454     return nullptr;
455 }
456 
NotifyCompleteContinuation(const std::u16string & devId,int32_t sessionId,bool isSuccess,const std::string & callerBundleName)457 int32_t DSchedContinueManager::NotifyCompleteContinuation(const std::u16string& devId, int32_t sessionId,
458     bool isSuccess, const std::string &callerBundleName)
459 {
460     auto func = [this, devId, sessionId, isSuccess, callerBundleName]() {
461         HandleNotifyCompleteContinuation(devId, sessionId, isSuccess, callerBundleName);
462     };
463     if (eventHandler_ == nullptr) {
464         HILOGE("eventHandler_ is nullptr");
465         return INVALID_PARAMETERS_ERR;
466     }
467     eventHandler_->PostTask(func);
468     return ERR_OK;
469 }
470 
HandleNotifyCompleteContinuation(const std::u16string & devId,int32_t missionId,bool isSuccess,const std::string & callerBundleName)471 void DSchedContinueManager::HandleNotifyCompleteContinuation(const std::u16string& devId, int32_t missionId,
472     bool isSuccess, const std::string &callerBundleName)
473 {
474     HILOGI("begin, isSuccess %{public}d", isSuccess);
475     auto dContinue = GetDSchedContinueByDevId(devId, missionId);
476     if (dContinue != nullptr) {
477         if (dContinue->GetContinueInfo().sinkBundleName_ != callerBundleName) {
478             HILOGE("callerBundleName doesn't match the existing continuation");
479             return;
480         }
481         dContinue->OnNotifyComplete(missionId, isSuccess);
482         HILOGI("end, continue info: %{public}s.", dContinue->GetContinueInfo().toString().c_str());
483     }
484     return;
485 }
486 
GetDSchedContinueByDevId(const std::u16string & devId,int32_t missionId)487 std::shared_ptr<DSchedContinue> DSchedContinueManager::GetDSchedContinueByDevId(
488     const std::u16string& devId, int32_t missionId)
489 {
490     std::string deviceId = Str16ToStr8(devId);
491     HILOGI("begin, deviceId %{public}s, missionId %{public}d", GetAnonymStr(deviceId).c_str(), missionId);
492     {
493         std::lock_guard<std::mutex> continueLock(continueMutex_);
494         if (continues_.empty()) {
495             HILOGE("No continuation in progress.");
496             return nullptr;
497         }
498         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
499             if (iter->second != nullptr && deviceId == iter->second->GetContinueInfo().sourceDeviceId_) {
500                 return iter->second;
501             }
502         }
503     }
504     HILOGE("source deviceId doesn't match an existing continuation.");
505     return nullptr;
506 }
507 
NotifyTerminateContinuation(const int32_t missionId)508 void DSchedContinueManager::NotifyTerminateContinuation(const int32_t missionId)
509 {
510     HILOGI("begin, missionId %{public}d", missionId);
511     {
512         std::lock_guard<std::mutex> continueLock(continueMutex_);
513         if (continues_.empty()) {
514             HILOGW("No continuation in progress.");
515             return;
516         }
517 
518         ContinueLaunchMissionInfo missionInfo;
519         int32_t ret = DMSContinueSendMgr::GetInstance().GetContinueLaunchMissionInfo(missionId, missionInfo);
520         if (ret != ERR_OK) {
521             HILOGE("get continueLaunchMissionInfo failed, missionId %{public}d", missionId);
522             return;
523         }
524         HILOGI("alive missionInfo bundleName is %{public}s, abilityName is %{public}s",
525             missionInfo.bundleName.c_str(), missionInfo.abilityName.c_str());
526         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
527             if (iter->second == nullptr) {
528                 break;
529             }
530 
531             auto continueInfo = iter->second->GetContinueInfo();
532             HILOGI("continueInfo bundleName is %{public}s, abilityName is %{public}s",
533                 continueInfo.sinkBundleName_.c_str(), continueInfo.sinkAbilityName_.c_str());
534             if (missionInfo.bundleName == continueInfo.sinkBundleName_
535                 && missionInfo.abilityName == continueInfo.sinkAbilityName_) {
536                 HILOGE("Excute onContinueEnd");
537                 iter->second->OnContinueEnd(CONTINUE_SINK_ABILITY_TERMINATED);
538                 return;
539             }
540         }
541     }
542     HILOGW("doesn't match an existing continuation.");
543 }
544 
OnContinueEnd(const DSchedContinueInfo & info)545 int32_t DSchedContinueManager::OnContinueEnd(const DSchedContinueInfo& info)
546 {
547     auto func = [this, info]() {
548         HandleContinueEnd(info);
549     };
550     if (eventHandler_ == nullptr) {
551         HILOGE("eventHandler_ is nullptr");
552         return INVALID_PARAMETERS_ERR;
553     }
554     eventHandler_->PostTask(func);
555     return ERR_OK;
556 }
557 
HandleContinueEnd(const DSchedContinueInfo & info)558 void DSchedContinueManager::HandleContinueEnd(const DSchedContinueInfo& info)
559 {
560     HILOGI("begin, continue info: %{public}s.", info.toString().c_str());
561     std::lock_guard<std::mutex> continueLock(continueMutex_);
562     if (continues_.empty() || continues_.find(info) == continues_.end()) {
563         HILOGE("continue info doesn't match any existing continuation.");
564         return;
565     }
566     RemoveTimeout(info);
567     continues_.erase(info);
568     ContinueSceneSessionHandler::GetInstance().ClearContinueSessionId();
569 
570     std::string localDevId;
571     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
572         HILOGE("get local deviceId failed!");
573         return;
574     }
575 
576     if (info.sinkDeviceId_ == localDevId) {
577         cntSink_--;
578     } else if (info.sourceDeviceId_ == localDevId) {
579         cntSource_--;
580     }
581     HILOGI("end.");
582 }
583 
RemoveTimeout(const DSchedContinueInfo & info)584 void DSchedContinueManager::RemoveTimeout(const DSchedContinueInfo& info)
585 {
586     if (eventHandler_ == nullptr) {
587         HILOGE("eventHandler_ is nullptr");
588         return;
589     }
590     eventHandler_->RemoveTask(info.ToStringIgnoreMissionId());
591 }
592 
OnDataRecv(int32_t sessionId,std::shared_ptr<DSchedDataBuffer> dataBuffer)593 void DSchedContinueManager::OnDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)
594 {
595     auto func = [this, sessionId, dataBuffer]() {
596         HandleDataRecv(sessionId, dataBuffer);
597     };
598     if (eventHandler_ == nullptr) {
599         HILOGE("eventHandler_ is nullptr");
600         return;
601     }
602     eventHandler_->PostTask(func);
603 }
604 
HandleDataRecv(int32_t sessionId,std::shared_ptr<DSchedDataBuffer> dataBuffer)605 void DSchedContinueManager::HandleDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)
606 {
607     HILOGI("start, sessionId: %{public}d.", sessionId);
608     if (dataBuffer == nullptr) {
609         HILOGE("dataBuffer is null.");
610         return;
611     }
612     uint8_t *data = dataBuffer->Data();
613     std::string jsonStr(reinterpret_cast<const char *>(data), dataBuffer->Capacity());
614     cJSON *rootValue = cJSON_Parse(jsonStr.c_str());
615     if (rootValue == nullptr) {
616         HILOGE("Parse jsonStr error.");
617         return;
618     }
619     cJSON *baseCmd = cJSON_GetObjectItemCaseSensitive(rootValue, "BaseCmd");
620     if (baseCmd == nullptr || !cJSON_IsString(baseCmd) || (baseCmd->valuestring == nullptr)) {
621         cJSON_Delete(rootValue);
622         HILOGE("Parse base cmd error.");
623         return;
624     }
625 
626     cJSON *cmdValue = cJSON_Parse(baseCmd->valuestring);
627     cJSON_Delete(rootValue);
628     if (cmdValue == nullptr) {
629         HILOGE("Parse cmd value error.");
630         return;
631     }
632 
633     cJSON *comvalue = cJSON_GetObjectItemCaseSensitive(cmdValue, "Command");
634     if (comvalue == nullptr || !cJSON_IsNumber(comvalue)) {
635         cJSON_Delete(cmdValue);
636         HILOGE("parse command failed");
637         return;
638     }
639     int32_t command = comvalue->valueint;
640     cJSON_Delete(cmdValue);
641     NotifyContinueDataRecv(sessionId, command, jsonStr, dataBuffer);
642     HILOGI("end, sessionId: %{public}d.", sessionId);
643 }
644 
NotifyContinueDataRecv(int32_t sessionId,int32_t command,const std::string & jsonStr,std::shared_ptr<DSchedDataBuffer> dataBuffer)645 void DSchedContinueManager::NotifyContinueDataRecv(int32_t sessionId, int32_t command, const std::string& jsonStr,
646     std::shared_ptr<DSchedDataBuffer> dataBuffer)
647 {
648     HILOGI("start, parsed cmd %{public}d, sessionId: %{public}d.", command, sessionId);
649     std::lock_guard<std::mutex> continueLock(continueMutex_);
650     if (!continues_.empty()) {
651         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
652             if (iter->second != nullptr && sessionId == iter->second->GetSessionId()) {
653                 HILOGI("sessionId %{public}d exist.", sessionId);
654                 iter->second->OnDataRecv(command, dataBuffer);
655                 return;
656             }
657         }
658     }
659 
660     if (command == DSCHED_CONTINUE_CMD_START) {
661         HILOGI("recv start cmd, sessionId: %{public}d.", sessionId);
662         auto startCmd = std::make_shared<DSchedContinueStartCmd>();
663         int32_t ret = startCmd->Unmarshal(jsonStr);
664         if (ret != ERR_OK) {
665             HILOGE("Unmarshal start cmd failed, ret: %{public}d", ret);
666             return;
667         }
668         int32_t direction = CONTINUE_SINK;
669         ret = CheckContinuationLimit(startCmd->srcDeviceId_, startCmd->dstDeviceId_, direction);
670         if (ret != ERR_OK) {
671             DmsRadar::GetInstance().SaveDataDmsRemoteWant("NotifyContinueDataRecv", ret);
672             HILOGE("CheckContinuationSubType failed, ret: %{public}d", ret);
673             return;
674         }
675 
676         auto newContinue = std::make_shared<DSchedContinue>(startCmd, sessionId);
677         newContinue->Init();
678         continues_.insert(std::make_pair(newContinue->GetContinueInfo(), newContinue));
679 
680         newContinue->OnStartCmd(startCmd->appVersion_);
681         HILOGI("end, continue info: %{public}s.", newContinue->GetContinueInfo().toString().c_str());
682         return;
683     }
684     HILOGE("No matching session to handle cmd! sessionId: %{public}d, recv cmd %{public}d.", sessionId, command);
685     return;
686 }
687 
CheckContinuationLimit(const std::string & srcDeviceId,const std::string & dstDeviceId,int32_t & direction)688 int32_t DSchedContinueManager::CheckContinuationLimit(const std::string& srcDeviceId,
689     const std::string& dstDeviceId, int32_t &direction)
690 {
691     std::string localDevId;
692     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
693         HILOGE("get local deviceId failed!");
694         return GET_LOCAL_DEVICE_ERR;
695     }
696 
697     direction = CONTINUE_SINK;
698     if (dstDeviceId == localDevId) {
699         if (cntSink_.load() >= MAX_CONCURRENT_SINK) {
700             HILOGE("can't deal more than %{public}d pull requests at the same time.", cntSink_.load());
701             return CONTINUE_ALREADY_IN_PROGRESS;
702         }
703         if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(srcDeviceId) == nullptr) {
704             HILOGE("Irrecognized source device!");
705             return INVALID_PARAMETERS_ERR;
706         }
707     } else if (srcDeviceId == localDevId) {
708         if (cntSource_.load() >= MAX_CONCURRENT_SOURCE) {
709             HILOGE("can't deal more than %{public}d push requests at the same time.", cntSource_.load());
710             return CONTINUE_ALREADY_IN_PROGRESS;
711         }
712         if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(dstDeviceId) == nullptr) {
713             HILOGE("Irrecognized destination device!");
714             return INVALID_PARAMETERS_ERR;
715         }
716         direction = CONTINUE_SOURCE;
717     } else {
718         HILOGE("source or target device must be local!");
719         return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
720     }
721     return ERR_OK;
722 }
723 
GetContinueInfo(std::string & srcDeviceId,std::string & dstDeviceId)724 int32_t DSchedContinueManager::GetContinueInfo(std::string &srcDeviceId, std::string &dstDeviceId)
725 {
726     HILOGI("called");
727     std::lock_guard<std::mutex> continueLock(continueMutex_);
728     if (continues_.empty()) {
729         HILOGW("No continuation in progress.");
730         return ERR_OK;
731     }
732     auto dsContinue = continues_.begin()->second;
733     if (dsContinue == nullptr) {
734         HILOGE("dContinue is null");
735         return INVALID_PARAMETERS_ERR;
736     }
737     dstDeviceId = dsContinue->GetContinueInfo().sinkDeviceId_;
738     srcDeviceId = dsContinue->GetContinueInfo().sourceDeviceId_;
739     return ERR_OK;
740 }
741 
OnShutdown(int32_t socket,bool isSelfCalled)742 void DSchedContinueManager::OnShutdown(int32_t socket, bool isSelfCalled)
743 {
744     if (isSelfCalled) {
745         HILOGW("called, shutdown by local, sessionId: %{public}d", socket);
746         return;
747     }
748     HILOGW("called, sessionId: %{public}d, isSelfCalled %{public}d", socket, isSelfCalled);
749     auto func = [this, socket]() {
750         std::lock_guard<std::mutex> continueLock(continueMutex_);
751         if (continues_.empty()) {
752             return;
753         }
754         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
755             if (iter->second != nullptr && socket == iter->second->GetSessionId()) {
756                 iter->second->OnContinueEnd(CONTINUE_SESSION_SHUTDOWN);
757             }
758         }
759     };
760     if (eventHandler_ == nullptr) {
761         HILOGE("eventHandler_ is nullptr");
762         return;
763     }
764     eventHandler_->PostTask(func);
765     return;
766 }
767 
OnBind(int32_t socket,PeerSocketInfo info)768 void DSchedContinueManager::SoftbusListener::OnBind(int32_t socket, PeerSocketInfo info)
769 {
770 }
771 
OnShutdown(int32_t socket,bool isSelfCalled)772 void DSchedContinueManager::SoftbusListener::OnShutdown(int32_t socket, bool isSelfCalled)
773 {
774     DSchedContinueManager::GetInstance().OnShutdown(socket, isSelfCalled);
775 }
776 
OnDataRecv(int32_t socket,std::shared_ptr<DSchedDataBuffer> dataBuffer)777 void DSchedContinueManager::SoftbusListener::OnDataRecv(int32_t socket, std::shared_ptr<DSchedDataBuffer> dataBuffer)
778 {
779     DSchedContinueManager::GetInstance().OnDataRecv(socket, dataBuffer);
780 }
781 }  // namespace DistributedSchedule
782 }  // namespace OHOS
783