Skip to content

Commit 0fa921d

Browse files
committed
Remove closing property in h2 server (not used)
1 parent 927b5d8 commit 0fa921d

File tree

1 file changed

+75
-92
lines changed

1 file changed

+75
-92
lines changed

rama-http-core/src/proto/h2/server.rs

Lines changed: 75 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ enum State<T> {
102102
struct Serving<T> {
103103
ping: Option<(ping::Recorder, ping::Ponger)>,
104104
conn: Connection<T, SendBuf<Bytes>>,
105-
closing: Option<crate::Error>,
106105
date_header: bool,
107106
}
108107

@@ -166,9 +165,7 @@ where
166165
self.close_pending = true;
167166
}
168167
State::Serving(ref mut srv) => {
169-
if srv.closing.is_none() {
170-
srv.conn.graceful_shutdown();
171-
}
168+
srv.conn.graceful_shutdown();
172169
}
173170
}
174171
}
@@ -204,13 +201,12 @@ where
204201
State::Serving(Serving {
205202
ping,
206203
conn,
207-
closing: None,
208204
date_header: me.date_header,
209205
})
210206
}
211207
State::Serving(ref mut srv) => {
212208
// graceful_shutdown was called before handshaking finished,
213-
if me.close_pending && srv.closing.is_none() {
209+
if me.close_pending {
214210
srv.conn.graceful_shutdown();
215211
}
216212
ready!(srv.poll_server(cx, &mut me.service, &me.exec))?;
@@ -236,95 +232,82 @@ where
236232
where
237233
S: HttpService<IncomingBody>,
238234
{
239-
if let Some(closing) = self.closing.take() {
240-
match self.conn.poll_closed(cx) {
241-
Poll::Ready(Ok(())) => (),
242-
Poll::Ready(Err(err)) => return Poll::Ready(Err(crate::Error::new_h2(err))),
243-
Poll::Pending => {
244-
self.closing = Some(closing);
245-
return Poll::Pending;
246-
}
247-
}
248-
249-
Poll::Ready(Err(closing))
250-
} else {
251-
loop {
252-
self.poll_ping(cx);
253-
254-
match ready!(self.conn.poll_accept(cx)) {
255-
Some(Ok((req, mut respond))) => {
256-
trace!("incoming request");
257-
let content_length = headers::content_length_parse_all(req.headers());
258-
let ping = self
259-
.ping
260-
.as_ref()
261-
.map(|ping| ping.0.clone())
262-
.unwrap_or_else(ping::disabled);
263-
264-
// Record the headers received
265-
ping.record_non_data();
266-
267-
let is_connect = req.method() == Method::CONNECT;
268-
let (mut parts, stream) = req.into_parts();
269-
let (req, connect_parts) = if !is_connect {
270-
(
271-
Request::from_parts(
272-
parts,
273-
IncomingBody::h2(stream, content_length.into(), ping),
274-
),
275-
None,
276-
)
277-
} else {
278-
if content_length.is_some_and(|len| len != 0) {
279-
warn!("h2 connect request with non-zero body not supported");
280-
respond.send_reset(crate::h2::Reason::INTERNAL_ERROR);
281-
return Poll::Ready(Ok(()));
282-
}
283-
let (pending, upgrade) = upgrade::pending();
284-
parts.extensions.insert(upgrade);
285-
(
286-
Request::from_parts(parts, IncomingBody::empty()),
287-
Some(ConnectParts {
288-
pending,
289-
ping,
290-
recv_stream: stream,
291-
}),
292-
)
293-
};
294-
295-
let serve_span = trace_root_span!(
296-
"h2::stream",
297-
otel.kind = "server",
298-
http.request.method = %req.method().as_str(),
299-
url.full = %req.uri(),
300-
url.path = %req.uri().path(),
301-
url.query = req.uri().query().unwrap_or_default(),
302-
url.scheme = %req.uri().scheme().map(|s| s.as_str()).unwrap_or_default(),
303-
network.protocol.name = "http",
304-
network.protocol.version = version_as_protocol_version(req.version()),
305-
);
306-
307-
let fut = H2Stream::new(
308-
service.serve_http(req),
309-
connect_parts,
310-
respond,
311-
self.date_header,
312-
);
313-
314-
exec.spawn_task(fut.instrument(serve_span));
315-
}
316-
Some(Err(e)) => {
317-
return Poll::Ready(Err(crate::Error::new_h2(e)));
318-
}
319-
None => {
320-
// no more incoming streams...
321-
if let Some((ref ping, _)) = self.ping {
322-
ping.ensure_not_timed_out()?;
235+
loop {
236+
self.poll_ping(cx);
237+
238+
match ready!(self.conn.poll_accept(cx)) {
239+
Some(Ok((req, mut respond))) => {
240+
trace!("incoming request");
241+
let content_length = headers::content_length_parse_all(req.headers());
242+
let ping = self
243+
.ping
244+
.as_ref()
245+
.map(|ping| ping.0.clone())
246+
.unwrap_or_else(ping::disabled);
247+
248+
// Record the headers received
249+
ping.record_non_data();
250+
251+
let is_connect = req.method() == Method::CONNECT;
252+
let (mut parts, stream) = req.into_parts();
253+
let (req, connect_parts) = if !is_connect {
254+
(
255+
Request::from_parts(
256+
parts,
257+
IncomingBody::h2(stream, content_length.into(), ping),
258+
),
259+
None,
260+
)
261+
} else {
262+
if content_length.is_some_and(|len| len != 0) {
263+
warn!("h2 connect request with non-zero body not supported");
264+
respond.send_reset(crate::h2::Reason::INTERNAL_ERROR);
265+
return Poll::Ready(Ok(()));
323266
}
267+
let (pending, upgrade) = upgrade::pending();
268+
parts.extensions.insert(upgrade);
269+
(
270+
Request::from_parts(parts, IncomingBody::empty()),
271+
Some(ConnectParts {
272+
pending,
273+
ping,
274+
recv_stream: stream,
275+
}),
276+
)
277+
};
324278

325-
trace!("incoming connection complete");
326-
return Poll::Ready(Ok(()));
279+
let serve_span = trace_root_span!(
280+
"h2::stream",
281+
otel.kind = "server",
282+
http.request.method = %req.method().as_str(),
283+
url.full = %req.uri(),
284+
url.path = %req.uri().path(),
285+
url.query = req.uri().query().unwrap_or_default(),
286+
url.scheme = %req.uri().scheme().map(|s| s.as_str()).unwrap_or_default(),
287+
network.protocol.name = "http",
288+
network.protocol.version = version_as_protocol_version(req.version()),
289+
);
290+
291+
let fut = H2Stream::new(
292+
service.serve_http(req),
293+
connect_parts,
294+
respond,
295+
self.date_header,
296+
);
297+
298+
exec.spawn_task(fut.instrument(serve_span));
299+
}
300+
Some(Err(e)) => {
301+
return Poll::Ready(Err(crate::Error::new_h2(e)));
302+
}
303+
None => {
304+
// no more incoming streams...
305+
if let Some((ref ping, _)) = self.ping {
306+
ping.ensure_not_timed_out()?;
327307
}
308+
309+
trace!("incoming connection complete");
310+
return Poll::Ready(Ok(()));
328311
}
329312
}
330313
}

0 commit comments

Comments
 (0)