1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::mem::take; 15 use std::sync::{Arc, Mutex}; 16 17 #[cfg(feature = "http2")] 18 use ylong_http::request::uri::Scheme; 19 use ylong_http::request::uri::Uri; 20 21 use crate::async_impl::connector::ConnInfo; 22 use crate::async_impl::Connector; 23 use crate::error::HttpClientError; 24 use crate::runtime::{AsyncRead, AsyncWrite}; 25 #[cfg(feature = "http2")] 26 use crate::util::config::H2Config; 27 use crate::util::config::{HttpConfig, HttpVersion}; 28 use crate::util::dispatcher::{Conn, ConnDispatcher, Dispatcher}; 29 use crate::util::pool::{Pool, PoolKey}; 30 31 pub(crate) struct ConnPool<C, S> { 32 pool: Pool<PoolKey, Conns<S>>, 33 connector: Arc<C>, 34 config: HttpConfig, 35 } 36 37 impl<C: Connector> ConnPool<C, C::Stream> { new(config: HttpConfig, connector: C) -> Self38 pub(crate) fn new(config: HttpConfig, connector: C) -> Self { 39 Self { 40 pool: Pool::new(), 41 connector: Arc::new(connector), 42 config, 43 } 44 } 45 connect_to(&self, uri: &Uri) -> Result<Conn<C::Stream>, HttpClientError>46 pub(crate) async fn connect_to(&self, uri: &Uri) -> Result<Conn<C::Stream>, HttpClientError> { 47 let key = PoolKey::new( 48 uri.scheme().unwrap().clone(), 49 uri.authority().unwrap().clone(), 50 ); 51 52 self.pool 53 .get(key, Conns::new) 54 .conn(self.config.clone(), self.connector.clone(), uri) 55 .await 56 } 57 } 58 59 pub(crate) struct Conns<S> { 60 list: Arc<Mutex<Vec<ConnDispatcher<S>>>>, 61 #[cfg(feature = "http2")] 62 h2_conn: Arc<crate::runtime::AsyncMutex<Vec<ConnDispatcher<S>>>>, 63 } 64 65 impl<S> Conns<S> { new() -> Self66 fn new() -> Self { 67 Self { 68 list: Arc::new(Mutex::new(Vec::new())), 69 70 #[cfg(feature = "http2")] 71 h2_conn: Arc::new(crate::runtime::AsyncMutex::new(Vec::with_capacity(1))), 72 } 73 } 74 } 75 76 impl<S> Clone for Conns<S> { clone(&self) -> Self77 fn clone(&self) -> Self { 78 Self { 79 list: self.list.clone(), 80 81 #[cfg(feature = "http2")] 82 h2_conn: self.h2_conn.clone(), 83 } 84 } 85 } 86 87 impl<S: AsyncRead + AsyncWrite + ConnInfo + Unpin + Send + Sync + 'static> Conns<S> { conn<C>( &mut self, config: HttpConfig, connector: Arc<C>, url: &Uri, ) -> Result<Conn<S>, HttpClientError> where C: Connector<Stream = S>,88 async fn conn<C>( 89 &mut self, 90 config: HttpConfig, 91 connector: Arc<C>, 92 url: &Uri, 93 ) -> Result<Conn<S>, HttpClientError> 94 where 95 C: Connector<Stream = S>, 96 { 97 match config.version { 98 #[cfg(feature = "http2")] 99 HttpVersion::Http2 => self.conn_h2(connector, url, config.http2_config).await, 100 #[cfg(feature = "http1_1")] 101 HttpVersion::Http1 => self.conn_h1(connector, url).await, 102 HttpVersion::Negotiate => { 103 #[cfg(all(feature = "http1_1", not(feature = "http2")))] 104 return self.conn_h1(connector, url).await; 105 106 #[cfg(all(feature = "http2", feature = "http1_1"))] 107 return self 108 .conn_negotiate(connector, url, config.http2_config) 109 .await; 110 } 111 } 112 } 113 conn_h1<C>(&self, connector: Arc<C>, url: &Uri) -> Result<Conn<S>, HttpClientError> where C: Connector<Stream = S>,114 async fn conn_h1<C>(&self, connector: Arc<C>, url: &Uri) -> Result<Conn<S>, HttpClientError> 115 where 116 C: Connector<Stream = S>, 117 { 118 if let Some(conn) = self.exist_h1_conn() { 119 return Ok(conn); 120 } 121 let dispatcher = ConnDispatcher::http1(connector.connect(url).await?); 122 Ok(self.dispatch_h1_conn(dispatcher)) 123 } 124 125 #[cfg(feature = "http2")] conn_h2<C>( &self, connector: Arc<C>, url: &Uri, config: H2Config, ) -> Result<Conn<S>, HttpClientError> where C: Connector<Stream = S>,126 async fn conn_h2<C>( 127 &self, 128 connector: Arc<C>, 129 url: &Uri, 130 config: H2Config, 131 ) -> Result<Conn<S>, HttpClientError> 132 where 133 C: Connector<Stream = S>, 134 { 135 // The lock `h2_occupation` is used to prevent multiple coroutines from sending 136 // Requests at the same time under concurrent conditions, 137 // resulting in the creation of multiple tcp connections 138 let mut lock = self.h2_conn.lock().await; 139 140 if let Some(conn) = Self::exist_h2_conn(&mut lock) { 141 return Ok(conn); 142 } 143 let stream = connector.connect(url).await?; 144 let details = stream.conn_detail(); 145 let tls = if let Some(scheme) = url.scheme() { 146 *scheme == Scheme::HTTPS 147 } else { 148 false 149 }; 150 match details.alpn() { 151 None if tls => return err_from_msg!(Connect, "The peer does not support http/2."), 152 Some(protocol) if protocol != b"h2" => { 153 return err_from_msg!(Connect, "Alpn negotiate a wrong protocol version.") 154 } 155 _ => {} 156 } 157 158 Ok(Self::dispatch_h2_conn(config, stream, &mut lock)) 159 } 160 161 #[cfg(all(feature = "http2", feature = "http1_1"))] conn_negotiate<C>( &self, connector: Arc<C>, url: &Uri, config: H2Config, ) -> Result<Conn<S>, HttpClientError> where C: Connector<Stream = S>,162 async fn conn_negotiate<C>( 163 &self, 164 connector: Arc<C>, 165 url: &Uri, 166 config: H2Config, 167 ) -> Result<Conn<S>, HttpClientError> 168 where 169 C: Connector<Stream = S>, 170 { 171 match *url.scheme().unwrap() { 172 Scheme::HTTPS => { 173 let mut lock = self.h2_conn.lock().await; 174 175 if let Some(conn) = Self::exist_h2_conn(&mut lock) { 176 return Ok(conn); 177 } 178 179 if let Some(conn) = self.exist_h1_conn() { 180 return Ok(conn); 181 } 182 183 let stream = connector.connect(url).await?; 184 let details = stream.conn_detail(); 185 186 let protocol = if let Some(bytes) = details.alpn() { 187 bytes 188 } else { 189 let dispatcher = ConnDispatcher::http1(stream); 190 return Ok(self.dispatch_h1_conn(dispatcher)); 191 }; 192 193 if protocol == b"http/1.1" { 194 let dispatcher = ConnDispatcher::http1(stream); 195 Ok(self.dispatch_h1_conn(dispatcher)) 196 } else if protocol == b"h2" { 197 Ok(Self::dispatch_h2_conn(config, stream, &mut lock)) 198 } else { 199 err_from_msg!(Connect, "Alpn negotiate a wrong protocol version.") 200 } 201 } 202 Scheme::HTTP => self.conn_h1(connector, url).await, 203 } 204 } 205 dispatch_h1_conn(&self, dispatcher: ConnDispatcher<S>) -> Conn<S>206 fn dispatch_h1_conn(&self, dispatcher: ConnDispatcher<S>) -> Conn<S> { 207 // We must be able to get the `Conn` here. 208 let conn = dispatcher.dispatch().unwrap(); 209 let mut list = self.list.lock().unwrap(); 210 list.push(dispatcher); 211 212 conn 213 } 214 215 #[cfg(feature = "http2")] dispatch_h2_conn( config: H2Config, stream: S, lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Conn<S>216 fn dispatch_h2_conn( 217 config: H2Config, 218 stream: S, 219 lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, 220 ) -> Conn<S> { 221 let dispatcher = ConnDispatcher::http2(config, stream); 222 let conn = dispatcher.dispatch().unwrap(); 223 lock.push(dispatcher); 224 conn 225 } 226 exist_h1_conn(&self) -> Option<Conn<S>>227 fn exist_h1_conn(&self) -> Option<Conn<S>> { 228 let mut list = self.list.lock().unwrap(); 229 let mut conn = None; 230 let curr = take(&mut *list); 231 // TODO Distinguish between http2 connections and http1 connections. 232 for dispatcher in curr.into_iter() { 233 // Discard invalid dispatchers. 234 if dispatcher.is_shutdown() { 235 continue; 236 } 237 if conn.is_none() { 238 conn = dispatcher.dispatch(); 239 } 240 list.push(dispatcher); 241 } 242 conn 243 } 244 245 #[cfg(feature = "http2")] exist_h2_conn( lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Option<Conn<S>>246 fn exist_h2_conn( 247 lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, 248 ) -> Option<Conn<S>> { 249 if let Some(dispatcher) = lock.pop() { 250 if !dispatcher.is_shutdown() { 251 if let Some(conn) = dispatcher.dispatch() { 252 lock.push(dispatcher); 253 return Some(conn); 254 } 255 } 256 } 257 None 258 } 259 } 260