質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.35%
Rust

Rustは、MoFoが支援するプログラミング言語。高速性を維持しつつも、メモリ管理を安全に行うことが可能な言語です。同じコンパイル言語であるC言語やC++では困難だったマルチスレッドを実装しやすく、並行性という点においても優れています。

Q&A

解決済

3回答

1850閲覧

tokio::io::AsyncWriteが駆動しない(?)ことがある

techno-tanoC

総合スコア24

Rust

Rustは、MoFoが支援するプログラミング言語。高速性を維持しつつも、メモリ管理を安全に行うことが可能な言語です。同じコンパイル言語であるC言語やC++では困難だったマルチスレッドを実装しやすく、並行性という点においても優れています。

0グッド

1クリップ

投稿2020/05/03 12:02

編集2020/05/04 13:56

追記

より短い再現コードを作りました。

https://github.com/techno-tanoC/progress_test

ダウンロードが途中で止まり、 pg.to_item の結果がずっと表示されるようになります。

イメージ説明

前提・実現したいこと

以前、 Rust - tokio::sync::Mutex を使った型に AsyncWrite を実装したい|teratail にて AsyncWrite の実装方法を教えていただいたのですが、実際に使ってみると問題が発生しました。

REST API で操作できるファイルダウンローダを作っています。大まかな動作は以下のようなものです。

  1. URL を含む情報を POST する
  2. reqwest で与えられた URL からデータを Stream として取得
  3. File とダウンロードしたバイト数を保持する型 Progresstokio::io::copyStream を流し込む
  4. ダウンロードしている最中に GET を行うと Progress が保持しているバイト数を取得する
  5. Stream が終わったら Progress が持つ File を別のディレクトリにコピーする

リポジトリ: techno-tanoC/azusa

発生している問題

フロントエンドから進捗を確認しているとダウンロードが止まることがあります。

ダウンロードが止まることがある · Issue #34 · techno-tanoC/azusa という issue で現象の調査を行いました。その結果、 Progress::to_item を呼ぶと問題が発生するようなのですが、原因の検討がつきません。

イメージ説明

ダウンロードが止まった後も Progress::to_item は動いているので Mutex が駆動していないというよりは AsyncWrite が駆動していない印象を受けます。

当該のソースコード

Progressto_itempoll_write 辺りが怪しいと思っているのですが、どこが悪いのかハッキリとは分かっていません。現象からして Progress 内に問題がある可能性が高いと思います。

rust

