《I/O 虚拟化(一):Virtqueue 介绍》是从宏观视角解释了 virtqueue 的概念和工作原理,但是不看看代码总觉得不够踏实。这篇文章将会以 Dragonball 项目的 virtiofs 设备 为蓝本,介绍下通用的 virtio 协议在 device 侧(VMM 侧)的实现细节,尽量屏蔽 virtiofs 设备的细节。至于 driver 侧(内核侧)本质上就是 device 侧的反逻辑,如果后面有时间了再单独写一篇文章。
在 driver 准备好数据后,通过 irqfd[1] 通知 device 侧接收数据,在接收到中断信息之后,Dragonball 侧的逻辑主要由 VirtioFsEpollHandler::process()
函数实现。
VirtioFsEpollHandler
的 config 字段的 queues 字段,其类型是 Vec<VirtioQueueConfig<Q>>
,其定义如下所示。QueueSync
就是对 Queue
类型的线程安全封装,它们的定义我都列了在下方。这里的 Queue
类型实际上是对 virtqueue 的一种描述,真正的数据需要根据这些描述信息去内存中获取。
关于 virtqueue 的定义在 rust-vmm 中有三个层次。最底层是 QueueT
trait,它定义了 virtqueue 的基本行为。
pub trait QueueT: for<'a> QueueGuard<'a> {
// === snipped ===
fn desc_table(&self) -> u64;
fn avail_ring(&self) -> u64;
fn used_ring(&self) -> u64;
// === snipped ===
}
中间层是两种 virtqueue 实现,分别是 Queue
和 QueueSync
,QueueSync
就是把 Queue
包在了 Arc<RwLock<>>
中,基础实现都在 Queue
中。
pub struct QueueSync {
state: Arc<Mutex<Queue>>,
}
// Queue 的定义
pub struct Queue {
/// The maximum size in elements offered by the device.
max_size: u16,
/// Tail position of the available ring.
next_avail: Wrapping<u16>,
/// Head position of the used ring.
next_used: Wrapping<u16>,
/// VIRTIO_F_RING_EVENT_IDX negotiated.
event_idx_enabled: bool,
/// The number of descriptor chains placed in the used ring via `add_used`
/// since the last time `needs_notification` was called on the associated queue.
num_added: Wrapping<u16>,
/// The queue size in elements the driver selected.
size: u16,
/// Indicates if the queue is finished with configuration.
ready: bool,
/// Guest physical address of the descriptor table.
desc_table: GuestAddress,
/// Guest physical address of the available ring.
avail_ring: GuestAddress,
/// Guest physical address of the used ring.
used_ring: GuestAddress,
}
从上面的定义中,desc_table
、avail_ring
以及 used_ring
对应的前一章中的 virtq_desc
、virtq_avail
以及 virtq_used
。由于是在 guest 中创建的,所以在 device 只定义了在 guest 地址空间的结构体地址。如果想要在 device 中使用,需要先将 guest 地址空间转换为 host 地址空间,然后 0..2
是 addr
、2..4
是 len
……
Virtqueue 空间是由 driver 创建的,device 是如何知道确切的地址呢?答案是通过 MMIO,driver 向指定内存写入数据后,触发 VmExit,会调用 dragonball 的 MmioV2Device::write()
。
fn write(&self, _base: IoAddress, offset: IoAddress, data: &[u8]) {
let offset = offset.raw_value();
// Write to the device configuration area.
if (MMIO_CFG_SPACE_OFF..DRAGONBALL_MMIO_DOORBELL_OFFSET).contains(&offset) {
self.set_device_config(offset - MMIO_CFG_SPACE_OFF, data);
} else if data.len() == 4 {
let v = LittleEndian::read_u32(data);
match offset {
// === snipped ===
// desc low 32 位地址
REG_MMIO_QUEUE_DESC_LOW => {
self.update_queue_field(|q| q.set_desc_table_address(Some(v), None))
}
// desc high 32 位地址
REG_MMIO_QUEUE_DESC_HIGH => {
self.update_queue_field(|q| q.set_desc_table_address(None, Some(v)))
}
REG_MMIO_QUEUE_AVAIL_LOW => {
self.update_queue_field(|q| q.set_avail_ring_address(Some(v), None))
}
REG_MMIO_QUEUE_AVAIL_HIGH => {
self.update_queue_field(|q| q.set_avail_ring_address(None, Some(v)))
}
REG_MMIO_QUEUE_USED_LOW => {
self.update_queue_field(|q| q.set_used_ring_address(Some(v), None))
}
REG_MMIO_QUEUE_USED_HIGH => {
self.update_queue_field(|q| q.set_used_ring_address(None, Some(v)))
}
// === snipped ===
}
}
// === snipped ===
}
最上层是被 virtio 设备直接使用的,出了 queue 的定义外,还规定了 queue 周边内容,比如 eventfd、notifier 等,它是 VirtioQueueConfig
。
// VirtioQueueConfig 的定义
pub struct VirtioQueueConfig<Q: QueueT = QueueSync> {
pub queue: Q,
pub eventfd: Arc<EventFd>,
notifier: Arc<dyn InterruptNotifier>,
index: u16,
}
前面说过了 VirtioFsEpollHandler::process()
是处理来自 guest 的 events 的主要逻辑,该函数获取了 VirtioQueueConfig
数组的长度(名为 queues_len
)。
fn process(&mut self, events: Events, _ops: &mut EventOps) {
// snipped
let slot = events.data();
// snipped
let queues_len = queues.len() as u32;
// snipped
match slot {
// snipped: 处理其他 events
// QUEUE_AVAIL_EVENT
_ => {
let idx = (slot - QUEUE_AVAIL_EVENT) as usize;
if let Err(e) = queues[idx].consume_event() {
error!("{}: failed to read queue event, {:?}", VIRTIO_FS_NAME, e);
return;
}
drop(guard);
if let Err(e) = self.process_queue(idx) {
error!(
"{}: process_queue failed due to error {:?}",
VIRTIO_FS_NAME, e
);
}
}
}
}
Event 携带的 data(events.data()
)表明了本次通知的类型,在 virtiofs 中类型主要有以下几种:
- Unknown(直接报错)
- Rate limiter event
- Patch rate limiter event
- Queue avail event(在 virtiofs 下 slot 可以理解为 idx,假设有 2 个 queues,那么 slot == 0 就说明第一个 queue 有信息)
我们重点关照的是最后一个 queue avail event,这个事件表示某个 virtqueue 有数据等待被 VMM 处理。首先取走对应 queue 的 eventfd 事件,参见 queue[idx].consume_event()
,但直接忽略了 eventfd 传来的值(为什么??那要这个 eventfd 的作用是什么?)。随后调用 process_queue()
处理 queue 中的数据,包含了与 avail vring 和 used vring 的交互。
为了便于理解,先来聊聊 rust-vmm 的 guest 内存管理方案,GuestAddressSpace
trait 表示 guest 的地址空间,地址空间可以是一块或多块内存区域。Rust-vmm 为Arc<M>
实现了该接口,那么说明 Arc<M>
可以表示一个或多个 GuestMemory
实例,其中M
的类型是 GuestMemory
表示多块连续的内存区域(memory regions)。
pub trait GuestAddressSpace {
type M: GuestMemory;
type T: Clone + Deref<Target = Self::M>;
fn memory(&self) -> Self::T;
}
impl<M: GuestMemory> GuestAddressSpace for Arc<M> {
type M = M;
type T = Self;
fn memory(&self) -> Self {
self.clone()
}
}
接下来介绍 Deref
的作用,Rust 源码给出了以下示例。
use std::ops::Deref;
struct DerefExample<T> {
value: T
}
impl<T> Deref for DerefExample<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.value
}
}
let x = DerefExample { value: 'a' };
assert_eq!('a', *x);
当调用 *x
的时候,rust 会自动调用 Defer::defer()
返回引用(&DerefExample::T
),然后再解引用,也就是 *x == *&DerefExample::T
。
GuestMemory
内含了多个区域(每个内存区域的类型是 GuestMemoryRegion
),提供了几个方法提供获取内存区域相关的方法。
pub trait GuestMemory {
type R: GuestMemoryRegion;
// ...
fn num_regions(&self) -> usize;
fn find_region(&self, addr: GuestAddress) -> Option<&Self::R>;
// ...
}
VirtioDeviceConfig
顾名思义是一个 virtio device 的配置,包含了 guest 地址空间(vm_as
)和 queues 配置(queues
)。VirtioDeviceConfig
提供了一个 lock_guest_memory()
,它的功能是调用 GuestAddressSpace::memory()
返回一个实现了 GuestMemory
trait 类型的 guest 内存的引用。
pub struct VirtioDeviceConfig<
AS: GuestAddressSpace,
Q: QueueT = QueueSync,
R: GuestMemoryRegion = GuestRegionMmap,
> {
pub vm_as: AS,
// ...
pub queues: Vec<VirtioQueueConfig<Q>>,
// ...
}
impl<AS, Q, R> VirtioDeviceConfig<AS, Q, R>
where
AS: GuestAddressSpace,
Q: QueueT,
R: GuestMemoryRegion,
{
// ...
pub fn lock_guest_memory(&self) -> AS::T {
self.vm_as.memory()
}
}
Rust-vmm 的内存处理的最重要的几个 traits、结构体已经搞清楚了,现在我们可以开始关注 process_queue()
到底做了什么了:访问 avail queue 以及 desc ring 获取等待处理的 buffer,解析请求后发送给 virtiofsd(virtiofs 在 host 上的 server),将处理好的数据写入 buffer 后更新 used ring,最后通知 driver。这个方法每次只处理一个 queue,因此传入参数需要指定 queue 的索引(queue_index
)。
fn process_queue(&mut self, queue_index: usize) -> Result<()> {
// (1)
let mut config_guard = self.config.lock().unwrap();
let mem = config_guard.lock_guest_memory();
let vm_as = config_guard.vm_as.clone();
// (2)
let queue = &mut config_guard.queues[queue_index];
// ...
// (3)
let mut queue_guard = queue.queue_mut().lock();
let mut iter = queue_guard
.iter(mem.clone())
.map_err(Error::VirtioQueueError)?;
// (4)
for desc_chain in &mut iter {
//...
let work_func = move || {
let guard = vm_as.memory();
let mem = guard.deref();
// (5)
let reader = Reader::from_descriptor_chain(mem, desc_chain.clone())
.map_err(FsError::InvalidDescriptorChain)
.unwrap();
// (6)
let writer = Writer::VirtioFs(
VirtioFsWriter::new(mem, desc_chain)
.map_err(FsError::InvalidDescriptorChain)
.unwrap(),
);
// (7)
let total = server
.handle_message(
reader,
writer,
cache_handler
.as_mut()
.map(|x| x as &mut dyn FsCacheReqHandler),
None,
)
.map_err(FsError::ProcessQueue)
.unwrap();
if pooled {
// (8)
let queue = &mut config.lock().unwrap().queues[queue_index];
queue.add_used(mem, head_index, total as u32);
if let Err(e) = queue.notify() {
error!("failed to signal used queue: {:?}", e);
}
} else {
// ...
}
};
// ...
}
// ...
let notify = !self.is_multi_thread() && used_count > 0;
// unlock QueueT
drop(queue_guard);
// ...
// (9)
if notify {
if let Err(e) = queue.notify() {
error!("failed to signal used queue: {:?}", e);
}
}
Ok(())
}
(1) 是从 VirtioDeviceConfig
中获取 guest 内存 mem
,这部分内容在前面介绍过了,这里不再赘述。
(2) 是根据 queue_index
从 VirtioDeviceConfig
获取一个指定的 queue。
(3) 是读取 avail ring 的 idx 字段并生成了一个 virtq_avail.ring
(详见 Avail ring)的迭代器(类型是 AvailIter
)。创建迭代器的实际实现的方法是 Queue::iter()
,分为两个子步骤:获取 avail ring 的 idx,创建迭代器。
impl QueueOwnedT for Queue {
fn iter<M>(&mut self, mem: M) -> Result<AvailIter<'_, M>, Error>
where
M: Deref,
M::Target: GuestMemory,
{
if !self.ready || self.avail_ring == GuestAddress(0) {
return Err(Error::QueueNotReady);
}
// (3.1)
self.avail_idx(mem.deref(), Ordering::Acquire)
// (3.2)
.map(move |idx| AvailIter::new(mem, idx, self))?
}
// ...
}
(3.1) avail_idx()
的作用是获取 idx
的值。Avail ring 存储了 queue 的 flags
(le16
)和 idx
(le16
),queue 是一个环形队列,每个 entry 保存了 descriptor 的 index(le16
)。
self.avail_ring
是 avail ring 的起始地址,checked_add(2)
跳过了 flags
字段(类型是 le16
),从内存中加载一个类型是 le16
的 idx
,它表示 ring 的头。
impl QueueT for Queue {
// ...
fn avail_idx<M>(&self, mem: &M, order: Ordering) -> Result<Wrapping<u16>, Error>
where
M: GuestMemory + ?Sized,
{
// idx 的起始地址
let addr = self
.avail_ring
.checked_add(2)
.ok_or(Error::AddressOverflow)?;
// 从 mem 中加载 idx 的值
mem.load(addr, order)
.map(u16::from_le)
.map(Wrapping)
.map_err(Error::GuestMemory)
}
// ...
}
(3.2) AvailIter::new()
是负责创建 AvailIter
实例的方法,它填充了全部字段:
desc_table
: desc ring 的起始地址。avail_ring
: avail ring 的起始地址。queue_size
: queue 的长度。last_index
: avail ring 的头,头是针对 driver 来说的,表示 driver 即将到来数据被写入的位置,而对于 device 来说是尾)。next_avail
: avail ring 的尾,与last_index
一样,表示 device 下一次读取的位置。
#[derive(Debug)]
pub struct AvailIter<'b, M> {
mem: M,
desc_table: GuestAddress,
avail_ring: GuestAddress,
queue_size: u16,
last_index: Wrapping<u16>,
next_avail: &'b mut Wrapping<u16>,
}
impl<'b, M> AvailIter<'b, M>
where
M: Deref,
M::Target: GuestMemory,
{
// ...
pub(crate) fn new(mem: M, idx: Wrapping<u16>, queue: &'b mut Queue) -> Result<Self, Error> {
if (idx - queue.next_avail).0 > queue.size {
return Err(Error::InvalidAvailRingIndex);
}
Ok(AvailIter {
mem,
desc_table: queue.desc_table,
avail_ring: queue.avail_ring,
queue_size: queue.size,
last_index: idx,
next_avail: &mut queue.next_avail,
})
}
// ...
}
(4) 调用 AvailIter::next()
遍历 next_avail..last_index
之间的 entries,每次遍历会访问 desc ring 获取 descriptor(类型是 DescriptorChain
)。
下面这段代码说明了是如何获得 DescriptorChain
这个对象的,上图展示了 avail ring 的布局。
对两个常量的解释:
VIRTQ_AVAIL_ELEMENT_SIZE
: 值是 2,表示一个 entry(le16
)的长度。VIRTQ_AVAIL_RING_HEADER_SIZE
: 值是 4,flags
(le16
)和idx
(le16
)的长度,表示 ring 的 header 的长度。
这个方法的流程是:
- 计算
next_avail
对应 entry 的 offset。 - 从内存中读取出 descriptor index 的值。
- 创建
DescriptorChain
并返回。
fn next(&mut self) -> Option<Self::Item> {
// 遍历退出条件:队列中无等待读取的数据
if *self.next_avail == self.last_index {
return None;
}
let elem_off =
u64::from(self.next_avail.0.checked_rem(self.queue_size)?) * VIRTQ_AVAIL_ELEMENT_SIZE;
let offset = VIRTQ_AVAIL_RING_HEADER_SIZE + elem_off;
// 取出 index
let addr = self.avail_ring.checked_add(offset)?;
let head_index: u16 = self
.mem
.load(addr, Ordering::Acquire)
.map(u16::from_le)
.map_err(|_| error!("Failed to read from memory {:x}", addr.raw_value()))
.ok()?;
// next_avail +1
*self.next_avail += Wrapping(1);
// 仅返回 desc table item 的描述,并没有实际访问内存
Some(DescriptorChain::new(
self.mem.clone(),
self.desc_table,
self.queue_size,
head_index,
))
}
一顿操作猛如虎,已经从 avail ring 中拿到了 descriptor(也就是 DescriptorChain
),接下来就是读取 buffer 加私有业务逻辑了。比如我们看的是 virtiofs,那么就是与 fs 相关的业务逻辑,如果我们看 viritio-net,那就是与网络相关的业务逻辑。注意:这篇文章的目的是介绍通用的 virtio 流程,接下来重点是介绍从 DescriptorChain
中读取 buffer,尽量避免或者少讲 virtiofs 私有逻辑。
(5)调用 Reader::from_descriptor_chain()
读取 readable buffers,即没有 VRING_DESC_F_WRITE
flag 的 descriptors。
由于遍历 descriptor chain 的操作是顺序的,先读 readable buffer 再读 writable buffer 说明 writable buffer 是置于 readable buffer 之后的。
from_descriptor_chain()
的代码如下所示。
pub fn from_descriptor_chain<M>(
mem: &'a M::Target,
desc_chain: DescriptorChain<M>,
) -> Result<Reader<'a, MS<'a, M::Target>>>
where
M: Deref,
M::Target: GuestMemory + Sized,
{
let mut total_len: usize = 0;
// (5.1)
let mut buffers = VecDeque::with_capacity(64);
// (5.2)
for desc in desc_chain.readable() {
// ...
// (5.3)
let region = mem
.find_region(desc.addr())
.ok_or(Error::FindMemoryRegion)?;
let offset = desc
.addr()
.checked_sub(region.start_addr().raw_value())
.unwrap();
// (5.4)
buffers.push_back(
region
.get_slice(MemoryRegionAddress(offset.raw_value()), desc.len() as usize)
.map_err(Error::GuestMemoryError)?,
);
}
Ok(Reader {
buffers: IoBuffers {
buffers,
bytes_consumed: 0,
},
})
}
(5.1) buffers
默认生成了一个长度是 64 的数组,每个元素的类型是 VecDeque<vm_memory::VolatileSlice<'_, <<<M as Deref>::Target as GuestMemory>::R as GuestMemoryRegion>::B>>
,简单地理解为 buffer 的指针就行。当然这个是可以扩容的,但是频繁扩容会有性能影响。
(5.2) 调用了 DiscriptorChain::readable()
方法生成了一个 DescriptorChainRwIter
实例,其 writable
字段被设置为 false
。
pub fn readable(self) -> DescriptorChainRwIter<M> {
DescriptorChainRwIter {
chain: self,
writable: false,
}
}
每一次遍历都调用的是 DescriptorChainRwIter::next()
,在 DescriptorChain::next()
的基础上增加了读写权限的判断。
// DescriptorChainRwIter::next()
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.chain.next() {
Some(v) => {
if v.is_write_only() == self.writable {
return Some(v);
}
}
None => return None,
}
}
}
// DescriptorChain::next()
fn next(&mut self) -> Option<Self::Item> {
if self.ttl == 0 || self.next_index >= self.queue_size {
return None;
}
// (5.2.1)
let desc_addr = self
.desc_table
.checked_add(self.next_index as u64 * size_of::<Descriptor>() as u64)?;
// (5.2.2)
let desc = self.mem.read_obj::<Descriptor>(desc_addr).ok()?;
// ...
if desc.has_next() {
// (5.2.3)
self.next_index = desc.next();
self.ttl -= 1;
} else {
self.ttl = 0;
}
Some(desc)
}
(5.2.1) self.next_index
初始值是从 avail queue 获取的 descriptor index,之后都是读取的 virtq_desc.next
。desc_addr
是 descriptor 的起始地址。
(5.2.2) 是根据起始地址从内存中读出 Descriptor
。
(5.2.3) 如果 chain 没有读完,即 has_next()
返回 true
,那么就设置 next_index
的值为下一个 descriptor index。
(5.3) 根据当前的 descriptor 计算出 buffer 所在的内存区域(region
)和区域内偏移(offset
)。
(5.4) 将最终 buffer 转化为一个字节的 slice 并 push 到 buffers
中。
(6) 与 (5) 非常相似,读取 desc ring 的 writable buffers,这里就不再赘述了。
(7) handle_message()
是 virtiofs 的私有逻辑了,这里不同的设备(virtio-net、virio-mem 等)处理的逻辑都不尽相同。Virtiofs 支持的操作包括 Init、Write、Open 等等操作磁盘 I/O 的逻辑。总之就是从 Reader 中读取从 device 传来的数据,然后将结果写入到 Writer 的 buffer 中通过 virtqueue 回传给 device。
(8) 将 descriptor index 添加到 used_queue 中,used queue 的 ring
数据类型是 virtq_used_elem
,除了 index 以外还有一个字段表示长度,表示本次使用的 buffers 的总长度。
(9) 设置 legacy irq 通知 driver,queue 有新的数据到来。