こんにちは。
まずマルチスレッド版のエラーについてです。これはロックの中身である Vec
をムーブ(元々あった Vec
を無効化)しようとしています。これだと共有している他のスレッドからもデータが触れなくなってしまうのでエラーになってしまっています。 Vec
はそのままに、 Vec
の中身だけ取り出すような書き方をすればコンパイルが通ります。
rust
1 use std :: sync :: { Arc , Mutex } ;
2
3 #[tokio::main]
4 async fn main ( ) {
5 let process_list = Arc :: new ( Mutex :: new ( vec! [ ] ) ) ;
6 let process_list_cloned = process_list . clone ( ) ;
7
8 // ロックはスコープの末尾まで有効なのでデッドロックを防ぐためにスコープを小さくする
9 {
10 let process_a = tokio :: spawn ( async {
11 //処理
12 } ) ;
13 let mut process_list_locked = process_list . lock ( ) . unwrap ( ) ;
14 process_list_locked . push ( process_a ) ;
15 //色々処理
16 }
17 {
18 let process_b = tokio :: spawn ( async {
19 //処理
20 } ) ;
21 let mut process_list_locked = process_list . lock ( ) . unwrap ( ) ;
22 process_list_locked . push ( process_b ) ;
23 //色々処理
24 }
25 let mut process_list_locked = process_list_cloned . lock ( ) . unwrap ( ) ;
26 // drainでVectの中身のデータのみ取り出す。 `..` はFull Rangeのリテラル
27 for process in process_list_locked . drain ( .. ) {
28 process . await . unwrap ( ) ;
29 }
30 }
31
この上で1, 2, 3に答えます。
1 やり方は間違えていないか
やりたいこと次第ですが、JavaScriptでいうPromise.allをやりたいなら間違えている可能性が高いです。 RustでPromise.allに対応するものはfuturesクレート のjoin_all です。for
式内で await
をするとfutureを1つ1つ順番に待ちますが、 join_all
を使うと同時に待ちます。
ちょっとコードを書いて実験してみましょう。タスクのあとに sleep
をつけて tokio::spawn(async {}).then(sleep)
とします。それを for
式で待つのと join_all
で待つのを比べてみます。
rust
1 use futures :: prelude :: * ;
2 use std :: time :: { Duration , Instant } ;
3 use tokio :: task :: JoinError ;
4
5 async fn sleep ( _ : Result < ( ) , JoinError > ) {
6 tokio :: time :: sleep ( Duration :: from_secs ( 1 ) ) . await
7 }
8
9 #[tokio::main]
10 async fn main ( ) {
11 println! ( "waiting with `for`" ) ;
12 let start = Instant :: now ( ) ;
13 {
14 let mut process_list = Vec :: new ( ) ;
15
16 let process_a = tokio :: spawn ( async { } ) . then ( sleep ) ;
17 process_list . push ( process_a ) ;
18 //色々処理
19 let process_b = tokio :: spawn ( async { } ) . then ( sleep ) ;
20 process_list . push ( process_b ) ;
21 //色々処理
22 for process in process_list {
23 process . await ;
24 }
25 }
26 let end = Instant :: now ( ) ;
27 println! ( "time elapsed: {}ms" , ( end - start ) . as_millis ( ) ) ;
28
29 println! ( "waiting with `join_all`" ) ;
30 let start = Instant :: now ( ) ;
31 {
32 let mut process_list = Vec :: new ( ) ;
33
34 let process_a = tokio :: spawn ( async { } ) . then ( sleep ) ;
35 process_list . push ( process_a ) ;
36 //色々処理
37 let process_b = tokio :: spawn ( async { } ) . then ( sleep ) ;
38 process_list . push ( process_b ) ;
39 //色々処理
40 futures :: future :: join_all ( process_list ) . await ;
41 }
42 let end = Instant :: now ( ) ;
43 println! ( "time elapsed: {}ms" , ( end - start ) . as_millis ( ) ) ;
44 }
これを走らせると、 for
式で待つ方は1つ目の sleep
が終わったあとに次の sleep
が走ってそれを待つので2秒かかります。一方 join_all
の方は2つの sleep
が同時に走って同時に待つので1秒で終わります。
shell
1 $ cargo run
2 Finished dev [ unoptimized + debuginfo ] target ( s ) in 0 .01s
3 Running ` target/debug/teratail-tokio `
4 waiting with ` for `
5 time elapsed: 2002ms
6 waiting with ` join_all `
7 time elapsed: 1001ms
なので十中八九 for
式で待つよりも join_all
で待つ方がやりたいことでしょう。ただし、今回のコードに限っていえば tokio::spawn
でタスクを裏で実行するようにしているので for
式で await
してもほぼ同じことになります。もしそれを分かって書いているのであれば意図通りですが、恐らくは意図せずたまたま動いた類いのコードじゃないかなと思います。
2 あっているのであれば、どのようにしたら動くか
上記のとおりPromise.allのようなことをしたいのであればお題の並行版のコードは以下のように書くのが正しいです。
rust
1 // 略
2
3 // drainでVectの中身のデータのみ取り出す。 `..` はFull Rangeのリテラル
4 futures :: future :: join_all ( process_list_locked . drain ( .. ) ) . await ;
もし、futureを1つ1つ await
したいというのであればロックを使わずにチャネル を使うこともできます。Vec
のように要素を入れ替えたり削除したりは難しいですが今回のように単純に push
して1つ1つ取り出す目的であれば Arc<Mutex<Vec<T>>>
よりも適しています。
rust
1 use std :: sync :: mpsc :: channel ;
2
3 #[tokio::main]
4 async fn main ( ) {
5 let ( sender , receiver ) = channel ( ) ;
6
7 let process_a = tokio :: spawn ( async {
8 //処理
9 } ) ;
10 sender . send ( process_a ) . unwrap ( ) ;
11 //色々処理
12
13 let process_b = tokio :: spawn ( async {
14 //処理
15 } ) ;
16 sender . send ( process_b ) . unwrap ( ) ;
17 //色々処理
18
19 // senderをdropすることでchannelを終わらせる
20 // そうしないと `for` 式が終わらない
21 drop ( sender ) ;
22
23 for process in receiver {
24 process . await . unwrap ( ) ;
25 }
26 }
以上、join_all
を使う方法と channel
を使う方法を紹介しました。 channel
に join_all
を使うこともできます。
3 自分がここで詰まらないために勉強しておくべきだったもの
最初の所有権の問題に関しては正直なところこれを勉強していれば防げたというものは私は思い付きません。「データを保持するデータ型はデータ型そのもののムーブと中身のムーブの区別をつける」という一般論を覚えてあとは慣れるくらいでしょうか。
for
式と join_all
の違いについては非同期の一般論とRust特有の挙動が絡みます。async book である程度カバーされているのでそちらを一読するとよいかもしれません。
チャネルについては公式ドキュメントで解説されています:メッセージ受け渡しを使ってスレッド間でデータを転送する - The Rust Programming Language 日本語版 。
Rustはドキュメントの数も量も多くて全て目を通してから書き始めるのは難しいかもしれませんが困ったときは目次だけでも見返してみるといいかもしれません。困ることがあってもこういう質問サイトで質問できるので行き詰まったときは気負わずいつでも質問して下さい。
バッドをするには、ログインかつ
こちらの条件を満たす必要があります。
2021/04/19 18:10