1use std::{
2 collections::HashMap,
3 pin::Pin,
4 sync::Arc,
5 task::{
6 Context,
7 Poll,
8 },
9};
10
11use async_trait::async_trait;
12use futures::Stream;
13use muxado::Error as MuxadoError;
14use thiserror::Error;
15use tokio::sync::mpsc::Receiver;
16
17use crate::{
18 config::{
19 HttpTunnelBuilder,
20 LabeledTunnelBuilder,
21 TcpTunnelBuilder,
22 TlsTunnelBuilder,
23 },
24 conn::{
25 ConnInner,
26 EdgeConn,
27 EndpointConn,
28 },
29 internals::raw_session::RpcError,
30 session::ConnectError,
31 Session,
32};
33
34#[derive(Error, Debug, Clone)]
36#[non_exhaustive]
37pub enum AcceptError {
38 #[error("transport error")]
40 Transport(#[from] MuxadoError),
41 #[error("reconnect error")]
43 Reconnect(#[from] Arc<ConnectError>),
44 #[error("listener closed: {message}{}", error_code.clone().map(|s| format!(", {s}")).unwrap_or_else(String::new))]
46 ListenerClosed {
47 message: String,
49 error_code: Option<String>,
51 },
52}
53
54#[derive(Clone)]
55pub(crate) struct TunnelInnerInfo {
56 pub(crate) id: String,
57 pub(crate) proto: String,
58 pub(crate) url: String,
59 pub(crate) labels: HashMap<String, String>,
60 pub(crate) forwards_to: String,
61 pub(crate) metadata: String,
62}
63
64pub(crate) struct TunnelInner {
65 pub(crate) info: TunnelInnerInfo,
66 pub(crate) incoming: Option<Receiver<Result<ConnInner, AcceptError>>>,
67
68 pub(crate) session: Session,
73}
74
75impl Drop for TunnelInner {
76 fn drop(&mut self) {
77 let id = self.id().to_string();
78 let sess = self.session.clone();
79 let rt = sess.runtime();
80 rt.spawn(async move { sess.close_tunnel(&id).await });
81 }
82}
83
84macro_rules! tunnel_trait {
88 ($($hyper_bound:tt)*) => {
89 pub trait Tunnel:
95 Stream<Item = Result<<Self as Tunnel>::Conn, AcceptError>>
96 + TunnelInfo
97 + TunnelCloser
98 $($hyper_bound)*
99 + Unpin
100 + Send
101 + 'static
102 {
103 type Conn: crate::Conn;
107 }
108
109 pub trait TunnelInfo {
111 fn id(&self) -> &str;
113 fn forwards_to(&self) -> &str;
118
119 fn metadata(&self) -> &str;
121 }
122
123 #[async_trait]
125 pub trait TunnelCloser {
126 async fn close(&mut self) -> Result<(), RpcError>;
135 }
136 }
137}
138
139tunnel_trait!();
140
141pub trait EndpointInfo {
145 fn url(&self) -> &str;
147
148 fn proto(&self) -> &str;
150}
151
152pub trait EdgeInfo {
156 fn labels(&self) -> &HashMap<String, String>;
158}
159
160impl Stream for TunnelInner {
161 type Item = Result<ConnInner, AcceptError>;
162
163 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
164 self.incoming
165 .as_mut()
166 .expect("tunnel inner lacks a receiver")
167 .poll_recv(cx)
168 }
169}
170
171impl TunnelInner {
172 pub fn id(&self) -> &str {
174 &self.info.id
175 }
176
177 pub fn url(&self) -> &str {
180 &self.info.url
181 }
182
183 pub async fn close(&mut self) -> Result<(), RpcError> {
186 self.session.close_tunnel(self.id()).await?;
187 if let Some(r) = self.incoming.as_mut() {
188 r.close()
189 }
190 Ok(())
191 }
192
193 pub fn proto(&self) -> &str {
195 &self.info.proto
196 }
197
198 pub fn labels(&self) -> &HashMap<String, String> {
201 &self.info.labels
202 }
203
204 pub fn forwards_to(&self) -> &str {
206 &self.info.forwards_to
207 }
208
209 pub fn metadata(&self) -> &str {
211 &self.info.metadata
212 }
213
214 pub(crate) fn make_info(&self) -> TunnelInner {
217 TunnelInner {
218 info: self.info.clone(),
219 incoming: None,
220 session: self.session.clone(),
221 }
222 }
223}
224
225macro_rules! make_tunnel_type {
226 ($(#[$outer:meta])* $wrapper:ident, $builder:tt, $conn:tt, $($m:tt),*) => {
227 $(#[$outer])*
228 pub struct $wrapper {
229 pub(crate) inner: TunnelInner,
230 }
231
232 impl $wrapper {
233 pub(crate) fn make_info(&self) -> $wrapper {
237 $wrapper {
238 inner: self.inner.make_info(),
239 }
240 }
241 }
242
243 impl Tunnel for $wrapper {
244 type Conn = $conn;
245 }
246
247 impl TunnelInfo for $wrapper {
248 fn id(&self) -> &str {
249 self.inner.id()
250 }
251
252 fn forwards_to(&self) -> &str {
253 self.inner.forwards_to()
254 }
255
256 fn metadata(&self) -> &str {
257 self.inner.metadata()
258 }
259 }
260
261 #[async_trait]
262 impl TunnelCloser for $wrapper {
263 async fn close(&mut self) -> Result<(), RpcError> {
264 self.inner.close().await
265 }
266 }
267
268 impl $wrapper {
269 pub fn builder(session: Session) -> $builder {
271 $builder::from(session)
272 }
273 }
274
275 $(
276 make_tunnel_type!($m; $wrapper);
277 )*
278
279 impl Stream for $wrapper {
280 type Item = Result<$conn, AcceptError>;
281
282 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
283 Pin::new(&mut self.inner).poll_next(cx).map(|o| o.map(|r| r.map(|c| $conn { inner: c })))
284 }
285 }
286 };
287 (endpoint; $wrapper:ty) => {
288 impl EndpointInfo for $wrapper {
289 fn url(&self) -> &str {
290 self.inner.url()
291 }
292 fn proto(&self) -> &str {
293 self.inner.proto()
294 }
295 }
296 };
297 (edge; $wrapper:ty) => {
298 impl EdgeInfo for $wrapper {
299 fn labels(&self) -> &HashMap<String, String> {
300 self.inner.labels()
301 }
302 }
303 };
304}
305
306make_tunnel_type! {
307 HttpTunnel, HttpTunnelBuilder, EndpointConn, endpoint
309}
310make_tunnel_type! {
311 TcpTunnel, TcpTunnelBuilder, EndpointConn, endpoint
313}
314make_tunnel_type! {
315 TlsTunnel, TlsTunnelBuilder, EndpointConn, endpoint
317}
318make_tunnel_type! {
319 LabeledTunnel, LabeledTunnelBuilder, EdgeConn, edge
321}