質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.48%
非同期処理

非同期処理とは一部のコードを別々のスレッドで実行させる手法です。アプリケーションのパフォーマンスを向上させる目的でこの手法を用います。

Rust

Rustは、MoFoが支援するプログラミング言語。高速性を維持しつつも、メモリ管理を安全に行うことが可能な言語です。同じコンパイル言語であるC言語やC++では困難だったマルチスレッドを実装しやすく、並行性という点においても優れています。

Q&A

解決済

2回答

2554閲覧

Rust:複数スレッドで発生する非同期処理をあとでまとめてawaitしたい

bitt

総合スコア3

非同期処理

非同期処理とは一部のコードを別々のスレッドで実行させる手法です。アプリケーションのパフォーマンスを向上させる目的でこの手法を用います。

Rust

Rustは、MoFoが支援するプログラミング言語。高速性を維持しつつも、メモリ管理を安全に行うことが可能な言語です。同じコンパイル言語であるC言語やC++では困難だったマルチスレッドを実装しやすく、並行性という点においても優れています。

0グッド

0クリップ

投稿2021/04/19 04:22

rustの非同期処理についてです。javascriptでいうPromise.allをrustでもやろうと思い、以下のコードを書きました。

Rust

1#[tokio::main] 2async fn main() { 3 let mut process_list = vec![]; 4 5 let process_a = tokio::spawn(async { 6 //処理 7 }); 8 process_list.push(process_a); 9 10 //色々処理 11 12 let process_b = tokio::spawn(async { 13 //処理 14 }); 15 process_list.push(process_b); 16 17 //色々処理 18 19 for process in process_list { 20 process.await.unwrap(); 21 } 22}

このコードは上手くいってるのですが、process_listを複数スレッドで共有しようと思い、以下のようにprocess_listをArc<Mutex<Vec<tokio::task::JoinHandle<()>>>>としてみたのですがうまくいきません。

Rust

