1use core::slice;
2use std::{
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use pin_project_lite::pin_project;
8
9pin_project! {
10 #[derive(Debug)]
11 pub(crate) struct FuturesIo<T> {
12 #[pin]
13 inner: T,
14 }
15}
16
17impl<T> FuturesIo<T> {
18 pub(crate) fn new(inner: T) -> Self {
26 Self { inner }
27 }
28}
29
30impl<T> hyper::rt::Write for FuturesIo<T>
31where
32 T: futures::AsyncWrite + Unpin,
33{
34 fn poll_write(
35 self: Pin<&mut Self>,
36 cx: &mut Context<'_>,
37 buf: &[u8],
38 ) -> Poll<Result<usize, std::io::Error>> {
39 self.project().inner.poll_write(cx, buf)
40 }
41
42 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
43 self.project().inner.poll_flush(cx)
44 }
45
46 fn poll_shutdown(
47 self: Pin<&mut Self>,
48 cx: &mut Context<'_>,
49 ) -> Poll<Result<(), std::io::Error>> {
50 self.project().inner.poll_close(cx)
51 }
52
53 fn poll_write_vectored(
54 self: Pin<&mut Self>,
55 cx: &mut Context<'_>,
56 bufs: &[std::io::IoSlice<'_>],
57 ) -> Poll<Result<usize, std::io::Error>> {
58 self.project().inner.poll_write_vectored(cx, bufs)
59 }
60}
61
62impl<T> hyper::rt::Read for FuturesIo<T>
64where
65 T: futures::AsyncRead + Unpin,
66{
67 fn poll_read(
68 self: Pin<&mut Self>,
69 cx: &mut Context<'_>,
70 mut buf: hyper::rt::ReadBufCursor<'_>,
71 ) -> Poll<Result<(), std::io::Error>> {
72 let buf_slice = unsafe {
75 slice::from_raw_parts_mut(buf.as_mut().as_mut_ptr() as *mut u8, buf.as_mut().len())
76 };
77
78 let n = match futures::AsyncRead::poll_read(self.project().inner, cx, buf_slice) {
79 Poll::Ready(Ok(n)) => n,
80 other => return other.map_ok(|_| ()),
81 };
82
83 unsafe {
84 buf.advance(n);
85 }
86 Poll::Ready(Ok(()))
87 }
88}