1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "dsched_continue_manager.h"
17
18 #include <chrono>
19 #include <sys/prctl.h>
20
21 #include "cJSON.h"
22
23 #include "continue_scene_session_handler.h"
24 #include "dfx/distributed_radar.h"
25 #include "distributed_sched_utils.h"
26 #include "dsched_transport_softbus_adapter.h"
27 #include "dtbschedmgr_device_info_storage.h"
28 #include "dtbschedmgr_log.h"
29 #include "mission/distributed_bm_storage.h"
30 #include "mission/dms_continue_send_manager.h"
31 #include "mission/dms_continue_recv_manager.h"
32
33 namespace OHOS {
34 namespace DistributedSchedule {
35 namespace {
36 const std::string TAG = "DSchedContinueManager";
37 const std::string DSCHED_CONTINUE_MANAGER = "dsched_continue_manager";
38 const std::string CONTINUE_TIMEOUT_TASK = "continue_timeout_task";
39 }
40
41 IMPLEMENT_SINGLE_INSTANCE(DSchedContinueManager);
42
DSchedContinueManager()43 DSchedContinueManager::DSchedContinueManager()
44 {
45 }
46
~DSchedContinueManager()47 DSchedContinueManager::~DSchedContinueManager()
48 {
49 HILOGI("DSchedContinueManager delete");
50 UnInit();
51 }
52
Init()53 void DSchedContinueManager::Init()
54 {
55 HILOGI("Init DSchedContinueManager start");
56 if (eventHandler_ != nullptr) {
57 HILOGI("DSchedContinueManager already inited, end.");
58 return;
59 }
60 DSchedTransportSoftbusAdapter::GetInstance().InitChannel();
61 softbusListener_ = std::make_shared<DSchedContinueManager::SoftbusListener>();
62 DSchedTransportSoftbusAdapter::GetInstance().RegisterListener(SERVICE_TYPE_CONTINUE, softbusListener_);
63 eventThread_ = std::thread(&DSchedContinueManager::StartEvent, this);
64 std::unique_lock<std::mutex> lock(eventMutex_);
65 eventCon_.wait(lock, [this] {
66 return eventHandler_ != nullptr;
67 });
68 HILOGI("Init DSchedContinueManager end");
69 }
70
StartEvent()71 void DSchedContinueManager::StartEvent()
72 {
73 HILOGI("StartEvent start");
74 prctl(PR_SET_NAME, DSCHED_CONTINUE_MANAGER.c_str());
75 auto runner = AppExecFwk::EventRunner::Create(false);
76 {
77 std::lock_guard<std::mutex> lock(eventMutex_);
78 eventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
79 }
80 eventCon_.notify_one();
81 runner->Run();
82 HILOGI("StartEvent end");
83 }
84
UnInit()85 void DSchedContinueManager::UnInit()
86 {
87 HILOGI("UnInit start");
88 DSchedTransportSoftbusAdapter::GetInstance().UnregisterListener(SERVICE_TYPE_CONTINUE, softbusListener_);
89 DSchedTransportSoftbusAdapter::GetInstance().ReleaseChannel();
90 continues_.clear();
91 cntSink_ = 0;
92 cntSource_ = 0;
93
94 if (eventHandler_ != nullptr) {
95 eventHandler_->GetEventRunner()->Stop();
96 eventThread_.join();
97 eventHandler_ = nullptr;
98 } else {
99 HILOGE("eventHandler_ is nullptr");
100 }
101 HILOGI("UnInit end");
102 }
103
NotifyAllConnectDecision(std::string peerDeviceId,bool isSupport)104 void DSchedContinueManager::NotifyAllConnectDecision(std::string peerDeviceId, bool isSupport)
105 {
106 HILOGI("Notify all connect decision, peerDeviceId %{public}s, isSupport %{public}d.",
107 GetAnonymStr(peerDeviceId).c_str(), isSupport);
108 #ifdef DMSFWK_ALL_CONNECT_MGR
109 std::lock_guard<std::mutex> decisionLock(connectDecisionMutex_);
110 peerConnectDecision_[peerDeviceId] = isSupport;
111 connectDecisionCond_.notify_all();
112 #endif
113 }
114
ContinueMission(const std::string & srcDeviceId,const std::string & dstDeviceId,int32_t missionId,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)115 int32_t DSchedContinueManager::ContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId,
116 int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
117 {
118 if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
119 HILOGE("srcDeviceId or dstDeviceId or callback is null!");
120 return INVALID_PARAMETERS_ERR;
121 }
122
123 std::string localDevId;
124 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
125 HILOGE("get local deviceId failed!");
126 return INVALID_PARAMETERS_ERR;
127 }
128 if (localDevId != srcDeviceId && localDevId != dstDeviceId) {
129 HILOGE("Input srcDevId: %{public}s or dstDevId: %{public}s must be locDevId: %{public}s.",
130 GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), GetAnonymStr(localDevId).c_str());
131 return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
132 }
133 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(
134 localDevId == srcDeviceId ? dstDeviceId : srcDeviceId) == nullptr) {
135 HILOGE("GetDeviceInfoById fail, locDevId: %{public}s, srcDevId: %{public}s, dstDevId: %{public}s.",
136 GetAnonymStr(localDevId).c_str(), GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str());
137 return INVALID_REMOTE_PARAMETERS_ERR;
138 }
139
140 auto func = [this, srcDeviceId, dstDeviceId, missionId, callback, wantParams]() {
141 HandleContinueMission(srcDeviceId, dstDeviceId, missionId, callback, wantParams);
142 };
143 if (eventHandler_ == nullptr) {
144 HILOGE("eventHandler_ is nullptr");
145 return INVALID_PARAMETERS_ERR;
146 }
147 eventHandler_->PostTask(func);
148 return ERR_OK;
149 }
150
HandleContinueMission(const std::string & srcDeviceId,const std::string & dstDeviceId,int32_t missionId,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)151 void DSchedContinueManager::HandleContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId,
152 int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
153 {
154 HILOGI("start, srcDeviceId: %{public}s. dstDeviceId: %{public}s. missionId: %{public}d.",
155 GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), missionId);
156
157 if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
158 HILOGE("srcDeviceId or dstDeviceId or callback is null!");
159 return;
160 }
161
162 std::string localDevId;
163 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
164 HILOGE("get local deviceId failed!");
165 return;
166 }
167 DSchedContinueInfo info = DSchedContinueInfo(srcDeviceId, dstDeviceId, missionId);
168
169 AAFwk::MissionInfo missionInfo;
170 if (AAFwk::AbilityManagerClient::GetInstance()->GetMissionInfo("", missionId, missionInfo) == ERR_OK
171 && srcDeviceId == localDevId) {
172 info.sourceBundleName_ = missionInfo.want.GetBundle();
173 info.sinkBundleName_ = missionInfo.want.GetBundle();
174 }
175
176 HandleContinueMissionWithBundleName(info, callback, wantParams);
177 return;
178 }
179
ContinueMission(const DSchedContinueInfo & continueInfo,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)180 int32_t DSchedContinueManager::ContinueMission(const DSchedContinueInfo& continueInfo,
181 const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
182 {
183 std::string srcDeviceId = continueInfo.sourceDeviceId_;
184 std::string dstDeviceId = continueInfo.sinkDeviceId_;
185
186 if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
187 HILOGE("srcDeviceId or dstDeviceId or callback is null!");
188 return INVALID_PARAMETERS_ERR;
189 }
190
191 std::string localDevId;
192 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
193 HILOGE("get local deviceId failed!");
194 return INVALID_PARAMETERS_ERR;
195 }
196 if (localDevId != srcDeviceId && localDevId != dstDeviceId) {
197 HILOGE("Input srcDevId: %{public}s or dstDevId: %{public}s must be locDevId: %{public}s.",
198 GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), GetAnonymStr(localDevId).c_str());
199 return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
200 }
201 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(
202 localDevId == srcDeviceId ? dstDeviceId : srcDeviceId) == nullptr) {
203 HILOGE("GetDeviceInfoById fail, locDevId: %{public}s, srcDevId: %{public}s, dstDevId: %{public}s.",
204 GetAnonymStr(localDevId).c_str(), GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str());
205 return INVALID_REMOTE_PARAMETERS_ERR;
206 }
207
208 #ifdef SUPPORT_DISTRIBUTED_MISSION_MANAGER
209 if (localDevId == srcDeviceId) {
210 int32_t missionId = -1;
211 int32_t ret = DMSContinueSendMgr::GetInstance().GetMissionIdByBundleName(
212 continueInfo.sinkBundleName_, missionId);
213 if (ret != ERR_OK) {
214 HILOGE("get missionId fail, ret %{public}d.", ret);
215 return INVALID_PARAMETERS_ERR;
216 }
217 }
218 #endif
219
220 auto func = [this, continueInfo, callback, wantParams]() {
221 HandleContinueMission(continueInfo, callback, wantParams);
222 };
223 if (eventHandler_ == nullptr) {
224 HILOGE("eventHandler_ is nullptr");
225 return INVALID_PARAMETERS_ERR;
226 }
227 eventHandler_->PostTask(func);
228 return ERR_OK;
229 }
230
HandleContinueMission(const DSchedContinueInfo & continueInfo,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)231 void DSchedContinueManager::HandleContinueMission(const DSchedContinueInfo& continueInfo,
232 const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
233 {
234 std::string srcDeviceId = continueInfo.sourceDeviceId_;
235 std::string dstDeviceId = continueInfo.sinkDeviceId_;
236 std::string srcBundleName = continueInfo.sourceBundleName_;
237 std::string bundleName = continueInfo.sinkBundleName_;
238 std::string continueType = continueInfo.continueType_;
239 HILOGI("start, srcDeviceId: %{public}s. dstDeviceId: %{public}s. bundleName: %{public}s."
240 " continueType: %{public}s.", GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(),
241 bundleName.c_str(), continueType.c_str());
242
243 if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
244 HILOGE("srcDeviceId or dstDeviceId or callback is null!");
245 return;
246 }
247
248 DSchedContinueInfo info = DSchedContinueInfo(srcDeviceId, srcBundleName, dstDeviceId, bundleName, continueType);
249 HandleContinueMissionWithBundleName(info, callback, wantParams);
250 return;
251 }
252
GetFirstBundleName(DSchedContinueInfo & info,std::string & firstBundleName,std::string bundleName,std::string deviceId)253 bool DSchedContinueManager::GetFirstBundleName(DSchedContinueInfo &info, std::string &firstBundleName,
254 std::string bundleName, std::string deviceId)
255 {
256 uint16_t bundleNameId;
257 DmsBundleInfo distributedBundleInfo;
258 DmsBmStorage::GetInstance()->GetBundleNameId(bundleName, bundleNameId);
259 bool result = DmsBmStorage::GetInstance()->GetDistributedBundleInfo(deviceId,
260 bundleNameId, distributedBundleInfo);
261 if (!result) {
262 HILOGE("GetDistributedBundleInfo faild");
263 return false;
264 }
265 std::vector<DmsAbilityInfo> dmsAbilityInfos = distributedBundleInfo.dmsAbilityInfos;
266 for (DmsAbilityInfo &ability: dmsAbilityInfos) {
267 std::vector<std::string> abilityContinueTypes = ability.continueType;
268 for (std::string &ability_continue_type: abilityContinueTypes) {
269 if (ability_continue_type == info.continueType_ && !ability.continueBundleName.empty()) {
270 firstBundleName = *ability.continueBundleName.begin();
271 return true;
272 }
273 }
274 }
275 HILOGE("can not get abilicy info or continue bundle names is empty for continue type:%{public}s",
276 info.continueType_.c_str());
277 return false;
278 }
279
HandleContinueMissionWithBundleName(DSchedContinueInfo & info,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)280 void DSchedContinueManager::HandleContinueMissionWithBundleName(DSchedContinueInfo &info,
281 const sptr<IRemoteObject> &callback, const OHOS::AAFwk::WantParams &wantParams)
282 {
283 int32_t direction = CONTINUE_SINK;
284 int32_t ret = CheckContinuationLimit(info.sourceDeviceId_, info.sinkDeviceId_, direction);
285 if (ret != ERR_OK) {
286 HILOGE("CheckContinuationLimit failed, ret: %{public}d", ret);
287 return;
288 }
289 int32_t subType = CONTINUE_PUSH;
290 if (direction == CONTINUE_SOURCE) {
291 cntSource_++;
292 } else {
293 cntSink_++;
294 subType = CONTINUE_PULL;
295 if (info.sourceBundleName_.empty()) {
296 HILOGW("current sub type is continue pull; but can not get source bundle name from recv cache.");
297 std::string firstBundleNamme;
298 std::string bundleName = info.sinkBundleName_;
299 std::string deviceId = info.sinkDeviceId_;
300 if (GetFirstBundleName(info, firstBundleNamme, bundleName, deviceId)) {
301 info.sourceBundleName_ = firstBundleNamme;
302 }
303 }
304 }
305
306 {
307 std::lock_guard<std::mutex> continueLock(continueMutex_);
308 if (!continues_.empty() && continues_.find(info) != continues_.end()) {
309 HILOGE("a same continue task is already in progress.");
310 return;
311 }
312 auto newContinue = std::make_shared<DSchedContinue>(subType, direction, callback, info);
313 newContinue->Init();
314 continues_.insert(std::make_pair(info, newContinue));
315 #ifdef DMSFWK_ALL_CONNECT_MGR
316 {
317 std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_);
318 std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_;
319 if (peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end()) {
320 peerConnectDecision_.erase(peerDeviceId);
321 }
322 }
323 #endif
324 newContinue->OnContinueMission(wantParams);
325 }
326 WaitAllConnectDecision(direction, info, CONTINUE_TIMEOUT);
327 HILOGI("end, subType: %{public}d dirction: %{public}d, continue info: %{public}s",
328 subType, direction, info.toString().c_str());
329 }
330
WaitAllConnectDecision(int32_t direction,const DSchedContinueInfo & info,int32_t timeout)331 void DSchedContinueManager::WaitAllConnectDecision(int32_t direction, const DSchedContinueInfo &info, int32_t timeout)
332 {
333 #ifdef DMSFWK_ALL_CONNECT_MGR
334 std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_;
335 {
336 std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_);
337 connectDecisionCond_.wait_for(decisionLock, std::chrono::seconds(CONNECT_DECISION_WAIT_S),
338 [this, peerDeviceId]() {
339 return peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end() &&
340 peerConnectDecision_.at(peerDeviceId).load();
341 });
342
343 if (peerConnectDecision_.find(peerDeviceId) == peerConnectDecision_.end()) {
344 HILOGE("Not find peerDeviceId %{public}s in peerConnectDecision.", GetAnonymStr(peerDeviceId).c_str());
345 SetTimeOut(info, 0);
346 return;
347 }
348 if (!peerConnectDecision_.at(peerDeviceId).load()) {
349 HILOGE("All connect manager refuse bind to PeerDeviceId %{public}s.", GetAnonymStr(peerDeviceId).c_str());
350 peerConnectDecision_.erase(peerDeviceId);
351 SetTimeOut(info, 0);
352 return;
353 }
354 peerConnectDecision_.erase(peerDeviceId);
355 }
356 #endif
357 SetTimeOut(info, timeout);
358 }
359
SetTimeOut(const DSchedContinueInfo & info,int32_t timeout)360 void DSchedContinueManager::SetTimeOut(const DSchedContinueInfo &info, int32_t timeout)
361 {
362 auto func = [this, info]() {
363 if (continues_.empty() || continues_.count(info) == 0) {
364 HILOGE("continue not exist.");
365 return;
366 }
367 HILOGE("continue timeout! info: %{public}s", info.toString().c_str());
368 auto dsContinue = continues_[info];
369 if (dsContinue != nullptr) {
370 dsContinue->OnContinueEnd(CONTINUE_ABILITY_TIMEOUT_ERR);
371 }
372 };
373 if (eventHandler_ == nullptr) {
374 HILOGE("eventHandler_ is nullptr");
375 return;
376 }
377 timeout > 0 ? eventHandler_->PostTask(func, info.ToStringIgnoreMissionId(), timeout) :
378 eventHandler_->PostTask(func);
379 }
380
StartContinuation(const OHOS::AAFwk::Want & want,int32_t missionId,int32_t callerUid,int32_t status,uint32_t accessToken)381 int32_t DSchedContinueManager::StartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId,
382 int32_t callerUid, int32_t status, uint32_t accessToken)
383 {
384 std::string dstDeviceId = want.GetElement().GetDeviceID();
385 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(dstDeviceId) == nullptr) {
386 HILOGE("GetDeviceInfoById fail, dstDevId: %{public}s.", GetAnonymStr(dstDeviceId).c_str());
387 return INVALID_REMOTE_PARAMETERS_ERR;
388 }
389 if (GetDSchedContinueByWant(want, missionId) == nullptr) {
390 HILOGE("GetDSchedContinueByWant fail, dstDevId: %{public}s, missionId: %{public}d.",
391 GetAnonymStr(dstDeviceId).c_str(), missionId);
392 return INVALID_REMOTE_PARAMETERS_ERR;
393 }
394
395 auto func = [this, want, missionId, callerUid, status, accessToken]() {
396 HandleStartContinuation(want, missionId, callerUid, status, accessToken);
397 };
398 if (eventHandler_ == nullptr) {
399 HILOGE("eventHandler_ is nullptr");
400 return INVALID_PARAMETERS_ERR;
401 }
402 eventHandler_->PostTask(func);
403 return ERR_OK;
404 }
405
HandleStartContinuation(const OHOS::AAFwk::Want & want,int32_t missionId,int32_t callerUid,int32_t status,uint32_t accessToken)406 void DSchedContinueManager::HandleStartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId,
407 int32_t callerUid, int32_t status, uint32_t accessToken)
408 {
409 HILOGI("begin");
410 auto dContinue = GetDSchedContinueByWant(want, missionId);
411 if (dContinue != nullptr) {
412 dContinue->OnStartContinuation(want, callerUid, status, accessToken);
413 } else {
414 DmsRadar::GetInstance().SaveDataDmsRemoteWant("HandleStartContinuation", INVALID_PARAMETERS_ERR);
415 }
416 HILOGI("end");
417 return;
418 }
419
GetDSchedContinueByWant(const OHOS::AAFwk::Want & want,int32_t missionId)420 std::shared_ptr<DSchedContinue> DSchedContinueManager::GetDSchedContinueByWant(
421 const OHOS::AAFwk::Want& want, int32_t missionId)
422 {
423 std::string srcDeviceId;
424 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(srcDeviceId)) {
425 DmsRadar::GetInstance().SaveDataDmsRemoteWant("GetDSchedContinueByWant", GET_LOCAL_DEVICE_ERR);
426 HILOGE("get local deviceId failed!");
427 return nullptr;
428 }
429 std::string dstDeviceId = want.GetElement().GetDeviceID();
430 std::string bundleName = want.GetElement().GetBundleName();
431 auto info = DSchedContinueInfo(srcDeviceId, bundleName, dstDeviceId, bundleName, "");
432
433 HILOGI("continue info: %{public}s.", info.toString().c_str());
434 {
435 std::lock_guard<std::mutex> continueLock(continueMutex_);
436 if (continues_.empty()) {
437 HILOGE("continue info doesn't match an existing continuation.");
438 return nullptr;
439 }
440 for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
441 if (iter->second == nullptr) {
442 continue;
443 }
444 DSchedContinueInfo continueInfo = iter->second->GetContinueInfo();
445 if (srcDeviceId == continueInfo.sourceDeviceId_
446 && bundleName == continueInfo.sourceBundleName_
447 && dstDeviceId == continueInfo.sinkDeviceId_) {
448 return iter->second;
449 }
450 }
451 }
452 HILOGE("missionId doesn't match the existing continuation, continueInfo: %{public}s.",
453 info.toString().c_str());
454 return nullptr;
455 }
456
NotifyCompleteContinuation(const std::u16string & devId,int32_t sessionId,bool isSuccess,const std::string & callerBundleName)457 int32_t DSchedContinueManager::NotifyCompleteContinuation(const std::u16string& devId, int32_t sessionId,
458 bool isSuccess, const std::string &callerBundleName)
459 {
460 auto func = [this, devId, sessionId, isSuccess, callerBundleName]() {
461 HandleNotifyCompleteContinuation(devId, sessionId, isSuccess, callerBundleName);
462 };
463 if (eventHandler_ == nullptr) {
464 HILOGE("eventHandler_ is nullptr");
465 return INVALID_PARAMETERS_ERR;
466 }
467 eventHandler_->PostTask(func);
468 return ERR_OK;
469 }
470
HandleNotifyCompleteContinuation(const std::u16string & devId,int32_t missionId,bool isSuccess,const std::string & callerBundleName)471 void DSchedContinueManager::HandleNotifyCompleteContinuation(const std::u16string& devId, int32_t missionId,
472 bool isSuccess, const std::string &callerBundleName)
473 {
474 HILOGI("begin, isSuccess %{public}d", isSuccess);
475 auto dContinue = GetDSchedContinueByDevId(devId, missionId);
476 if (dContinue != nullptr) {
477 if (dContinue->GetContinueInfo().sinkBundleName_ != callerBundleName) {
478 HILOGE("callerBundleName doesn't match the existing continuation");
479 return;
480 }
481 dContinue->OnNotifyComplete(missionId, isSuccess);
482 HILOGI("end, continue info: %{public}s.", dContinue->GetContinueInfo().toString().c_str());
483 }
484 return;
485 }
486
GetDSchedContinueByDevId(const std::u16string & devId,int32_t missionId)487 std::shared_ptr<DSchedContinue> DSchedContinueManager::GetDSchedContinueByDevId(
488 const std::u16string& devId, int32_t missionId)
489 {
490 std::string deviceId = Str16ToStr8(devId);
491 HILOGI("begin, deviceId %{public}s, missionId %{public}d", GetAnonymStr(deviceId).c_str(), missionId);
492 {
493 std::lock_guard<std::mutex> continueLock(continueMutex_);
494 if (continues_.empty()) {
495 HILOGE("No continuation in progress.");
496 return nullptr;
497 }
498 for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
499 if (iter->second != nullptr && deviceId == iter->second->GetContinueInfo().sourceDeviceId_) {
500 return iter->second;
501 }
502 }
503 }
504 HILOGE("source deviceId doesn't match an existing continuation.");
505 return nullptr;
506 }
507
NotifyTerminateContinuation(const int32_t missionId)508 void DSchedContinueManager::NotifyTerminateContinuation(const int32_t missionId)
509 {
510 HILOGI("begin, missionId %{public}d", missionId);
511 {
512 std::lock_guard<std::mutex> continueLock(continueMutex_);
513 if (continues_.empty()) {
514 HILOGW("No continuation in progress.");
515 return;
516 }
517
518 ContinueLaunchMissionInfo missionInfo;
519 int32_t ret = DMSContinueSendMgr::GetInstance().GetContinueLaunchMissionInfo(missionId, missionInfo);
520 if (ret != ERR_OK) {
521 HILOGE("get continueLaunchMissionInfo failed, missionId %{public}d", missionId);
522 return;
523 }
524 HILOGI("alive missionInfo bundleName is %{public}s, abilityName is %{public}s",
525 missionInfo.bundleName.c_str(), missionInfo.abilityName.c_str());
526 for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
527 if (iter->second == nullptr) {
528 break;
529 }
530
531 auto continueInfo = iter->second->GetContinueInfo();
532 HILOGI("continueInfo bundleName is %{public}s, abilityName is %{public}s",
533 continueInfo.sinkBundleName_.c_str(), continueInfo.sinkAbilityName_.c_str());
534 if (missionInfo.bundleName == continueInfo.sinkBundleName_
535 && missionInfo.abilityName == continueInfo.sinkAbilityName_) {
536 HILOGE("Excute onContinueEnd");
537 iter->second->OnContinueEnd(CONTINUE_SINK_ABILITY_TERMINATED);
538 return;
539 }
540 }
541 }
542 HILOGW("doesn't match an existing continuation.");
543 }
544
OnContinueEnd(const DSchedContinueInfo & info)545 int32_t DSchedContinueManager::OnContinueEnd(const DSchedContinueInfo& info)
546 {
547 auto func = [this, info]() {
548 HandleContinueEnd(info);
549 };
550 if (eventHandler_ == nullptr) {
551 HILOGE("eventHandler_ is nullptr");
552 return INVALID_PARAMETERS_ERR;
553 }
554 eventHandler_->PostTask(func);
555 return ERR_OK;
556 }
557
HandleContinueEnd(const DSchedContinueInfo & info)558 void DSchedContinueManager::HandleContinueEnd(const DSchedContinueInfo& info)
559 {
560 HILOGI("begin, continue info: %{public}s.", info.toString().c_str());
561 std::lock_guard<std::mutex> continueLock(continueMutex_);
562 if (continues_.empty() || continues_.find(info) == continues_.end()) {
563 HILOGE("continue info doesn't match any existing continuation.");
564 return;
565 }
566 RemoveTimeout(info);
567 continues_.erase(info);
568 ContinueSceneSessionHandler::GetInstance().ClearContinueSessionId();
569
570 std::string localDevId;
571 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
572 HILOGE("get local deviceId failed!");
573 return;
574 }
575
576 if (info.sinkDeviceId_ == localDevId) {
577 cntSink_--;
578 } else if (info.sourceDeviceId_ == localDevId) {
579 cntSource_--;
580 }
581 HILOGI("end.");
582 }
583
RemoveTimeout(const DSchedContinueInfo & info)584 void DSchedContinueManager::RemoveTimeout(const DSchedContinueInfo& info)
585 {
586 if (eventHandler_ == nullptr) {
587 HILOGE("eventHandler_ is nullptr");
588 return;
589 }
590 eventHandler_->RemoveTask(info.ToStringIgnoreMissionId());
591 }
592
OnDataRecv(int32_t sessionId,std::shared_ptr<DSchedDataBuffer> dataBuffer)593 void DSchedContinueManager::OnDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)
594 {
595 auto func = [this, sessionId, dataBuffer]() {
596 HandleDataRecv(sessionId, dataBuffer);
597 };
598 if (eventHandler_ == nullptr) {
599 HILOGE("eventHandler_ is nullptr");
600 return;
601 }
602 eventHandler_->PostTask(func);
603 }
604
HandleDataRecv(int32_t sessionId,std::shared_ptr<DSchedDataBuffer> dataBuffer)605 void DSchedContinueManager::HandleDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)
606 {
607 HILOGI("start, sessionId: %{public}d.", sessionId);
608 if (dataBuffer == nullptr) {
609 HILOGE("dataBuffer is null.");
610 return;
611 }
612 uint8_t *data = dataBuffer->Data();
613 std::string jsonStr(reinterpret_cast<const char *>(data), dataBuffer->Capacity());
614 cJSON *rootValue = cJSON_Parse(jsonStr.c_str());
615 if (rootValue == nullptr) {
616 HILOGE("Parse jsonStr error.");
617 return;
618 }
619 cJSON *baseCmd = cJSON_GetObjectItemCaseSensitive(rootValue, "BaseCmd");
620 if (baseCmd == nullptr || !cJSON_IsString(baseCmd) || (baseCmd->valuestring == nullptr)) {
621 cJSON_Delete(rootValue);
622 HILOGE("Parse base cmd error.");
623 return;
624 }
625
626 cJSON *cmdValue = cJSON_Parse(baseCmd->valuestring);
627 cJSON_Delete(rootValue);
628 if (cmdValue == nullptr) {
629 HILOGE("Parse cmd value error.");
630 return;
631 }
632
633 cJSON *comvalue = cJSON_GetObjectItemCaseSensitive(cmdValue, "Command");
634 if (comvalue == nullptr || !cJSON_IsNumber(comvalue)) {
635 cJSON_Delete(cmdValue);
636 HILOGE("parse command failed");
637 return;
638 }
639 int32_t command = comvalue->valueint;
640 cJSON_Delete(cmdValue);
641 NotifyContinueDataRecv(sessionId, command, jsonStr, dataBuffer);
642 HILOGI("end, sessionId: %{public}d.", sessionId);
643 }
644
NotifyContinueDataRecv(int32_t sessionId,int32_t command,const std::string & jsonStr,std::shared_ptr<DSchedDataBuffer> dataBuffer)645 void DSchedContinueManager::NotifyContinueDataRecv(int32_t sessionId, int32_t command, const std::string& jsonStr,
646 std::shared_ptr<DSchedDataBuffer> dataBuffer)
647 {
648 HILOGI("start, parsed cmd %{public}d, sessionId: %{public}d.", command, sessionId);
649 std::lock_guard<std::mutex> continueLock(continueMutex_);
650 if (!continues_.empty()) {
651 for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
652 if (iter->second != nullptr && sessionId == iter->second->GetSessionId()) {
653 HILOGI("sessionId %{public}d exist.", sessionId);
654 iter->second->OnDataRecv(command, dataBuffer);
655 return;
656 }
657 }
658 }
659
660 if (command == DSCHED_CONTINUE_CMD_START) {
661 HILOGI("recv start cmd, sessionId: %{public}d.", sessionId);
662 auto startCmd = std::make_shared<DSchedContinueStartCmd>();
663 int32_t ret = startCmd->Unmarshal(jsonStr);
664 if (ret != ERR_OK) {
665 HILOGE("Unmarshal start cmd failed, ret: %{public}d", ret);
666 return;
667 }
668 int32_t direction = CONTINUE_SINK;
669 ret = CheckContinuationLimit(startCmd->srcDeviceId_, startCmd->dstDeviceId_, direction);
670 if (ret != ERR_OK) {
671 DmsRadar::GetInstance().SaveDataDmsRemoteWant("NotifyContinueDataRecv", ret);
672 HILOGE("CheckContinuationSubType failed, ret: %{public}d", ret);
673 return;
674 }
675
676 auto newContinue = std::make_shared<DSchedContinue>(startCmd, sessionId);
677 newContinue->Init();
678 continues_.insert(std::make_pair(newContinue->GetContinueInfo(), newContinue));
679
680 newContinue->OnStartCmd(startCmd->appVersion_);
681 HILOGI("end, continue info: %{public}s.", newContinue->GetContinueInfo().toString().c_str());
682 return;
683 }
684 HILOGE("No matching session to handle cmd! sessionId: %{public}d, recv cmd %{public}d.", sessionId, command);
685 return;
686 }
687
CheckContinuationLimit(const std::string & srcDeviceId,const std::string & dstDeviceId,int32_t & direction)688 int32_t DSchedContinueManager::CheckContinuationLimit(const std::string& srcDeviceId,
689 const std::string& dstDeviceId, int32_t &direction)
690 {
691 std::string localDevId;
692 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
693 HILOGE("get local deviceId failed!");
694 return GET_LOCAL_DEVICE_ERR;
695 }
696
697 direction = CONTINUE_SINK;
698 if (dstDeviceId == localDevId) {
699 if (cntSink_.load() >= MAX_CONCURRENT_SINK) {
700 HILOGE("can't deal more than %{public}d pull requests at the same time.", cntSink_.load());
701 return CONTINUE_ALREADY_IN_PROGRESS;
702 }
703 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(srcDeviceId) == nullptr) {
704 HILOGE("Irrecognized source device!");
705 return INVALID_PARAMETERS_ERR;
706 }
707 } else if (srcDeviceId == localDevId) {
708 if (cntSource_.load() >= MAX_CONCURRENT_SOURCE) {
709 HILOGE("can't deal more than %{public}d push requests at the same time.", cntSource_.load());
710 return CONTINUE_ALREADY_IN_PROGRESS;
711 }
712 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(dstDeviceId) == nullptr) {
713 HILOGE("Irrecognized destination device!");
714 return INVALID_PARAMETERS_ERR;
715 }
716 direction = CONTINUE_SOURCE;
717 } else {
718 HILOGE("source or target device must be local!");
719 return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
720 }
721 return ERR_OK;
722 }
723
GetContinueInfo(std::string & srcDeviceId,std::string & dstDeviceId)724 int32_t DSchedContinueManager::GetContinueInfo(std::string &srcDeviceId, std::string &dstDeviceId)
725 {
726 HILOGI("called");
727 std::lock_guard<std::mutex> continueLock(continueMutex_);
728 if (continues_.empty()) {
729 HILOGW("No continuation in progress.");
730 return ERR_OK;
731 }
732 auto dsContinue = continues_.begin()->second;
733 if (dsContinue == nullptr) {
734 HILOGE("dContinue is null");
735 return INVALID_PARAMETERS_ERR;
736 }
737 dstDeviceId = dsContinue->GetContinueInfo().sinkDeviceId_;
738 srcDeviceId = dsContinue->GetContinueInfo().sourceDeviceId_;
739 return ERR_OK;
740 }
741
OnShutdown(int32_t socket,bool isSelfCalled)742 void DSchedContinueManager::OnShutdown(int32_t socket, bool isSelfCalled)
743 {
744 if (isSelfCalled) {
745 HILOGW("called, shutdown by local, sessionId: %{public}d", socket);
746 return;
747 }
748 HILOGW("called, sessionId: %{public}d, isSelfCalled %{public}d", socket, isSelfCalled);
749 auto func = [this, socket]() {
750 std::lock_guard<std::mutex> continueLock(continueMutex_);
751 if (continues_.empty()) {
752 return;
753 }
754 for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
755 if (iter->second != nullptr && socket == iter->second->GetSessionId()) {
756 iter->second->OnContinueEnd(CONTINUE_SESSION_SHUTDOWN);
757 }
758 }
759 };
760 if (eventHandler_ == nullptr) {
761 HILOGE("eventHandler_ is nullptr");
762 return;
763 }
764 eventHandler_->PostTask(func);
765 return;
766 }
767
OnBind(int32_t socket,PeerSocketInfo info)768 void DSchedContinueManager::SoftbusListener::OnBind(int32_t socket, PeerSocketInfo info)
769 {
770 }
771
OnShutdown(int32_t socket,bool isSelfCalled)772 void DSchedContinueManager::SoftbusListener::OnShutdown(int32_t socket, bool isSelfCalled)
773 {
774 DSchedContinueManager::GetInstance().OnShutdown(socket, isSelfCalled);
775 }
776
OnDataRecv(int32_t socket,std::shared_ptr<DSchedDataBuffer> dataBuffer)777 void DSchedContinueManager::SoftbusListener::OnDataRecv(int32_t socket, std::shared_ptr<DSchedDataBuffer> dataBuffer)
778 {
779 DSchedContinueManager::GetInstance().OnDataRecv(socket, dataBuffer);
780 }
781 } // namespace DistributedSchedule
782 } // namespace OHOS
783