The aio_ring mechanism relies on two assumptions: 1. The size of the header of an aio_ring is equal to the size of an io_event, i.e. sizeof(aio_ring) equals sizeof(io_event) 2. There is no leftover space at the end of a page when populating it with io_events, i.e. PAGE_SIZE is a multiple of sizeof(io_event)
These assumptions are not met in PCuABI. This change removes these assumptions, making the aio_ring mechanism more flexible and robust.
Mapping pages independently using kmap doesn't create a contiguous virtual space. As some io_event structs could span across different pages, all the pages necessary to access the events' data need to be mapped to a contiguous virtual address space. For this reason, the ring pages are vmapped at the context initialisation step, and the mapping is cached in the context.
In addition, this change improves the performance of the aio_ring mechanism. Previously, io_events were copied from the shared ring to userspace using multiple copy_to_user calls. This change batches the copying of io_events into a maximum of two copy_to_user calls.
The changes are similar to the approach taken in [0].
[0] https://lore.kernel.org/all/1349764760-21093-4-git-send-email-koverstreet@go...
Signed-off-by: Tudor Cretu tudor.cretu@arm.com --- fs/aio.c | 98 +++++++++++++++++++------------------------------------- 1 file changed, 33 insertions(+), 65 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c index 1f235c446e89..c5e850c4a4d7 100644 --- a/fs/aio.c +++ b/fs/aio.c @@ -119,6 +119,7 @@ struct kioctx { /* Size of ringbuffer, in units of struct io_event */ unsigned nr_events;
+ struct aio_ring *ring; unsigned long mmap_base; unsigned long mmap_size;
@@ -351,6 +352,9 @@ static void aio_free_ring(struct kioctx *ctx) { int i;
+ if (ctx->ring) + vunmap(ctx->ring); + /* Disconnect the kiotx from the ring file. This prevents future * accesses to the kioctx from page migration. */ @@ -508,7 +512,6 @@ static const struct address_space_operations aio_ctx_aops = {
static int aio_setup_ring(struct kioctx *ctx, unsigned int nr_events) { - struct aio_ring *ring; struct mm_struct *mm = current->mm; unsigned long size, unused; int nr_pages; @@ -589,24 +592,23 @@ static int aio_setup_ring(struct kioctx *ctx, unsigned int nr_events) ctx->user_id = ctx->mmap_base; ctx->nr_events = nr_events; /* trusted copy */
- ring = kmap_atomic(ctx->ring_pages[0]); - ring->nr = nr_events; /* user copy */ - ring->id = ~0U; - ring->head = ring->tail = 0; - ring->magic = AIO_RING_MAGIC; - ring->compat_features = AIO_RING_COMPAT_FEATURES; - ring->incompat_features = AIO_RING_INCOMPAT_FEATURES; - ring->header_length = sizeof(struct aio_ring); - kunmap_atomic(ring); - flush_dcache_page(ctx->ring_pages[0]); + ctx->ring = vmap(ctx->ring_pages, nr_pages, VM_MAP, PAGE_KERNEL); + if (!ctx->ring) { + aio_free_ring(ctx); + return -ENOMEM; + } + + ctx->ring->nr = nr_events; /* user copy */ + ctx->ring->id = ~0U; + ctx->ring->head = ctx->ring->tail = 0; + ctx->ring->magic = AIO_RING_MAGIC; + ctx->ring->compat_features = AIO_RING_COMPAT_FEATURES; + ctx->ring->incompat_features = AIO_RING_INCOMPAT_FEATURES; + ctx->ring->header_length = sizeof(struct aio_ring);
return 0; }
-#define AIO_EVENTS_PER_PAGE (PAGE_SIZE / sizeof(struct io_event)) -#define AIO_EVENTS_FIRST_PAGE ((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event)) -#define AIO_EVENTS_OFFSET (AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE) - void kiocb_set_cancel_fn(struct kiocb *iocb, kiocb_cancel_fn *cancel) { struct aio_kiocb *req = container_of(iocb, struct aio_kiocb, rw); @@ -683,7 +685,6 @@ static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm) { unsigned i, new_nr; struct kioctx_table *table, *old; - struct aio_ring *ring;
spin_lock(&mm->ioctx_lock); table = rcu_dereference_raw(mm->ioctx_table); @@ -700,9 +701,7 @@ static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm) * we are protected from page migration * changes ring_pages by ->ring_lock. */ - ring = kmap_atomic(ctx->ring_pages[0]); - ring->id = ctx->id; - kunmap_atomic(ring); + ctx->ring->id = ctx->id; return 0; }
@@ -1031,7 +1030,6 @@ static void user_refill_reqs_available(struct kioctx *ctx) { spin_lock_irq(&ctx->completion_lock); if (ctx->completed_events) { - struct aio_ring *ring; unsigned head;
/* Access of ring->head may race with aio_read_events_ring() @@ -1043,9 +1041,7 @@ static void user_refill_reqs_available(struct kioctx *ctx) * against ctx->completed_events below will make sure we do the * safe/right thing. */ - ring = kmap_atomic(ctx->ring_pages[0]); - head = ring->head; - kunmap_atomic(ring); + head = ctx->ring->head;
refill_reqs_available(ctx, head, ctx->tail); } @@ -1133,9 +1129,7 @@ static inline void iocb_destroy(struct aio_kiocb *iocb) static void aio_complete(struct aio_kiocb *iocb) { struct kioctx *ctx = iocb->ki_ctx; - struct aio_ring *ring; - struct io_event *ev_page, *event; - unsigned tail, pos, head; + unsigned int tail, head; unsigned long flags;
/* @@ -1146,19 +1140,12 @@ static void aio_complete(struct aio_kiocb *iocb) spin_lock_irqsave(&ctx->completion_lock, flags);
tail = ctx->tail; - pos = tail + AIO_EVENTS_OFFSET; + + ctx->ring->io_events[tail] = iocb->ki_res;
if (++tail >= ctx->nr_events) tail = 0;
- ev_page = kmap_atomic(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]); - event = ev_page + pos % AIO_EVENTS_PER_PAGE; - - *event = iocb->ki_res; - - kunmap_atomic(ev_page); - flush_dcache_page(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]); - pr_debug("%p[%u]: %p: %p %Lx %Lx %Lx\n", ctx, tail, iocb, (void __user *)(unsigned long)iocb->ki_res.obj, iocb->ki_res.data, iocb->ki_res.res, iocb->ki_res.res2); @@ -1169,12 +1156,8 @@ static void aio_complete(struct aio_kiocb *iocb) smp_wmb(); /* make event visible before updating tail */
ctx->tail = tail; - - ring = kmap_atomic(ctx->ring_pages[0]); - head = ring->head; - ring->tail = tail; - kunmap_atomic(ring); - flush_dcache_page(ctx->ring_pages[0]); + head = ctx->ring->head; + ctx->ring->tail = tail;
ctx->completed_events++; if (ctx->completed_events > 1) @@ -1218,8 +1201,7 @@ static inline void iocb_put(struct aio_kiocb *iocb) static long aio_read_events_ring(struct kioctx *ctx, struct io_event __user *event, long nr) { - struct aio_ring *ring; - unsigned head, tail, pos; + unsigned int head, tail; long ret = 0; int copy_ret;
@@ -1232,11 +1214,9 @@ static long aio_read_events_ring(struct kioctx *ctx, sched_annotate_sleep(); mutex_lock(&ctx->ring_lock);
- /* Access to ->ring_pages here is protected by ctx->ring_lock. */ - ring = kmap_atomic(ctx->ring_pages[0]); - head = ring->head; - tail = ring->tail; - kunmap_atomic(ring); + /* Access to ->ring here is protected by ctx->ring_lock. */ + head = ctx->ring->head; + tail = ctx->ring->tail;
/* * Ensure that once we've read the current tail pointer, that @@ -1253,25 +1233,16 @@ static long aio_read_events_ring(struct kioctx *ctx, tail %= ctx->nr_events;
while (ret < nr) { - long avail; - struct io_event *ev; - struct page *page; + long avail = (head <= tail ? tail : ctx->nr_events) - head;
- avail = (head <= tail ? tail : ctx->nr_events) - head; if (head == tail) break;
- pos = head + AIO_EVENTS_OFFSET; - page = ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]; - pos %= AIO_EVENTS_PER_PAGE; - avail = min(avail, nr - ret); - avail = min_t(long, avail, AIO_EVENTS_PER_PAGE - pos);
- ev = kmap(page); - copy_ret = copy_to_user(event + ret, ev + pos, - sizeof(*ev) * avail); - kunmap(page); + copy_ret = copy_to_user(event + ret, + &ctx->ring->io_events[head], + sizeof(struct io_event) * avail);
if (unlikely(copy_ret)) { ret = -EFAULT; @@ -1283,10 +1254,7 @@ static long aio_read_events_ring(struct kioctx *ctx, head %= ctx->nr_events; }
- ring = kmap_atomic(ctx->ring_pages[0]); - ring->head = head; - kunmap_atomic(ring); - flush_dcache_page(ctx->ring_pages[0]); + ctx->ring->head = head;
pr_debug("%li h%u t%u\n", ret, head, tail); out: