前提・実現したいこと
マルチスレッドで共有できる書き込まれたバイト数を保持しながら書き込みを行う型を実装したいです。そこで struct Progress<T>(std::sync::Arc<tokio::sync::Mutex<(T, usize)>>)
のような型に AsyncWrite
を実装すると上手く抽象化できると考えました。
あるタスクで書き込みを行い、他のタスクから進捗を見る感じです。
tokio::io::AsyncWrite - Rust
tokio::sync::Mutex - Rust
std::sync::Mutex - Rust
発生している問題
tokio::sync::Mutex
はロックを取得するのに mutex.lock().await
する必要がありますが、 await
は AsyncWrite::poll_write
の中では使えないので困っています。上手く実装を与えることはできないのでしょうか?(追記した try_lock
を使った実装は適切なのでしょうか? )
後述の通り、代わりに std::sync::Mutex
を用いた実装はできたので tokio::sync::Mutex
が使えない場合はこちらを使おうと思うのですが、その場合のパフォーマンスへの影響はどのようなものでしょうか?
上記2点についてご教示いただけると幸いです。
該当のソースコード
rust
1use std::pin::Pin; 2use std::sync::Arc; 3use std::task::{Poll, Context}; 4use tokio::io::{AsyncWrite, Error}; 5use tokio::sync::Mutex; 6 7#[derive(Debug)] 8struct ProgressInner<T> { 9 inner: T, 10 size: usize, 11} 12 13#[derive(Debug)] 14struct Progress<T>(Arc<Mutex<ProgressInner<T>>>); 15 16impl<T: AsyncWrite + Unpin> AsyncWrite for Progress<T> { 17 fn poll_write( 18 self: Pin<&mut Self>, 19 cx: &mut Context, 20 buf: &[u8] 21 ) -> Poll<Result<usize, Error>> { 22 todo!() 23 } 24 25 fn poll_flush( 26 self: Pin<&mut Self>, 27 cx: &mut Context, 28 ) -> Poll<Result<(), Error>> { 29 todo!() 30 } 31 32 fn poll_shutdown( 33 self: Pin<&mut Self>, 34 cx: &mut Context 35 ) -> Poll<Result<(), Error>> { 36 todo!() 37 } 38}
試したこと
tokio::sync::Mutex
ではなく、 std::sync::Mutex
を使ったものは実装できました。
rust
1use std::pin::Pin; 2use std::sync::{Arc, Mutex}; 3use std::task::{Poll, Context}; 4use tokio::io::{AsyncWrite, Error}; 5 6#[derive(Debug)] 7struct ProgressInner<T> { 8 inner: T, 9 size: usize, 10} 11 12#[derive(Debug)] 13struct Progress<T>(Arc<Mutex<ProgressInner<T>>>); 14 15impl<T: AsyncWrite + Unpin> AsyncWrite for Progress<T> { 16 fn poll_write( 17 self: Pin<&mut Self>, 18 cx: &mut Context, 19 buf: &[u8] 20 ) -> Poll<Result<usize, Error>> { 21 let mut s = self.0.lock().unwrap(); 22 let poll = Pin::new(&mut s.inner).poll_write(cx, buf); 23 if let Poll::Ready(Ok(n)) = poll { 24 s.size += n; 25 } 26 poll 27 } 28 29 fn poll_flush( 30 self: Pin<&mut Self>, 31 cx: &mut Context, 32 ) -> Poll<Result<(), Error>> { 33 let mut s = self.0.lock().unwrap(); 34 Pin::new(&mut s.inner).poll_flush(cx) 35 } 36 37 fn poll_shutdown( 38 self: Pin<&mut Self>, 39 cx: &mut Context 40 ) -> Poll<Result<(), Error>> { 41 let mut s = self.0.lock().unwrap(); 42 Pin::new(&mut s.inner).poll_shutdown(cx) 43 } 44}
追記 2020-03-17 01:20
tokio::sync::Mutex::try_lock()
を使うことで実装できました(これが適切な方針なのかは自信が無いのでアドバイスお願いします)
rust
1impl<T: AsyncWrite + Unpin> AsyncWrite for Progress<T> { 2 fn poll_write( 3 self: Pin<&mut Self>, 4 cx: &mut Context, 5 buf: &[u8] 6 ) -> Poll<Result<usize, Error>> { 7 match self.0.try_lock() { 8 Ok(mut s) => { 9 let poll = Pin::new(&mut s.inner).poll_write(cx, buf); 10 if let Poll::Ready(Ok(n)) = poll { 11 s.size += n; 12 } 13 poll 14 }, 15 Err(_e) => { 16 Poll::Pending 17 } 18 } 19 } 20 21 // poll_flush, poll_shutdown 略 22}
補足情報(FW/ツールのバージョンなど)
rustc 1.41.1
tokio 0.2.13
回答1件
あなたの回答
tips
プレビュー
バッドをするには、ログインかつ
こちらの条件を満たす必要があります。
2020/03/17 14:26
2020/03/20 11:56
2020/03/20 12:04
2020/05/03 14:37 編集
2020/05/04 01:20