SDR / Rust

In the last year, I’ve been thinking about how GNU Radio and SDR runtimes in general could improve. I had some ideas in mind that I wanted to test with quick prototypes. But C++ and rapid prototyping don’t really go together, so I looked into Rust and created a minimalistic SDR runtime from scratch. Even though I’m still very bad at Rust, it was a lot of fun to implement.

One of the core ideas for the runtime was to experiment with async programming, which Rust supports through Futures. Hence the name FutureSDR. (Other languages might call similar constructs promises or co-routines.) The runtime uses futures and it is a future, in the sense that it’s a placeholder for something that is not ready yet :-)

FutureSDR is minimalistic and has far less features than GNU Radio. But, at the same time, it’s only ~5100 lines of code and served me well to quickly test ideas. I, furthermore, just implemented basic accelerator support and finally have the feeling that the current base design makes some sense.

Overall, I think it might have reached a state where it might be useful to experiment with SDR runtimes. So this post gives a bit of an overview and introduces the main ideas. In follow-up posts, I want to go in more detail on specific features and have an associated git commit introducing this feature.

Note that the code examples might not compile exactly as-is (e.g. I leave some irrelevant function parameters out). Also there is, at the moment, no proper error handling (i.e., unwrap() all over the place).

Rust

So why Rust? The main reason, as already mentioned, is that it is fun to use. But there are also more objective advantages:

  • well-suited for rapid prototyping (no headers, no fiddling with the build system, functional language features)
  • relatively easy to cross-compile
  • a system programming language with zero-cost abstractions and good performance
  • comes with a package manager that uses a central repository of decent libraries that use semantic versioning
  • easy interfacing of C libraries through its foreign function interface (FFI)
  • is designed with multi-threaded and concurrent applications in mind
  • best ecosystem of async libraries and executors

Some of these advantages might not look like a big deal. But considering, for example, how much time C++ projects can spend with cmake, packaging, library dependencies, availability of libraries on various distributions, cross-compiling, etc; this frees quite some developer time that can be used to do something meaningful.

Also Rust has some nice language constructs (in particular enums in combination with match and destructuring) that make prototyping very simple. Let’s say we want a quick replacement for GNU Radio’s polymorphic types (PMTs).

pub enum PMT {
    Null,
    String(String),
    U32(u32),
    U64(u64),
    Double(f64),
    VecF32(Vec<f32>),
    Blob(Vec<u8>),
}

That’s it. And if we need more data types, they are just one line of code away. Using match and destructuring, this type is really ergonomic to use:

pub fn is_string(&self) -> bool {
    match &self {
        PMT::String(_) => true,
        _ => false
    }
}

pub fn to_string(&self) -> Option<String> {
    match &self {
        PMT::String(s) => Some(s.clone()),
        _ => None
    }
}

So you might think lol, the point about PMTs is that they can be serialized and that’s true. Here, Rust has a crate (a Rust package) that everybody seems to love: serde. It allows to serialize and deserialize the enum in various formats by adding one line of code that derives the Serialize and Deserialize traits.

#[derive(Serialize, Deserialize)]
pub enum PMT {
    Null,
    ...
}

One of the supported formats is Flexbuffers, which is a schema-less version of Flatbuffers (i.e., the deserializer doesn’t have to know the schema in advance). GNU Radio currently considers Flatbuffers and Flexbuffers for PMTs. With Rust, you get it basically for free; together with all the other formats. A simple example to serialize and deserialize to Flexbuffers could look like this:

#[test]
fn pmt_serde() {

    let p = PMT::Null;
    let mut s = flexbuffers::FlexbufferSerializer::new();
    p.serialize(&mut s).unwrap();

    let r = flexbuffers::Reader::get_root(s.view()).unwrap();
    let p2 = PMT::deserialize(r).unwrap();

    assert_eq!(p, p2);
}

Not too bad, hmm? :-)

Async Runtime and IO

The main difference between FutureSDR and other SDR runtimes is that it’s built around async. (You probably already came across async. Some buzzwords in that context: non-blocking, futures, promises, async/await, co-routines.) The primary motivation to use async is that it makes all problems with IO and blocking go away.

