質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.47%
並列処理

複数の計算が同時に実行される手法

非同期処理

非同期処理とは一部のコードを別々のスレッドで実行させる手法です。アプリケーションのパフォーマンスを向上させる目的でこの手法を用います。

Rust

Rustは、MoFoが支援するプログラミング言語。高速性を維持しつつも、メモリ管理を安全に行うことが可能な言語です。同じコンパイル言語であるC言語やC++では困難だったマルチスレッドを実装しやすく、並行性という点においても優れています。

Q&A

解決済

1回答

1537閲覧

Rust: 別のスレッドでの処理結果を Future にするには?

退会済みユーザー

退会済みユーザー

総合スコア0

並列処理

複数の計算が同時に実行される手法

非同期処理

非同期処理とは一部のコードを別々のスレッドで実行させる手法です。アプリケーションのパフォーマンスを向上させる目的でこの手法を用います。

Rust

Rustは、MoFoが支援するプログラミング言語。高速性を維持しつつも、メモリ管理を安全に行うことが可能な言語です。同じコンパイル言語であるC言語やC++では困難だったマルチスレッドを実装しやすく、並行性という点においても優れています。

0グッド

1クリップ

投稿2020/12/16 17:24

前提・実現したいこと

非同期処理の結果を Future で返す方法に付いて教えて下さい。

  1. 2 つのスレッド Main と Worker が存在し、ジョブキューを共有しています。つまり channel の Sender, Receiver で通信しています。
  2. Worker はジョブキューを監視して処理が来たらキューから取り出し、Worker スレッド内でそれを実行します。
  3. Main スレッドから呼ばれるある関数は、処理をジョブキューに投入して非同期で Worker で実行させ、関数自体は即座に Future を返します。

単に非同期処理を行うだけであれば async を使うことができますが、この質問の背景では別の非同期処理と組み合わせているためこのような構成になっています (具体的には epoll()select() を使うケースのように Worker スレッドがイベントループしている想定です)。

発生している問題・エラーメッセージ

後述のソースコードの enqueue() 関数の返値を Future<R> にして let result = exec();result を非同期で取得するにはどうしたら良いでしょうか? 最終的には以下のように記述できるようにしたいと思っています。

rust

1let now = euqueue(&sender, move || SystemTime::now()).await;

該当のソースコード

rust

1use std::sync::mpsc::{channel, Receiver, Sender}; 2use std::thread::{sleep, spawn}; 3use std::time::{Duration, SystemTime}; 4use std::fmt::Debug; 5 6type TASK = Box<dyn FnMut() + Send>; 7 8fn main() { 9 let (sender, receiver) = channel::<TASK>(); 10 spawn(move || event_loop(receiver)); 11 loop { 12 sleep(Duration::from_secs(1)); 13 euqueue(&sender, move || SystemTime::now()); 14 } 15} 16 17fn euqueue<R, F>(sender: &Sender<TASK>, mut exec: F) where F: FnMut() -> R + Send, F: 'static, R:Debug { 18 sender.send(Box::new(move || { 19 let result = exec(); // How to return this result as Future for function enqueue()? 20 println!("task has been executed with: {:?}", result); 21 })).unwrap(); 22} 23 24fn event_loop(receiver: Receiver<TASK>) { 25 loop { 26 let mut exec = receiver.recv().unwrap(); 27 exec(); 28 } 29}

補足情報(FW/ツールのバージョンなど)

> rustc --version rustc 1.48.0 (7eac88abb 2020-11-16)

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

回答1

0

ベストアンサー

Futureトレイトを実装した型を定義して、それを返すことで実現できそうです。

Rust公式ドキュメントのひとつである「Asynchronous Programming in Rust」に載っている例が参考になると思います。

その例ではTimerFutureという型を定義してFutureトレイトを実装します。TimerFutureは指定した時間が経過すると終了するFutureで、内部では別スレッドを起動してthread::sleep()で時間を計ります。

以下のように動作します。

  1. TimerFuture::new(Duration)を呼ぶとTimerFutureのインスタンスが作られ、スレッドが起動される。TimerFutureインスタンスと、起動したスレッド上で実行されるクロージャとの間では、Arc<Mutex<SharedState>>というデータを共有することで連携できるようになっている
  2. TimerFutureのインスタンスを非同期ランタイムで実行すると、非同期ランタイムはTimerFutureインスタンスのFuture::poll()メソッドを呼ぶ
  3. TimerFutureインスタンスは、まだ指定時間が経過してないなら、poll()で渡されたWakerというオブジェクトをSharedStateに保存してからPoll::Pendingを返す
  4. TimerFutureのインスタンスが起動したスレッドでは、指定時間が経過したら(thread::sleep()が終わったら)SharedStateにセットされたWakerwake()メソッドを呼ぶ
  5. 非同期ランタイムはwake()が呼ばれたことでTimerFuturepoll()を再度呼ぶようスケジューリングする
  6. 非同期ランタイムはTimerFutureインスタンスのpoll()メソッドを呼ぶ
  7. TimerFutureインスタンスは指定時間が経過したので、Poll::Ready(計算結果の値)を返す。(この例では計算結果自体は意味を持たないので()値を返している)
  8. 非同期ランタイムはTimerFutureインスタンスの実行を終了する

