1 /* 2 * Copyright (c) 2021-2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef STREAM_SOCKET_H 17 #define STREAM_SOCKET_H 18 19 #include <condition_variable> 20 #include <map> 21 #include <mutex> 22 #include <queue> 23 #include <securec.h> 24 #include <utility> 25 26 #include "client_trans_stream.h" 27 #include "i_stream.h" 28 #include "session.h" 29 #include "stream_common.h" 30 31 namespace Communication { 32 namespace SoftBus { 33 class IStreamSocketListener { 34 public: 35 IStreamSocketListener() = default; 36 virtual ~IStreamSocketListener() = default; 37 38 virtual void OnStreamReceived(std::unique_ptr<IStream> stream) = 0; 39 virtual void OnStreamStatus(int status) = 0; 40 virtual int OnStreamHdrReceived(std::unique_ptr<char[]> header, int size) = 0; 41 virtual void OnQosEvent(int32_t eventId, int32_t tvCount, const QosTv *tvList) const = 0; 42 virtual void OnFrameStats(const StreamSendStats *data) = 0; 43 virtual void OnRippleStats(const TrafficStats *data) = 0; 44 }; 45 46 class IStreamSocket { 47 public: IStreamSocket()48 IStreamSocket() 49 { 50 listenFd_ = -1; 51 streamFd_ = -1; 52 epollFd_ = -1; 53 isStreamRecv_ = false; 54 streamType_ = INVALID; 55 isBlocked_ = false; 56 } ~IStreamSocket()57 virtual ~IStreamSocket() 58 { 59 if (sessionKey_.first != nullptr) { 60 (void)memset_s(sessionKey_.first, sessionKey_.second, 0, sessionKey_.second); 61 delete [] sessionKey_.first; 62 } 63 sessionKey_.first = nullptr; 64 } 65 66 virtual bool CreateClient(IpAndPort &local, int streamType, 67 std::pair<uint8_t*, uint32_t> sessionKey) = 0; // socket + bind 68 virtual bool CreateClient(IpAndPort &local, const IpAndPort &remote, int streamType, 69 std::pair<uint8_t*, uint32_t> sessionKey) = 0; 70 virtual bool CreateServer(IpAndPort &local, int streamType, std::pair<uint8_t*, uint32_t> sessionKey) = 0; 71 72 virtual void DestroyStreamSocket() = 0; 73 74 virtual bool Connect(const IpAndPort &remote) = 0; 75 virtual bool Send(std::unique_ptr<IStream> stream) = 0; 76 77 virtual bool SetOption(int type, const StreamAttr &value) = 0; 78 virtual int32_t SetMultiLayer(const void *para) = 0; 79 virtual StreamAttr GetOption(int type) const = 0; 80 81 virtual bool SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver) = 0; 82 83 protected: 84 static constexpr int MAX_EPOLL_NUM = 100; 85 static constexpr int MAX_CONNECTION_VALUE = 100; 86 static constexpr int FRAME_HEADER_LEN = 4; 87 static constexpr int BYTE_TO_BIT = 8; 88 static constexpr int INT_TO_BYTE = 0xff; 89 static constexpr int IPTOS_LOWDELAY = 0XBC; 90 static constexpr int DEFAULT_UDP_BUFFER_SIZE = 512 * 1024; 91 static constexpr int DEFAULT_UDP_BUFFER_RCV_SIZE = 1024 * 1024; 92 static constexpr int STREAM_BUFFER_THRESHOLD = 5; 93 94 virtual int CreateAndBindSocket(IpAndPort &local, bool isServer) = 0; 95 virtual bool Accept() = 0; 96 97 virtual int EpollTimeout(int fd, int timeout) = 0; 98 virtual int SetSocketEpollMode(int fd) = 0; 99 virtual std::unique_ptr<char[]> RecvStream(int dataLength) = 0; TakeStream()100 virtual std::unique_ptr<IStream> TakeStream() 101 { 102 std::unique_lock<std::mutex> lock(streamReceiveLock_); 103 while (isStreamRecv_) { 104 if (!streamReceiveBuffer_.empty()) { 105 auto item = std::move(streamReceiveBuffer_.front()); 106 streamReceiveBuffer_.pop(); 107 return item; 108 } 109 streamReceiveCv_.wait(lock); 110 } 111 return nullptr; 112 } 113 PutStream(std::unique_ptr<IStream> stream)114 virtual void PutStream(std::unique_ptr<IStream> stream) 115 { 116 std::lock_guard<std::mutex> lock(streamReceiveLock_); 117 if (isStreamRecv_) { 118 streamReceiveBuffer_.push(std::move(stream)); 119 streamReceiveCv_.notify_all(); 120 } 121 } 122 GetStreamNum()123 virtual int GetStreamNum() 124 { 125 std::lock_guard<std::mutex> lock(streamReceiveLock_); 126 return streamReceiveBuffer_.size(); 127 } 128 QuitStreamBuffer()129 virtual void QuitStreamBuffer() 130 { 131 std::lock_guard<std::mutex> lock(streamReceiveLock_); 132 isStreamRecv_ = false; 133 streamReceiveCv_.notify_all(); 134 } 135 136 int listenFd_; 137 int streamFd_; 138 int epollFd_; 139 IpAndPort localIpPort_ {}; 140 IpAndPort remoteIpPort_ {}; 141 bool isStreamRecv_; 142 std::shared_ptr<IStreamSocketListener> streamReceiver_ = nullptr; 143 std::queue<std::unique_ptr<IStream>> streamReceiveBuffer_; 144 std::mutex streamReceiveLock_; 145 std::condition_variable streamReceiveCv_; 146 int streamType_ = INVALID; 147 bool isBlocked_; 148 std::pair<uint8_t*, uint32_t> sessionKey_ = std::make_pair(nullptr, 0); 149 }; 150 } // namespace SoftBus 151 } // namespace Communication 152 153 #endif //STREAM_SOCKET_H