If you followed GNU Radio development, you might know that there were many IO-related problems, in particular with blocking in work() (the central function of a block where all DSP is implemented). Currently GNU Radio starts one thread for each block in the flowgraph, so it doesn’t seem like a big deal if something blocks. But:

  • The runtime loses control. It cannot cancel the blocking call inside work() and cannot process control messages, for example, to initiate a shutdown.
  • It locks us into the one-thread-per-block approach, which might be suboptimal because:
    • many threads -> high synchronization overhead
    • many threads -> many context switches
    • threads are under the control of the operating systems -> we have no control when and how long a thread is run, which also means that we have no chance to exploit CPU caches, because we have no way to tell Linux about GNU Radio and our flowgraph. It doesn’t know that Block A produces data that Block B reads and that it would, therefore, make sense to schedule B after A.

With that being said, it’s also true that operating system schedulers are actually pretty good. And the success of GNU Radio shows that this approach provides reasonable performance. At the same time, I think that this approach is a dead-end for further development of runtimes and SDR schedulers.

Maybe it becomes clearer, if we consider some examples: I wasted a lot of time implementing variations of a periodic message source in GNU Radio. The problem is, I could either block in work(), waiting for the timer to produce the next message (causing all the problems mentioned above) or spawn another thread that is not under the control of the GNU Radio runtime, which notifies the block when the timer expired. The latter is what’s currently in use. We end up with two threads for a dead-simple task.

An async runtime, basically means that the work() function and all message handlers are async functions. In this case, you could just do something like

async fn work(&mut self) {
    Timer::after(dur).await;
    self.post_by_name("out", self.message.clone());
}

When the timer function is reached, the task (i.e., a future that is run on an executor of an async runtime) gives control back to the task executor, which can go on to process another task. The executor, furthermore, has a reactor that takes care of all timers. And once our timer expires, it will put the task back in a queue of active tasks, where the executor pulls tasks from. In summary, we have an async runtime with several executor threads that process tasks. And when we await on something, nothing is stalled but the executor will go on and do useful stuff. I think the example also shows that it’s kind of natural to wrap a block in a task and have an async function async fn run_block(b: Block) that corresponds to GNU Radio’s ThreadBodyWrapper and BlockExecutor.

(Side note: In the context of async runtimes, you might have come across the term work-stealing schedulers, which (in one shape or form) are used by all modern executors. The work-stealing scheduler evolves pretty naturally: There is a global queue with tasks that are ready to run. To avoid lock-congestion when each executor hammers the global queue, there are also local queues per executor thread. If an executor is running out of tasks, it will get a bunch of new tasks from the global queue. And if this queue is also completely drained, it will try to steal tasks from other local queues.)

Anyway, let’s look at another example and consider a UDP source in a flowgraph. How would you do this in GNU Radio? There’s again a similar issue. Should we wait for the next packet in work() and block? Or do we spawn another thread for this simple looking task? GNU Radio is currently using Boost async IO, which spawns something like an async runtime that is not under control of GNU Radio and copies all data between these domains. Here, we end up again with two threads and, in this case, an additional copy to bridge the boost runtime and the GNU Radio runtime.

With an async work() function, you can just do something like:

async fn work(&mut self) {
    let out = self.output(0).slice::<u8>();

    match self.socket.as_mut().unwrap().recv(out).await {
        Ok(n) => {
            self.output(0).produce(n).await;
        }
        Err(_) => {
            self.finished = true;
        }
    }
}

The recv() on the socket will either return immediately if there is a packet, or it will register a file descriptor with the reactor, telling it to only call the block again if there is some data ready read. So besides timers, the reactor is also responsible for monitoring file descriptors. The latter is done with epoll, kqueue or IOCP on Linux, MacOS, and Windows, respectively. The deal about epoll in comparison to poll or select system calls is that epoll manages a set of file descriptors in the kernel and the syscall can refer to that set, instead of communicating the whole array of file descriptors with every system call. With this, epoll can be used to monitor a large number of file descriptors without too much overhead.

This is already everything async does. It monitors file descriptors and timers. Files, sockets, and also USB devices (usbfs), … everything is a file. We’ll discuss it in more detail later, but also DMA transfers or the execution of kernels on the GPU can be mapped to a file. Also, note that strictly speaking, async doesn’t really work well for files at the moment, but this is addressed through the ongoing transition from epoll to io_uring.

At this point, some people might bite in their keyboard :-) I know, it might not be a good idea to also make SDR drivers or DMA transfers async. But that’s not an issue, since every runtime supports blocking tasks, which get their own thread. (More details later.)