この例と同じようにFutureを実装した型を定義して、new()で処理をジョブキューに投入します。

その型と処理オブジェクトの間ではArc<Mutex<_>>のようなものでデータを共有するようにして、処理が終わるときにWaker::wake()を呼ぶようにします。

そして上の例と同じようにpoll()メソッドを実装すれば完成です。

追記

ご提示のコードに上の内容を実装してみました。

Futureを実装した型を定義して、new()で処理をジョブキューに投入します。

このコードではTaskFutureという型を定義しましたが、それのnew()で処理(Task)をジョブキューに投入するのではなく、enqueue()関数内で行うようにしました。

rust

1// Cargo.tomlに以下を追加する 2// [dependencies] 3// tokio = { version = "0.3", features = ["rt", "rt-multi-thread"] } 4 5use std::{ 6 fmt::Debug, 7 future::Future, 8 pin::Pin, 9 sync::mpsc::{channel, Receiver, Sender}, 10 sync::{Arc, Mutex}, 11 task::{Context, Poll, Waker}, 12 thread::{sleep, spawn}, 13 time::{Duration, SystemTime}, 14}; 15use tokio::runtime::Runtime; 16 17struct Task { 18 exec: Box<dyn Send + FnMut() -> Box<dyn Debug + Send>>, 19 shared_state: Arc<Mutex<SharedState>>, 20} 21 22impl Task { 23 fn new<E>(exec: E) -> Self 24 where 25 E: Send + 'static + FnMut() -> Box<dyn Debug + Send>, 26 { 27 Self { 28 exec: Box::new(exec), 29 shared_state: Arc::new(Mutex::new(SharedState { 30 result: None, 31 waker: None, 32 })), 33 } 34 } 35 36 fn execute(&mut self) { 37 let result = (self.exec)(); 38 println!("task has been executed with: {:?}", result); 39 let mut shared_state = self.shared_state.lock().unwrap(); 40 shared_state.result = Some(result); 41 if let Some(waker) = shared_state.waker.take() { 42 waker.wake() 43 } 44 } 45} 46 47struct TaskFuture { 48 shared_state: Arc<Mutex<SharedState>>, 49} 50 51impl Future for TaskFuture { 52 type Output = Box<dyn Debug>; 53 54 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 55 let mut shared_state = self.shared_state.lock().unwrap(); 56 if let Some(result) = shared_state.result.take() { 57 Poll::Ready(result) 58 } else { 59 shared_state.waker = Some(cx.waker().clone()); 60 Poll::Pending 61 } 62 } 63} 64 65struct SharedState { 66 result: Option<Box<dyn Debug + Send>>, 67 waker: Option<Waker>, 68} 69 70fn main() { 71 let (sender, receiver) = channel::<Task>(); 72 spawn(move || event_loop(receiver)); 73 74 let rt = Runtime::new().unwrap(); 75 76 loop { 77 sleep(Duration::from_secs(1)); 78 let task = Task::new(|| Box::new(SystemTime::now())); 79 // tokioの非同期ランタイムでasyncブロックを実行する 80 rt.block_on(async { 81 let now = enqueue(&sender, task).await; 82 println!("result has been received: {:?}", now); 83 }); 84 } 85} 86 87fn enqueue(sender: &Sender<Task>, task: Task) -> TaskFuture { 88 let future = TaskFuture { 89 shared_state: Arc::clone(&task.shared_state), 90 }; 91 sender.send(task).unwrap(); 92 future 93} 94 95fn event_loop(receiver: Receiver<Task>) { 96 loop { 97 let mut task = receiver.recv().unwrap(); 98 task.execute(); 99 } 100}

投稿2020/12/17 04:11

編集2020/12/17 05:48
tatsuya6502

総合スコア2035

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

退会済みユーザー

退会済みユーザー

2020/12/20 13:12 編集

イベントループ時のパターンとして理解しやすいソースとともに詳しい解説をありがとうございます。 この回答を見ていくつか調べましたが、Rust の Future は Future そのものから実装しないといけないんですね。Scala の Future や Java の CompletableFuture、その他の言語の同等機能に慣れていたので、スレッド間で Future/Promise を共有して Worker が set すれば Main 側で get できるようになる使い方をイメージしていました。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.47%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問