1use futures::future::FutureExt; 2use std::fmt; 3use std::io::SeekFrom; 4use std::pin::Pin; 5use std::sync::Arc; 6use std::task::{Poll, Context}; 7use tokio::io::{AsyncSeek, Result, ErrorKind}; 8use tokio::prelude::*; 9use tokio::sync::Mutex; 10 11use crate::item::Item; 12 13struct ProgressInner<T> { 14 name: String, 15 total: u64, 16 size: u64, 17 canceled: bool, 18 buf: T, 19} 20 21impl<T> fmt::Debug for ProgressInner<T> { 22 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 23 f.debug_struct("ProgressInner") 24 .field("name", &self.name) 25 .field("total", &self.total) 26 .field("size", &self.size) 27 .field("canceled", &self.canceled) 28 .finish() 29 } 30} 31 32pub struct Progress<T> { 33 inner: Arc<Mutex<ProgressInner<T>>>, 34} 35 36impl<T> std::clone::Clone for Progress<T> { 37 fn clone(&self) -> Self { 38 Progress { inner: self.inner.clone() } 39 } 40} 41 42impl<T> Progress<T> { 43 pub fn new(name: impl ToString, buf: T) -> Self { 44 let inner = Arc::new(Mutex::new(ProgressInner { 45 name: name.to_string(), 46 total: 0, 47 size: 0, 48 canceled: false, 49 buf, 50 })); 51 Progress { inner } 52 } 53 54 pub async fn set_total(&mut self, total: u64) { 55 self.inner.lock().await.total = total; 56 } 57 58 pub async fn cancel(&mut self) { 59 self.inner.lock().await.canceled = true 60 } 61 62 pub async fn to_item(&self, id: impl ToString) -> Item { 63 let pg = self.inner.lock().await; 64 Item { 65 id: id.to_string(), 66 name: pg.name.clone(), 67 total: pg.total, 68 size: pg.size, 69 canceled: pg.canceled, 70 } 71 } 72} 73 74impl<T: AsyncRead + Unpin + Send> AsyncRead for Progress<T> { 75 fn poll_read( 76 self: Pin<&mut Self>, 77 cx: &mut Context, 78 buf: &mut [u8] 79 ) -> Poll<Result<usize>> { 80 match self.inner.lock().boxed().as_mut().poll(cx) { 81 Poll::Ready(mut s) => { 82 Pin::new(&mut s.buf).poll_read(cx, buf) 83 }, 84 Poll::Pending => { 85 Poll::Pending 86 } 87 } 88 } 89} 90 91impl<T: AsyncWrite + Unpin + Send> AsyncWrite for Progress<T> { 92 fn poll_write( 93 self: Pin<&mut Self>, 94 cx: &mut Context, 95 buf: &[u8] 96 ) -> Poll<Result<usize>> { 97 match self.inner.lock().boxed().as_mut().poll(cx) { 98 Poll::Ready(mut s) => { 99 if s.canceled { 100 Poll::Ready(Err(io::Error::new(ErrorKind::Interrupted, "canceled"))) 101 } else { 102 let poll = Pin::new(&mut s.buf).poll_write(cx, buf); 103 if let Poll::Ready(Ok(n)) = poll { 104 s.size += n as u64; 105 } 106 poll 107 } 108 }, 109 Poll::Pending => { 110 Poll::Pending 111 } 112 } 113 } 114 115 fn poll_flush( 116 self: Pin<&mut Self>, 117 cx: &mut Context 118 ) -> Poll<Result<()>> { 119 match self.inner.lock().boxed().as_mut().poll(cx) { 120 Poll::Ready(mut s) => { 121 Pin::new(&mut s.buf).poll_flush(cx) 122 }, 123 Poll::Pending => { 124 Poll::Pending 125 } 126 } 127 } 128 129 fn poll_shutdown( 130 self: Pin<&mut Self>, 131 cx: &mut Context 132 ) -> Poll<Result<()>> { 133 match self.inner.lock().boxed().as_mut().poll(cx) { 134 Poll::Ready(mut s) => { 135 Pin::new(&mut s.buf).poll_shutdown(cx) 136 }, 137 Poll::Pending => { 138 Poll::Pending 139 } 140 } 141 } 142} 143 144impl<T: AsyncSeek + Unpin + Send> AsyncSeek for Progress<T> { 145 fn start_seek( 146 self: Pin<&mut Self>, 147 cx: &mut Context, 148 position: SeekFrom 149 ) -> Poll<Result<()>> { 150 match self.inner.lock().boxed().as_mut().poll(cx) { 151 Poll::Ready(mut s) => { 152 Pin::new(&mut s.buf).start_seek(cx, position) 153 }, 154 Poll::Pending => { 155 Poll::Pending 156 } 157 } 158 } 159 160 fn poll_complete( 161 self: Pin<&mut Self>, 162 cx: &mut Context 163 ) -> Poll<Result<u64>> { 164 match self.inner.lock().boxed().as_mut().poll(cx) { 165 Poll::Ready(mut s) => { 166 Pin::new(&mut s.buf).poll_complete(cx) 167 }, 168 Poll::Pending => { 169 Poll::Pending 170 } 171 } 172 } 173}

試したこと

基本的に issue に書いてあります。

環境

rustc: 1.43.0
tokio: 0.2.20

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

回答3

0

ベストアンサー

以前 ... AsyncWrite の実装方法を教えていただいたのですが、実際に使ってみると問題が発生しました。

以前の質問で実装方法を回答した者です。そのプログラムは正しく動作せず、ご迷惑をおかけしました。

問題の起こる原因は、すでにi-poperさんが回答されているように、メソッドから戻る際にMutexlock()から返されたFutureを破棄してしまっているためです。Futurepoll()Pendingが返ったときにこれを行うと、Futureの計算結果が不要と判断されてキャンセルされてしまいます。その後はwakerが呼ばれないので、AsyncWriteなどの処理が次に進まなくなっていたわけです。

直し方については、Rustの日本語コミュニティ(Slack rust-jp) にいる皆さんに教えてもらいました。Mutexlock()から返されたFutureを破棄せずにどこかに保存しておくことと、そのFutureから返されるMutexGuardも同様に保存することで、処理が先に進むようになります。それを反映したprogress_testのコードは fix-future-handlingブランチ にあります。

