1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 //! Message passing style communication
15
16 pub mod bounded;
17 pub mod unbounded;
18
19 pub use bounded::{bounded_channel, BoundedReceiver, BoundedSender};
20 pub use unbounded::{unbounded_channel, UnboundedReceiver, UnboundedSender};
21
22 pub(crate) trait Container {
close(&self)23 fn close(&self);
24
is_close(&self) -> bool25 fn is_close(&self) -> bool;
26
len(&self) -> usize27 fn len(&self) -> usize;
28 }
29
30 use std::ops::Deref;
31 use std::sync::atomic::AtomicUsize;
32 use std::sync::atomic::Ordering::{AcqRel, Relaxed};
33 use std::sync::Arc;
34
35 pub(crate) struct Channel<C: Container> {
36 chan: C,
37 tx_cnt: AtomicUsize,
38 }
39
40 impl<C: Container> Channel<C> {
new(chan: C) -> Channel<C>41 fn new(chan: C) -> Channel<C> {
42 Channel {
43 chan,
44 tx_cnt: AtomicUsize::new(1),
45 }
46 }
47 }
48
channel<C: Container>(chan: C) -> (Tx<C>, Rx<C>)49 pub(crate) fn channel<C: Container>(chan: C) -> (Tx<C>, Rx<C>) {
50 let channel = Arc::new(Channel::new(chan));
51 (Tx::new(channel.clone()), Rx::new(channel))
52 }
53
54 pub(crate) struct Tx<C: Container> {
55 inner: Arc<Channel<C>>,
56 }
57
58 impl<C: Container> Clone for Tx<C> {
clone(&self) -> Self59 fn clone(&self) -> Self {
60 self.inner.tx_cnt.fetch_add(1, Relaxed);
61 Tx {
62 inner: self.inner.clone(),
63 }
64 }
65 }
66
67 impl<C: Container> Tx<C> {
new(channel: Arc<Channel<C>>) -> Tx<C>68 fn new(channel: Arc<Channel<C>>) -> Tx<C> {
69 Tx { inner: channel }
70 }
71
is_same(&self, other: &Self) -> bool72 pub(crate) fn is_same(&self, other: &Self) -> bool {
73 Arc::ptr_eq(&self.inner, &other.inner)
74 }
75
close(&self)76 pub(crate) fn close(&self) {
77 if self.inner.tx_cnt.fetch_sub(1, AcqRel) == 1 {
78 self.inner.chan.close();
79 }
80 }
81 }
82
83 impl<C: Container> Deref for Tx<C> {
84 type Target = C;
85
deref(&self) -> &Self::Target86 fn deref(&self) -> &Self::Target {
87 &self.inner.chan
88 }
89 }
90
91 pub(crate) struct Rx<C: Container> {
92 inner: Arc<Channel<C>>,
93 }
94
95 impl<C: Container> Rx<C> {
new(channel: Arc<Channel<C>>) -> Rx<C>96 fn new(channel: Arc<Channel<C>>) -> Rx<C> {
97 Rx { inner: channel }
98 }
99 }
100
101 impl<C: Container> Deref for Rx<C> {
102 type Target = C;
103
deref(&self) -> &Self::Target104 fn deref(&self) -> &Self::Target {
105 &self.inner.chan
106 }
107 }
108