Up until recently, hyper was my favorite Rust
HTTP framework. It’s low-level, but that gives you a lot of control over what
happens.
Here’s what a sample hyper application would look like:
Shell session
$ cargo new nostalgia
Created binary (application) `nostalgia` package
Shell session
$ cd nostalgia
$ cargo add hyper@0.14 --features "http1 tcp server"
Updating 'https://github.com/rust-lang/crates.io-index' index
Adding hyper v0.14 to dependencies with features: ["http1", "tcp", "server"]
$ cargo add tokio@1 --features "full"
Updating 'https://github.com/rust-lang/crates.io-index' index
Adding tokio v1 to dependencies with features: ["full"]
Rust code
use std::{
convert::Infallible,
future::{ready, Ready},
task::{Context, Poll},
};
use hyper::{server::conn::AddrStream, service::Service, Body, Request, Response, Server};
#[tokio::main]
async fn main() {
Server::bind(&([127, 0, 0, 1], 1025).into())
.serve(MyServiceFactory)
.await
.unwrap();
}
struct MyServiceFactory;
impl Service<&AddrStream> for MyServiceFactory {
type Response = MyService;
type Error = Infallible;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn call(&mut self, req: &AddrStream) -> Self::Future {
println!("Accepted connection from {}", req.remote_addr());
ready(Ok(MyService))
}
}
struct MyService;
impl Service<Request<Body>> for MyService {
type Response = Response<Body>;
type Error = Infallible;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
println!("Handling {req:?}");
ready(Ok(Response::builder().body("Hello World!n".into()).unwrap()))
}
}
Shell session
$ cargo run
cargo run
Compiling nostalgia v0.1.0 (/home/amos/bearcove/nostalgia)
Finished dev [unoptimized + debuginfo] target(s) in 1.20s
Running `target/debug/nostalgia`
(server runs peacefully in the background)
Aaaand half the readers have closed the page already.
What? Let me show how it works at least!
See, you can curl it, and it works:
Shell session
$ curl 0:1025
Hello World!
Wait a minute hold on – what kind of address is that?
Well… omitted octets in IPv4 addresses are filled with zeroes, so 127.1
is
127.0.0.1
for example…
So… 0 is 0.0.0.0
? Isn’t that the address we listen on when we want to accept
traffic from all network interfaces? How does that work?
For completeness, here’s what it shows in the terminal pane where the server is
running:
Shell session
Accepted connection from 127.0.0.1:50408
Handling Request { method: GET, uri: /, version: HTTP/1.1, headers: {"host": "0.0.0.0:1025", "user-agent": "curl/7.79.1", "accept": "*/*"}, body: Body(Empty) }
Okay, cool. That’s an ungodly amount of code though – I could do the same in,
like, ten lines of G-
Ahhh but that’s not the point! The point is that the design of hyper is
delightfully perceptible to the naked eye! It’s 90% the
tower Service trait and 10%, huh… waves hands
HTTP stuff.
Yeah, you know! Easy. Text-based protocol, couple headers, some chunking if
needed, the occasional trailer, cheerio good day sir.
Oh yeah, lil’ bit of binary, adaptive windows, header compression, add
multiplexing to taste. Nothing too hard there. Really it’s mostly just the
Service trait, look at it.
Eh that’s all rustls or OpenSSL or some fork
thereof, but who cares, LOOK AT THAT TRAIT:
Rust code
struct MyServiceFactory;
impl Service<&AddrStream> for MyServiceFactory {
type Response = MyService;
type Error = Infallible;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn call(&mut self, req: &AddrStream) -> Self::Future {
println!("Accepted connection from {}", req.remote_addr());
ready(Ok(MyService))
}
}
Isn’t it beautiful?
What… what am I supposed to see?
Backpressure, bear! The cornerstone of a ~nutritious breakfast~ stable
application server.
See, before spawning any future onto the executor, it asks us if we’re ready!
With poll_ready
. And if we’re not, it… well it waits until we are.
That means we can control how many concurrent connections there can be to our
service!
What would that look like?
Ah jeeze bear, that’s not really the point of the article, but if you insist, I
suppose we could, uh… use a semaphore maybe?
But first we should probably keep track of how many connections we have
concurrently…
Maybe we keep an Arc
in MyServiceFactory
, which we increment
when we accept a connection, and decrement when we drop a MyService
?
Rust code
#[tokio::main]
async fn main() {
Server::bind(&([127, 0, 0, 1], 1025).into())
// 👇 previously was a unit struct (just `MyServiceFactory`)
.serve(MyServiceFactory::default())
.await
.unwrap();
}
// 👇 Now holding an atomically-reference-counted atomic counter
#[derive(Default)]
struct MyServiceFactory {
num_connected: Arc<AtomicU64>,
}
impl Service<&AddrStream> for MyServiceFactory {
// (cut: everything except for call)
fn call(&mut self, req: &AddrStream) -> Self::Future {
let prev = self.num_connected.fetch_add(1, Ordering::SeqCst);
println!(
"⬆️ {} connections (accepted {})",
prev + 1,
req.remote_addr()
);
ready(Ok(MyService {
num_connected: self.num_connected.clone(),
}))
}
}
// 👇 Now also holding a counter
struct MyService {
num_connected: Arc<AtomicU64>,
}
impl Drop for MyService {
fn drop(&mut self) {
let prev = self.num_connected.fetch_sub(1, Ordering::SeqCst);
println!("⬇️ {} connections (dropped)", prev - 1);
}
}
impl Service<Request<Body>> for MyService {
// (cut: everything except call)
fn call(&mut self, req: Request<Body>) -> Self::Future {
// 👇 made these logs a little nicer
println!("{} {}", req.method(), req.uri());
// otherwise the same
ready(Ok(Response::builder()
.body("Hello World!n".into())
.unwrap()))
}
}
And now, a single curl request results in these logs:
Shell session
$ cargo run --quiet
⬆️ 1 connections (accepted 127.0.0.1:50416)
GET /
⬇️ 0 connections (dropped)
But we can also make requests by hand:
Shell session
$ socat - TCP4:0:1025
GET / HTTP/1.1
> HTTP/1.1 200 OK
> content-length: 13
> date: Sat, 02 Apr 2022 23:59:58 GMT
>
> Hello World!
GET /ahAH HTTP/1.1
> HTTP/1.1 200 OK
> content-length: 13
> date: Sun, 03 Apr 2022 00:00:04 GMT
>
> Hello World!
(Note I’ve prefixed response lines by >
manually. The “GET” lines followed by
two newlines are typed manually)
And from the server’s perspective, this looks like this:
Shell session
$ cargo run --quiet
⬆️ 1 connections (accepted 127.0.0.1:50420)
GET /
GET /ahAH
⬇️ 0 connections (dropped)
Two requests! From the same connection! Who needs h2/h3/quic? Huh?
I don’t feel like doing enough typing to test our limiter though… how about we
use a load testing tool? Like oha?
Shell session
$ oha http://127.0.0.1:1025
(a lot of output ensues)
Status code distribution:
[200] 200 responses
That generated… a /lot/ of output on the server side. But I’ve redirected it
to a file, because I’m a forward-thinking young lad.
Like so:
Shell session
$ cargo run --release --quiet | tee /tmp/server-log.txt
(server output is printed as normal, but is also logged to the file)
Which means I can now easily count how many connections we had at our peakest of
peaks:
Shell session
$ cat /tmp/server-log.txt | grep '⬆' | cut -d ' ' -f 2 | sort -n | tail -1
50
You may think that’s a useless use of cat, but actually, cat is union now, so
think twice about giving its work away to someone else.
50! What a suspicious value, almost as if it was oha’s default…
Shell session
$ oha --help
(cut)
-c
Number of workers to run concurrently. You may should increase limit to number of open
files for larger `-c`. [default: 50]
Mh yep!
Okay but we’re trying to limit it. Let’s use… a semaphore!
Shell session
$ cargo add tokio-util@0.7
Updating 'https://github.com/rust-lang/crates.io-index' index
Adding tokio-util v0.7 to dependencies
Rust code
struct MyServiceFactory {
num_connected: Arc<AtomicU64>,
semaphore: PollSemaphore,
}
impl Default for MyServiceFactory {
fn default() -> Self {
Self {
num_connected: Default::default(),
semaphore: PollSemaphore::new(Arc::new(Semaphore::new(5))),
}
}
}
There! I’ve set a limit of 5 permits, and so now all we have to do is… try to
acquire a semaphore from poll_ready
I suppose!
Shell session
$ cargo add futures
Updating 'https://github.com/rust-lang/crates.io-index' index
Adding futures v0.3.21 to dependencies
Rust code
impl Service<&AddrStream> for MyServiceFactory {
// (cut: all except for poll_ready)
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let permit = futures::ready!(self.semaphore.poll_acquire(cx)).unwrap();
Ok(()).into()
}
}
There! That handy little
ready! macro lets us
extract the successful type of a Poll
, and we unwrap that
Option
because, well, we never close the semaphore.
But uh… where do we store that permit? We’re not creating a MyService
from
poll_ready
, merely checking for readiness!
Okay we’ll need one more field:
Rust code
struct MyServiceFactory {
num_connected: Arc<AtomicU64>,
semaphore: PollSemaphore,
// 👇 new!
permit: Option<OwnedSemaphorePermit>,
}
Adjusting the impl Default
is left as an exercise to the reader.
And now, in poll_ready
, we simply try to acquire a permit if we don’t have one
already.
Rust code
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.permit.is_none() {
self.permit = Some(futures::ready!(self.semaphore.poll_acquire(cx)).unwrap());
}
Ok(()).into()
}
And in call
, well, we just take it! And explode loudly in case someone hasn’t
called poll_ready
before:
Rust code
fn call(&mut self, req: &AddrStream) -> Self::Future {
let permit = self.permit.take().expect(
"you didn't drive me to readiness did you? you know that's a tower crime right?",
);
let prev = self.num_connected.fetch_add(1, Ordering::SeqCst);
println!(
"⬆️ {} connections (accepted {})",
prev + 1,
req.remote_addr()
);
ready(Ok(MyService {
num_connected: self.num_connected.clone(),
permit,
}))
}
Of course we need somewhere to store that permit:
Rust code
struct MyService {
num_connected: Arc<AtomicU64>,
permit: OwnedSemaphorePermit,
}
rustc
will complain that this is dead code, but it’s very much not – holding
that type is proof that we’re allowed to run within the concurrency limits we’ve
set for ourselves.
We… no, you can fix that by renaming it to _permit
. I won’t.
So now, let’s try our oha test again!
Shell session
$ cargo run --release --quiet | tee /tmp/server-log.txt
# in another pane:
$ oha http://127.0.0.1:1025
# after stopping the server:
$ cat /tmp/server-log.txt | grep '⬆' | cut -d ' ' -f 2 | sort -n | tail -1
5
Wonderful! How often do things work exactly as expected? Not so often, I tell
you what.
Oh, and we can actually remove our atomic counter, because semaphores do their
own counting.
Which makes our complete program this:
Rust code
use std::{
convert::Infallible,
future::{ready, Ready},
sync::Arc,
task::{Context, Poll},
};
use hyper::{server::conn::AddrStream, service::Service, Body, Request, Response, Server};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_util::sync::PollSemaphore;
#[tokio::main]
async fn main() {
Server::bind(&([127, 0, 0, 1], 1025).into())
.serve(MyServiceFactory::default())
.await
.unwrap();
}
const MAX_CONNS: usize = 5;
struct MyServiceFactory {
semaphore: PollSemaphore,
permit: Option<OwnedSemaphorePermit>,
}
impl Default for MyServiceFactory {
fn default() -> Self {
Self {
semaphore: PollSemaphore::new(Arc::new(Semaphore::new(MAX_CONNS))),
permit: None,
}
}
}
impl Service<&AddrStream> for MyServiceFactory {
type Response = MyService;
type Error = Infallible;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.permit.is_none() {
self.permit = Some(futures::ready!(self.semaphore.poll_acquire(cx)).unwrap());
}
Ok(()).into()
}
fn call(&mut self, _req: &AddrStream) -> Self::Future {
let permit = self.permit.take().expect(
"you didn't drive me to readiness did you? you know that's a tower crime right?",
);
println!(
"⬆️ {} connections",
MAX_CONNS - self.semaphore.available_permits()
);
ready(Ok(MyService { _permit: permit }))
}
}
struct MyService {
_permit: OwnedSemaphorePermit,
}
impl Service<Request<Body>> for MyService {
type Response = Response<Body>;
type Error = Infallible;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
println!("{} {}", req.method(), req.uri());
ready(Ok(Response::builder()
.body("Hello World!n".into())
.unwrap()))
}
}
That’s not an async server though
I mean… yes it is an async server, but I see what you mean – we just
immediately reply with “Hello World!”, we’re not even pretending to think
about it a little bit.
So let’s pretend. Now our Future
type cannot be Ready
anymore. We could make
a custom future, like so:
Rust code
struct PretendFuture {
sleep: Sleep,
response: Option<Response<Body>>,
}
impl Future for PretendFuture {
type Output = Result<Response<Body>, Infallible>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
futures::ready!(
unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.sleep) }.poll(cx)
);
Ok(unsafe { self.get_unchecked_mut() }.response.take().unwrap()).into()
}
}
Which we could then use as our Future
type in MyService
:
Rust code
impl Service<Request<Body>> for MyService {
type Response = Response<Body>;
type Error = Infallible;
type Future = PretendFuture;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
println!("{} {}", req.method(), req.uri());
PretendFuture {
sleep: tokio::time::sleep(Duration::from_millis(250)),
response: Some(Response::builder().body("Hello World!n".into()).unwrap()),
}
}
}
And now our latency has SHOT UP, from this:
Shell session
$ oha etc.
(cut)
Latency distribution:
10% in 0.0001 secs
25% in 0.0001 secs
50% in 0.0002 secs
75% in 0.0015 secs
90% in 0.0091 secs
95% in 0.0095 secs
99% in 0.0103 secs
To this:
Shell session
$ oha etc.
(cut)
Latency distribution:
10% in 0.2514 secs
25% in 0.2521 secs
50% in 0.2522 secs
75% in 0.2528 secs
90% in 9.3291 secs
95% in 9.8342 secs
99% in 10.0861 secs
Wait, why are some requests taking 10 seconds?
Concurrency limits! If we raise MAX_CONNS
to 50, the p99 falls to 256 milliseconds.
Now we have something that looks, more or less, like a real-world application.
From the outside at least.
But before we move on, and more importantly, before the unsafe police rains on
me, let’s use pin-project-lite to get
rid of those gnarly map_unchecked_mut
and get_unchecked_mut
:
Shell session
$ cargo add pin-project-lite
Updating 'https://github.com/rust-lang/crates.io-index' index
Adding pin-project-lite v0.2.8 to dependencies
Rust code
pin_project_lite::pin_project! {
struct PretendFuture {
#[pin]
sleep: Sleep,
response: Option
}
}
impl Future for PretendFuture {
type Output = Result<Response<Body>, Infallible>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
futures::ready!(this.sleep.poll(cx));
Ok(this.response.take().unwrap()).into()
}
}