I hope, at this point, the potential advantage of async is somewhat clear. Yet, applications like that SDR runtimes are usually regarded as the anti-use-case. Typical use-case are, for example, server applications that serve a large number of independent client connections (i.e., many independent, IO-bound tasks that don’t require a lot of compute). With SDR, blocks have a clear dependency and might require a lot of CPU time. Also just like Linux, the async runtime doesn’t know about SDR and the topology of our flowgraph.

This is again where Rust shines, as it allows us to bring our own executor. Compare this to JavaScript, Erlang, Go, etc. They all have the executors as part of the language. You won’t teach the Go runtime about SDR; goroutines will be scheduled in some arbitrary order that cannot be influenced by the user. With Rust, the runtime is just another crate (i.e., library/package). There are two very popular runtimes (Tokio and async-std) and some lesser known ones, like Smol or Nuclei.

This project is, at least for now, based on Smol. Smol is a minimalistic runtime that is a collection of fundamental parts, required to build a runtime (a task, an executor, a thread-pool for blocking tasks, a reactor that monitor IO and timers, etc). I find it really amazing that the guy behind Smol managed to destructure a runtime, splitting it into components. This makes it easy to plugin different versions of components without having to worry about the rest. With other runtimes, this is more entangled and, therefore, more complex.

For an SDR runtime, we’re mainly interested in the executor, since we want to teach it about flowgraphs. The executor of the async runtime, will make up the main part of our SDR scheduler. So overall: With GNU Radio, everything is a block and, hence, a thread. With FutureSDR, everything is a task. Or one could also see this as switching from a focus on parallelism to a focus on concurrency.

Scheduling

By default, Smol only uses one thread. So a simple single-threaded scheduler is:

for b in blocks.iter() {
    smol::spawn(run_block(b));
}

(Remember, run_block is the equivalent of GNU Radio’s BlockExecutor and ThreadBodyWrapper, i.e., a function that handles the input/output of the block and calls work().) With Smol, the number of executor threads that are spawned can be configured through the SMOL_THREADS environment variable. A scheduler that spawns executors corresponding to the number of CPU cores, available to the process would be:

let n_cores = core_affinity::get_core_ids().unwrap().len();
std::env::set_var("SMOL_THREADS", n_cores.to_string());
for b in blocks.iter() {
    smol::spawn(run_block(b));
}

(Note that obviously these schedulers don’t exploit the known structure of the flowgraph. Also, smol::spawn and the configuration through the environment variable is just a demo to show how one can get started.)

Even though non-blocking might be the preferred mode in most cases, I think there will always be blocks that block. For example, all SDR drivers block at the moment. I’ll probably extend some drivers to support non-blocking read/write, but I’m not sure if this will make a big difference. Anyway, high-performance stuff will probably always use blocking calls or polling. Fortunately, this is no problem and integrates nicely, as all runtimes allow spawning blocking tasks, which are started in a separate thread. The actual scheduler would, therefore, look slightly different:

for b in blocks.iter() {
    if b.is_blocking() {
        smol::spawn(smol::unblock(|| smol::block_on(run_block(b))))
    } else { 
        smol::spawn(run_block(b));
    }
}

Obviously, this also makes a thread-per-block scheduler, similar to GNU Radio’s current implementation, trivial. Just tread every block as if it would block.

for b in blocks.iter() {
    smol::spawn(smol::unblock(|| smol::block_on(run_block(b))))
}

Another simple example would be an executor with custom thread setup. This is a slight variation of what Smol does under the hood. It creates a global executor struct that is initialized lazy on first use. The executor spawns threads corresponding to the number of available CPUs (i.e., the number of online CPUs in the process’ CPU set) and pin the worker threads to that CPUs

static GLOBAL: Lazy<Executor<'_>> = Lazy::new(|| {

    let core_ids = core_affinity::get_core_ids().unwrap();

    for id in core_ids {
        thread::Builder::new()
            .name(format!("sdr-{}", id.id))
            .spawn(move || {
                core_affinity::set_for_current(id);
                async_io::block_on(GLOBAL.run(future::pending::<()>()));
            }).unwrap();
    }

    Executor::new()
});

fn my_spawn(future) {
    GLOBAL.spawn(future);
}

I hope this gives an impression, how easy it can be to play with schedulers; something I always wanted to do with GNU Radio.

