summaryrefslogtreecommitdiff
path: root/engine/src/work_queue.rs
blob: 494d2b51677df14527bbc74b55bc99d7b578c6a6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use std::borrow::Cow;
use std::marker::PhantomData;
use std::panic::catch_unwind;
use std::sync::mpsc::{channel as mpsc_channel, Sender as MpscSender};
use std::sync::{Arc, OnceLock};
use std::thread::{Builder as ThreadBuilder, JoinHandle as ThreadJoinHandle};

pub struct Work<UserData: Send + Sync + 'static>
{
    pub func: fn(UserData),
    pub user_data: UserData,
}

#[derive(Debug)]
pub struct WorkQueue<UserData: Send + Sync + 'static>
{
    work_sender: MpscSender<Work<UserData>>,
    thread_panic: Arc<OnceLock<Box<str>>>,
    _thread: ThreadJoinHandle<()>,
    _pd: PhantomData<UserData>,
}

impl<UserData: Send + Sync + 'static> WorkQueue<UserData>
{
    pub fn new(name: &str) -> Self
    {
        let (work_sender, work_receiver) = mpsc_channel::<Work<UserData>>();

        let thread_panic = Arc::new(OnceLock::new());

        let thread_panic_b = thread_panic.clone();

        Self {
            work_sender,
            thread_panic: thread_panic,
            _thread: ThreadBuilder::new()
                .name(name.to_string())
                .spawn(move || {
                    if let Err(panic_err) = catch_unwind(|| {
                        while let Ok(work) = work_receiver.recv() {
                            (work.func)(work.user_data);
                        }
                    }) {
                        let panic_message: Cow<'static, str> =
                            if let Some(panic_message) =
                                panic_err.downcast_ref::<&'static str>()
                            {
                                (*panic_message).into()
                            } else if let Some(panic_message) =
                                panic_err.downcast_ref::<String>()
                            {
                                panic_message.clone().into()
                            } else {
                                "(unknown panic payload type)".into()
                            };

                        let _ = thread_panic_b
                            .set(panic_message.into_owned().into_boxed_str());
                    }
                })
                .expect("Failed to create work queue thread"),
            _pd: PhantomData,
        }
    }

    pub fn get_thread_panic(&self) -> Option<&str>
    {
        self.thread_panic.get().map(|thread_panic| &**thread_panic)
    }

    pub fn add_work(&self, work: Work<UserData>)
    {
        if self.work_sender.send(work).is_err() {
            tracing::error!("Cannot add work to work queue. Work queue thread is dead");
        }
    }
}