1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! Bounded channel
15 
16 pub(crate) mod array;
17 
18 use std::task::{Context, Poll};
19 
20 use crate::futures::poll_fn;
21 use crate::sync::error::{RecvError, SendError, TryRecvError, TrySendError};
22 use crate::sync::mpsc::bounded::array::Array;
23 use crate::sync::mpsc::{channel, Container, Rx, Tx};
24 cfg_time!(
25     use crate::sync::error::{RecvTimeoutError, SendTimeoutError};
26     use crate::sync::mpsc::bounded::array::SendPosition;
27     use crate::time::timeout;
28     use std::time::Duration;
29 );
30 
31 /// The sender of bounded channel.
32 /// A [`BoundedSender`] and [`BoundedReceiver`] handle pair are created by the
33 /// [`bounded_channel`] function.
34 ///
35 /// # Examples
36 ///
37 /// ```
38 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
39 /// async fn io_func() {
40 ///     let (tx, mut rx) = bounded_channel(1);
41 ///     let tx2 = tx.clone();
42 ///     let handle = ylong_runtime::spawn(async move {
43 ///         assert!(tx.send(1).await.is_ok());
44 ///         assert!(!tx.is_closed());
45 ///         assert!(tx.is_same(&tx2));
46 ///     });
47 ///     let handle2 = ylong_runtime::spawn(async move {
48 ///         assert_eq!(rx.recv().await, Ok(1));
49 ///     });
50 ///     let _ = ylong_runtime::block_on(handle);
51 ///     let _ = ylong_runtime::block_on(handle2);
52 /// }
53 /// ```
54 pub struct BoundedSender<T> {
55     channel: Tx<Array<T>>,
56 }
57 
58 impl<T> Clone for BoundedSender<T> {
clone(&self) -> Self59     fn clone(&self) -> Self {
60         BoundedSender {
61             channel: self.channel.clone(),
62         }
63     }
64 }
65 
66 /// The receiver of bounded channel.
67 /// A [`BoundedSender`] and [`BoundedReceiver`] handle pair are created by the
68 /// [`bounded_channel`] function.
69 ///
70 /// # Examples
71 ///
72 /// ```
73 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
74 /// async fn io_func() {
75 ///     let (tx, mut rx) = bounded_channel(1);
76 ///     assert!(rx.try_recv().is_err());
77 ///     let handle = ylong_runtime::spawn(async move {
78 ///         assert!(tx.send(1).await.is_ok());
79 ///     });
80 ///     let handle2 = ylong_runtime::spawn(async move {
81 ///         assert_eq!(rx.len(), 1);
82 ///         assert_eq!(rx.recv().await, Ok(1));
83 ///     });
84 ///     let _ = ylong_runtime::block_on(handle);
85 ///     let _ = ylong_runtime::block_on(handle2);
86 /// }
87 /// ```
88 pub struct BoundedReceiver<T> {
89     channel: Rx<Array<T>>,
90 }
91 
92 /// Creates a new mpsc channel, and returns the `Sender` and `Receiver` handle
93 /// pair.
94 ///
95 /// The channel is bounded with the passed in capacity.
96 ///
97 /// # Panics
98 ///
99 /// Panics if the new capacity is initialized to zero.
100 ///
101 /// # Examples
102 ///
103 /// ```
104 /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
105 /// async fn io_func() {
106 ///     let (tx, mut rx) = bounded_channel(1);
107 ///     let handle = ylong_runtime::spawn(async move {
108 ///         assert_eq!(rx.recv().await, Ok(1));
109 ///     });
110 ///     let handle2 = ylong_runtime::spawn(async move {
111 ///         assert!(tx.send(1).await.is_ok());
112 ///     });
113 ///     let _ = ylong_runtime::block_on(handle);
114 ///     let _ = ylong_runtime::block_on(handle2);
115 /// }
116 /// ```
bounded_channel<T>(number: usize) -> (BoundedSender<T>, BoundedReceiver<T>)117 pub fn bounded_channel<T>(number: usize) -> (BoundedSender<T>, BoundedReceiver<T>) {
118     let array = Array::new(number);
119     let (tx, rx) = channel(array);
120     (BoundedSender::new(tx), BoundedReceiver::new(rx))
121 }
122 
123 impl<T> BoundedSender<T> {
new(channel: Tx<Array<T>>) -> BoundedSender<T>124     fn new(channel: Tx<Array<T>>) -> BoundedSender<T> {
125         BoundedSender { channel }
126     }
127 
128     /// Attempts to send a value to the associated [`BoundedReceiver`].
129     ///
130     /// If the receiver has been closed or the channel is full, this method will
131     /// return an error containing sent value.
132     ///
133     /// # Return value
134     /// * `Ok(T)` if receiving a value successfully.
135     /// * `Err(TrySendError::Full(T))` if the buffer of channel is full.
136     /// * `Err(TrySendError::Closed(T))` if all senders have been dropped.
137     ///
138     /// # Examples
139     ///
140     /// ```
141     /// use ylong_runtime::sync::error::TryRecvError;
142     /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
143     /// let (tx, mut rx) = bounded_channel(1);
144     /// match rx.try_recv() {
145     ///     Err(TryRecvError::Empty) => {}
146     ///     _ => panic!("This won't happen"),
147     /// }
148     /// tx.try_send(1).unwrap();
149     /// match rx.try_recv() {
150     ///     Ok(_) => {}
151     ///     _ => panic!("This won't happen"),
152     /// }
153     /// ```
try_send(&self, value: T) -> Result<(), TrySendError<T>>154     pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
155         self.channel.try_send(value)
156     }
157 
158     /// Sends a value to the associated receiver
159     ///
160     /// If the receiver has been closed, this method will return an error
161     /// containing the sent value.
162     ///
163     /// # Return value
164     /// * `Ok()` if sending a value successfully.
165     /// * `Err(SendError::Closed(T))` if receiver has been dropped or closed.
166     ///
167     /// # Examples
168     ///
169     /// ```
170     /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
171     /// async fn io_func() {
172     ///     let (tx, mut rx) = bounded_channel(1);
173     ///     let handle = ylong_runtime::spawn(async move {
174     ///         assert_eq!(rx.recv().await, Ok(1));
175     ///     });
176     ///     let handle2 = ylong_runtime::spawn(async move {
177     ///         assert!(tx.send(1).await.is_ok());
178     ///     });
179     ///     let _ = ylong_runtime::block_on(handle);
180     ///     let _ = ylong_runtime::block_on(handle2);
181     /// }
182     /// ```
send(&self, value: T) -> Result<(), SendError<T>>183     pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
184         self.channel.send(value).await
185     }
186 
187     /// Attempts to send a value to the associated receiver in a limited amount
188     /// of time.
189     ///
190     /// If the receiver has been closed or the time limit has been passed, this
191     /// method will return an error containing the sent value.
192     ///
193     /// # Return value
194     /// * `Ok()` if sending a value successfully.
195     /// * `Err(SendError::Closed(T))` if receiver has been dropped or closed.
196     /// * `Err(SendError::TimeOut(T))` if time limit has been passed.
197     ///
198     /// # Examples
199     ///
200     /// ```
201     /// use std::time::Duration;
202     ///
203     /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
204     /// async fn io_func() {
205     ///     let (tx, mut rx) = bounded_channel(1);
206     ///     let handle = ylong_runtime::spawn(async move {
207     ///         assert_eq!(rx.recv().await, Ok(1));
208     ///     });
209     ///     let handle2 = ylong_runtime::spawn(async move {
210     ///         assert!(tx.send_timeout(1, Duration::from_millis(10)).await.is_ok());
211     ///     });
212     ///     let _ = ylong_runtime::block_on(handle);
213     ///     let _ = ylong_runtime::block_on(handle2);
214     /// }
215     /// ```
216     #[cfg(feature = "time")]
send_timeout(&self, value: T, time: Duration) -> Result<(), SendTimeoutError<T>>217     pub async fn send_timeout(&self, value: T, time: Duration) -> Result<(), SendTimeoutError<T>> {
218         match timeout(time, self.channel.get_position()).await {
219             Ok(res) => match res {
220                 SendPosition::Pos(index) => {
221                     self.channel.write(index, value);
222                     Ok(())
223                 }
224                 SendPosition::Closed => Err(SendTimeoutError::Closed(value)),
225                 SendPosition::Full => unreachable!(),
226             },
227             Err(_) => Err(SendTimeoutError::TimeOut(value)),
228         }
229     }
230 
231     /// Checks whether the channel is closed. If so, the sender could not
232     /// send values anymore. It returns true after the [`BoundedReceiver`] is
233     /// dropped or the [`close`] method gets called.
234     ///
235     /// [`close`]: BoundedReceiver::close
236     ///
237     /// # Examples
238     ///
239     /// ```
240     /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
241     /// let (tx, rx) = bounded_channel::<isize>(1);
242     /// assert!(!tx.is_closed());
243     /// drop(rx);
244     /// assert!(tx.is_closed());
245     /// ```
is_closed(&self) -> bool246     pub fn is_closed(&self) -> bool {
247         self.channel.is_close()
248     }
249 
250     /// Checks whether the sender and another sender belong to the same channel.
251     ///
252     /// # Examples
253     ///
254     /// ```
255     /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
256     /// let (tx, rx) = bounded_channel::<isize>(1);
257     /// let tx2 = tx.clone();
258     /// assert!(tx.is_same(&tx2));
259     /// ```
is_same(&self, other: &Self) -> bool260     pub fn is_same(&self, other: &Self) -> bool {
261         self.channel.is_same(&other.channel)
262     }
263 
264     /// Gets the capacity of the channel.
265     ///
266     /// # Examples
267     ///
268     /// ```
269     /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
270     /// let (tx, rx) = bounded_channel::<isize>(5);
271     /// assert_eq!(tx.capacity(), 5);
272     /// ```
capacity(&self) -> usize273     pub fn capacity(&self) -> usize {
274         self.channel.capacity()
275     }
276 
277     /// Gets the number of values in the channel.
278     ///
279     /// # Examples
280     ///
281     /// ```
282     /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
283     /// let (tx, rx) = bounded_channel(5);
284     /// assert_eq!(tx.len(), 0);
285     /// tx.try_send(1).unwrap();
286     /// assert_eq!(tx.len(), 1);
287     /// ```
len(&self) -> usize288     pub fn len(&self) -> usize {
289         self.channel.len()
290     }
291 
292     /// Returns `true` if the channel contains no elements.
293     ///
294     /// # Examples
295     ///
296     /// ```
297     /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
298     /// let (tx, rx) = bounded_channel(5);
299     /// assert!(tx.is_empty());
300     /// tx.try_send(1).unwrap();
301     /// assert!(!tx.is_empty());
302     /// ```
is_empty(&self) -> bool303     pub fn is_empty(&self) -> bool {
304         self.len() == 0
305     }
306 }
307 
308 impl<T> Drop for BoundedSender<T> {
drop(&mut self)309     fn drop(&mut self) {
310         self.channel.close();
311     }
312 }
313 
314 impl<T> BoundedReceiver<T> {
new(channel: Rx<Array<T>>) -> BoundedReceiver<T>315     fn new(channel: Rx<Array<T>>) -> BoundedReceiver<T> {
316         BoundedReceiver { channel }
317     }
318 
319     /// Get the number of values in the channel.
320     ///
321     /// # Examples
322     ///
323     /// ```
324     /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
325     /// let (tx, mut rx) = bounded_channel(3);
326     /// tx.try_send(1).unwrap();
327     /// tx.try_send(2).unwrap();
328     /// assert_eq!(rx.len(), 2);
329     /// ```
len(&self) -> usize330     pub fn len(&self) -> usize {
331         self.channel.len()
332     }
333 
334     /// Returns `true` if the channel contains no elements.
335     ///
336     /// # Examples
337     ///
338     /// ```
339     /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
340     /// let (tx, rx) = bounded_channel(5);
341     /// assert!(rx.is_empty());
342     /// tx.try_send(1).unwrap();
343     /// assert!(!rx.is_empty());
344     /// ```
is_empty(&self) -> bool345     pub fn is_empty(&self) -> bool {
346         self.len() == 0
347     }
348 
349     /// Attempts to receive a value from the associated [`BoundedSender`].
350     ///
351     /// # Return value
352     /// * `Ok(T)` if receiving a value successfully.
353     /// * `Err(TryRecvError::Empty)` if no value has been sent yet.
354     /// * `Err(TryRecvError::Closed)` if all senders have been dropped.
355     ///
356     /// # Examples
357     ///
358     /// ```
359     /// use ylong_runtime::sync::error::TryRecvError;
360     /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
361     /// let (tx, mut rx) = bounded_channel(1);
362     /// match rx.try_recv() {
363     ///     Err(TryRecvError::Empty) => {}
364     ///     _ => panic!("This won't happen"),
365     /// }
366     /// tx.try_send(1).unwrap();
367     /// match rx.try_recv() {
368     ///     Ok(_) => {}
369     ///     _ => panic!("This won't happen"),
370     /// }
371     /// drop(tx);
372     /// match rx.try_recv() {
373     ///     Err(TryRecvError::Closed) => {}
374     ///     _ => panic!("This won't happen"),
375     /// }
376     /// ```
try_recv(&mut self) -> Result<T, TryRecvError>377     pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
378         self.channel.try_recv()
379     }
380 
381     /// Polls to receive a value from the associated [`BoundedSender`].
382     ///
383     /// When the sender has not yet sent a message, calling this method will
384     /// return pending, and the waker from the Context will receive a
385     /// wakeup when the message arrives or when the channel is closed. Multiple
386     /// calls to this method, only the waker from the last call will receive a
387     /// wakeup.
388     ///
389     /// # Return value
390     /// * `Poll::Pending` if no messages in the channel now, but the channel is
391     ///   not closed.
392     /// * `Poll::Ready(Ok(T))` if receiving a value successfully.
393     /// * `Poll::Ready(Err(RecvError))` in the following situations: 1. All
394     ///   senders have been dropped or the channel is closed. 2. No messages
395     ///   remaining.
396     ///
397     /// # Examples
398     ///
399     /// ```
400     /// use ylong_runtime::futures::poll_fn;
401     /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
402     /// async fn io_func() {
403     ///     let (tx, mut rx) = bounded_channel(1);
404     ///     let handle = ylong_runtime::spawn(async move {
405     ///         let msg = poll_fn(|cx| rx.poll_recv(cx)).await;
406     ///         assert_eq!(msg, Ok(1));
407     ///     });
408     ///     tx.try_send(1).unwrap();
409     ///     let _ = ylong_runtime::block_on(handle);
410     /// }
411     /// ```
poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>>412     pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
413         self.channel.poll_recv(cx)
414     }
415 
416     /// Receives a value from the associated [`BoundedSender`].
417     ///
418     /// The `receiver` can still receive all sent messages in the channel after
419     /// the channel is closed.
420     ///
421     /// # Return value
422     /// * `Ok(T)` if receiving a value successfully.
423     /// * `Err(RecvError)` in the following situations: 1. All senders have been
424     ///   dropped or the channel is closed. 2. No messages remaining.
425     ///
426     /// # Examples
427     ///
428     /// ```
429     /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
430     /// async fn io_func() {
431     ///     let (tx, mut rx) = bounded_channel(1);
432     ///     let handle = ylong_runtime::spawn(async move {
433     ///         assert_eq!(rx.recv().await, Ok(1));
434     ///     });
435     ///     tx.try_send(1).unwrap();
436     ///     let _ = ylong_runtime::block_on(handle);
437     /// }
438     /// ```
recv(&mut self) -> Result<T, RecvError>439     pub async fn recv(&mut self) -> Result<T, RecvError> {
440         poll_fn(|cx| self.channel.poll_recv(cx)).await
441     }
442 
443     /// Attempts to receive a value from the associated [`BoundedSender`] in a
444     /// limited amount of time.
445     ///
446     /// The `receiver` can still receive all sent messages in the channel after
447     /// the channel is closed.
448     ///
449     /// # Return value
450     /// * `Ok(T)` if receiving a value successfully.
451     /// * `Err(RecvTimeoutError::Closed)` if all senders have been dropped.
452     /// * `Err(RecvTimeoutError::TimeOut)` if time limit has been passed.
453     ///
454     /// # Examples
455     ///
456     /// ```
457     /// use std::time::Duration;
458     ///
459     /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
460     /// async fn io_func() {
461     ///     let (tx, mut rx) = bounded_channel(1);
462     ///     let handle = ylong_runtime::spawn(async move {
463     ///         tx.try_send(1).unwrap();
464     ///         assert_eq!(rx.recv_timeout(Duration::from_millis(10)).await, Ok(1));
465     ///     });
466     ///     let _ = ylong_runtime::block_on(handle);
467     /// }
468     /// ```
469     #[cfg(feature = "time")]
recv_timeout(&mut self, time: Duration) -> Result<T, RecvTimeoutError>470     pub async fn recv_timeout(&mut self, time: Duration) -> Result<T, RecvTimeoutError> {
471         match timeout(time, self.recv()).await {
472             Ok(res) => res.map_err(|_| RecvTimeoutError::Closed),
473             Err(_) => Err(RecvTimeoutError::Timeout),
474         }
475     }
476 
477     /// Closes the channel, prevents the `Sender` from sending more values.
478     ///
479     /// The `Sender` will fail to call [`send`] or [`try_send`] after the
480     /// `Receiver` called `close`. It will do nothing if the channel is
481     /// already closed.
482     ///
483     /// [`send`]: BoundedSender::send
484     /// [`try_send`]: BoundedSender::try_send
485     ///
486     /// # Examples
487     /// ```
488     /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
489     /// async fn io_func() {
490     ///     let (tx, mut rx) = bounded_channel(1);
491     ///     assert!(!tx.is_closed());
492     ///
493     ///     rx.close();
494     ///
495     ///     assert!(tx.is_closed());
496     ///     assert!(tx.try_send("no receive").is_err());
497     /// }
498     /// ```
499     ///
500     /// Receives a value sent **before** calling `close`
501     ///
502     /// ```
503     /// use ylong_runtime::sync::mpsc::bounded::bounded_channel;
504     /// async fn io_func() {
505     ///     let (tx, mut rx) = bounded_channel(1);
506     ///     assert!(tx.try_send("Hello").is_ok());
507     ///
508     ///     rx.close();
509     ///
510     ///     let msg = rx.try_recv().unwrap();
511     ///     assert_eq!(msg, "Hello");
512     /// }
513     /// ```
close(&mut self)514     pub fn close(&mut self) {
515         self.channel.close();
516     }
517 }
518 
519 impl<T> Drop for BoundedReceiver<T> {
drop(&mut self)520     fn drop(&mut self) {
521         self.channel.close();
522     }
523 }
524