On 15/06/2023 17:54, Tudor Cretu wrote:
Previously, the aio_ring mechanism relied on two assumptions:
Nit: I find it clearer to talk about the situation before the patch as the current situation (present tense), and then say what the patch is doing to change that. It's just a convention really, it's also tempting to do it the way you did.
- The size of the header of an aio_ring is equal to the size of an io_event, i.e. sizeof(aio_ring) equals sizeof(io_event)
- There is no leftover space at the end of a page when populating it with io_events, i.e. PAGE_SIZE is a multiple of sizeof(io_event)
These assumptions are not met in PCuABI. This change removes these assumptions, making the aio_ring mechanism more flexible and robust.
In addition, this change improves the performance of the aio_ring mechanism. Previously, io_events were copied from the shared ring to userspace using multiple copy_to_user calls. This change batches the copying of io_events into a maximum of two copy_to_user calls.
Mapping pages independently using kmap doesn't create a contiguous virtual space. As some io_event structs could span across different pages, all the pages necessary to access the events' data need to be mapped to a contiguous virtual address space. For this reason, the ring pages are vmapped at the context initialisation step, and the mapping is cached in the context.
Should this paragraph be before the previous one? Considering this is the main justification it would feel more natural.
The changes are similar to the approach taken in [0].
[0] https://lore.kernel.org/all/1349764760-21093-4-git-send-email-koverstreet@go...
Signed-off-by: Tudor Cretu tudor.cretu@arm.com
fs/aio.c | 99 ++++++++++++++++++-------------------------------------- 1 file changed, 32 insertions(+), 67 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c index 7d64ba7e42a5..4fdab03bc0f2 100644 --- a/fs/aio.c +++ b/fs/aio.c @@ -119,6 +119,7 @@ struct kioctx { /* Size of ringbuffer, in units of struct io_event */ unsigned nr_events;
- struct aio_ring *ring; unsigned long mmap_base; unsigned long mmap_size;
@@ -508,7 +509,6 @@ static const struct address_space_operations aio_ctx_aops = { static int aio_setup_ring(struct kioctx *ctx, unsigned int nr_events) {
- struct aio_ring *ring; struct mm_struct *mm = current->mm; unsigned long size, unused; int nr_pages;
@@ -589,24 +589,23 @@ static int aio_setup_ring(struct kioctx *ctx, unsigned int nr_events) ctx->user_id = ctx->mmap_base; ctx->nr_events = nr_events; /* trusted copy */
- ring = kmap_atomic(ctx->ring_pages[0]);
- ring->nr = nr_events; /* user copy */
- ring->id = ~0U;
- ring->head = ring->tail = 0;
- ring->magic = AIO_RING_MAGIC;
- ring->compat_features = AIO_RING_COMPAT_FEATURES;
- ring->incompat_features = AIO_RING_INCOMPAT_FEATURES;
- ring->header_length = sizeof(struct aio_ring);
- kunmap_atomic(ring);
- flush_dcache_page(ctx->ring_pages[0]);
- ctx->ring = vmap(ctx->ring_pages, nr_pages, VM_MAP, PAGE_KERNEL);
- if (!ctx->ring) {
aio_free_ring(ctx);
return -ENOMEM;
- }
- ctx->ring->nr = nr_events; /* user copy */
- ctx->ring->id = ~0U;
- ctx->ring->head = ctx->ring->tail = 0;
- ctx->ring->magic = AIO_RING_MAGIC;
- ctx->ring->compat_features = AIO_RING_COMPAT_FEATURES;
- ctx->ring->incompat_features = AIO_RING_INCOMPAT_FEATURES;
- ctx->ring->header_length = sizeof(struct aio_ring);
return 0; } -#define AIO_EVENTS_PER_PAGE (PAGE_SIZE / sizeof(struct io_event)) -#define AIO_EVENTS_FIRST_PAGE ((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event)) -#define AIO_EVENTS_OFFSET (AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE)
void kiocb_set_cancel_fn(struct kiocb *iocb, kiocb_cancel_fn *cancel) { struct aio_kiocb *req = container_of(iocb, struct aio_kiocb, rw); @@ -683,7 +682,6 @@ static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm) { unsigned i, new_nr; struct kioctx_table *table, *old;
- struct aio_ring *ring;
spin_lock(&mm->ioctx_lock); table = rcu_dereference_raw(mm->ioctx_table); @@ -700,9 +698,7 @@ static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm) * we are protected from page migration
Speaking of migration, I wonder if we shouldn't do something about aio_migrate_folio(), since it seems to replace the backing pages (I don't completely understand what it does though).
* changes ring_pages by ->ring_lock. */
ring = kmap_atomic(ctx->ring_pages[0]);
ring->id = ctx->id;
kunmap_atomic(ring);
ctx->ring->id = ctx->id; return 0; }
@@ -1031,7 +1027,6 @@ static void user_refill_reqs_available(struct kioctx *ctx) { spin_lock_irq(&ctx->completion_lock); if (ctx->completed_events) {
unsigned head;struct aio_ring *ring;
/* Access of ring->head may race with aio_read_events_ring() @@ -1043,9 +1038,7 @@ static void user_refill_reqs_available(struct kioctx *ctx) * against ctx->completed_events below will make sure we do the * safe/right thing. */
ring = kmap_atomic(ctx->ring_pages[0]);
head = ring->head;
kunmap_atomic(ring);
head = ctx->ring->head;
refill_reqs_available(ctx, head, ctx->tail); } @@ -1133,9 +1126,7 @@ static inline void iocb_destroy(struct aio_kiocb *iocb) static void aio_complete(struct aio_kiocb *iocb) { struct kioctx *ctx = iocb->ki_ctx;
- struct aio_ring *ring;
- struct io_event *ev_page, *event;
- unsigned tail, pos, head;
- unsigned int tail, head; unsigned long flags;
/* @@ -1146,18 +1137,8 @@ static void aio_complete(struct aio_kiocb *iocb) spin_lock_irqsave(&ctx->completion_lock, flags); tail = ctx->tail;
- pos = tail + AIO_EVENTS_OFFSET;
- if (++tail >= ctx->nr_events)
tail = 0;
- ev_page = kmap_atomic(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
- event = ev_page + pos % AIO_EVENTS_PER_PAGE;
- *event = iocb->ki_res;
- kunmap_atomic(ev_page);
- flush_dcache_page(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
- ctx->ring->io_events[tail] = iocb->ki_res;
pr_debug("%p[%u]: %p: %p %Lx %Lx %Lx\n", ctx, tail, iocb, (void __user *)(unsigned long)iocb->ki_res.obj, @@ -1168,13 +1149,12 @@ static void aio_complete(struct aio_kiocb *iocb) */ smp_wmb(); /* make event visible before updating tail */
- ctx->tail = tail;
- if (++tail >= ctx->nr_events)
Nit: should stay before the pr_debug() to keep the output unchanged. Not sure printing the new tail value was intended, but just in case...
tail = 0;
- ring = kmap_atomic(ctx->ring_pages[0]);
- head = ring->head;
- ring->tail = tail;
- kunmap_atomic(ring);
- flush_dcache_page(ctx->ring_pages[0]);
- ctx->tail = tail;
- head = ctx->ring->head;
- ctx->ring->tail = tail;
ctx->completed_events++; if (ctx->completed_events > 1) @@ -1218,10 +1198,8 @@ static inline void iocb_put(struct aio_kiocb *iocb) static long aio_read_events_ring(struct kioctx *ctx, struct io_event __user *event, long nr) {
- struct aio_ring *ring;
- unsigned head, tail, pos;
- unsigned int head, tail; long ret = 0;
- int copy_ret;
/* * The mutex can block and wake us up and that will cause @@ -1233,10 +1211,8 @@ static long aio_read_events_ring(struct kioctx *ctx, mutex_lock(&ctx->ring_lock); /* Access to ->ring_pages here is protected by ctx->ring_lock. */
Nit: I guess s/ring_pages/ring
- ring = kmap_atomic(ctx->ring_pages[0]);
- head = ring->head;
- tail = ring->tail;
- kunmap_atomic(ring);
- head = ctx->ring->head;
- tail = ctx->ring->tail;
/* * Ensure that once we've read the current tail pointer, that @@ -1254,24 +1230,16 @@ static long aio_read_events_ring(struct kioctx *ctx, while (ret < nr) { long avail;
struct io_event *ev;
struct page *page;
int copy_ret;
avail = (head <= tail ? tail : ctx->nr_events) - head;
Nit: I'd rather avoid this sort of move (both this line and copy_ret), as it's not actually necessary or related to what the patch does. Both do make sense, but they are a bit distracting when looking at the patch.
Kevin
if (head == tail) break;
pos = head + AIO_EVENTS_OFFSET;
page = ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE];
pos %= AIO_EVENTS_PER_PAGE;
avail = min(avail, nr - ret);avail = (head <= tail ? tail : ctx->nr_events) - head;
avail = min_t(long, avail, AIO_EVENTS_PER_PAGE - pos);
ev = kmap(page);
copy_ret = copy_to_user(event + ret, ev + pos,
sizeof(*ev) * avail);
kunmap(page);
copy_ret = copy_to_user(event + ret,
&ctx->ring->io_events[head],
sizeof(struct io_event) * avail);
if (unlikely(copy_ret)) { ret = -EFAULT; @@ -1283,10 +1251,7 @@ static long aio_read_events_ring(struct kioctx *ctx, head %= ctx->nr_events; }
- ring = kmap_atomic(ctx->ring_pages[0]);
- ring->head = head;
- kunmap_atomic(ring);
- flush_dcache_page(ctx->ring_pages[0]);
- ctx->ring->head = head;
pr_debug("%li h%u t%u\n", ret, head, tail); out: