質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.35%
Rust

Rustは、MoFoが支援するプログラミング言語。高速性を維持しつつも、メモリ管理を安全に行うことが可能な言語です。同じコンパイル言語であるC言語やC++では困難だったマルチスレッドを実装しやすく、並行性という点においても優れています。

Q&A

解決済

1回答

1230閲覧

tokio::sync::Mutex を使った型に AsyncWrite を実装したい

techno-tanoC

総合スコア24

Rust

Rustは、MoFoが支援するプログラミング言語。高速性を維持しつつも、メモリ管理を安全に行うことが可能な言語です。同じコンパイル言語であるC言語やC++では困難だったマルチスレッドを実装しやすく、並行性という点においても優れています。

0グッド

0クリップ

投稿2020/03/16 13:31

編集2020/03/16 16:19

前提・実現したいこと

マルチスレッドで共有できる書き込まれたバイト数を保持しながら書き込みを行う型を実装したいです。そこで struct Progress<T>(std::sync::Arc<tokio::sync::Mutex<(T, usize)>>) のような型に AsyncWrite を実装すると上手く抽象化できると考えました。

あるタスクで書き込みを行い、他のタスクから進捗を見る感じです。

tokio::io::AsyncWrite - Rust
tokio::sync::Mutex - Rust
std::sync::Mutex - Rust

発生している問題

tokio::sync::Mutex はロックを取得するのに mutex.lock().await する必要がありますが、 awaitAsyncWrite::poll_write の中では使えないので困っています。上手く実装を与えることはできないのでしょうか?(追記した try_lock を使った実装は適切なのでしょうか? )

後述の通り、代わりに std::sync::Mutex を用いた実装はできたので tokio::sync::Mutex が使えない場合はこちらを使おうと思うのですが、その場合のパフォーマンスへの影響はどのようなものでしょうか?

上記2点についてご教示いただけると幸いです。

該当のソースコード

rust

1use std::pin::Pin; 2use std::sync::Arc; 3use std::task::{Poll, Context}; 4use tokio::io::{AsyncWrite, Error}; 5use tokio::sync::Mutex; 6 7#[derive(Debug)] 8struct ProgressInner<T> { 9 inner: T, 10 size: usize, 11} 12 13#[derive(Debug)] 14struct Progress<T>(Arc<Mutex<ProgressInner<T>>>); 15 16impl<T: AsyncWrite + Unpin> AsyncWrite for Progress<T> { 17 fn poll_write( 18 self: Pin<&mut Self>, 19 cx: &mut Context, 20 buf: &[u8] 21 ) -> Poll<Result<usize, Error>> { 22 todo!() 23 } 24 25 fn poll_flush( 26 self: Pin<&mut Self>, 27 cx: &mut Context, 28 ) -> Poll<Result<(), Error>> { 29 todo!() 30 } 31 32 fn poll_shutdown( 33 self: Pin<&mut Self>, 34 cx: &mut Context 35 ) -> Poll<Result<(), Error>> { 36 todo!() 37 } 38}

試したこと

tokio::sync::Mutex ではなく、 std::sync::Mutex を使ったものは実装できました。

rust

1use std::pin::Pin; 2use std::sync::{Arc, Mutex}; 3use std::task::{Poll, Context}; 4use tokio::io::{AsyncWrite, Error}; 5 6#[derive(Debug)] 7struct ProgressInner<T> { 8 inner: T, 9 size: usize, 10} 11 12#[derive(Debug)] 13struct Progress<T>(Arc<Mutex<ProgressInner<T>>>); 14 15impl<T: AsyncWrite + Unpin> AsyncWrite for Progress<T> { 16 fn poll_write( 17 self: Pin<&mut Self>, 18 cx: &mut Context, 19 buf: &[u8] 20 ) -> Poll<Result<usize, Error>> { 21 let mut s = self.0.lock().unwrap(); 22 let poll = Pin::new(&mut s.inner).poll_write(cx, buf); 23 if let Poll::Ready(Ok(n)) = poll { 24 s.size += n; 25 } 26 poll 27 } 28 29 fn poll_flush( 30 self: Pin<&mut Self>, 31 cx: &mut Context, 32 ) -> Poll<Result<(), Error>> { 33 let mut s = self.0.lock().unwrap(); 34 Pin::new(&mut s.inner).poll_flush(cx) 35 } 36 37 fn poll_shutdown( 38 self: Pin<&mut Self>, 39 cx: &mut Context 40 ) -> Poll<Result<(), Error>> { 41 let mut s = self.0.lock().unwrap(); 42 Pin::new(&mut s.inner).poll_shutdown(cx) 43 } 44}

追記 2020-03-17 01:20

tokio::sync::Mutex::try_lock() を使うことで実装できました(これが適切な方針なのかは自信が無いのでアドバイスお願いします)

rust

1impl<T: AsyncWrite + Unpin> AsyncWrite for Progress<T> { 2 fn poll_write( 3 self: Pin<&mut Self>, 4 cx: &mut Context, 5 buf: &[u8] 6 ) -> Poll<Result<usize, Error>> { 7 match self.0.try_lock() { 8 Ok(mut s) => { 9 let poll = Pin::new(&mut s.inner).poll_write(cx, buf); 10 if let Poll::Ready(Ok(n)) = poll { 11 s.size += n; 12 } 13 poll 14 }, 15 Err(_e) => { 16 Poll::Pending 17 } 18 } 19 } 20 21 // poll_flush, poll_shutdown 略 22}

補足情報(FW/ツールのバージョンなど)

rustc 1.41.1
tokio 0.2.13

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

回答1

0

ベストアンサー

tokio::sync::Mutex はロックを取得するのに mutex.lock().await する必要がありますが、 awaitAsyncWrite::poll_write の中では使えないので困っています。上手く実装を与えることはできないのでしょうか?

async fnの戻り値型がTのとき、その関数を呼ぶとimpl Future<Output = T>の値が返されます。ですから.awaitが使えない状況でも、Futurepoll()メソッドを呼ぶことでFutureを駆動できます。

poll_write()の実装を以下のようにしたところコンパイルは通りました。ただし、私はFutureトレイトやAsyncWriteトレイトを実装するのは不慣れですので、これで正しく動くという自信はありません。動かないようでしたら調べますので教えてください。

rust

1use futures::future::FutureExt; // Cargo.toml: futures = "0.3" 2 3// トレイト境界にSendを追加(FutureExtトレイトのboxed()メソッドが要求するため) 4impl<'a, T: AsyncWrite + Unpin + Send> AsyncWrite for Progress<T> { 5 fn poll_write( 6 self: Pin<&mut Self>, 7 cx: &mut Context, 8 buf: &[u8], 9 ) -> Poll<Result<usize, Error>> { 10 // Mutexのlock()メソッドから返されたFutureを駆動するためにpoll() 11 // メソッドを呼ぶ。poll()のレシーバーはPin<&mut self>。 12 // lock()はimpl Future<Output = MutexGuard<'_, T>>を返すので 13 // FutureExtトレイトのboxed()でPin<Box<dyn Future<Output = ..>>に 14 // 変換し、as_mut()でPin<&mut dyn Future<Output = ..>>に変換すればよい。 15 match self.0.lock().boxed().as_mut().poll(cx) { 16 Poll::Pending => Poll::Pending, 17 Poll::Ready(mut s) => { 18 let poll = Pin::new(&mut s.inner).poll_write(cx, buf); 19 if let Poll::Ready(Ok(n)) = poll { 20 s.size += n; 21 } 22 poll 23 } 24 } 25 }

後述の通り、代わりに std::sync::Mutex を用いた実装はできたので tokio::sync::Mutex が使えない場合はこちらを使おうと思うのですが、その場合のパフォーマンスへの影響はどのようなものでしょうか?

tokioのRuntimeのようなマルチスレッドな非同期ランタイムでは、非同期タスクに対応していないMutexは使えないようです。こちら の説明によると、デッドロックを引き起こす恐れがあるそうです。

(訂正)参照先の説明をもう一度読んだところ、デッドロックを引き起こす可能性があるのは、複数の.awaitにまたがってロックを使用するときだけだとわかりました。今回の使いかたは、それには該当しないはずです。ただデッドロックは起こらなくても、stdのMutexlock()はブロッキングな操作ですので、非同期タスク内から呼ぶと非同期ランタイムのexecutorスレッドの実行を一時的にブロックします。ですから性能面の影響は多少あるかもしれません。それほど長い時間はブロックしなさそうなので、無視できる範囲かもしれませんが。

追記

「追記 2020-03-17 01:20」についてですが、tokio::sync::Mutex::try_lock()だとダメそうな気がします。

Mutexのロックに失敗してpoll_write()からPoll::Pendingを返したとします。その後、ロックできる状態になったら、非同期ランタイムから再度poll_write()を呼んでもらわないといけません。そのためには前回のpoll_write()で引数にとったContext内にあるWakerwake()メソッドを呼ぶことでランタイムに通知する必要があります。

質問の追記部分にあるコードですと、wake()メソッドを呼ぶことがないので、ランタイムがpoll_write()を呼んでくれなくなってしまい、非同期タスクが前に進みません。

回答に書いたコードではlock()が返したFuturepoll()Contextを引数として渡しています。こうすることで、ロックできる状態になったときに、tokioのMutexの実装側からwake()を呼んでもらえます。

Wakerの働きについては公式のAsync Bookの この章 が参考になると思います。

投稿2020/03/16 17:26

編集2020/03/16 23:03
tatsuya6502

総合スコア2046

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

techno-tanoC

2020/03/17 14:26

回答ありがとうございます!動作を確認出来次第再度報告させていただきます。
techno-tanoC

2020/03/20 11:56

動作確認したところ問題なく動作しました。 改めてありがとうございました! future, async/await について学習を進めようと思います。
tatsuya6502

2020/03/20 12:04

ご報告ありがとうございました。動いてよかったです。
tatsuya6502

2020/05/04 01:20

ご連絡ありがとうざいます。to_item と poll_write を中心にコードを眺めてみたのですが、どこが悪いのかわかりませんでした。少し時間ができたら実際に動作させてデバッグしてみようと思います。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.35%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問