The next step is obviously to implement our own application-specific executor that knows about the flowgraph’s topology. We can, for example, run consecutive blocks in their natural order, such that a downstream block is executed right after the upstream block that wrote to the shared buffer. While Smol does a great job in decomposing the runtime and factoring out the executor. Implementing a complete executor is still not trivial. The default is about 410 lines of code that is not exactly easy to understand. We’ll later implement some simple executor that, partitions the flowgraph, pins blocks to executor threads, and runs them round-robin over a topological sort. In the long run, it might make sense to crate API wrappers that make it easier to implement custom SDR executors.

Finally, there is one more thing I wanted to highlight. When thinking about creating a new runtime for GNU Radio, it was clear that we want to decouple threads from blocks, i.e., have worker threads can server multiple blocks. So far, so good. But, at some point, this also requires to add an interface that allows shifting a block from one thread to another during runtime. I have no really good idea how we could do that (ideally without loosing sanity over locking, signaling, and performance issues). With async, we get it for free. It’s what the work stealing scheduler does anyways. Also, if we add more tasks to the runtime which are not associated with a block (for example to add a control-port-like interface, see below), this could just be a free floating tasks that is served by the executor thread that has CPU time available.

Actor Pattern

I don’t want to go too much into implementation details in this post, but I think one more aspect is worth mentioning. It’s about how we handle parallelism and concurrency. With GNU Radio, we have mutex, condition variables. And even though I spent significant time, looking at the code, I was never sure how exactly it works. (At least not without putting some debug outputs in the code :-)) And even if I had something working, I never knew, if it was actually correct or if it might trigger a hard-to-reproduce bug some hours later.

The actor pattern seems like a good approach to deal with this issue and stay sane while doing so. With this pattern, each component has one inbox that receives all updates from the outside. This outside could be the flowgraph, another block, or any other task running on the executor.

So there aren’t various condition variables that may or may not be relevant; there is one inbox. The inbox is a multi-produce-single-consumer queue/channel that contains AsyncMessages, which are just another enum. The messages might, for example, signal that a buffer made progress or ask the block to call a handler and send the result back through a channel:

#[derive(Debug)]
pub enum AsyncMessage {
    StreamInputUpdate {
        port_id: usize,
        msg: BufferMessage,
    },
    CallbackId {
        port_id: usize,
        data: Arc<PMT>,
        tx: Sender<PMT>,
    },
    [...]
}

The algorithm for the executor is then pretty simple. First, it reads non-blocking (or better non-awaiting :-)) from the inbox and processes the messages. If there were updates, we’ll directly go ahead an call work(). If there were no updates and the work function also didn’t indicate in the last run that it would like to be called again, we wait/await another message from that channel.

// main loop
loop {
    // ================== non blocking
    loop {
        match inbox.try_recv() {
            Ok(AsyncMessage::StreamInputUpdate { port_id, msg }) => {
                block.stream_input_by_id_mut(port_id).handle(msg).await;
            }
            Ok(AsyncMessage::CallbackId { port_id, data, tx }) => {
                tx.send(block.call_handler_id(port_id, data).await).await.unwrap();
            }
            [...]
            Err(_) => break,
        }
        // received at least one message
        work_io.call_again = true;
    }

    // ================== blocking
    if !work_io.call_again {
        match inbox.recv().await {
            AsyncMessage::StreamInputUpdate { port_id, msg } => {
                block.stream_input_by_id_mut(port_id).handle(msg).await;
            }
            [...]
         }
    }

    // ================== work
    work_io.call_again = false;
    block.work(&mut work_io).await;
}

(The example misses the code for termination. It’s just to show the idea of the actor pattern.)

Control and GUI

I think GNU Radio’s Control Port interface is one of its greatest features. It exposes an RPC-like interface to set/get parameters and data from the flowgraph. Yet, its usability and maintainability was a bit of a disaster. Initially, it was implemented using Ice, which had licensing issues. Therefore, it was later ported to Thrift, which was not properly packaged for distributions, its API is no fun at all, and it introduced many breaking-changes, making it hard to maintain. In the end, most users (and also most devs) compiled GNU Radio without Control Port.

Our async framework provides a cool alternative here. As already mentioned, development of the whole async ecosystem is mainly driven by client–server applications, like web application frameworks. I think there is no doubt that Rust is doing very well in this domain, as frameworks like Actix Web, Warp, or Rocket do well in benchmarks.

