1use std::{
7 collections::VecDeque,
8 pin::Pin,
9 sync::{Arc, Mutex, MutexGuard, PoisonError},
10 task::{Context, Poll, Waker},
11};
12
13use futures::{AsyncRead, AsyncWrite, Future};
14use js_sys::{Promise, Uint8Array};
15use wasm_bindgen::prelude::*;
16use wasm_bindgen_futures::JsFuture;
17
18#[wasm_bindgen]
23extern "C" {
24 #[wasm_bindgen(typescript_type = "IoChannel")]
26 pub type JsIo;
27
28 #[wasm_bindgen(method, catch)]
32 pub fn read(this: &JsIo) -> Result<Promise, JsValue>;
33
34 #[wasm_bindgen(method, catch)]
38 pub fn write(this: &JsIo, data: &Uint8Array) -> Result<Promise, JsValue>;
39
40 #[wasm_bindgen(method, catch)]
44 pub fn close(this: &JsIo) -> Result<Promise, JsValue>;
45}
46
47struct AdapterState {
49 read_buffer: VecDeque<u8>,
51 eof: bool,
53 pending_read: Option<JsFuture>,
55 read_waker: Option<Waker>,
57 closed: bool,
59 error: Option<String>,
61}
62
63pub(crate) struct JsIoAdapter {
68 inner: JsIo,
69 state: Arc<Mutex<AdapterState>>,
70}
71
72impl JsIoAdapter {
73 fn lock_state(&self) -> std::io::Result<MutexGuard<'_, AdapterState>> {
74 self.state.lock().map_err(|e: PoisonError<_>| {
75 std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
76 })
77 }
78
79 pub(crate) fn new(js_io: JsIo) -> Self {
81 Self {
82 inner: js_io,
83 state: Arc::new(Mutex::new(AdapterState {
84 read_buffer: VecDeque::new(),
85 eof: false,
86 pending_read: None,
87 read_waker: None,
88 closed: false,
89 error: None,
90 })),
91 }
92 }
93}
94
95impl AsyncRead for JsIoAdapter {
96 fn poll_read(
97 self: Pin<&mut Self>,
98 cx: &mut Context<'_>,
99 buf: &mut [u8],
100 ) -> Poll<std::io::Result<usize>> {
101 let this = self.get_mut();
102 let mut state = match this.lock_state() {
103 Ok(guard) => guard,
104 Err(e) => return Poll::Ready(Err(e)),
105 };
106
107 if let Some(ref err) = state.error {
109 return Poll::Ready(Err(std::io::Error::new(
110 std::io::ErrorKind::Other,
111 err.clone(),
112 )));
113 }
114
115 if !state.read_buffer.is_empty() {
117 let to_read = std::cmp::min(buf.len(), state.read_buffer.len());
118 for (i, byte) in state.read_buffer.drain(..to_read).enumerate() {
119 buf[i] = byte;
120 }
121 return Poll::Ready(Ok(to_read));
122 }
123
124 if state.eof {
126 return Poll::Ready(Ok(0));
127 }
128
129 state.read_waker = Some(cx.waker().clone());
131
132 if state.pending_read.is_none() {
134 match this.inner.read() {
135 Ok(promise) => {
136 state.pending_read = Some(JsFuture::from(promise));
137 }
138 Err(e) => {
139 let err_msg = format!("read error: {:?}", e);
140 state.error = Some(err_msg.clone());
141 return Poll::Ready(Err(std::io::Error::new(
142 std::io::ErrorKind::Other,
143 err_msg,
144 )));
145 }
146 }
147 }
148
149 if let Some(ref mut future) = state.pending_read {
151 let future = unsafe { Pin::new_unchecked(future) };
153 match future.poll(cx) {
154 Poll::Ready(Ok(value)) => {
155 state.pending_read = None;
156
157 if value.is_null() || value.is_undefined() {
159 tracing::warn!("JsIo read returned null/undefined (EOF)");
160 state.eof = true;
161 return Poll::Ready(Ok(0));
162 }
163
164 let array = Uint8Array::new(&value);
166 let bytes = array.to_vec();
167
168 if bytes.is_empty() {
169 tracing::warn!("JsIo read returned empty array (EOF)");
170 state.eof = true;
171 return Poll::Ready(Ok(0));
172 }
173
174 let to_read = std::cmp::min(buf.len(), bytes.len());
176 buf[..to_read].copy_from_slice(&bytes[..to_read]);
177
178 if bytes.len() > to_read {
180 state.read_buffer.extend(&bytes[to_read..]);
181 }
182
183 Poll::Ready(Ok(to_read))
184 }
185 Poll::Ready(Err(e)) => {
186 state.pending_read = None;
187 let err_msg = format!("read error: {:?}", e);
188 state.error = Some(err_msg.clone());
189 Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, err_msg)))
190 }
191 Poll::Pending => Poll::Pending,
192 }
193 } else {
194 Poll::Pending
195 }
196 }
197}
198
199impl AsyncWrite for JsIoAdapter {
200 fn poll_write(
201 self: Pin<&mut Self>,
202 _cx: &mut Context<'_>,
203 buf: &[u8],
204 ) -> Poll<std::io::Result<usize>> {
205 let this = self.get_mut();
206 let state = match this.lock_state() {
207 Ok(guard) => guard,
208 Err(e) => return Poll::Ready(Err(e)),
209 };
210
211 if let Some(ref err) = state.error {
213 return Poll::Ready(Err(std::io::Error::new(
214 std::io::ErrorKind::Other,
215 err.clone(),
216 )));
217 }
218
219 if state.closed {
220 return Poll::Ready(Err(std::io::Error::new(
221 std::io::ErrorKind::BrokenPipe,
222 "stream closed",
223 )));
224 }
225
226 let array = Uint8Array::from(buf);
228
229 match this.inner.write(&array) {
232 Ok(_promise) => {
233 Poll::Ready(Ok(buf.len()))
235 }
236 Err(e) => {
237 let err_msg = format!("write error: {:?}", e);
238 tracing::error!("JsIo write failed: {}", err_msg);
239 Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, err_msg)))
240 }
241 }
242 }
243
244 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
245 Poll::Ready(Ok(()))
247 }
248
249 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
250 let this = self.get_mut();
251 let mut state = match this.lock_state() {
252 Ok(guard) => guard,
253 Err(e) => return Poll::Ready(Err(e)),
254 };
255
256 if state.closed {
257 return Poll::Ready(Ok(()));
258 }
259
260 match this.inner.close() {
262 Ok(_promise) => {
263 state.closed = true;
264 Poll::Ready(Ok(()))
265 }
266 Err(e) => {
267 let err_msg = format!("close error: {:?}", e);
268 Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, err_msg)))
269 }
270 }
271 }
272}
273
274unsafe impl Send for JsIoAdapter {}