1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "dsched_transport_softbus_adapter.h"
17 
18 #include "distributed_sched_utils.h"
19 #include "dsched_all_connect_manager.h"
20 #include "dsched_continue_manager.h"
21 #include "dtbschedmgr_device_info_storage.h"
22 #include "dtbschedmgr_log.h"
23 #include "softbus_bus_center.h"
24 #include "softbus_common.h"
25 #include "softbus_error_code.h"
26 #include "token_setproc.h"
27 
28 namespace OHOS {
29 namespace DistributedSchedule {
30 namespace {
31 const std::string TAG = "DSchedTransportSoftbusAdapter";
32 constexpr int32_t INVALID_SESSION_ID = -1;
33 }
34 
35 IMPLEMENT_SINGLE_INSTANCE(DSchedTransportSoftbusAdapter);
36 
37 static QosTV g_qosInfo[] = {
38     { .qos = QOS_TYPE_MIN_BW, .value = DSCHED_QOS_TYPE_MIN_BW },
39     { .qos = QOS_TYPE_MAX_LATENCY, .value = DSCHED_QOS_TYPE_MAX_LATENCY },
40     { .qos = QOS_TYPE_MIN_LATENCY, .value = DSCHED_QOS_TYPE_MIN_LATENCY }
41 };
42 static uint32_t g_QosTV_Param_Index = static_cast<uint32_t>(sizeof(g_qosInfo) / sizeof(QosTV));
43 
OnBind(int32_t socket,PeerSocketInfo info)44 static void OnBind(int32_t socket, PeerSocketInfo info)
45 {
46     std::string peerDeviceId(info.networkId);
47     DSchedTransportSoftbusAdapter::GetInstance().OnBind(socket, peerDeviceId);
48 }
49 
OnShutdown(int32_t socket,ShutdownReason reason)50 static void OnShutdown(int32_t socket, ShutdownReason reason)
51 {
52     DSchedTransportSoftbusAdapter::GetInstance().OnShutdown(socket, false);
53 }
54 
OnBytes(int32_t socket,const void * data,uint32_t dataLen)55 static void OnBytes(int32_t socket, const void *data, uint32_t dataLen)
56 {
57     DSchedTransportSoftbusAdapter::GetInstance().OnBytes(socket, data, dataLen);
58 }
59 
60 ISocketListener iSocketListener = {
61     .OnBind = OnBind,
62     .OnShutdown = OnShutdown,
63     .OnBytes = OnBytes
64 };
65 
DSchedTransportSoftbusAdapter()66 DSchedTransportSoftbusAdapter::DSchedTransportSoftbusAdapter()
67 {
68 }
69 
~DSchedTransportSoftbusAdapter()70 DSchedTransportSoftbusAdapter::~DSchedTransportSoftbusAdapter()
71 {
72 }
73 
InitChannel()74 int32_t DSchedTransportSoftbusAdapter::InitChannel()
75 {
76     HILOGI("start");
77     int32_t ret = ERR_OK;
78 #ifdef DMSFWK_ALL_CONNECT_MGR
79     ret = DSchedAllConnectManager::GetInstance().InitAllConnectManager();
80     if (ret != ERR_OK) {
81         HILOGE("Init all connect manager fail, ret: %{public}d.", ret);
82     }
83 #endif
84 
85     serverSocket_ = CreateServerSocket();
86     if (serverSocket_ <= 0) {
87         HILOGE("create socket failed, ret: %{public}d", serverSocket_);
88         return serverSocket_;
89     }
90 
91     ret = Listen(serverSocket_, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
92     if (ret != ERR_OK) {
93         HILOGE("service listen failed, ret: %{public}d", ret);
94         return ret;
95     }
96     HILOGI("end");
97     return ERR_OK;
98 }
99 
CreateServerSocket()100 int32_t DSchedTransportSoftbusAdapter::CreateServerSocket()
101 {
102     HILOGI("start");
103     localSessionName_ = SOCKET_DMS_SESSION_NAME;
104     SocketInfo info = {
105         .name = const_cast<char*>(localSessionName_.c_str()),
106         .pkgName = const_cast<char*>(SOCKET_DMS_PKG_NAME.c_str()),
107         .dataType = DATA_TYPE_BYTES
108     };
109     int32_t socket = Socket(info);
110     HILOGI("finish, socket session id: %{public}d", socket);
111     return socket;
112 }
113 
ConnectDevice(const std::string & peerDeviceId,int32_t & sessionId)114 int32_t DSchedTransportSoftbusAdapter::ConnectDevice(const std::string &peerDeviceId, int32_t &sessionId)
115 {
116     HILOGI("try to connect peer: %{public}s.", GetAnonymStr(peerDeviceId).c_str());
117     {
118         std::lock_guard<std::mutex> sessionLock(sessionMutex_);
119         if (!sessions_.empty()) {
120             for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
121                 if (iter->second != nullptr && peerDeviceId == iter->second->GetPeerDeviceId()) {
122                     HILOGI("peer device already connected");
123                     iter->second->OnConnect();
124                     sessionId = iter->first;
125 #ifdef DMSFWK_ALL_CONNECT_MGR
126                     DSchedContinueManager::GetInstance().NotifyAllConnectDecision(peerDeviceId, true);
127 #endif
128                     return ERR_OK;
129                 }
130             }
131         }
132     }
133 
134     int32_t ret = ERR_OK;
135 #ifdef DMSFWK_ALL_CONNECT_MGR
136     ServiceCollaborationManager_ResourceRequestInfoSets reqInfoSets;
137     DSchedAllConnectManager::GetInstance().GetResourceRequest(reqInfoSets);
138     ret = DSchedAllConnectManager::GetInstance().ApplyAdvanceResource(peerDeviceId, reqInfoSets);
139     if (ret != ERR_OK) {
140         HILOGE("Apply advance resource fail, ret: %{public}d.", ret);
141         sessionId = INVALID_SESSION_ID;
142         DSchedContinueManager::GetInstance().NotifyAllConnectDecision(peerDeviceId, false);
143         return ret;
144     }
145     DSchedContinueManager::GetInstance().NotifyAllConnectDecision(peerDeviceId, true);
146 
147     ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_PREPARE);
148     if (ret != ERR_OK) {
149         HILOGE("Publish prepare state fail, ret %{public}d, peerDeviceId %{public}s.",
150             ret, GetAnonymStr(peerDeviceId).c_str());
151     }
152 #endif
153 
154     ret = AddNewPeerSession(peerDeviceId, sessionId);
155     if (ret != ERR_OK || sessionId <= 0) {
156         HILOGE("Add new peer connect session fail, ret: %{public}d, sessionId: %{public}d.", ret, sessionId);
157     }
158     return ret;
159 }
160 
AddNewPeerSession(const std::string & peerDeviceId,int32_t & sessionId)161 int32_t DSchedTransportSoftbusAdapter::AddNewPeerSession(const std::string &peerDeviceId, int32_t &sessionId)
162 {
163     int32_t ret = ERR_OK;
164     sessionId = CreateClientSocket(peerDeviceId);
165     if (sessionId <= 0) {
166         HILOGE("create socket failed, sessionId: %{public}d.", sessionId);
167 #ifdef DMSFWK_ALL_CONNECT_MGR
168         ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_IDLE);
169         if (ret != ERR_OK) {
170             HILOGE("Publish idle state fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
171                 ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
172         }
173 #endif
174         return REMOTE_DEVICE_BIND_ABILITY_ERR;
175     }
176 
177     ret = SetFirstCallerTokenID(callingTokenId_);
178     HILOGD("SetFirstCallerTokenID callingTokenId: %{public}s, ret: %{public}d",
179         GetAnonymStr(std::to_string(callingTokenId_)).c_str(), ret);
180     callingTokenId_ = 0;
181 
182     do {
183         HILOGI("bind begin");
184         ret = Bind(sessionId, g_qosInfo, g_QosTV_Param_Index, &iSocketListener);
185         HILOGI("bind end");
186         if (ret != ERR_OK) {
187             HILOGE("client bind failed, ret: %{public}d", ret);
188             break;
189         }
190 
191         ret = CreateSessionRecord(sessionId, peerDeviceId, false);
192         if (ret != ERR_OK) {
193             HILOGE("Client create session record fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
194                 ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
195             break;
196         }
197     } while (false);
198 
199     if (ret != ERR_OK) {
200         ShutdownSession(peerDeviceId, sessionId);
201         sessionId = INVALID_SESSION_ID;
202     }
203     return ret;
204 }
205 
CreateClientSocket(const std::string & peerDeviceId)206 int32_t DSchedTransportSoftbusAdapter::CreateClientSocket(const std::string &peerDeviceId)
207 {
208     HILOGI("start");
209     SocketInfo info = {
210         .name = const_cast<char*>(SOCKET_DMS_SESSION_NAME.c_str()),
211         .peerName = const_cast<char*>(SOCKET_DMS_SESSION_NAME.c_str()),
212         .peerNetworkId = const_cast<char*>(peerDeviceId.c_str()),
213         .pkgName = const_cast<char*>(SOCKET_DMS_PKG_NAME.c_str()),
214         .dataType = DATA_TYPE_BYTES
215     };
216     int32_t sessionId = Socket(info);
217     HILOGI("finish, socket session id: %{public}d", sessionId);
218     return sessionId;
219 }
220 
CreateSessionRecord(int32_t sessionId,const std::string & peerDeviceId,bool isServer)221 int32_t DSchedTransportSoftbusAdapter::CreateSessionRecord(int32_t sessionId, const std::string &peerDeviceId,
222     bool isServer)
223 {
224     std::string localDeviceId;
225     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
226         HILOGE("GetLocalDeviceId failed");
227         ShutdownSession(peerDeviceId, sessionId);
228         return GET_LOCAL_DEVICE_ERR;
229     }
230     {
231         std::lock_guard<std::mutex> sessionLock(sessionMutex_);
232         std::string sessionName = SOCKET_DMS_SESSION_NAME;
233         SessionInfo info = { sessionId, localDeviceId, peerDeviceId, sessionName, isServer };
234         auto session = std::make_shared<DSchedSoftbusSession>(info);
235         sessions_[sessionId] = session;
236     }
237 
238 #ifdef DMSFWK_ALL_CONNECT_MGR
239     int32_t ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_CONNECTED);
240     if (ret != ERR_OK) {
241         HILOGE("Publish connected state fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
242             ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
243     }
244 #endif
245     return ERR_OK;
246 }
247 
DisconnectDevice(const std::string & peerDeviceId)248 void DSchedTransportSoftbusAdapter::DisconnectDevice(const std::string &peerDeviceId)
249 {
250     HILOGI("try to disconnect peer: %{public}s.", GetAnonymStr(peerDeviceId).c_str());
251     int32_t sessionId = 0;
252     std::lock_guard<std::mutex> sessionLock(sessionMutex_);
253     for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
254         if (iter->second != nullptr && peerDeviceId == iter->second->GetPeerDeviceId()) {
255             sessionId = iter->first;
256             break;
257         }
258     }
259     if (sessionId != 0 && sessions_[sessionId] != nullptr && sessions_[sessionId]->OnDisconnect()) {
260         HILOGI("peer %{public}s shutdown, socket sessionId: %{public}d.",
261             GetAnonymStr(sessions_[sessionId]->GetPeerDeviceId()).c_str(), sessionId);
262         ShutdownSession(peerDeviceId, sessionId);
263         sessions_.erase(sessionId);
264         NotifyListenersSessionShutdown(sessionId, true);
265     }
266     HILOGI("finish, socket session id: %{public}d", sessionId);
267     return;
268 }
269 
ShutdownSession(const std::string & peerDeviceId,int32_t sessionId)270 void DSchedTransportSoftbusAdapter::ShutdownSession(const std::string &peerDeviceId, int32_t sessionId)
271 {
272     Shutdown(sessionId);
273 #ifdef DMSFWK_ALL_CONNECT_MGR
274     int32_t ret = DSchedAllConnectManager::GetInstance().PublishServiceState(peerDeviceId, "", SCM_IDLE);
275     if (ret != ERR_OK) {
276         HILOGE("Publish idle state fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
277             ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
278     }
279 #endif
280 }
281 
GetSessionIdByDeviceId(const std::string & peerDeviceId,int32_t & sessionId)282 bool DSchedTransportSoftbusAdapter::GetSessionIdByDeviceId(const std::string &peerDeviceId, int32_t &sessionId)
283 {
284     std::lock_guard<std::mutex> sessionLock(sessionMutex_);
285     for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
286         if (iter->second != nullptr && peerDeviceId == iter->second->GetPeerDeviceId()) {
287             sessionId = iter->first;
288             return true;
289         }
290     }
291     return false;
292 }
293 
OnBind(int32_t sessionId,const std::string & peerDeviceId)294 void DSchedTransportSoftbusAdapter::OnBind(int32_t sessionId, const std::string &peerDeviceId)
295 {
296     int32_t ret = CreateSessionRecord(sessionId, peerDeviceId, true);
297     if (ret != ERR_OK) {
298         HILOGE("Service create session record fail, ret %{public}d, peerDeviceId %{public}s, sessionId %{public}d.",
299             ret, GetAnonymStr(peerDeviceId).c_str(), sessionId);
300     }
301 }
302 
OnShutdown(int32_t sessionId,bool isSelfcalled)303 void DSchedTransportSoftbusAdapter::OnShutdown(int32_t sessionId, bool isSelfcalled)
304 {
305     {
306         std::lock_guard<std::mutex> sessionLock(sessionMutex_);
307         if (sessions_.empty() || sessions_.count(sessionId) == 0 || sessions_[sessionId] == nullptr) {
308             HILOGE("error, invalid sessionId %{public}d", sessionId);
309             return;
310         }
311         std::string peerDeviceId = sessions_[sessionId]->GetPeerDeviceId();
312         HILOGI("peerDeviceId: %{public}s shutdown, socket sessionId: %{public}d.",
313             GetAnonymStr(peerDeviceId).c_str(), sessionId);
314         ShutdownSession(peerDeviceId, sessionId);
315         sessions_.erase(sessionId);
316     }
317     NotifyListenersSessionShutdown(sessionId, isSelfcalled);
318 }
319 
NotifyListenersSessionShutdown(int32_t sessionId,bool isSelfcalled)320 void DSchedTransportSoftbusAdapter::NotifyListenersSessionShutdown(int32_t sessionId, bool isSelfcalled)
321 {
322     std::lock_guard<std::mutex> listenerMapLock(listenerMutex_);
323     if (listeners_.empty()) {
324         HILOGE("no listener has registered");
325         return;
326     }
327     for (auto iterItem = listeners_.begin(); iterItem != listeners_.end(); iterItem++) {
328         std::vector<std::shared_ptr<IDataListener>> objs = iterItem->second;
329         for (auto iter : objs) {
330             iter->OnShutdown(sessionId, isSelfcalled);
331         }
332     }
333     return;
334 }
335 
ReleaseChannel()336 int32_t DSchedTransportSoftbusAdapter::ReleaseChannel()
337 {
338     HILOGI("start");
339     {
340         std::lock_guard<std::mutex> sessionLock(sessionMutex_);
341         for (auto iter = sessions_.begin(); iter != sessions_.end(); iter++) {
342             std::string peerDeviceId = (iter->second != nullptr) ? iter->second->GetPeerDeviceId() : "";
343             HILOGI("shutdown client: %{public}s, socket sessionId: %{public}d.",
344                 GetAnonymStr(peerDeviceId).c_str(), iter->first);
345             ShutdownSession(peerDeviceId, iter->first);
346         }
347         sessions_.clear();
348     }
349     HILOGI("shutdown server, socket session id: %{public}d", serverSocket_);
350     Shutdown(serverSocket_);
351     serverSocket_ = 0;
352 
353 #ifdef DMSFWK_ALL_CONNECT_MGR
354     int32_t ret = DSchedAllConnectManager::GetInstance().UninitAllConnectManager();
355     if (ret != ERR_OK) {
356         HILOGE("Uninit all connect manager fail, ret: %{public}d.", ret);
357     }
358 #endif
359     return ERR_OK;
360 }
361 
SendData(int32_t sessionId,int32_t dataType,std::shared_ptr<DSchedDataBuffer> dataBuffer)362 int32_t DSchedTransportSoftbusAdapter::SendData(int32_t sessionId, int32_t dataType,
363     std::shared_ptr<DSchedDataBuffer> dataBuffer)
364 {
365     std::lock_guard<std::mutex> sessionLock(sessionMutex_);
366     if (!sessions_.count(sessionId) || sessions_[sessionId] == nullptr) {
367         HILOGE("error, invalid session id %{public}d", sessionId);
368         return INVALID_SESSION_ID;
369     }
370     return sessions_[sessionId]->SendData(dataBuffer, dataType);
371 }
372 
SendBytesBySoftbus(int32_t sessionId,std::shared_ptr<DSchedDataBuffer> dataBuffer)373 int32_t DSchedTransportSoftbusAdapter::SendBytesBySoftbus(int32_t sessionId,
374     std::shared_ptr<DSchedDataBuffer> dataBuffer)
375 {
376     if (dataBuffer != nullptr) {
377         return SendBytes(sessionId, dataBuffer->Data(), dataBuffer->Size());
378     } else {
379         HILOGE("dataBuffer is nullptr");
380         return INVALID_PARAMETERS_ERR;
381     }
382 }
383 
OnBytes(int32_t sessionId,const void * data,uint32_t dataLen)384 void DSchedTransportSoftbusAdapter::OnBytes(int32_t sessionId, const void *data, uint32_t dataLen)
385 {
386     if (dataLen == 0 || dataLen > DSCHED_MAX_RECV_DATA_LEN || data == nullptr) {
387         HILOGE("error, dataLen: %{public}d, session id: %{public}d", dataLen, sessionId);
388         return;
389     }
390     HILOGD("start, sessionId: %{public}d", sessionId);
391     {
392         std::lock_guard<std::mutex> sessionLock(sessionMutex_);
393         if (!sessions_.count(sessionId) || sessions_[sessionId] == nullptr) {
394             HILOGE("invalid session id %{public}d", sessionId);
395             return;
396         }
397         std::shared_ptr<DSchedDataBuffer> buffer = std::make_shared<DSchedDataBuffer>(dataLen);
398         int32_t ret = memcpy_s(buffer->Data(), buffer->Capacity(), data, dataLen);
399         if (ret != ERR_OK) {
400             HILOGE("memcpy_s failed ret: %{public}d", ret);
401             return;
402         }
403         sessions_[sessionId]->OnBytesReceived(buffer);
404     }
405     HILOGD("end, session id: %{public}d", sessionId);
406     return;
407 }
408 
OnDataReady(int32_t sessionId,std::shared_ptr<DSchedDataBuffer> dataBuffer,uint32_t dataType)409 void DSchedTransportSoftbusAdapter::OnDataReady(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer,
410     uint32_t dataType)
411 {
412     std::lock_guard<std::mutex> listenerMapLock(listenerMutex_);
413     if (listeners_.empty()) {
414         HILOGE("no listener has registered");
415         return;
416     }
417     auto iterItem = listeners_.find(dataType);
418     if (iterItem == listeners_.end()) {
419         HILOGE("get iterItem failed from listeners_, type %{public}d, sessionId: %{public}d", dataType, sessionId);
420         return;
421     }
422     std::vector<std::shared_ptr<IDataListener>> objs = iterItem->second;
423     for (auto iter : objs) {
424         iter->OnDataRecv(sessionId, dataBuffer);
425     }
426     return;
427 }
428 
RegisterListener(int32_t serviceType,std::shared_ptr<IDataListener> listener)429 void DSchedTransportSoftbusAdapter::RegisterListener(int32_t serviceType, std::shared_ptr<IDataListener> listener)
430 {
431     HILOGI("start, service type: %{public}d", serviceType);
432     if (listener == nullptr) {
433         HILOGE("listener is null, type: %{public}d", serviceType);
434         return;
435     }
436     std::lock_guard<std::mutex> listenerMapLock(listenerMutex_);
437     if (listeners_.empty() || listeners_.find(serviceType) == listeners_.end()) {
438         HILOGD("service type %{public}d does not exist in the listeners, adding", serviceType);
439         std::vector<std::shared_ptr<IDataListener>> newListeners;
440         newListeners.emplace_back(listener);
441         listeners_[serviceType] = newListeners;
442         HILOGI("listener register success");
443         return;
444     }
445     auto iterItem = listeners_.find(serviceType);
446     for (auto iter : iterItem->second) {
447         if (iter == listener) {
448             HILOGI("listener already registed");
449             return;
450         }
451     }
452     iterItem->second.emplace_back(listener);
453     HILOGI("listener register success");
454     return;
455 }
456 
UnregisterListener(int32_t serviceType,std::shared_ptr<IDataListener> listener)457 void DSchedTransportSoftbusAdapter::UnregisterListener(int32_t serviceType, std::shared_ptr<IDataListener> listener)
458 {
459     HILOGI("start, service type: %{public}d", serviceType);
460     if (listener == nullptr) {
461         HILOGE("listener is null, type: %{public}d", serviceType);
462         return;
463     }
464     std::lock_guard<std::mutex> listenerMapLock(listenerMutex_);
465     if (listeners_.empty() || listeners_.find(serviceType) == listeners_.end()) {
466         HILOGD("service type %{public}d does not exist in the listeners, ignore", serviceType);
467         return;
468     }
469     auto typeListeners = listeners_.find(serviceType);
470     for (size_t i = 0; i < typeListeners->second.size(); i++) {
471         if (typeListeners->second[i] == listener) {
472             typeListeners->second.erase(typeListeners->second.begin() + i);
473             if (typeListeners->second.empty()) {
474                 listeners_.erase(typeListeners);
475             }
476             break;
477         }
478     }
479     HILOGI("listener unregister success");
480     return;
481 }
482 
SetCallingTokenId(int32_t callingTokenId)483 void DSchedTransportSoftbusAdapter::SetCallingTokenId(int32_t callingTokenId)
484 {
485     callingTokenId_ = callingTokenId;
486 }
487 }  // namespace DistributedSchedule
488 }  // namespace OHOS
489