1 /*
2  * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef CHANNEL_H
17 #define CHANNEL_H
18 
19 #include <condition_variable>
20 #include <deque>
21 #include <memory>
22 #include <mutex>
23 #include <type_traits>
24 
25 namespace OHOS {
26 namespace Msdp {
27 namespace DeviceStatus {
28 
29 template<typename Event>
30 class Channel {
31     static_assert(std::is_enum_v<Event> || std::is_integral_v<Event> ||
32                   (std::is_class_v<Event> &&
33                    std::is_default_constructible_v<Event> &&
34                    std::is_copy_constructible_v<Event>));
35 
36 public:
37     enum ChannelError {
38         NO_ERROR = 0,
39         QUEUE_IS_FULL = -1,
40         NO_CHANNEL = -2,
41         INACTIVE_CHANNEL = -3,
42     };
43 
44     class Sender final {
45         friend class Channel<Event>;
46 
47     public:
48         Sender() = default;
49         ~Sender() = default;
50 
Sender(const Sender & other)51         Sender(const Sender &other)
52             : channel_(other.channel_)
53         {}
54 
Sender(Sender && other)55         Sender(Sender &&other)
56             : channel_(other.channel_)
57         {
58             other.channel_ = nullptr;
59         }
60 
61         Sender& operator=(const Sender &other)
62         {
63             channel_ = other.channel_;
64             return *this;
65         }
66 
67         Sender& operator=(Sender &&other)
68         {
69             channel_ = other.channel_;
70             other.channel_ = nullptr;
71             return *this;
72         }
73 
Send(const Event & event)74         int32_t Send(const Event &event)
75         {
76             if (channel_ == nullptr) {
77                 return ChannelError::NO_CHANNEL;
78             }
79             return channel_->Send(event);
80         }
81 
82     private:
Sender(std::shared_ptr<Channel<Event>> channel)83         Sender(std::shared_ptr<Channel<Event>> channel)
84             : channel_(channel)
85         {}
86 
87         std::shared_ptr<Channel<Event>> channel_ { nullptr };
88     };
89 
90     class Receiver final {
91         friend class Channel<Event>;
92 
93     public:
94         Receiver() = default;
95         ~Receiver() = default;
96 
Receiver(const Receiver & other)97         Receiver(const Receiver &other)
98             : channel_(other.channel_)
99         {}
100 
Receiver(Receiver && other)101         Receiver(Receiver &&other)
102             : channel_(other.channel_)
103         {
104             other.channel_ = nullptr;
105         }
106 
107         Receiver& operator=(const Receiver &other)
108         {
109             channel_ = other.channel_;
110             return *this;
111         }
112 
113         Receiver& operator=(Receiver &&other)
114         {
115             channel_ = other.channel_;
116             other.channel_ = nullptr;
117             return *this;
118         }
119 
Enable()120         void Enable()
121         {
122             if (channel_ != nullptr) {
123                 channel_->Enable();
124             }
125         }
126 
Disable()127         void Disable()
128         {
129             if (channel_ != nullptr) {
130                 channel_->Disable();
131             }
132         }
133 
Peek()134         Event Peek()
135         {
136             return (channel_ != nullptr ? channel_->Peek() : Event());
137         }
138 
Pop()139         void Pop()
140         {
141             if (channel_ != nullptr) {
142                 channel_->Pop();
143             }
144         }
145 
Receive()146         Event Receive()
147         {
148             return (channel_ != nullptr ? channel_->Receive() : Event());
149         }
150 
151     private:
Receiver(std::shared_ptr<Channel<Event>> channel)152         Receiver(std::shared_ptr<Channel<Event>> channel)
153             : channel_(channel)
154         {}
155 
156         std::shared_ptr<Channel<Event>> channel_ { nullptr };
157     };
158 
159     Channel() = default;
160     ~Channel() = default;
161 
162     static std::pair<Sender, Receiver> OpenChannel();
163 
164 private:
165     void Enable();
166     void Disable();
167     int32_t Send(const Event &event);
168     Event Peek();
169     void Pop();
170     Event Receive();
171 
172     static inline constexpr size_t QUEUE_CAPACITY { 1024 };
173 
174     std::mutex lock_;
175     bool isActive_ { false };
176     std::condition_variable empty_;
177     std::deque<Event> queue_;
178 };
179 
180 template<typename Event>
OpenChannel()181 std::pair<typename Channel<Event>::Sender, typename Channel<Event>::Receiver> Channel<Event>::OpenChannel()
182 {
183     std::shared_ptr<Channel<Event>> channel = std::make_shared<Channel<Event>>();
184     return std::make_pair(Channel<Event>::Sender(channel), Channel<Event>::Receiver(channel));
185 }
186 
187 template<typename Event>
Enable()188 void Channel<Event>::Enable()
189 {
190     std::unique_lock<std::mutex> lock(lock_);
191     isActive_ = true;
192 }
193 
194 template<typename Event>
Disable()195 void Channel<Event>::Disable()
196 {
197     std::unique_lock<std::mutex> lock(lock_);
198     isActive_ = false;
199     queue_.clear();
200 }
201 
202 template<typename Event>
Send(const Event & event)203 int32_t Channel<Event>::Send(const Event &event)
204 {
205     std::unique_lock<std::mutex> lock(lock_);
206     if (!isActive_) {
207         return ChannelError::INACTIVE_CHANNEL;
208     }
209     if (queue_.size() >= QUEUE_CAPACITY) {
210         return ChannelError::QUEUE_IS_FULL;
211     }
212     bool needNotify = queue_.empty();
213     queue_.push_back(event);
214     if (needNotify) {
215         empty_.notify_all();
216     }
217     return ChannelError::NO_ERROR;
218 }
219 
220 template<typename Event>
Peek()221 Event Channel<Event>::Peek()
222 {
223     std::unique_lock<std::mutex> lock(lock_);
224     if (queue_.empty()) {
225         empty_.wait(lock, [this] {
226             return !queue_.empty();
227         });
228     }
229     return queue_.front();
230 }
231 
232 template<typename Event>
Pop()233 void Channel<Event>::Pop()
234 {
235     std::unique_lock<std::mutex> lock(lock_);
236     if (queue_.empty()) {
237         empty_.wait(lock, [this] {
238             return !queue_.empty();
239         });
240     }
241     queue_.pop_front();
242 }
243 
244 template<typename Event>
Receive()245 Event Channel<Event>::Receive()
246 {
247     std::unique_lock<std::mutex> lock(lock_);
248     if (queue_.empty()) {
249         empty_.wait(lock, [this] {
250             return !queue_.empty();
251         });
252     }
253     Event event = queue_.front();
254     queue_.pop_front();
255     return event;
256 }
257 } // namespace DeviceStatus
258 } // namespace Msdp
259 } // namespace OHOS
260 #endif // CHANNEL_H