It, therefore, suggested itself to use these frameworks also for FutureSDR. So instead of integrating a strange RPC framework, we can just start another tasks that implements a web server. I went for Rocket, mainly because I like its API. It’s really fun to use.

(Side note: The Rust ecosystem is not particularly great here. While there are many high-performance web application server (all of them are async), none of them allows to set a custom executor. Or I just didn’t see how that could work. For some frameworks, there are issues on GitHub that suggest that the maintainers are not interested to support this :-/ I, therefore, made some very crude changes to Rocket that allow me to use it with my custom executor.)

Using Rocket, we get a webserver and a REST interface to our flowgraph with basically no code.

scheduler.spawn(
   rocket::custom(config)
       .manage(blocks)
       .mount("/", ctrl_port::routes())
       .launch(Box::new(scheduler.clone()))
).detach();

The manage() function gets a vector with the blocks ids, allowing us to access them in URL handlers. The ctrl_port::routes() creates these handlers.

pub fn routes() -> Vec<rocket::Route> {
    routes![index, <etc>]
}

#[get("/")]
fn index(blocks: rocket::State<Vec<Block>>) -> String {
    format!("Number of blocks in flowgraph {:?}", blocks.len())
}

This gives us a root page that just tells us how many blocks are in the flowgraph. Extending this to show a real graph with blocks and metadata is straightforward.

But we can, of course, do more. For example, call message handlers of blocks.

#[get("/block/<blk>/call/<handler>")]
async fn handler_id(blk: usize, handler: usize, blocks: rocket::State<Arena<Block>>) -> String {

    let (tx, rx) = async_channel::bounded::<PMT>(1);
    
    block.send(AsyncMessage::CallbackId{
        port_id: handler,
        data: Arc::new(PMT::Null),
        tx
    }).await.unwrap();

    let ret = rx.recv().await.unwrap();

    format!("pmt {:?}", ret)
}

When getting the URL /block/<id of the block>/call/<callback handler id>, it will call the corresponding handler. Callbacks are, for now, functions with a PMT as input and output (Fn(PMT) -> PMT). In this simple example, we do not specify an input PMT (it will use PMT::Null) and print the resulting PMT. This could, for example, be a handler to query a parameter of a block, like the frequency of the SDR.

The URL handler interfaces the flowgraph following our actor pattern. It sends an AsyncMessage to the inbox of the block, asking it to execute a handler. The result will be sent back to the URL handler through a channel.

Note, nobody expects the user to go to these URLs. This is done by one of the many web frameworks, which query parameters in the background and show the results in the frontend. But it is not limited to web. It can be interfaced from any normal program or even with curl from the shell. I added a block with a handler that just returns a counter.

$ curl http://localhost:1337/block/3/call/0
pmt U64(1)
$ curl http://localhost:1337/block/3/call/0
pmt U64(2)
$ curl http://localhost:1337/block/3/call/0
pmt U64(3)

I think this allows many new cool applications.

GUI

I follow a similar approach for the GUI. Watching the hassle and development time that GNU Radio spent with cross-platform GUI frameworks, I don’t want to go down that road. And even the current QT framework doesn’t work for Android, for example. So, while it is totally possible to add normal GUI widgets, I decided to focus on web only. Instead of adding widgets to the flowgraph, I add a WebSocket sink and can, if I want connect to it with a web client. Maybe that might seem completely insane but

  • WebGL performs quite well (see ShinySDR to get an idea what’s possible)
  • it is completely platform independent (everything that runs a browser; Android -> no problem)
  • responsive, nice looking GUIs that adapt to the size of the screen are easy to do with web technologies
  • you can connect to a remote or embedded device from anywhere
  • stable. While there might be ten new web frameworks per day, your browser can still display websites from the 90s.

Tungstenite is a popular WebSocket implementation in Rust that can be easily integrated. The work() function basically boils down to:

let i = sio.input(0).slice::<u8>();

let mut v = Vec::new(); // todo: don't copy
v.extend_from_slice(i);
match self.conn.as_mut().unwrap().send(Message::Binary(v)).await {
    Ok(()) => {}
    Err(_) => panic!("websocket error"),
}

sio.input(0).consume(i.len()).await;

