1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "app_pipe_mgr.h"
17
18 namespace OHOS {
19 namespace ObjectStore {
20 static const int MAX_TRANSFER_SIZE = 1024 * 1024 * 5;
StartWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)21 Status AppPipeMgr::StartWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo)
22 {
23 LOG_INFO("begin");
24 if (observer == nullptr || pipeInfo.pipeId.empty()) {
25 LOG_ERROR("argument invalid");
26 return Status::INVALID_ARGUMENT;
27 }
28 std::lock_guard<std::mutex> lock(dataBusMapMutex_);
29 auto it = dataBusMap_.find(pipeInfo.pipeId);
30 if (it == dataBusMap_.end()) {
31 LOG_ERROR("pipeid not found");
32 return Status::ERROR;
33 }
34 LOG_INFO("end");
35 return it->second->StartWatchDataChange(observer, pipeInfo);
36 }
37
38 // stop DataChangeListener to watch data change;
StopWatchDataChange(const AppDataChangeListener * observer,const PipeInfo & pipeInfo)39 Status AppPipeMgr::StopWatchDataChange(const AppDataChangeListener *observer, const PipeInfo &pipeInfo)
40 {
41 LOG_INFO("begin");
42 if (observer == nullptr || pipeInfo.pipeId.empty()) {
43 LOG_ERROR("argument invalid");
44 return Status::INVALID_ARGUMENT;
45 }
46 std::lock_guard<std::mutex> lock(dataBusMapMutex_);
47 auto it = dataBusMap_.find(pipeInfo.pipeId);
48 if (it == dataBusMap_.end()) {
49 LOG_ERROR("pipeid not found");
50 return Status::ERROR;
51 }
52 LOG_INFO("end");
53 return it->second->StopWatchDataChange(observer, pipeInfo);
54 }
55
56 // Send data to other device, function will be called back after sent to notify send result.
SendData(const PipeInfo & pipeInfo,const DeviceId & deviceId,const DataInfo & dataInfo,uint32_t totalLength,const MessageInfo & info)57 Status AppPipeMgr::SendData(const PipeInfo &pipeInfo, const DeviceId &deviceId, const DataInfo &dataInfo,
58 uint32_t totalLength, const MessageInfo &info)
59 {
60 if (dataInfo.length > MAX_TRANSFER_SIZE || dataInfo.length <= 0 || dataInfo.data == nullptr
61 || pipeInfo.pipeId.empty() || deviceId.deviceId.empty()) {
62 LOG_WARN("Input is invalid, maxSize:%{public}d, current size:%{public}u", MAX_TRANSFER_SIZE, dataInfo.length);
63 return Status::ERROR;
64 }
65 LOG_DEBUG("pipeInfo:%{public}s ,size:%{public}u", pipeInfo.pipeId.c_str(), dataInfo.length);
66 std::shared_ptr<AppPipeHandler> appPipeHandler;
67 {
68 std::lock_guard<std::mutex> lock(dataBusMapMutex_);
69 auto it = dataBusMap_.find(pipeInfo.pipeId);
70 if (it == dataBusMap_.end()) {
71 LOG_WARN("pipeInfo:%{public}s not found", pipeInfo.pipeId.c_str());
72 return Status::KEY_NOT_FOUND;
73 }
74 appPipeHandler = it->second;
75 }
76 return appPipeHandler->SendData(pipeInfo, deviceId, dataInfo, totalLength, info);
77 }
78
79 // start server
Start(const PipeInfo & pipeInfo)80 Status AppPipeMgr::Start(const PipeInfo &pipeInfo)
81 {
82 if (pipeInfo.pipeId.empty()) {
83 LOG_WARN("Start Failed, pipeInfo is empty.");
84 return Status::INVALID_ARGUMENT;
85 }
86 std::lock_guard<std::mutex> lock(dataBusMapMutex_);
87 auto it = dataBusMap_.find(pipeInfo.pipeId);
88 if (it != dataBusMap_.end()) {
89 LOG_WARN("repeated start, pipeInfo:%{public}s.", pipeInfo.pipeId.c_str());
90 return Status::REPEATED_REGISTER;
91 }
92 LOG_DEBUG("Start pipeInfo:%{public}s ", pipeInfo.pipeId.c_str());
93 auto handler = std::make_shared<AppPipeHandler>(pipeInfo);
94 int ret = handler->CreateSessionServer(pipeInfo.pipeId);
95 if (ret != 0) {
96 LOG_WARN("Start pipeInfo:%{public}s, failed ret:%{public}d.", pipeInfo.pipeId.c_str(), ret);
97 return Status::ILLEGAL_STATE;
98 }
99
100 dataBusMap_.insert(std::pair<std::string, std::shared_ptr<AppPipeHandler>>(pipeInfo.pipeId, handler));
101 return Status::SUCCESS;
102 }
103
104 // stop server
Stop(const PipeInfo & pipeInfo)105 Status AppPipeMgr::Stop(const PipeInfo &pipeInfo)
106 {
107 std::shared_ptr<AppPipeHandler> appPipeHandler;
108 {
109 std::lock_guard<std::mutex> lock(dataBusMapMutex_);
110 auto it = dataBusMap_.find(pipeInfo.pipeId);
111 if (it == dataBusMap_.end()) {
112 LOG_WARN("pipeInfo:%{public}s not found", pipeInfo.pipeId.c_str());
113 return Status::KEY_NOT_FOUND;
114 }
115 appPipeHandler = it->second;
116 int ret = appPipeHandler->RemoveSessionServer(pipeInfo.pipeId);
117 if (ret != 0) {
118 LOG_WARN("Stop pipeInfo:%{public}s ret:%{public}d.", pipeInfo.pipeId.c_str(), ret);
119 return Status::ERROR;
120 }
121 dataBusMap_.erase(pipeInfo.pipeId);
122 return Status::SUCCESS;
123 }
124 return Status::KEY_NOT_FOUND;
125 }
126
IsSameStartedOnPeer(const struct PipeInfo & pipeInfo,const struct DeviceId & peer)127 bool AppPipeMgr::IsSameStartedOnPeer(const struct PipeInfo &pipeInfo, const struct DeviceId &peer)
128 {
129 LOG_INFO("start");
130 if (pipeInfo.pipeId.empty() || peer.deviceId.empty()) {
131 LOG_ERROR("pipeId or deviceId is empty. Return false.");
132 return false;
133 }
134 LOG_INFO("pipeInfo == [%{public}s]", pipeInfo.pipeId.c_str());
135 std::shared_ptr<AppPipeHandler> appPipeHandler;
136 {
137 std::lock_guard<std::mutex> lock(dataBusMapMutex_);
138 auto it = dataBusMap_.find(pipeInfo.pipeId);
139 if (it == dataBusMap_.end()) {
140 LOG_ERROR("pipeInfo:%{public}s not found. Return false.", pipeInfo.pipeId.c_str());
141 return false;
142 }
143 appPipeHandler = it->second;
144 }
145 return appPipeHandler->IsSameStartedOnPeer(pipeInfo, peer);
146 }
147 } // namespace ObjectStore
148 } // namespace OHOS
149