1use std::sync::{Arc, Mutex}; 2 3#[tokio::main] 4async fn main() { 5 let process_list = Arc::new(Mutex::new(vec![])); 6 let process_list_cloned = process_list.clone(); 7 8 9 let process_a = tokio::spawn(async { 10 //処理 11 }); 12 let mut process_list_locked = process_list_cloned.lock().unwrap(); 13 process_list_locked.push(process_a); 14 15 //色々処理 16 17 let process_b = tokio::spawn(async { 18 //処理 19 }); 20 let mut process_list_locked = process_list_cloned.lock().unwrap(); 21 process_list_locked.push(process_b); 22 23 //色々処理 24 25 let process_list_locked = process_list_cloned.lock().unwrap(); 26 for process in process_list_locked.into_iter() { 27 process.await.unwrap(); 28 } 29}
error[E0507]: cannot move out of dereference of `std::sync::MutexGuard<'_, Vec<tokio::task::JoinHandle<()>>>` --> src/main.rs:69:20 | 69 | for process in process_list_locked.into_iter() { | ^^^^^^^^^^^^^^^^^^^ move occurs because value has type `Vec<tokio::task::JoinHandle<()>>`, which does not implement the `Copy` trait

ここで質問なのですが、
1.そもそもやり方は間違えていないか(この方法が根本的に間違っていないか)
2.あっているのであれば、どのようにしたら動くか
3.自分がここで詰まらないために勉強しておくべきだったもの

上手く質問できていないかもしれませんがすみません。

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

回答2

0

ベストアンサー

こんにちは。

まずマルチスレッド版のエラーについてです。これはロックの中身である Vec をムーブ(元々あった Vec を無効化)しようとしています。これだと共有している他のスレッドからもデータが触れなくなってしまうのでエラーになってしまっています。 Vec はそのままに、 Vec の中身だけ取り出すような書き方をすればコンパイルが通ります。

rust

1use std::sync::{Arc, Mutex}; 2 3#[tokio::main] 4async fn main() { 5 let process_list = Arc::new(Mutex::new(vec![])); 6 let process_list_cloned = process_list.clone(); 7 8 // ロックはスコープの末尾まで有効なのでデッドロックを防ぐためにスコープを小さくする 9 { 10 let process_a = tokio::spawn(async { 11 //処理 12 }); 13 let mut process_list_locked = process_list.lock().unwrap(); 14 process_list_locked.push(process_a); 15 //色々処理 16 } 17 { 18 let process_b = tokio::spawn(async { 19 //処理 20 }); 21 let mut process_list_locked = process_list.lock().unwrap(); 22 process_list_locked.push(process_b); 23 //色々処理 24 } 25 let mut process_list_locked = process_list_cloned.lock().unwrap(); 26 // drainでVectの中身のデータのみ取り出す。 `..` はFull Rangeのリテラル 27 for process in process_list_locked.drain(..) { 28 process.await.unwrap(); 29 } 30} 31

この上で1, 2, 3に答えます。

1 やり方は間違えていないか

やりたいこと次第ですが、JavaScriptでいうPromise.allをやりたいなら間違えている可能性が高いです。 RustでPromise.allに対応するものはfuturesクレートjoin_allです。for 式内で await をするとfutureを1つ1つ順番に待ちますが、 join_all を使うと同時に待ちます。

ちょっとコードを書いて実験してみましょう。タスクのあとに sleep をつけて tokio::spawn(async {}).then(sleep) とします。それを for 式で待つのと join_all で待つのを比べてみます。

rust

1use futures::prelude::*; 2use std::time::{Duration, Instant}; 3use tokio::task::JoinError; 4 5async fn sleep(_: Result<(), JoinError>) { 6 tokio::time::sleep(Duration::from_secs(1)).await 7} 8 9#[tokio::main] 10async fn main() { 11 println!("waiting with `for`"); 12 let start = Instant::now(); 13 { 14 let mut process_list = Vec::new(); 15 16 let process_a = tokio::spawn(async {}).then(sleep); 17 process_list.push(process_a); 18 //色々処理 19 let process_b = tokio::spawn(async {}).then(sleep); 20 process_list.push(process_b); 21 //色々処理 22 for process in process_list { 23 process.await; 24 } 25 } 26 let end = Instant::now(); 27 println!("time elapsed: {}ms", (end - start).as_millis()); 28 29 println!("waiting with `join_all`"); 30 let start = Instant::now(); 31 { 32 let mut process_list = Vec::new(); 33 34 let process_a = tokio::spawn(async {}).then(sleep); 35 process_list.push(process_a); 36 //色々処理 37 let process_b = tokio::spawn(async {}).then(sleep); 38 process_list.push(process_b); 39 //色々処理 40 futures::future::join_all(process_list).await; 41 } 42 let end = Instant::now(); 43 println!("time elapsed: {}ms", (end - start).as_millis()); 44}

これを走らせると、 for 式で待つ方は1つ目の sleep が終わったあとに次の sleep が走ってそれを待つので2秒かかります。一方 join_all の方は2つの sleep が同時に走って同時に待つので1秒で終わります。

shell

1$ cargo run 2 Finished dev [unoptimized + debuginfo] target(s) in 0.01s 3 Running `target/debug/teratail-tokio` 4waiting with `for` 5time elapsed: 2002ms 6waiting with `join_all` 7time elapsed: 1001ms

なので十中八九 for 式で待つよりも join_all で待つ方がやりたいことでしょう。ただし、今回のコードに限っていえば tokio::spawn でタスクを裏で実行するようにしているので for 式で await してもほぼ同じことになります。もしそれを分かって書いているのであれば意図通りですが、恐らくは意図せずたまたま動いた類いのコードじゃないかなと思います。

2 あっているのであれば、どのようにしたら動くか

上記のとおりPromise.allのようなことをしたいのであればお題の並行版のコードは以下のように書くのが正しいです。

rust

1 // 略 2 3 // drainでVectの中身のデータのみ取り出す。 `..` はFull Rangeのリテラル 4 futures::future::join_all(process_list_locked.drain(..)).await;

もし、futureを1つ1つ await したいというのであればロックを使わずにチャネルを使うこともできます。Vec のように要素を入れ替えたり削除したりは難しいですが今回のように単純に push して1つ1つ取り出す目的であれば Arc<Mutex<Vec<T>>> よりも適しています。

rust

1use std::sync::mpsc::channel; 2 3#[tokio::main] 4async fn main() { 5 let (sender, receiver) = channel(); 6 7 let process_a = tokio::spawn(async { 8 //処理 9 }); 10 sender.send(process_a).unwrap(); 11 //色々処理 12 13 let process_b = tokio::spawn(async { 14 //処理 15 }); 16 sender.send(process_b).unwrap(); 17 //色々処理 18 19 // senderをdropすることでchannelを終わらせる 20 // そうしないと `for` 式が終わらない 21 drop(sender); 22 23 for process in receiver { 24 process.await.unwrap(); 25 } 26}

以上、join_all を使う方法と channel を使う方法を紹介しました。 channeljoin_all を使うこともできます。

3 自分がここで詰まらないために勉強しておくべきだったもの

最初の所有権の問題に関しては正直なところこれを勉強していれば防げたというものは私は思い付きません。「データを保持するデータ型はデータ型そのもののムーブと中身のムーブの区別をつける」という一般論を覚えてあとは慣れるくらいでしょうか。

for 式と join_all の違いについては非同期の一般論とRust特有の挙動が絡みます。async book である程度カバーされているのでそちらを一読するとよいかもしれません。

チャネルについては公式ドキュメントで解説されています:メッセージ受け渡しを使ってスレッド間でデータを転送する - The Rust Programming Language 日本語版

Rustはドキュメントの数も量も多くて全て目を通してから書き始めるのは難しいかもしれませんが困ったときは目次だけでも見返してみるといいかもしれません。困ることがあってもこういう質問サイトで質問できるので行き詰まったときは気負わずいつでも質問して下さい。

投稿2021/04/19 15:15

blackenedgold

総合スコア468

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

bitt

2021/04/19 18:10

回答ありがとうございます! forでawaitやってるのは代用だったのですが、drainで中身を取り出しつつ、join_allでやることでやりたいことが実現できるんですね、ありがとうございます。 またこの目的ならchannelの方がきれいに書けるんですね、こちらで書き換えようと思います。
guest

0

1.そもそもやり方は間違えていないか(この方法が根本的に間違っていないか)

スレッド関連のコードが例示されていないので判断が難しいのですが、根本的に間違っていることはなさそうです。

2.あっているのであれば、どのようにしたら動くか

ご質問のコードには2点問題があります。まずエラーになっているところから解決しましょう。

console

1error[E0507]: cannot move out of dereference of `std::sync::MutexGuard<'_, Vec<tokio::task::JoinHandle<()>>>` 2 --> src/main.rs:69:20 3 | 469 | for process in process_list_locked.into_iter() { 5 | ^^^^^^^^^^^^^^^^^^^ move occurs because value has type `Vec<tokio::task::JoinHandle<()>>`, which does not implement the `Copy` trait

Vecinto_iter()Vec<_>の各要素の所有権を取ろうとしています。しかし、Mutexに入れたものは普通にアクセスしたのでは所有権が取れません。そのためにエラーになっています。

一方、Mutexに入れたものについて&&mutなら簡単に得られます。.await&mutを要求するので、それを渡してあげましょう。Veciter_mut()を使います。

rust

1 // &mut Vec<_>を得るためにはまずMutexGuardにmutが必要 2 let mut process_list_locked = process_list_cloned.lock().unwrap(); 3 // Iteratorのiter_mut()で、&mut Vec<_>から、その要素に対する&mutを得る 4 for process in process_list_locked.iter_mut() { 5 process.await.unwrap(); 6 }

これでエラーが解消します。

なお、futuresクレートjoin_allという関数 があって普段は便利なのですが今回のコードでは使えません。なぜならjoin_allは所有権を取るタイプのイテレーターが作れる型(IntoIteratorトレイトを実装した型)を引数に要求するからです。

もう一つの問題は実行時のデッドロックです。ご質問のコードはMutexのロックを計3回取得することになっていますが、前のロックが解除される前に次のロックを取得しようとしています。その場合にどういう動作をするのかはMutexの設計しだいなのですが、標準ライブラリーのMutexはデッドロックする仕様となっています。

Mutexのロックはそれに束縛されている変数がスコープを外れたときに解除されます。ですから以下のように{ }を使って変数のスコープを狭めてあげれば、適切なタイミングでロックが解除されるようになり、デッドロックが起こらなくなります。

rust

1#[tokio::main] 2async fn main() { 3 let process_list = Arc::new(Mutex::new(vec![])); 4 let process_list_cloned = process_list.clone(); 5 6 { 7 let process_a = tokio::spawn(async { 8 //処理 9 }); 10 let mut process_list_locked = process_list_cloned.lock().unwrap(); 11 process_list_locked.push(process_a); 12 } // ここでロックが解除される 13 14 //色々処理 15 16 { 17 let process_b = tokio::spawn(async { 18 //処理 19 }); 20 let mut process_list_locked = process_list_cloned.lock().unwrap(); 21 process_list_locked.push(process_b); 22 } // ここでロックが解除される 23 24 //色々処理 25 26 let mut process_list_locked = process_list_cloned.lock().unwrap(); 27 for process in process_list_locked.iter_mut() { 28 process.await.unwrap(); 29 }

3.自分がここで詰まらないために勉強しておくべきだったもの

答えるのが難しい質問ですね。具体的にどういう理由で詰まったのかによって違う答えになりそうです。

例えば以下のような感じでしょうか。

エラーメッセージの意味がわからない

  • → コンパイルエラーのドキュメントを読んでみる。Rust Compiler Error Index E0507

move outなど所有権まわりがわからない

  • → 公式ドキュメント(The Rust Programming Language)の所有権の章(和訳)あたりか、Rustの入門書の該当の章をじっくり読んでみる

Veciter_mutがあることを知らなかった

  • Vecリファレンス に何か使えそうなメソッドがないか眺める
  • iterのモジュールレベルのドキュメントを読んでみる。( 参照をイテレートする方法 なども書かれており、そこでiter_mutが出てくる)

投稿2021/04/19 14:17

編集2021/04/19 14:35
tatsuya6502

総合スコア2035

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

bitt

2021/04/19 17:47

回答ありがとうございます! なるほど、参照でも書き換え可能な&mutであればそのままawaitできるんですね、勉強になります。 mutexとかよく知らずに使っていたのですが、ロック状態は他が.lock()を呼び出すと自動で外れるのかと思ってたのですがそうではないのですね、勘違いしてました。 思ったより原因が多かったです…ドキュメントとか調べて読むのはよくても大量に読むのは苦手な人間なので、書いてみて止まったら調べるのを繰り返してるのですが、これからはちゃんと読むようにします。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.48%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問