teratail header banner
teratail header banner
質問するログイン新規登録

回答編集履歴

2

コードの main() 関数内を修正して let now = enqueue(&sender, task).await の形にしました

2020/12/17 05:48

投稿

tatsuya6502
tatsuya6502

スコア2055

answer CHANGED
@@ -110,10 +110,11 @@
110
110
  loop {
111
111
  sleep(Duration::from_secs(1));
112
112
  let task = Task::new(|| Box::new(SystemTime::now()));
113
+ // tokioの非同期ランタイムでasyncブロックを実行する
114
+ rt.block_on(async {
113
- let future = enqueue(&sender, task);
115
+ let now = enqueue(&sender, task).await;
114
- // 非同期ランタイムでfutureを実行する。実行が終わるまでブロックされる
115
- let result = rt.block_on(future);
116
- println!("result has been received: {:?}", result);
116
+ println!("result has been received: {:?}", now);
117
+ });
117
118
  }
118
119
  }
119
120
 

1

実装例のコードを追加しました

2020/12/17 05:48

投稿

tatsuya6502
tatsuya6502

スコア2055

answer CHANGED
@@ -21,4 +21,114 @@
21
21
 
22
22
  その型と処理オブジェクトの間では`Arc<Mutex<_>>`のようなものでデータを共有するようにして、処理が終わるときに`Waker::wake()`を呼ぶようにします。
23
23
 
24
- そして上の例と同じように`poll()`メソッドを実装すれば完成です。
24
+ そして上の例と同じように`poll()`メソッドを実装すれば完成です。
25
+
26
+ **追記**
27
+
28
+ ご提示のコードに上の内容を実装してみました。
29
+
30
+ > `Future`を実装した型を定義して、`new()`で処理をジョブキューに投入します。
31
+
32
+ このコードでは`TaskFuture`という型を定義しましたが、それの`new()`で処理(`Task`)をジョブキューに投入するのではなく、`enqueue()`関数内で行うようにしました。
33
+
34
+ ```rust
35
+ // Cargo.tomlに以下を追加する
36
+ // [dependencies]
37
+ // tokio = { version = "0.3", features = ["rt", "rt-multi-thread"] }
38
+
39
+ use std::{
40
+ fmt::Debug,
41
+ future::Future,
42
+ pin::Pin,
43
+ sync::mpsc::{channel, Receiver, Sender},
44
+ sync::{Arc, Mutex},
45
+ task::{Context, Poll, Waker},
46
+ thread::{sleep, spawn},
47
+ time::{Duration, SystemTime},
48
+ };
49
+ use tokio::runtime::Runtime;
50
+
51
+ struct Task {
52
+ exec: Box<dyn Send + FnMut() -> Box<dyn Debug + Send>>,
53
+ shared_state: Arc<Mutex<SharedState>>,
54
+ }
55
+
56
+ impl Task {
57
+ fn new<E>(exec: E) -> Self
58
+ where
59
+ E: Send + 'static + FnMut() -> Box<dyn Debug + Send>,
60
+ {
61
+ Self {
62
+ exec: Box::new(exec),
63
+ shared_state: Arc::new(Mutex::new(SharedState {
64
+ result: None,
65
+ waker: None,
66
+ })),
67
+ }
68
+ }
69
+
70
+ fn execute(&mut self) {
71
+ let result = (self.exec)();
72
+ println!("task has been executed with: {:?}", result);
73
+ let mut shared_state = self.shared_state.lock().unwrap();
74
+ shared_state.result = Some(result);
75
+ if let Some(waker) = shared_state.waker.take() {
76
+ waker.wake()
77
+ }
78
+ }
79
+ }
80
+
81
+ struct TaskFuture {
82
+ shared_state: Arc<Mutex<SharedState>>,
83
+ }
84
+
85
+ impl Future for TaskFuture {
86
+ type Output = Box<dyn Debug>;
87
+
88
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
89
+ let mut shared_state = self.shared_state.lock().unwrap();
90
+ if let Some(result) = shared_state.result.take() {
91
+ Poll::Ready(result)
92
+ } else {
93
+ shared_state.waker = Some(cx.waker().clone());
94
+ Poll::Pending
95
+ }
96
+ }
97
+ }
98
+
99
+ struct SharedState {
100
+ result: Option<Box<dyn Debug + Send>>,
101
+ waker: Option<Waker>,
102
+ }
103
+
104
+ fn main() {
105
+ let (sender, receiver) = channel::<Task>();
106
+ spawn(move || event_loop(receiver));
107
+
108
+ let rt = Runtime::new().unwrap();
109
+
110
+ loop {
111
+ sleep(Duration::from_secs(1));
112
+ let task = Task::new(|| Box::new(SystemTime::now()));
113
+ let future = enqueue(&sender, task);
114
+ // 非同期ランタイムでfutureを実行する。実行が終わるまでブロックされる
115
+ let result = rt.block_on(future);
116
+ println!("result has been received: {:?}", result);
117
+ }
118
+ }
119
+
120
+ fn enqueue(sender: &Sender<Task>, task: Task) -> TaskFuture {
121
+ let future = TaskFuture {
122
+ shared_state: Arc::clone(&task.shared_state),
123
+ };
124
+ sender.send(task).unwrap();
125
+ future
126
+ }
127
+
128
+ fn event_loop(receiver: Receiver<Task>) {
129
+ loop {
130
+ let mut task = receiver.recv().unwrap();
131
+ task.execute();
132
+ }
133
+ }
134
+ ```