追記
より短い再現コードを作りました。
https://github.com/techno-tanoC/progress_test
ダウンロードが途中で止まり、 pg.to_item
の結果がずっと表示されるようになります。
前提・実現したいこと
以前、 Rust - tokio::sync::Mutex を使った型に AsyncWrite を実装したい|teratail にて AsyncWrite の実装方法を教えていただいたのですが、実際に使ってみると問題が発生しました。
REST API で操作できるファイルダウンローダを作っています。大まかな動作は以下のようなものです。
- URL を含む情報を POST する
- reqwest で与えられた URL からデータを
Stream
として取得 - File とダウンロードしたバイト数を保持する型 Progress に
tokio::io::copy
でStream
を流し込む - ダウンロードしている最中に GET を行うと Progress が保持しているバイト数を取得する
Stream
が終わったらProgress
が持つ File を別のディレクトリにコピーする
リポジトリ: techno-tanoC/azusa
発生している問題
フロントエンドから進捗を確認しているとダウンロードが止まることがあります。
ダウンロードが止まることがある · Issue #34 · techno-tanoC/azusa という issue で現象の調査を行いました。その結果、 Progress::to_item
を呼ぶと問題が発生するようなのですが、原因の検討がつきません。
ダウンロードが止まった後も Progress::to_item
は動いているので Mutex が駆動していないというよりは AsyncWrite が駆動していない印象を受けます。
当該のソースコード
Progress の to_item
や poll_write
辺りが怪しいと思っているのですが、どこが悪いのかハッキリとは分かっていません。現象からして Progress
内に問題がある可能性が高いと思います。
rust
1use futures::future::FutureExt; 2use std::fmt; 3use std::io::SeekFrom; 4use std::pin::Pin; 5use std::sync::Arc; 6use std::task::{Poll, Context}; 7use tokio::io::{AsyncSeek, Result, ErrorKind}; 8use tokio::prelude::*; 9use tokio::sync::Mutex; 10 11use crate::item::Item; 12 13struct ProgressInner<T> { 14 name: String, 15 total: u64, 16 size: u64, 17 canceled: bool, 18 buf: T, 19} 20 21impl<T> fmt::Debug for ProgressInner<T> { 22 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 23 f.debug_struct("ProgressInner") 24 .field("name", &self.name) 25 .field("total", &self.total) 26 .field("size", &self.size) 27 .field("canceled", &self.canceled) 28 .finish() 29 } 30} 31 32pub struct Progress<T> { 33 inner: Arc<Mutex<ProgressInner<T>>>, 34} 35 36impl<T> std::clone::Clone for Progress<T> { 37 fn clone(&self) -> Self { 38 Progress { inner: self.inner.clone() } 39 } 40} 41 42impl<T> Progress<T> { 43 pub fn new(name: impl ToString, buf: T) -> Self { 44 let inner = Arc::new(Mutex::new(ProgressInner { 45 name: name.to_string(), 46 total: 0, 47 size: 0, 48 canceled: false, 49 buf, 50 })); 51 Progress { inner } 52 } 53 54 pub async fn set_total(&mut self, total: u64) { 55 self.inner.lock().await.total = total; 56 } 57 58 pub async fn cancel(&mut self) { 59 self.inner.lock().await.canceled = true 60 } 61 62 pub async fn to_item(&self, id: impl ToString) -> Item { 63 let pg = self.inner.lock().await; 64 Item { 65 id: id.to_string(), 66 name: pg.name.clone(), 67 total: pg.total, 68 size: pg.size, 69 canceled: pg.canceled, 70 } 71 } 72} 73 74impl<T: AsyncRead + Unpin + Send> AsyncRead for Progress<T> { 75 fn poll_read( 76 self: Pin<&mut Self>, 77 cx: &mut Context, 78 buf: &mut [u8] 79 ) -> Poll<Result<usize>> { 80 match self.inner.lock().boxed().as_mut().poll(cx) { 81 Poll::Ready(mut s) => { 82 Pin::new(&mut s.buf).poll_read(cx, buf) 83 }, 84 Poll::Pending => { 85 Poll::Pending 86 } 87 } 88 } 89} 90 91impl<T: AsyncWrite + Unpin + Send> AsyncWrite for Progress<T> { 92 fn poll_write( 93 self: Pin<&mut Self>, 94 cx: &mut Context, 95 buf: &[u8] 96 ) -> Poll<Result<usize>> { 97 match self.inner.lock().boxed().as_mut().poll(cx) { 98 Poll::Ready(mut s) => { 99 if s.canceled { 100 Poll::Ready(Err(io::Error::new(ErrorKind::Interrupted, "canceled"))) 101 } else { 102 let poll = Pin::new(&mut s.buf).poll_write(cx, buf); 103 if let Poll::Ready(Ok(n)) = poll { 104 s.size += n as u64; 105 } 106 poll 107 } 108 }, 109 Poll::Pending => { 110 Poll::Pending 111 } 112 } 113 } 114 115 fn poll_flush( 116 self: Pin<&mut Self>, 117 cx: &mut Context 118 ) -> Poll<Result<()>> { 119 match self.inner.lock().boxed().as_mut().poll(cx) { 120 Poll::Ready(mut s) => { 121 Pin::new(&mut s.buf).poll_flush(cx) 122 }, 123 Poll::Pending => { 124 Poll::Pending 125 } 126 } 127 } 128 129 fn poll_shutdown( 130 self: Pin<&mut Self>, 131 cx: &mut Context 132 ) -> Poll<Result<()>> { 133 match self.inner.lock().boxed().as_mut().poll(cx) { 134 Poll::Ready(mut s) => { 135 Pin::new(&mut s.buf).poll_shutdown(cx) 136 }, 137 Poll::Pending => { 138 Poll::Pending 139 } 140 } 141 } 142} 143 144impl<T: AsyncSeek + Unpin + Send> AsyncSeek for Progress<T> { 145 fn start_seek( 146 self: Pin<&mut Self>, 147 cx: &mut Context, 148 position: SeekFrom 149 ) -> Poll<Result<()>> { 150 match self.inner.lock().boxed().as_mut().poll(cx) { 151 Poll::Ready(mut s) => { 152 Pin::new(&mut s.buf).start_seek(cx, position) 153 }, 154 Poll::Pending => { 155 Poll::Pending 156 } 157 } 158 } 159 160 fn poll_complete( 161 self: Pin<&mut Self>, 162 cx: &mut Context 163 ) -> Poll<Result<u64>> { 164 match self.inner.lock().boxed().as_mut().poll(cx) { 165 Poll::Ready(mut s) => { 166 Pin::new(&mut s.buf).poll_complete(cx) 167 }, 168 Poll::Pending => { 169 Poll::Pending 170 } 171 } 172 } 173}
試したこと
基本的に issue に書いてあります。
環境
rustc: 1.43.0
tokio: 0.2.20
回答3件
あなたの回答
tips
プレビュー
バッドをするには、ログインかつ
こちらの条件を満たす必要があります。
2020/05/10 10:47