回答編集履歴

2

コードの main() 関数内を修正して let now = enqueue(&sender, task).await の形にしました

2020/12/17 05:48

投稿

tatsuya6502
tatsuya6502

スコア2035

test CHANGED
@@ -222,13 +222,15 @@
222
222
 
223
223
  let task = Task::new(|| Box::new(SystemTime::now()));
224
224
 
225
+ // tokioの非同期ランタイムでasyncブロックを実行する
226
+
227
+ rt.block_on(async {
228
+
225
- let future = enqueue(&sender, task);
229
+ let now = enqueue(&sender, task).await;
226
-
227
- // 非同期ランタイムでfutureを実行する。実行が終わるまでブロックされる
230
+
228
-
229
- let result = rt.block_on(future);
230
-
231
- println!("result has been received: {:?}", result);
231
+ println!("result has been received: {:?}", now);
232
+
233
+ });
232
234
 
233
235
  }
234
236
 

1

実装例のコードを追加しました

2020/12/17 05:48

投稿

tatsuya6502
tatsuya6502

スコア2035

test CHANGED
@@ -45,3 +45,223 @@
45
45
 
46
46
 
47
47
  そして上の例と同じように`poll()`メソッドを実装すれば完成です。
48
+
49
+
50
+
51
+ **追記**
52
+
53
+
54
+
55
+ ご提示のコードに上の内容を実装してみました。
56
+
57
+
58
+
59
+ > `Future`を実装した型を定義して、`new()`で処理をジョブキューに投入します。
60
+
61
+
62
+
63
+ このコードでは`TaskFuture`という型を定義しましたが、それの`new()`で処理(`Task`)をジョブキューに投入するのではなく、`enqueue()`関数内で行うようにしました。
64
+
65
+
66
+
67
+ ```rust
68
+
69
+ // Cargo.tomlに以下を追加する
70
+
71
+ // [dependencies]
72
+
73
+ // tokio = { version = "0.3", features = ["rt", "rt-multi-thread"] }
74
+
75
+
76
+
77
+ use std::{
78
+
79
+ fmt::Debug,
80
+
81
+ future::Future,
82
+
83
+ pin::Pin,
84
+
85
+ sync::mpsc::{channel, Receiver, Sender},
86
+
87
+ sync::{Arc, Mutex},
88
+
89
+ task::{Context, Poll, Waker},
90
+
91
+ thread::{sleep, spawn},
92
+
93
+ time::{Duration, SystemTime},
94
+
95
+ };
96
+
97
+ use tokio::runtime::Runtime;
98
+
99
+
100
+
101
+ struct Task {
102
+
103
+ exec: Box<dyn Send + FnMut() -> Box<dyn Debug + Send>>,
104
+
105
+ shared_state: Arc<Mutex<SharedState>>,
106
+
107
+ }
108
+
109
+
110
+
111
+ impl Task {
112
+
113
+ fn new<E>(exec: E) -> Self
114
+
115
+ where
116
+
117
+ E: Send + 'static + FnMut() -> Box<dyn Debug + Send>,
118
+
119
+ {
120
+
121
+ Self {
122
+
123
+ exec: Box::new(exec),
124
+
125
+ shared_state: Arc::new(Mutex::new(SharedState {
126
+
127
+ result: None,
128
+
129
+ waker: None,
130
+
131
+ })),
132
+
133
+ }
134
+
135
+ }
136
+
137
+
138
+
139
+ fn execute(&mut self) {
140
+
141
+ let result = (self.exec)();
142
+
143
+ println!("task has been executed with: {:?}", result);
144
+
145
+ let mut shared_state = self.shared_state.lock().unwrap();
146
+
147
+ shared_state.result = Some(result);
148
+
149
+ if let Some(waker) = shared_state.waker.take() {
150
+
151
+ waker.wake()
152
+
153
+ }
154
+
155
+ }
156
+
157
+ }
158
+
159
+
160
+
161
+ struct TaskFuture {
162
+
163
+ shared_state: Arc<Mutex<SharedState>>,
164
+
165
+ }
166
+
167
+
168
+
169
+ impl Future for TaskFuture {
170
+
171
+ type Output = Box<dyn Debug>;
172
+
173
+
174
+
175
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
176
+
177
+ let mut shared_state = self.shared_state.lock().unwrap();
178
+
179
+ if let Some(result) = shared_state.result.take() {
180
+
181
+ Poll::Ready(result)
182
+
183
+ } else {
184
+
185
+ shared_state.waker = Some(cx.waker().clone());
186
+
187
+ Poll::Pending
188
+
189
+ }
190
+
191
+ }
192
+
193
+ }
194
+
195
+
196
+
197
+ struct SharedState {
198
+
199
+ result: Option<Box<dyn Debug + Send>>,
200
+
201
+ waker: Option<Waker>,
202
+
203
+ }
204
+
205
+
206
+
207
+ fn main() {
208
+
209
+ let (sender, receiver) = channel::<Task>();
210
+
211
+ spawn(move || event_loop(receiver));
212
+
213
+
214
+
215
+ let rt = Runtime::new().unwrap();
216
+
217
+
218
+
219
+ loop {
220
+
221
+ sleep(Duration::from_secs(1));
222
+
223
+ let task = Task::new(|| Box::new(SystemTime::now()));
224
+
225
+ let future = enqueue(&sender, task);
226
+
227
+ // 非同期ランタイムでfutureを実行する。実行が終わるまでブロックされる
228
+
229
+ let result = rt.block_on(future);
230
+
231
+ println!("result has been received: {:?}", result);
232
+
233
+ }
234
+
235
+ }
236
+
237
+
238
+
239
+ fn enqueue(sender: &Sender<Task>, task: Task) -> TaskFuture {
240
+
241
+ let future = TaskFuture {
242
+
243
+ shared_state: Arc::clone(&task.shared_state),
244
+
245
+ };
246
+
247
+ sender.send(task).unwrap();
248
+
249
+ future
250
+
251
+ }
252
+
253
+
254
+
255
+ fn event_loop(receiver: Receiver<Task>) {
256
+
257
+ loop {
258
+
259
+ let mut task = receiver.recv().unwrap();
260
+
261
+ task.execute();
262
+
263
+ }
264
+
265
+ }
266
+
267
+ ```