To create a fancy GUI, it might make sense to have a closer look at ShinySDR. For now, I only have a small PoC with WebGL-Plot. It is a trivial modification of their examples. Instead of reading data from the microphone, it reads data from the WebSocket. So here are 1024 random values, fresh from a flowgraph consisting of Random Source -> Throttle -> Websocket Sink. (Follow for more premium content! :-))

Noise

Custom Buffers and Integration of Accelerators

The lack of native support and proper integration of accelerators was always an issue for GNU Radio. While there were implementations for specific technologies, they never made it upstream. A simple workaround is to create an accelerator block with normal input/output buffers and do another internal copy to accelerator memory in work(). This is usually referred to as the double-copy problem.

FutureSDR handles accelerators like this:

  • when connecting input/output streams, one can optionally specify a buffer type
  • accelerators have to provide host-to-device (H2D), device-to-host (D2H), and optionally device-to-device (D2D) buffer implementations
  • the host part of H2D and D2H buffers have to provide an interface that is similar to normal CPU buffers. This way, normal CPU blocks do not have to be adapted.
  • the accelerator block, in turn, is implemented for specific edge types, i.e., a Vulkan block will expect VulkanH2D/D2D/H2D buffers.
  • buffer handling and triggering of data processing is done in work().

I think the last point is particularly important, as it defines what an accelerator block is actually doing, i.e., what does it do in work()? The main problem is, if one sticks to GNU Radio’s signature of the work() function (which only provides raw pointers to input and output buffers), there is not much an accelerator block can do, apart from launching kernels/initiating DMA transfers.

fn work(input_buffer: &[u8], output_buffer: &[u8]) {
    launch_kernel(input_buffer, output_buffer);
}

In this case, work() only gets raw pointers but cannot, for example, allocate new accelerator memory and add them to the edge/buffer. And it only gets the pointer and the number of items in the buffer, not it’s size. So this has to be a fixed value that cannot be adapted during runtime. Given these limitations, I changed the interface to give work() direct access to stream inputs and outputs (struct StreamIo). A non-accelerator block that multiplies 32-bit floats by 23.0 could, for example, look like this:

fn work(sio: StreamIo) {
    let i = sio.input(0).slice::<f32>();
    let o = sio.output(0).slice::<f32>();
    
    let m = std::cmp::min(i.len(), o.len());
    
    for n in 0..m {
        o[n] = i[n] * 23.0;
    }

    sio.input(0).consume(m);
    sio.output(0).produce(m);
}

In addition, I added a handler to input/output buffers that processes BufferMessages.

pub enum BufferMessage {
    Consume{amount: usize},
    Produce{amount: usize},
    VkBuffEmpty {
        buff: Arc<CpuAccessibleBuffer<[u8]>>,
    },
    VkBuffFull {
        buff: Arc<CpuAccessibleBuffer<[u8]>>,
        used: usize,
    },
    ...
}

Internally, consume and produce just put messages in the handler (sio.output(O).handle(BufferMessage::Produce{amount: m})). Extending this interface to support custom buffers is rather straightforward. Until now, I mainly focused on Vulkan, since it’s available on many platforms (also on Android, for example) and doesn’t require fancy hardware, like a Zynq board.

The Vulkan block does not try to get a slice() (i.e., array of items) from its input and output buffers but mainly operates on BufferMessages with Vulkan buffer objects, allowing it to orchestrate the GPU. It can allocate new Vulkan buffers and inject them in the edge or decide when it wants to launch a kernel (which, in my opinion, wouldn’t be possible with a work() function that gets raw pointers). At the moment, I have only a simple implementation that passes a Vulkan buffer around. But the design gives full control to the block, i.e., the block becomes the GPU scheduler and can optionally coordinate with a central broker to access to a device or device queue.

The details in the following pseudo code are not important (many definitions are missing). But it should give you an idea about what could be happening in the work() function of an accelerator.