なお、これらのFutureMutexGuardですが、Mutexへの参照を持つためにライフタイムパラメータが付いています。今回のように複数回のAsyncWriteメソッドにまたがって生存させたい場合に、ライフタイムがうまく表現できません。そのため'staticを指定し、unsafeなコードで慎重に手動で管理する必要がありました。もしi-poperさんがご提案されているtokio::sync::watchによる方法などが使えるなら、そちらを使った方がいいかもしれません。この回答の最後でも別解としてAtomicU64を使用した方法を紹介します。

引き続きMutexを使用する場合

progress_testの変更点について簡単に説明します。まずロックの状態を管理するためのenumを作成し、FutureMutexGuardもそこに格納するようにしました。

rust

1// Mutexが返してくるFutureの型 2type MutexFuture<'a, T> = Box<dyn Future<Output = MutexGuard<'a, T>> + Send + Sync>; 3 4// LockStateはロックの状態を表す。Mutexが返すFutureとそのFutureが返すMutexGuardの 5// ライフタイムはうまく表現できないので'staticにする。LockStateのライフタイムの管理は 6// 手動で(慎重に)行う 7enum LockState<T: 'static> { 8 Released, 9 Acquiring(Pin<MutexFuture<'static, T>>), 10 Locked(MutexGuard<'static, T>), 11}

Progressではinnerに加えて、lock_stateも持たせます。

rust

1pub struct Progress<T: 'static> { 2 inner: Pin<Arc<Mutex<ProgressInner<T>>>>, 3 // ・lock_stateはAsyncRead、AsyncWrite、AsyncSeek関連のメソッド用なので 4 //  to_size()など他のメソッドでは使用しない 5 // ・lock_stateに格納されたFutureやMutexGuardはinnerのMutexを参照しているため 6 //  Progressのdrop時はinnerより前にdropする必要がある 7 // ・ManDrop(std::mem::ManuallyDropのエイリアス)で包むことで、dropの 8 //  タイミングを手動でコントロールする 9 lock_state: ManDrop<LockState<ProgressInner<T>>>, 10}

LockStateがdropされるタイミングを手動で制御するためにstd::mem::ManualDropで包みました。

clone()drop()の実装は以下の通りです。

rust

