1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "dsched_transport_softbus_adapter.h"
17
18 #include "distributed_sched_utils.h"
19 #include "dsched_all_connect_manager.h"
20 #include "dsched_continue_manager.h"
21 #include "dtbschedmgr_device_info_storage.h"
22 #include "dtbschedmgr_log.h"
23 #include "softbus_bus_center.h"
24 #include "softbus_common.h"
25 #include "softbus_error_code.h"
26 #include "token_setproc.h"
27
28 namespace OHOS {
29 namespace DistributedSchedule {
30 namespace {
31 const std::string TAG = "DSchedTransportSoftbusAdapter";
32 constexpr int32_t INVALID_SESSION_ID = -1;
33 }
34
35 IMPLEMENT_SINGLE_INSTANCE(DSchedTransportSoftbusAdapter);
36
37 static QosTV g_qosInfo[] = {
38 { .qos = QOS_TYPE_MIN_BW, .value = DSCHED_QOS_TYPE_MIN_BW },
39 { .qos = QOS_TYPE_MAX_LATENCY, .value = DSCHED_QOS_TYPE_MAX_LATENCY },
40 { .qos = QOS_TYPE_MIN_LATENCY, .value = DSCHED_QOS_TYPE_MIN_LATENCY }
41 };
42 static uint32_t g_QosTV_Param_Index = static_cast<uint32_t>(sizeof(g_qosInfo) / sizeof(QosTV));
43
OnBind(int32_t socket,PeerSocketInfo info)44 static void OnBind(int32_t socket, PeerSocketInfo info)
45 {
46 std::string peerDeviceId(info.networkId);
47 DSchedTransportSoftbusAdapter::GetInstance().OnBind(socket, peerDeviceId);
48 }
49
OnShutdown(int32_t socket,ShutdownReason reason)50 static void OnShutdown(int32_t socket, ShutdownReason reason)
51 {
52 DSchedTransportSoftbusAdapter::GetInstance().OnShutdown(socket, false);
53 }
54
OnBytes(int32_t socket,const void * data,uint32_t dataLen)55 static void OnBytes(int32_t socket, const void *data, uint32_t dataLen)
56 {
57 DSchedTransportSoftbusAdapter::GetInstance().OnBytes(socket, data, dataLen);
58 }
59
60 ISocketListener iSocketListener = {
61 .OnBind = OnBind,
62 .OnShutdown = OnShutdown,
63 .OnBytes = OnBytes
64 };
65
DSchedTransportSoftbusAdapter()66 DSchedTransportSoftbusAdapter::DSchedTransportSoftbusAdapter()
67 {
68 }
69
~DSchedTransportSoftbusAdapter()70 DSchedTransportSoftbusAdapter::~DSchedTransportSoftbusAdapter()
71 {
72 }
73
InitChannel()74 int32_t DSchedTransportSoftbusAdapter::InitChannel()
75 {
76 HILOGI("start");
77 int32_t ret = ERR_OK;
78 #ifdef DMSFWK_ALL_CONNECT_MGR
79 ret = DSchedAllConnectManager::GetInstance().InitAllConnectManager();
80 if (ret != ERR_OK) {
81 HILOGE("Init all connect manager fail, ret: %{public}d.", ret);
82 }
83 #endif
84
85 serverSocket_ = CreateServerSocket();
86 if (serverSocket_ <= 0) {
87 HILOGE("create socket failed, ret: %{public}d", serverSocket_);
88 return serverSocket_;
89 }
90
91 ret = Listen(serverSocket_, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
92 if (ret != ERR_OK) {
93 HILOGE("service listen failed, ret: %{public}d", ret);
94 return ret;
95 }
96 HILOGI("end");
97 return ERR_OK;
98 }
99
CreateServerSocket()100 int32_t DSchedTransportSoftbusAdapter::CreateServerSocket()
101 {
102 HILOGI("start");
103 localSessionName_ = SOCKET_DMS_SESSION_NAME;
104 SocketInfo info = {
105 .name = const_cast<char*>(localSessionName_.c_str()),
106 .pkgName = const_cast<char*>(SOCKET_DMS_PKG_NAME.c_str()),
107 .dataType = DATA_TYPE_BYTES
108 };
109 int32_t socket = Socket(info);
110 HILOGI("finish, socket session id: %{public}d", socket);
111 return socket;
112 }
113
ConnectDevice(const std::string & peerDeviceId,int32_t & sessionId)114 int32_t DSchedTransportSoftbusAdapter::ConnectDevice(const std::string &peerDeviceId, int32_t &sessionId)
115 {
116 HILOGI("try to connect peer: %{public}s.", GetAnonymStr(peerDeviceId).c_str());
117 {
118 std::lock_guard<std::mutex> sessionLock(sessionMutex_);
119 if (!sessions_.empty()) {
120 for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
121 if (iter->second != nullptr && peerDeviceId == iter->second->GetPeerDeviceId()) {
122 HILOGI("peer device already connected");
123 iter->second->OnConnect();
124 sessionId = iter->first;
125 #ifdef DMSFWK_ALL_CONNECT_MGR
126 DSchedContinueManager::GetInstance().NotifyAllConnectDecision(peerDeviceId, true);
127 #endif
128 return ERR_OK;
129 }
130 }
131 }
132 }
133
134 int32_t ret = ERR_OK;
135 #ifdef DMSFWK_ALL_CONNECT_MGR
136 ServiceCollaborationManager_ResourceRequestInfoSets reqInfoSets;
137 DSchedAllConnectManager::GetInstance().GetResourceRequest(reqInfoSets);
138 ret = DSchedAllConnectManager::GetInstance().ApplyAdvanceResource(peerDeviceId, reqInfoSets);
139 if (ret != ERR_OK) {
140 HILOGE("Apply advance resource fail, ret: %{public}d.", ret);
141 sessionId = INVALID_SESSION_ID;
142 DSchedContinueManager::GetInstance().NotifyAllConnectDecision(peerDeviceId, false);
143 return ret;
144 }
145 DSchedContinueManager::GetInstance().NotifyAllConnectDecision(peerDeviceId, true);
146
147 ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_PREPARE);
148 if (ret != ERR_OK) {
149 HILOGE("Publish prepare state fail, ret %{public}d, peerDeviceId %{public}s.",
150 ret, GetAnonymStr(peerDeviceId).c_str());
151 }
152 #endif
153
154 ret = AddNewPeerSession(peerDeviceId, sessionId);
155 if (ret != ERR_OK || sessionId <= 0) {
156 HILOGE("Add new peer connect session fail, ret: %{public}d, sessionId: %{public}d.", ret, sessionId);
157 }
158 return ret;
159 }
160
AddNewPeerSession(const std::string & peerDeviceId,int32_t & sessionId)161 int32_t DSchedTransportSoftbusAdapter::AddNewPeerSession(const std::string &peerDeviceId, int32_t &sessionId)
162 {
163 int32_t ret = ERR_OK;
164 sessionId = CreateClientSocket(peerDeviceId);
165 if (sessionId <= 0) {
166 HILOGE("create socket failed, sessionId: %{public}d.", sessionId);
167 #ifdef DMSFWK_ALL_CONNECT_MGR
168 ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_IDLE);
169 if (ret != ERR_OK) {
170 HILOGE("Publish idle state fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
171 ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
172 }
173 #endif
174 return REMOTE_DEVICE_BIND_ABILITY_ERR;
175 }
176
177 ret = SetFirstCallerTokenID(callingTokenId_);
178 HILOGD("SetFirstCallerTokenID callingTokenId: %{public}s, ret: %{public}d",
179 GetAnonymStr(std::to_string(callingTokenId_)).c_str(), ret);
180 callingTokenId_ = 0;
181
182 do {
183 HILOGI("bind begin");
184 ret = Bind(sessionId, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
185 HILOGI("bind end");
186 if (ret != ERR_OK) {
187 HILOGE("client bind failed, ret: %{public}d", ret);
188 break;
189 }
190
191 ret = CreateSessionRecord(sessionId, peerDeviceId, false);
192 if (ret != ERR_OK) {
193 HILOGE("Client create session record fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
194 ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
195 break;
196 }
197 } while (false);
198
199 if (ret != ERR_OK) {
200 ShutdownSession(peerDeviceId, sessionId);
201 sessionId = INVALID_SESSION_ID;
202 }
203 return ret;
204 }
205
CreateClientSocket(const std::string & peerDeviceId)206 int32_t DSchedTransportSoftbusAdapter::CreateClientSocket(const std::string &peerDeviceId)
207 {
208 HILOGI("start");
209 SocketInfo info = {
210 .name = const_cast<char*>(SOCKET_DMS_SESSION_NAME.c_str()),
211 .peerName = const_cast<char*>(SOCKET_DMS_SESSION_NAME.c_str()),
212 .peerNetworkId = const_cast<char*>(peerDeviceId.c_str()),
213 .pkgName = const_cast<char*>(SOCKET_DMS_PKG_NAME.c_str()),
214 .dataType = DATA_TYPE_BYTES
215 };
216 int32_t sessionId = Socket(info);
217 HILOGI("finish, socket session id: %{public}d", sessionId);
218 return sessionId;
219 }
220
CreateSessionRecord(int32_t sessionId,const std::string & peerDeviceId,bool isServer)221 int32_t DSchedTransportSoftbusAdapter::CreateSessionRecord(int32_t sessionId, const std::string &peerDeviceId,
222 bool isServer)
223 {
224 std::string localDeviceId;
225 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
226 HILOGE("GetLocalDeviceId failed");
227 ShutdownSession(peerDeviceId, sessionId);
228 return GET_LOCAL_DEVICE_ERR;
229 }
230 {
231 std::lock_guard<std::mutex> sessionLock(sessionMutex_);
232 std::string sessionName = SOCKET_DMS_SESSION_NAME;
233 SessionInfo info = { sessionId, localDeviceId, peerDeviceId, sessionName, isServer };
234 auto session = std::make_shared<DSchedSoftbusSession>(info);
235 sessions_[sessionId] = session;
236 }
237
238 #ifdef DMSFWK_ALL_CONNECT_MGR
239 int32_t ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_CONNECTED);
240 if (ret != ERR_OK) {
241 HILOGE("Publish connected state fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
242 ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
243 }
244 #endif
245 return ERR_OK;
246 }
247
DisconnectDevice(const std::string & peerDeviceId)248 void DSchedTransportSoftbusAdapter::DisconnectDevice(const std::string &peerDeviceId)
249 {
250 HILOGI("try to disconnect peer: %{public}s.", GetAnonymStr(peerDeviceId).c_str());
251 int32_t sessionId = 0;
252 std::lock_guard<std::mutex> sessionLock(sessionMutex_);
253 for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
254 if (iter->second != nullptr && peerDeviceId == iter->second->GetPeerDeviceId()) {
255 sessionId = iter->first;
256 break;
257 }
258 }
259 if (sessionId != 0 && sessions_[sessionId] != nullptr && sessions_[sessionId]->OnDisconnect()) {
260 HILOGI("peer %{public}s shutdown, socket sessionId: %{public}d.",
261 GetAnonymStr(sessions_[sessionId]->GetPeerDeviceId()).c_str(), sessionId);
262 ShutdownSession(peerDeviceId, sessionId);
263 sessions_.erase(sessionId);
264 NotifyListenersSessionShutdown(sessionId, true);
265 }
266 HILOGI("finish, socket session id: %{public}d", sessionId);
267 return;
268 }
269
ShutdownSession(const std::string & peerDeviceId,int32_t sessionId)270 void DSchedTransportSoftbusAdapter::ShutdownSession(const std::string &peerDeviceId, int32_t sessionId)
271 {
272 Shutdown(sessionId);
273 #ifdef DMSFWK_ALL_CONNECT_MGR
274 int32_t ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_IDLE);
275 if (ret != ERR_OK) {
276 HILOGE("Publish idle state fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
277 ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
278 }
279 #endif
280 }
281
GetSessionIdByDeviceId(const std::string & peerDeviceId,int32_t & sessionId)282 bool DSchedTransportSoftbusAdapter::GetSessionIdByDeviceId(const std::string &peerDeviceId, int32_t &sessionId)
283 {
284 std::lock_guard<std::mutex> sessionLock(sessionMutex_);
285 for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
286 if (iter->second != nullptr && peerDeviceId == iter->second->GetPeerDeviceId()) {
287 sessionId = iter->first;
288 return true;
289 }
290 }
291 return false;
292 }
293
OnBind(int32_t sessionId,const std::string & peerDeviceId)294 void DSchedTransportSoftbusAdapter::OnBind(int32_t sessionId, const std::string &peerDeviceId)
295 {
296 int32_t ret = CreateSessionRecord(sessionId, peerDeviceId, true);
297 if (ret != ERR_OK) {
298 HILOGE("Service create session record fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
299 ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
300 }
301 }
302
OnShutdown(int32_t sessionId,bool isSelfcalled)303 void DSchedTransportSoftbusAdapter::OnShutdown(int32_t sessionId, bool isSelfcalled)
304 {
305 {
306 std::lock_guard<std::mutex> sessionLock(sessionMutex_);
307 if (sessions_.empty() || sessions_.count(sessionId) == 0 || sessions_[sessionId] == nullptr) {
308 HILOGE("error, invalid sessionId %{public}d", sessionId);
309 return;
310 }
311 std::string peerDeviceId = sessions_[sessionId]->GetPeerDeviceId();
312 HILOGI("peerDeviceId: %{public}s shutdown, socket sessionId: %{public}d.",
313 GetAnonymStr(peerDeviceId).c_str(), sessionId);
314 ShutdownSession(peerDeviceId, sessionId);
315 sessions_.erase(sessionId);
316 }
317 NotifyListenersSessionShutdown(sessionId, isSelfcalled);
318 }
319
NotifyListenersSessionShutdown(int32_t sessionId,bool isSelfcalled)320 void DSchedTransportSoftbusAdapter::NotifyListenersSessionShutdown(int32_t sessionId, bool isSelfcalled)
321 {
322 std::lock_guard<std::mutex> listenerMapLock(listenerMutex_);
323 if (listeners_.empty()) {
324 HILOGE("no listener has registered");
325 return;
326 }
327 for (auto iterItem = listeners_.begin(); iterItem != listeners_.end(); iterItem++) {
328 std::vector<std::shared_ptr<IDataListener>> objs = iterItem->second;
329 for (auto iter : objs) {
330 iter->OnShutdown(sessionId, isSelfcalled);
331 }
332 }
333 return;
334 }
335
ReleaseChannel()336 int32_t DSchedTransportSoftbusAdapter::ReleaseChannel()
337 {
338 HILOGI("start");
339 {
340 std::lock_guard<std::mutex> sessionLock(sessionMutex_);
341 for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
342 std::string peerDeviceId = (iter->second != nullptr) ? iter->second->GetPeerDeviceId() : "";
343 HILOGI("shutdown client: %{public}s, socket sessionId: %{public}d.",
344 GetAnonymStr(peerDeviceId).c_str(), iter->first);
345 ShutdownSession(peerDeviceId, iter->first);
346 }
347 sessions_.clear();
348 }
349 HILOGI("shutdown server, socket session id: %{public}d", serverSocket_);
350 Shutdown(serverSocket_);
351 serverSocket_ = 0;
352
353 #ifdef DMSFWK_ALL_CONNECT_MGR
354 int32_t ret = DSchedAllConnectManager::GetInstance().UninitAllConnectManager();
355 if (ret != ERR_OK) {
356 HILOGE("Uninit all connect manager fail, ret: %{public}d.", ret);
357 }
358 #endif
359 return ERR_OK;
360 }
361
SendData(int32_t sessionId,int32_t dataType,std::shared_ptr<DSchedDataBuffer> dataBuffer)362 int32_t DSchedTransportSoftbusAdapter::SendData(int32_t sessionId, int32_t dataType,
363 std::shared_ptr<DSchedDataBuffer> dataBuffer)
364 {
365 std::lock_guard<std::mutex> sessionLock(sessionMutex_);
366 if (!sessions_.count(sessionId) || sessions_[sessionId] == nullptr) {
367 HILOGE("error, invalid session id %{public}d", sessionId);
368 return INVALID_SESSION_ID;
369 }
370 return sessions_[sessionId]->SendData(dataBuffer, dataType);
371 }
372
SendBytesBySoftbus(int32_t sessionId,std::shared_ptr<DSchedDataBuffer> dataBuffer)373 int32_t DSchedTransportSoftbusAdapter::SendBytesBySoftbus(int32_t sessionId,
374 std::shared_ptr<DSchedDataBuffer> dataBuffer)
375 {
376 if (dataBuffer != nullptr) {
377 return SendBytes(sessionId, dataBuffer->Data(), dataBuffer->Size());
378 } else {
379 HILOGE("dataBuffer is nullptr");
380 return INVALID_PARAMETERS_ERR;
381 }
382 }
383
OnBytes(int32_t sessionId,const void * data,uint32_t dataLen)384 void DSchedTransportSoftbusAdapter::OnBytes(int32_t sessionId, const void *data, uint32_t dataLen)
385 {
386 if (dataLen == 0 || dataLen > DSCHED_MAX_RECV_DATA_LEN || data == nullptr) {
387 HILOGE("error, dataLen: %{public}d, session id: %{public}d", dataLen, sessionId);
388 return;
389 }
390 HILOGD("start, sessionId: %{public}d", sessionId);
391 {
392 std::lock_guard<std::mutex> sessionLock(sessionMutex_);
393 if (!sessions_.count(sessionId) || sessions_[sessionId] == nullptr) {
394 HILOGE("invalid session id %{public}d", sessionId);
395 return;
396 }
397 std::shared_ptr<DSchedDataBuffer> buffer = std::make_shared<DSchedDataBuffer>(dataLen);
398 int32_t ret = memcpy_s(buffer->Data(), buffer->Capacity(), data, dataLen);
399 if (ret != ERR_OK) {
400 HILOGE("memcpy_s failed ret: %{public}d", ret);
401 return;
402 }
403 sessions_[sessionId]->OnBytesReceived(buffer);
404 }
405 HILOGD("end, session id: %{public}d", sessionId);
406 return;
407 }
408
OnDataReady(int32_t sessionId,std::shared_ptr<DSchedDataBuffer> dataBuffer,uint32_t dataType)409 void DSchedTransportSoftbusAdapter::OnDataReady(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer,
410 uint32_t dataType)
411 {
412 std::lock_guard<std::mutex> listenerMapLock(listenerMutex_);
413 if (listeners_.empty()) {
414 HILOGE("no listener has registered");
415 return;
416 }
417 auto iterItem = listeners_.find(dataType);
418 if (iterItem == listeners_.end()) {
419 HILOGE("get iterItem failed from listeners_, type %{public}d, sessionId: %{public}d", dataType, sessionId);
420 return;
421 }
422 std::vector<std::shared_ptr<IDataListener>> objs = iterItem->second;
423 for (auto iter : objs) {
424 iter->OnDataRecv(sessionId, dataBuffer);
425 }
426 return;
427 }
428
RegisterListener(int32_t serviceType,std::shared_ptr<IDataListener> listener)429 void DSchedTransportSoftbusAdapter::RegisterListener(int32_t serviceType, std::shared_ptr<IDataListener> listener)
430 {
431 HILOGI("start, service type: %{public}d", serviceType);
432 if (listener == nullptr) {
433 HILOGE("listener is null, type: %{public}d", serviceType);
434 return;
435 }
436 std::lock_guard<std::mutex> listenerMapLock(listenerMutex_);
437 if (listeners_.empty() || listeners_.find(serviceType) == listeners_.end()) {
438 HILOGD("service type %{public}d does not exist in the listeners, adding", serviceType);
439 std::vector<std::shared_ptr<IDataListener>> newListeners;
440 newListeners.emplace_back(listener);
441 listeners_[serviceType] = newListeners;
442 HILOGI("listener register success");
443 return;
444 }
445 auto iterItem = listeners_.find(serviceType);
446 for (auto iter : iterItem->second) {
447 if (iter == listener) {
448 HILOGI("listener already registed");
449 return;
450 }
451 }
452 iterItem->second.emplace_back(listener);
453 HILOGI("listener register success");
454 return;
455 }
456
UnregisterListener(int32_t serviceType,std::shared_ptr<IDataListener> listener)457 void DSchedTransportSoftbusAdapter::UnregisterListener(int32_t serviceType, std::shared_ptr<IDataListener> listener)
458 {
459 HILOGI("start, service type: %{public}d", serviceType);
460 if (listener == nullptr) {
461 HILOGE("listener is null, type: %{public}d", serviceType);
462 return;
463 }
464 std::lock_guard<std::mutex> listenerMapLock(listenerMutex_);
465 if (listeners_.empty() || listeners_.find(serviceType) == listeners_.end()) {
466 HILOGD("service type %{public}d does not exist in the listeners, ignore", serviceType);
467 return;
468 }
469 auto typeListeners = listeners_.find(serviceType);
470 for (size_t i = 0; i < typeListeners->second.size(); i++) {
471 if (typeListeners->second[i] == listener) {
472 typeListeners->second.erase(typeListeners->second.begin() + i);
473 if (typeListeners->second.empty()) {
474 listeners_.erase(typeListeners);
475 }
476 break;
477 }
478 }
479 HILOGI("listener unregister success");
480 return;
481 }
482
SetCallingTokenId(int32_t callingTokenId)483 void DSchedTransportSoftbusAdapter::SetCallingTokenId(int32_t callingTokenId)
484 {
485 callingTokenId_ = callingTokenId;
486 }
487 } // namespace DistributedSchedule
488 } // namespace OHOS
489