async fn work(&mut self, sio: &mut StreamIo) {

    // downstream has read all samples, push buffer back to input edge
    for m in sio.output(0).messages().drain(..) {
        match m {
            BufferMessage::VkBuffEmpty{..} => sio.input(0).handle(m).await,
            _ => {},
        }
    }

    // upstream submitted a VulkanBuffer -> launch kernel and forward buffer to output edge 
    for m in sio.input(0).messages().drain(..) {
        match m {
            BufferMessage::VkBuffFull{buff, used} => {
                {
                    let dispatch = used as u32 / 4 / 64; // 4: item size, 64: work group size

                    let mut builder = AutoCommandBufferBuilder::new(self.broker.device().clone(), self.broker.queue().family());
                    builder.dispatch([dispatch, 1, 1], self.pipeline.as_ref().clone(), self.set.as_ref().clone(), ());
                    let command_buffer = builder.build();

                    let finished = command_buffer.execute(self.broker.queue().clone());
                    finished.then_signal_fence_and_flush().wait(None);
                }

                sio.output(0).handle(BufferMessage::VkBuffFull{buff, used}).await;
            }
            _ => {},
        }
    }
}

Apart from Vulkan, I also looked into Xilinx MPSoCs and created a Rust user space driver to interface the AXI DMA controller of my Zynq ZCU106 board. (This always sounded scary to me, but it is actually much easier than expected. One just has to map DMA memory and massage some memory-mapped IO registers.) It’s at the moment a simple loopback example that is not integrated in FutureSDR. But since we just have to pass pointers to DMA pages around, this is actually simpler than Vulkan.

Another cool thing is that both accelerator types can integrate nicely with async. Launching kernels or doing DMA transfers might typically be blocking calls, which means, we should probably run the block in a separate thread (spawn_blocking, see above). We can, however, also do it non-blocking the async way. Vulkan supports this through FenceFDs (a file descriptor that can be epolled to get notified when the kernel is ready). And with the Zynq, we can epoll the file descriptor of the userspace IO device. (The generic UIO driver (uio_pdrv_genirq) allows a four-byte read on the file descriptor, once the interrupt of the DMA controller fired.)

We’ll probably stick to blocking, but it’s certainly interesting to give it a try.

API

There are no big surprises for the user-facing API. The only major difference to GNU Radio is that blocks are explicitly added to the flowgraph. This also passes ownership. The outside world only gets an identifier that can be used to refer to a block.

I think this is more reasonable, since it makes sure that nobody messes with the block through the external reference. Also, referring to blocks by memory references or (smart) pointers breaks sooner or later in anyway. Latest when the flowgraph spans multiple nodes.

Here is an example flowgraph with a Vulkan block, executing a hard-coded kernel that multiplies a 32-bit integer by 12. The copy block is, like the whole flowgraph, pretty pointless. The important part is that the blocks that are connected to the Vulkan block (1) don’t have to be adapted for Vulkan and (2) read/write directly into accelerator memory.

fn main() {
    futuresdr::runtime::init();

    let mut fg = FlowgraphBuilder::new().finish();

    let n_items = 10_000_000;
    let orig : Vec<u32> = repeat_with(|| fastrand::u32(0..1024)).take(n_items).collect();

    let broker = Arc::new(VulkanBroker::new());

    let src = Box::new(VectorSourceBuilder::<u32>::new(orig.clone()).finish());
    let cpy = Box::new(CopyBuilder::new(4).finish());
    let vulkan = Box::new(VulkanBuilder::new(broker.clone()).finish());
    let snk = Box::new(VectorSinkBuilder::<u32>::new().finish());

    let src = fg.add_block(src);
    let cpy = fg.add_block(cpy);
    let vulkan = fg.add_block(vulkan);
    let snk = fg.add_block(snk);

    fg.connect_stream(src, "out", cpy, "in");
    fg.connect_stream_with_type(cpy, "out", vulkan, "in", BufferType::VulkanH2D);
    fg.connect_stream_with_type(vulkan, "out", snk, "in", BufferType::VulkanD2H);

    fg.run();

    let snk = fg.block_ref(snk).unwrap();
    let snk = snk.as_any().downcast_ref::<Block<VectorSink<u32>>>().unwrap();
    let v = snk.kernel().items();

    assert_eq!(v.len(), n_items);
    for i in 0..v.len() {
        assert_eq!(orig[i]*12, v[i]);
    }
}

Next Steps

I hope that you found this overview a bit interesting, but, above all, I hope that you found it motivating to have a look at SDR runtimes yourself. If you have an idea for a CPU scheduler or a GPU scheduler, or if you want to connect your fancy AI algorithm to the task executor, this might be for you :-) I’ll release the code in the next weeks. It is, however, only interesting, if you want to experiment with the runtime. There are no DSP blocks at the moment.