1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
use std::borrow::Cow;
use std::marker::PhantomData;
use std::panic::catch_unwind;
use std::sync::mpsc::{channel as mpsc_channel, Sender as MpscSender};
use std::sync::{Arc, OnceLock};
use std::thread::{Builder as ThreadBuilder, JoinHandle as ThreadJoinHandle};
pub struct Work<UserData: Send + Sync + 'static>
{
pub func: fn(UserData),
pub user_data: UserData,
}
#[derive(Debug)]
pub struct WorkQueue<UserData: Send + Sync + 'static>
{
work_sender: MpscSender<Work<UserData>>,
thread_panic: Arc<OnceLock<Box<str>>>,
_thread: ThreadJoinHandle<()>,
_pd: PhantomData<UserData>,
}
impl<UserData: Send + Sync + 'static> WorkQueue<UserData>
{
pub fn new(name: &str) -> Self
{
let (work_sender, work_receiver) = mpsc_channel::<Work<UserData>>();
let thread_panic = Arc::new(OnceLock::new());
let thread_panic_b = thread_panic.clone();
Self {
work_sender,
thread_panic: thread_panic,
_thread: ThreadBuilder::new()
.name(name.to_string())
.spawn(move || {
if let Err(panic_err) = catch_unwind(|| {
while let Ok(work) = work_receiver.recv() {
(work.func)(work.user_data);
}
}) {
let panic_message: Cow<'static, str> =
if let Some(panic_message) =
panic_err.downcast_ref::<&'static str>()
{
(*panic_message).into()
} else if let Some(panic_message) =
panic_err.downcast_ref::<String>()
{
panic_message.clone().into()
} else {
"(unknown panic payload type)".into()
};
let _ = thread_panic_b
.set(panic_message.into_owned().into_boxed_str());
}
})
.expect("Failed to create work queue thread"),
_pd: PhantomData,
}
}
pub fn get_thread_panic(&self) -> Option<&str>
{
self.thread_panic.get().map(|thread_panic| &**thread_panic)
}
pub fn add_work(&self, work: Work<UserData>)
{
if self.work_sender.send(work).is_err() {
tracing::error!("Cannot add work to work queue. Work queue thread is dead");
}
}
}
|