1impl<T> Clone for Progress<T> { 2 fn clone(&self) -> Self { 3 Progress { 4 inner: self.inner.clone(), 5 lock_state: ManDrop::new(LockState::Released), 6 } 7 } 8} 9 10impl<T> Drop for Progress<T> { 11 fn drop(&mut self) { 12 // 最初にlock_stateをdropする 13 unsafe { ManDrop::drop(&mut self.lock_state) }; 14 // それ以外のフィールドはこのメソッドから返った後に自動的にdropされる 15 } 16}

to_size()メソッドの実装は今まで通りです。lock_stateは触らず、innerMutexだけを使います。

rust

1impl<T: Unpin + Send> Progress<T> { 2 pub async fn to_size(&self) -> u64 { 3 let pg = self.inner.lock().await; 4 pg.size 5 } 6}

lock_stateの管理はlock_and_then()メソッドで行うことにしました。

rust

1impl<T: Unpin + Send> Progress<T> { 2 // ・lock_and_then()はAsyncRead、AsyncWrite、AsyncSeek関連のメソッドで共通に行われる処理を 3 //  抽象化したもの。ロックを取得後、引数にとったクロージャで非同期IO処理を実行し、最後にロックを解除する 4 // ・ロック取得時や非同期IO時にPoll::Pendingが返されたときは早期リターンする 5 fn lock_and_then<U, F>(&mut self, cx: &mut Context<'_>, f: F) -> Poll<Result<U>> 6 where 7 F: FnOnce(Pin<&mut T>, &mut Context<'_>, &mut u64) -> Poll<Result<U>>, 8 { 9 use LockState::*; 10 loop { 11 match &mut *self.lock_state { 12 Released => { 13 // Mutexのlock()が返すFutureとそのMutexGuardのライフタイムを'staticにするため 14 // Mutexの可変参照を可変の生ポインタに変換してからlock()を呼ぶ 15 let mutex = &self.inner as &Mutex<_> as *const Mutex<_>; 16 let fut = unsafe { Box::pin((*mutex).lock()) }; 17 self.set_lock_state(Acquiring(fut)); 18 } 19 Acquiring(fut) => match fut.as_mut().as_mut().poll(cx) { 20 Poll::Pending => return Poll::Pending, 21 Poll::Ready(guard) => self.set_lock_state(Locked(guard)), 22 }, 23 Locked(guard) => { 24 let ProgressInner { buf, size } = &mut **guard; 25 let pin = Pin::new(buf); 26 let poll = f(pin, cx, size); 27 if let Poll::Ready(..) = &poll { 28 self.set_lock_state(Released); 29 } 30 return poll; 31 } 32 }; 33 } 34 } 35 36 fn set_lock_state(&mut self, new: LockState<ProgressInner<T>>) { 37 let mut old = std::mem::replace(&mut self.lock_state, ManDrop::new(new)); 38 // lock_stateはManuallyDropなので、以前の値を手動でdropする 39 unsafe { ManDrop::drop(&mut old) } 40 } 41}

lock_stateReleasedのとき

  • Mutexlock()Futureを得る
  • その際、Futureのライフタイムを'staticにするために、Mutexの参照を生ポインタに変換してからlock()を呼ぶようにする
  • lock_statusAcquiringに変更して、ループをもう一度回す

lock_stateAcquiringのとき

  • Futurepoll()を呼んでロックを取得しようとする
  • Poll::Pendingが返されたならリターンする
  • Poll::Readyが返されたなら、lock_stateLockedに変更し、保存しておいたFutureは削除する。ループをもう一度回す

lock_stateLockedのとき

  • MutexGuardから取り出したProgressInnerなどを引数にして、クロージャを呼ぶ
  • クロージャは非同期IOの処理を実行する
  • Poll::Pendingが返されたならリターンする
  • Poll::Readyが返されたなら、lock_stateReleasedに変更し、保存しておいたMutexGuardを削除することでロックを解除する。その後リターンする

AsyncWriteなどからは、以下のようにlock_and_then()を使います。

rust

1impl<T: AsyncWrite + Unpin + Send> AsyncWrite for Progress<T> { 2 fn poll_write( 3 mut self: Pin<&mut Self>, 4 cx: &mut Context<'_>, 5 buf: &[u8], 6 ) -> Poll<Result<usize>> { 7 self.lock_and_then(cx, |pin, cx, size| { 8 let poll = pin.poll_write(cx, buf); 9 if let Poll::Ready(Ok(n)) = poll { 10 *size += n as u64; 11 } 12 poll 13 }) 14 } 15 16 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 17 self.lock_and_then(cx, |pin, cx, _| pin.poll_flush(cx)) 18 }

AtomicU64を使用する場合

progress_testのコードは atomic-u64ブランチ にあります。

ProgressProgressInnerの役割を変更しました。

Progressは複数の非同期タスクから共有される進捗情報のみを持ちます。AtomicU64を使用しますのでMutexなどで守らなくてすみます。

rust

1// 非同期IOの進捗を表す型。複数の非同期タスクから共有されるのでAtomic系の型を使う 2pub struct Progress { 3 size: AtomicU64, 4}

AsyncWriteなどはProgressDecorator<T>が提供することにしました。こちらは一つの非同期タスクのみから使われることを前提にしており、複数の非同期タスクで共有することはできません。bufはこちらにあります。

rust

1// AsyncWriteなどを提供する型。一つの非同期タスクのみから使われることを前提にしている 2pub struct ProgressDecorator<T> { 3 buf: T, 4 progress: Arc<Progress>, 5}

ProgressDecoratorprogress()メソッドを呼ぶと、Arc<Progress>が得られる仕組みです。

rust

1impl<T> ProgressDecorator<T> { 2 pub fn progress(&self) -> Arc<Progress> { 3 Arc::clone(&self.progress) 4 } 5}

AsyncWritepoll_write()メソッドではProgressが持つ値(AtomicU64)を更新します。fetch_add()Ordering::Acquireを与えることで、カウントの正確さと実行効率の良さを両立させています。

rust

1impl<T: AsyncWrite + Unpin + Send> AsyncWrite for ProgressDecorator<T> { 2 fn poll_write( 3 mut self: Pin<&mut Self>, 4 cx: &mut Context<'_>, 5 buf: &[u8], 6 ) -> Poll<Result<usize>> { 7 let poll = Pin::new(&mut self.buf).poll_write(cx, buf); 8 if let Poll::Ready(Ok(n)) = poll { 9 // Acquire (Release-Acquire) は今回の用途には十分 10 // これに対応するマシン命令はx86系など多くのプロセッサに標準装備されており 11 // 効率よく実行できる 12 self.progress.size.fetch_add(n as u64, Ordering::Acquire); 13 } 14 poll 15 }

投稿2020/05/10 07:20

編集2020/05/11 02:37
tatsuya6502

総合スコア2046

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

techno-tanoC

2020/05/10 10:47

回答ありががとうございます。 迷惑だなんてとんでもないです。一人ではどうしようもなかったので以前の質問の回答が無ければ早々に挫折していたと思います。 Atomic を用いた実装も良さそうです。 i-poper さんの `tokio::sync::watch` を用いた実装とあわせて検討してみます。( Mutex の Future を保存する方法は正しく扱える自信が無いので一旦保留ということで… )
guest

0

i-poperさん、tatsuya6502 さんの回答にあるように、poll_write 内で取得したロックを Ready を返す前に解放してしまっているのが今回の現象が発生する原因だと考えられます。Progress::<T>::poll_write 開始時に取得したロックは、背後の I/O の使用を終えるまで(すなわち T::poll_writeReady を返すまで)生きている必要があり、それまでは MutexGuard のインスタンスを Progress 内で保持しておく必要があります。

ここでは、tatsuya6502 さんの tokio::sync::MutexGuard を保持する解法の別解として、Mutex 内で使用されている同期プリミティブである Semaphore を用いた方法を紹介します。MutexGuard を保持する場合とは異なり、unsafe なライフタイム消去を用いずに実装できるという利点があります。

まず、次のような排他ロック用のヘルパ型を用意します。本来であれば元コードとの差分が少なくなるよう Progress<T> の実装に組み込むべきですが、見通しを良くするため今回は別の型にしました。直感的に言うと、この型は Mutex<T>MutexGuard<'_, T> を組み合わせた役割を持ちます。

rust

1use futures::future::{BoxFuture, FutureExt as _}; 2use futures::task::{Context, Poll}; 3use std::cell::UnsafeCell; 4use std::io; 5use std::pin::Pin; 6use std::sync::Arc; 7use tokio::io::{AsyncRead, AsyncWrite, AsyncSeek}; 8use tokio::sync::{Semaphore, OwnedSemaphorePermit}; 9 10pub struct Lock<T> { 11 inner: Arc<LockInner<T>>, 12 lock_state: LockState, 13} 14 15struct LockInner<T> { 16 data: UnsafeCell<T>, 17 semaphore: Arc<Semaphore>, 18} 19 20enum LockState { 21 Released, 22 Acquiring(BoxFuture<'static, OwnedSemaphorePermit>), 23 Acquired(OwnedSemaphorePermit), 24} 25 26impl<T> Lock<T> { 27 pub fn new(data: T) -> Self { 28 Self { 29 inner: Arc::new(LockInner { 30 data: UnsafeCell::new(data), 31 semaphore: Arc::new(Semaphore::new(1)), // 排他ロックなので 1 32 }), 33 lock_state: LockState::Released, 34 } 35 } 36 37 /// リソースの排他ロックを獲得し、指定されたクロージャを実行する。 38 /// 獲得したロックは、f が Ready を返すまで保持される。 39 pub fn poll_with_lock<F, R>(self: Pin<&mut Self>, cx: &mut Context<'_>, f: F) 40 -> Poll<R> 41 where 42 F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<R>, 43 { 44 let me = self.get_mut(); 45 46 ready!(self.poll_acquire_lock(cx)); 47 48 let data = unsafe { Pin::new_unchecked(&mut *self.inner.data.get()) }; 49 let result = ready!(f(data, cx)); 50 51 self.release_lock(); 52 53 Poll::Ready(result) 54 } 55 56 fn poll_acquire_lock(&mut self, cx: &mut Context<'_>) -> Poll<()> { 57 loop { 58 self.lock_state = match self.lock_state { 59 LockState::Released => { 60 // セマフォからパーミットを獲得する。 61 // future が返ってくるので、ここでは lock_state を変更して次の状態に遷移する 62 let semaphore = self.inner.semaphore.clone(); 63 let future = semaphore.acquire_owned().boxed(); 64 LockState::Acquring(future) 65 }, 66 67 LockState::Acquring(ref mut future) => { 68 // パーミット待ちの future を駆動する 69 let permit = ready!(future.poll(cx)); 70 LockState::Acquired(permit) 71 }, 72 73 // パーミットを獲得済みなら何もしない 74 LockState::Acquired(..) => break Poll::Ready(()), 75 }; 76 } 77 } 78 79 fn release_lock(&mut self) { 80 self.lock_state = LockState::Released; 81 } 82} 83 84impl<T> Clone for Lock<T> { 85 fn clone(&self) -> Self { 86 Self { 87 inner: self.inner.clone(), 88 lock_state: LockState::Released, 89 } 90 } 91} 92 93unsafe impl<T: Send> Send for Lock<T> {} 94unsafe impl<T: Send + Sync> Sync for Lock<T> {}

T が I/O 関連のトレイトを実装していたときに Lock<T> を I/O として使えるようにするよう、そのための実装を追加しておきます。

rust

1impl<T: AsyncRead> AsyncRead for Lock<T> { 2 fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, dst: &mut [u8]) 3 -> Poll<io::Result<usize>> 4 { 5 // poll_with_lock 経由で呼び出す 6 self.poll_with_lock(cx, |data, cx| data.poll_read(cx, dst)) 7 } 8} 9 10impl<T: AsyncWrite> AsyncWrite for Lock<T> { 11 // ... omit ... 12} 13 14impl<T: AsyncSeek> AsyncSeek for Lock<T> { 15 // ... omit ... 16}

ロック周りの実装を Lock<T> にまとめたので、Progress<T> 自体の実装は次のように簡略化することができます(pin-project クレートの説明はここでは省略します)。

rust

1use pin_project::pin_project; 2use futures::task::{Context, Poll}; 3use std::io; 4use std::pin::Pin; 5use tokio::io::{AsyncRead, AsyncWrite, AsyncSeek}; 6 7#[pin_project] 8struct Progress<T> { 9 #[pin] 10 io: T, 11 amount: u64, 12} 13 14impl<T: AsyncRead> AsyncRead for Progress<T> { ... } 15impl<T: AsyncSeek> AsyncSeek for Progress<T> { ... } 16 17impl<T: AsyncWrite> AsyncWrite for Progress<T> { 18 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, src: &[u8]) 19 -> Poll<io::Result<usize>> 20 { 21 let me = self.project(); 22 let polled = me.io.poll_write(cx, src); 23 if let Poll::Ready(Ok(n)) = polled { 24 *me.amount += n as u64; 25 } 26 polled 27 } 28 ... 29}

あとは Lock::new(Progress::new(io::sink())) のように Lock<T> でラッピングしてあげることで使えるようになります。

投稿2020/05/10 13:48

ubnt-intrepid

総合スコア58

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

0

async/await辺りについて私も色々と調べているところで、まだ正確にはつかめてはいませんが、ある程度、目星がついたので回答します。

こちらのソースで言うと、progress.rsの97行目でMutexlock()を呼び出し、poll()でlockの獲得をしようとしていますが、これがPendingを返した場合に今回の現象が発生しているようです。

Pendingとなった場合、何らかのイベントで再度タスクを起こして(wake)もらわなければタスクは実行されません。
本来であれば他タスクが獲得しているMutexのlockが解除された時に起こされ、poll_write()が再度呼ばれるはずですが、呼ばれない状態になってしまっているようです。

progress.rsの97行目のlock().poll()の内部で作成されたtokio::sync::batch_semaphore::Acquireはlockの解除待ちリストにwakerを登録します。Acquiredrop時にその待ちリストからwakerを削除しています。
Acquireの寿命は長くてもpoll_write()の範囲内なので、他のタスクでlockが獲得されてから解除されるまでの間にpoll_write()Pendingで終わり、解除待ちリストから削除され起こされる契機がなくなってしまったのではないかと思います。

下手するとビジーウエイトになってしまうので正しい解ではありませんが、progress.rsの110行目の後ろに、

cx.waker().clone().wake();

と入れると途中で停止すること無く正常に動作しました。

[追記]
別案として、この用途であればtokio::sync::watchを使うのが良いかなと思いました。以下参照ください。
progress_test
フォーマットしてしまったので差分がわかりにくいですが、poll_write()でダウンロードの状態が変わったらwatch::Sender.broadcast()で通知しています。
ダウンロードキャンセルは、逆にUIスレッド側?からwatch::Sender.broadcast()で通知します。
poll_write()での参照はrecv()だとawaitが必要になってしまうので、watch::Receiver.borrow()で参照します。

投稿2020/05/08 14:53

編集2020/05/09 06:41
i-poper

総合スコア12

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

techno-tanoC

2020/05/10 10:39

回答ありがとうございます。 `Mutex::lock()` の `Future` が破棄されたことで poll されなくなってしまっていたのですね。言われてみれば理解できるのですが、気付けませんでした。 `tokio::sync::watch` を用いた実装、良さそうです。 tatsuya6502 さんの Atomic を使った実装とあわせて検討してみます。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.35%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問