1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 //! Bounded channel
15
16 pub(crate) mod array;
17
18 use std::task::{Context, Poll};
19
20 use crate::futures::poll_fn;
21 use crate::sync::error::{RecvError, SendError, TryRecvError, TrySendError};
22 use crate::sync::mpsc::bounded::array::Array;
23 use crate::sync::mpsc::{channel, Container, Rx, Tx};
24 cfg_time!(
25 use crate::sync::error::{RecvTimeoutError, SendTimeoutError};
26 use crate::sync::mpsc::bounded::array::SendPosition;
27 use crate::time::timeout;
28 use std::time::Duration;
29 );
30
31 /// The sender of bounded channel.
32 /// A [`BoundedSender`] and [`BoundedReceiver`] handle pair are created by the
33 /// [`bounded_channel`] function.
34 ///
35 /// # Examples
36 ///
37 /// ```
38 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
39 /// async fn io_func() {
40 /// let (tx, mut rx) = bounded_channel(1);
41 /// let tx2 = tx.clone();
42 /// let handle = ylong_runtime::spawn(async move {
43 /// assert!(tx.send(1).await.is_ok());
44 /// assert!(!tx.is_closed());
45 /// assert!(tx.is_same(&tx2));
46 /// });
47 /// let handle2 = ylong_runtime::spawn(async move {
48 /// assert_eq!(rx.recv().await, Ok(1));
49 /// });
50 /// let _ = ylong_runtime::block_on(handle);
51 /// let _ = ylong_runtime::block_on(handle2);
52 /// }
53 /// ```
54 pub struct BoundedSender<T> {
55 channel: Tx<Array<T>>,
56 }
57
58 impl<T> Clone for BoundedSender<T> {
clone(&self) -> Self59 fn clone(&self) -> Self {
60 BoundedSender {
61 channel: self.channel.clone(),
62 }
63 }
64 }
65
66 /// The receiver of bounded channel.
67 /// A [`BoundedSender`] and [`BoundedReceiver`] handle pair are created by the
68 /// [`bounded_channel`] function.
69 ///
70 /// # Examples
71 ///
72 /// ```
73 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
74 /// async fn io_func() {
75 /// let (tx, mut rx) = bounded_channel(1);
76 /// assert!(rx.try_recv().is_err());
77 /// let handle = ylong_runtime::spawn(async move {
78 /// assert!(tx.send(1).await.is_ok());
79 /// });
80 /// let handle2 = ylong_runtime::spawn(async move {
81 /// assert_eq!(rx.len(), 1);
82 /// assert_eq!(rx.recv().await, Ok(1));
83 /// });
84 /// let _ = ylong_runtime::block_on(handle);
85 /// let _ = ylong_runtime::block_on(handle2);
86 /// }
87 /// ```
88 pub struct BoundedReceiver<T> {
89 channel: Rx<Array<T>>,
90 }
91
92 /// Creates a new mpsc channel, and returns the `Sender` and `Receiver` handle
93 /// pair.
94 ///
95 /// The channel is bounded with the passed in capacity.
96 ///
97 /// # Panics
98 ///
99 /// Panics if the new capacity is initialized to zero.
100 ///
101 /// # Examples
102 ///
103 /// ```
104 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
105 /// async fn io_func() {
106 /// let (tx, mut rx) = bounded_channel(1);
107 /// let handle = ylong_runtime::spawn(async move {
108 /// assert_eq!(rx.recv().await, Ok(1));
109 /// });
110 /// let handle2 = ylong_runtime::spawn(async move {
111 /// assert!(tx.send(1).await.is_ok());
112 /// });
113 /// let _ = ylong_runtime::block_on(handle);
114 /// let _ = ylong_runtime::block_on(handle2);
115 /// }
116 /// ```
bounded_channel<T>(number: usize) -> (BoundedSender<T>, BoundedReceiver<T>)117 pub fn bounded_channel<T>(number: usize) -> (BoundedSender<T>, BoundedReceiver<T>) {
118 let array = Array::new(number);
119 let (tx, rx) = channel(array);
120 (BoundedSender::new(tx), BoundedReceiver::new(rx))
121 }
122
123 impl<T> BoundedSender<T> {
new(channel: Tx<Array<T>>) -> BoundedSender<T>124 fn new(channel: Tx<Array<T>>) -> BoundedSender<T> {
125 BoundedSender { channel }
126 }
127
128 /// Attempts to send a value to the associated [`BoundedReceiver`].
129 ///
130 /// If the receiver has been closed or the channel is full, this method will
131 /// return an error containing sent value.
132 ///
133 /// # Return value
134 /// * `Ok(T)` if receiving a value successfully.
135 /// * `Err(TrySendError::Full(T))` if the buffer of channel is full.
136 /// * `Err(TrySendError::Closed(T))` if all senders have been dropped.
137 ///
138 /// # Examples
139 ///
140 /// ```
141 /// use ylong_runtime::sync::error::TryRecvError;
142 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
143 /// let (tx, mut rx) = bounded_channel(1);
144 /// match rx.try_recv() {
145 /// Err(TryRecvError::Empty) => {}
146 /// _ => panic!("This won't happen"),
147 /// }
148 /// tx.try_send(1).unwrap();
149 /// match rx.try_recv() {
150 /// Ok(_) => {}
151 /// _ => panic!("This won't happen"),
152 /// }
153 /// ```
try_send(&self, value: T) -> Result<(), TrySendError<T>>154 pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
155 self.channel.try_send(value)
156 }
157
158 /// Sends a value to the associated receiver
159 ///
160 /// If the receiver has been closed, this method will return an error
161 /// containing the sent value.
162 ///
163 /// # Return value
164 /// * `Ok()` if sending a value successfully.
165 /// * `Err(SendError::Closed(T))` if receiver has been dropped or closed.
166 ///
167 /// # Examples
168 ///
169 /// ```
170 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
171 /// async fn io_func() {
172 /// let (tx, mut rx) = bounded_channel(1);
173 /// let handle = ylong_runtime::spawn(async move {
174 /// assert_eq!(rx.recv().await, Ok(1));
175 /// });
176 /// let handle2 = ylong_runtime::spawn(async move {
177 /// assert!(tx.send(1).await.is_ok());
178 /// });
179 /// let _ = ylong_runtime::block_on(handle);
180 /// let _ = ylong_runtime::block_on(handle2);
181 /// }
182 /// ```
send(&self, value: T) -> Result<(), SendError<T>>183 pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
184 self.channel.send(value).await
185 }
186
187 /// Attempts to send a value to the associated receiver in a limited amount
188 /// of time.
189 ///
190 /// If the receiver has been closed or the time limit has been passed, this
191 /// method will return an error containing the sent value.
192 ///
193 /// # Return value
194 /// * `Ok()` if sending a value successfully.
195 /// * `Err(SendError::Closed(T))` if receiver has been dropped or closed.
196 /// * `Err(SendError::TimeOut(T))` if time limit has been passed.
197 ///
198 /// # Examples
199 ///
200 /// ```
201 /// use std::time::Duration;
202 ///
203 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
204 /// async fn io_func() {
205 /// let (tx, mut rx) = bounded_channel(1);
206 /// let handle = ylong_runtime::spawn(async move {
207 /// assert_eq!(rx.recv().await, Ok(1));
208 /// });
209 /// let handle2 = ylong_runtime::spawn(async move {
210 /// assert!(tx.send_timeout(1, Duration::from_millis(10)).await.is_ok());
211 /// });
212 /// let _ = ylong_runtime::block_on(handle);
213 /// let _ = ylong_runtime::block_on(handle2);
214 /// }
215 /// ```
216 #[cfg(feature = "time")]
send_timeout(&self, value: T, time: Duration) -> Result<(), SendTimeoutError<T>>217 pub async fn send_timeout(&self, value: T, time: Duration) -> Result<(), SendTimeoutError<T>> {
218 match timeout(time, self.channel.get_position()).await {
219 Ok(res) => match res {
220 SendPosition::Pos(index) => {
221 self.channel.write(index, value);
222 Ok(())
223 }
224 SendPosition::Closed => Err(SendTimeoutError::Closed(value)),
225 SendPosition::Full => unreachable!(),
226 },
227 Err(_) => Err(SendTimeoutError::TimeOut(value)),
228 }
229 }
230
231 /// Checks whether the channel is closed. If so, the sender could not
232 /// send values anymore. It returns true after the [`BoundedReceiver`] is
233 /// dropped or the [`close`] method gets called.
234 ///
235 /// [`close`]: BoundedReceiver::close
236 ///
237 /// # Examples
238 ///
239 /// ```
240 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
241 /// let (tx, rx) = bounded_channel::<isize>(1);
242 /// assert!(!tx.is_closed());
243 /// drop(rx);
244 /// assert!(tx.is_closed());
245 /// ```
is_closed(&self) -> bool246 pub fn is_closed(&self) -> bool {
247 self.channel.is_close()
248 }
249
250 /// Checks whether the sender and another sender belong to the same channel.
251 ///
252 /// # Examples
253 ///
254 /// ```
255 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
256 /// let (tx, rx) = bounded_channel::<isize>(1);
257 /// let tx2 = tx.clone();
258 /// assert!(tx.is_same(&tx2));
259 /// ```
is_same(&self, other: &Self) -> bool260 pub fn is_same(&self, other: &Self) -> bool {
261 self.channel.is_same(&other.channel)
262 }
263
264 /// Gets the capacity of the channel.
265 ///
266 /// # Examples
267 ///
268 /// ```
269 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
270 /// let (tx, rx) = bounded_channel::<isize>(5);
271 /// assert_eq!(tx.capacity(), 5);
272 /// ```
capacity(&self) -> usize273 pub fn capacity(&self) -> usize {
274 self.channel.capacity()
275 }
276
277 /// Gets the number of values in the channel.
278 ///
279 /// # Examples
280 ///
281 /// ```
282 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
283 /// let (tx, rx) = bounded_channel(5);
284 /// assert_eq!(tx.len(), 0);
285 /// tx.try_send(1).unwrap();
286 /// assert_eq!(tx.len(), 1);
287 /// ```
len(&self) -> usize288 pub fn len(&self) -> usize {
289 self.channel.len()
290 }
291
292 /// Returns `true` if the channel contains no elements.
293 ///
294 /// # Examples
295 ///
296 /// ```
297 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
298 /// let (tx, rx) = bounded_channel(5);
299 /// assert!(tx.is_empty());
300 /// tx.try_send(1).unwrap();
301 /// assert!(!tx.is_empty());
302 /// ```
is_empty(&self) -> bool303 pub fn is_empty(&self) -> bool {
304 self.len() == 0
305 }
306 }
307
308 impl<T> Drop for BoundedSender<T> {
drop(&mut self)309 fn drop(&mut self) {
310 self.channel.close();
311 }
312 }
313
314 impl<T> BoundedReceiver<T> {
new(channel: Rx<Array<T>>) -> BoundedReceiver<T>315 fn new(channel: Rx<Array<T>>) -> BoundedReceiver<T> {
316 BoundedReceiver { channel }
317 }
318
319 /// Get the number of values in the channel.
320 ///
321 /// # Examples
322 ///
323 /// ```
324 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
325 /// let (tx, mut rx) = bounded_channel(3);
326 /// tx.try_send(1).unwrap();
327 /// tx.try_send(2).unwrap();
328 /// assert_eq!(rx.len(), 2);
329 /// ```
len(&self) -> usize330 pub fn len(&self) -> usize {
331 self.channel.len()
332 }
333
334 /// Returns `true` if the channel contains no elements.
335 ///
336 /// # Examples
337 ///
338 /// ```
339 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
340 /// let (tx, rx) = bounded_channel(5);
341 /// assert!(rx.is_empty());
342 /// tx.try_send(1).unwrap();
343 /// assert!(!rx.is_empty());
344 /// ```
is_empty(&self) -> bool345 pub fn is_empty(&self) -> bool {
346 self.len() == 0
347 }
348
349 /// Attempts to receive a value from the associated [`BoundedSender`].
350 ///
351 /// # Return value
352 /// * `Ok(T)` if receiving a value successfully.
353 /// * `Err(TryRecvError::Empty)` if no value has been sent yet.
354 /// * `Err(TryRecvError::Closed)` if all senders have been dropped.
355 ///
356 /// # Examples
357 ///
358 /// ```
359 /// use ylong_runtime::sync::error::TryRecvError;
360 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
361 /// let (tx, mut rx) = bounded_channel(1);
362 /// match rx.try_recv() {
363 /// Err(TryRecvError::Empty) => {}
364 /// _ => panic!("This won't happen"),
365 /// }
366 /// tx.try_send(1).unwrap();
367 /// match rx.try_recv() {
368 /// Ok(_) => {}
369 /// _ => panic!("This won't happen"),
370 /// }
371 /// drop(tx);
372 /// match rx.try_recv() {
373 /// Err(TryRecvError::Closed) => {}
374 /// _ => panic!("This won't happen"),
375 /// }
376 /// ```
try_recv(&mut self) -> Result<T, TryRecvError>377 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
378 self.channel.try_recv()
379 }
380
381 /// Polls to receive a value from the associated [`BoundedSender`].
382 ///
383 /// When the sender has not yet sent a message, calling this method will
384 /// return pending, and the waker from the Context will receive a
385 /// wakeup when the message arrives or when the channel is closed. Multiple
386 /// calls to this method, only the waker from the last call will receive a
387 /// wakeup.
388 ///
389 /// # Return value
390 /// * `Poll::Pending` if no messages in the channel now, but the channel is
391 /// not closed.
392 /// * `Poll::Ready(Ok(T))` if receiving a value successfully.
393 /// * `Poll::Ready(Err(RecvError))` in the following situations: 1. All
394 /// senders have been dropped or the channel is closed. 2. No messages
395 /// remaining.
396 ///
397 /// # Examples
398 ///
399 /// ```
400 /// use ylong_runtime::futures::poll_fn;
401 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
402 /// async fn io_func() {
403 /// let (tx, mut rx) = bounded_channel(1);
404 /// let handle = ylong_runtime::spawn(async move {
405 /// let msg = poll_fn(|cx| rx.poll_recv(cx)).await;
406 /// assert_eq!(msg, Ok(1));
407 /// });
408 /// tx.try_send(1).unwrap();
409 /// let _ = ylong_runtime::block_on(handle);
410 /// }
411 /// ```
poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>>412 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
413 self.channel.poll_recv(cx)
414 }
415
416 /// Receives a value from the associated [`BoundedSender`].
417 ///
418 /// The `receiver` can still receive all sent messages in the channel after
419 /// the channel is closed.
420 ///
421 /// # Return value
422 /// * `Ok(T)` if receiving a value successfully.
423 /// * `Err(RecvError)` in the following situations: 1. All senders have been
424 /// dropped or the channel is closed. 2. No messages remaining.
425 ///
426 /// # Examples
427 ///
428 /// ```
429 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
430 /// async fn io_func() {
431 /// let (tx, mut rx) = bounded_channel(1);
432 /// let handle = ylong_runtime::spawn(async move {
433 /// assert_eq!(rx.recv().await, Ok(1));
434 /// });
435 /// tx.try_send(1).unwrap();
436 /// let _ = ylong_runtime::block_on(handle);
437 /// }
438 /// ```
recv(&mut self) -> Result<T, RecvError>439 pub async fn recv(&mut self) -> Result<T, RecvError> {
440 poll_fn(|cx| self.channel.poll_recv(cx)).await
441 }
442
443 /// Attempts to receive a value from the associated [`BoundedSender`] in a
444 /// limited amount of time.
445 ///
446 /// The `receiver` can still receive all sent messages in the channel after
447 /// the channel is closed.
448 ///
449 /// # Return value
450 /// * `Ok(T)` if receiving a value successfully.
451 /// * `Err(RecvTimeoutError::Closed)` if all senders have been dropped.
452 /// * `Err(RecvTimeoutError::TimeOut)` if time limit has been passed.
453 ///
454 /// # Examples
455 ///
456 /// ```
457 /// use std::time::Duration;
458 ///
459 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
460 /// async fn io_func() {
461 /// let (tx, mut rx) = bounded_channel(1);
462 /// let handle = ylong_runtime::spawn(async move {
463 /// tx.try_send(1).unwrap();
464 /// assert_eq!(rx.recv_timeout(Duration::from_millis(10)).await, Ok(1));
465 /// });
466 /// let _ = ylong_runtime::block_on(handle);
467 /// }
468 /// ```
469 #[cfg(feature = "time")]
recv_timeout(&mut self, time: Duration) -> Result<T, RecvTimeoutError>470 pub async fn recv_timeout(&mut self, time: Duration) -> Result<T, RecvTimeoutError> {
471 match timeout(time, self.recv()).await {
472 Ok(res) => res.map_err(|_| RecvTimeoutError::Closed),
473 Err(_) => Err(RecvTimeoutError::Timeout),
474 }
475 }
476
477 /// Closes the channel, prevents the `Sender` from sending more values.
478 ///
479 /// The `Sender` will fail to call [`send`] or [`try_send`] after the
480 /// `Receiver` called `close`. It will do nothing if the channel is
481 /// already closed.
482 ///
483 /// [`send`]: BoundedSender::send
484 /// [`try_send`]: BoundedSender::try_send
485 ///
486 /// # Examples
487 /// ```
488 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
489 /// async fn io_func() {
490 /// let (tx, mut rx) = bounded_channel(1);
491 /// assert!(!tx.is_closed());
492 ///
493 /// rx.close();
494 ///
495 /// assert!(tx.is_closed());
496 /// assert!(tx.try_send("no receive").is_err());
497 /// }
498 /// ```
499 ///
500 /// Receives a value sent **before** calling `close`
501 ///
502 /// ```
503 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
504 /// async fn io_func() {
505 /// let (tx, mut rx) = bounded_channel(1);
506 /// assert!(tx.try_send("Hello").is_ok());
507 ///
508 /// rx.close();
509 ///
510 /// let msg = rx.try_recv().unwrap();
511 /// assert_eq!(msg, "Hello");
512 /// }
513 /// ```
close(&mut self)514 pub fn close(&mut self) {
515 self.channel.close();
516 }
517 }
518
519 impl<T> Drop for BoundedReceiver<T> {
drop(&mut self)520 fn drop(&mut self) {
521 self.channel.close();
522 }
523 }
524