diff options
| author | Linus Torvalds <torvalds@linux-foundation.org> | 2026-02-09 17:22:00 -0800 |
|---|---|---|
| committer | Linus Torvalds <torvalds@linux-foundation.org> | 2026-02-09 17:22:00 -0800 |
| commit | f5d4feed174ce9fb3c42886a3c36038fd5a43e25 (patch) | |
| tree | 2e1940643a141621ef28b2ecb757d0dbce6ef9d7 | |
| parent | 26c9342bb761e463774a64fb6210b4f95f5bc035 (diff) | |
| parent | 442ae406603a94f1a263654494f425302ceb0445 (diff) | |
Merge tag 'for-7.0/io_uring-20260206' of git://git.kernel.org/pub/scm/linux/kernel/git/axboe/linux
Pull io_uring updates from Jens Axboe:
- Clean up the IORING_SETUP_R_DISABLED and submitter task checking,
mostly just in preparation for relaxing the locking for SINGLE_ISSUER
in the future.
- Improve IOPOLL by using a doubly linked list to manage completions.
Previously it was singly listed, which meant that to complete request
N in the chain 0..N-1 had to have completed first. With a doubly
linked list we can complete whatever request completes in that order,
rather than need to wait for a consecutive range to be available.
This reduces latencies.
- Improve the restriction setup and checking. Mostly in preparation for
adding further features on top of that. Coming in a separate pull
request.
- Split out task_work and wait handling into separate files. These are
mostly nicely abstracted already, but still remained in the
io_uring.c file which is on the larger side.
- Use GFP_KERNEL_ACCOUNT in a few more spots, where appropriate.
- Ensure even the idle io-wq worker exits if a task no longer has any
rings open.
- Add support for a non-circular submission queue.
By default, the SQ ring keeps moving around, even if only a few
entries are used for each submission. This can be wasteful in terms
of cachelines.
If IORING_SETUP_SQ_REWIND is set for the ring when created, each
submission will start at offset 0 instead of where we last left off
doing submissions.
- Various little cleanups
* tag 'for-7.0/io_uring-20260206' of git://git.kernel.org/pub/scm/linux/kernel/git/axboe/linux: (30 commits)
io_uring/kbuf: fix memory leak if io_buffer_add_list fails
io_uring: Add SPDX id lines to remaining source files
io_uring: allow io-wq workers to exit when unused
io_uring/io-wq: add exit-on-idle state
io_uring/net: don't continue send bundle if poll was required for retry
io_uring/rsrc: use GFP_KERNEL_ACCOUNT consistently
io_uring/futex: use GFP_KERNEL_ACCOUNT for futex data allocation
io_uring/io-wq: handle !sysctl_hung_task_timeout_secs
io_uring: fix bad indentation for setup flags if statement
io_uring/rsrc: take unsigned index in io_rsrc_node_lookup()
io_uring: introduce non-circular SQ
io_uring: split out CQ waiting code into wait.c
io_uring: split out task work code into tw.c
io_uring/io-wq: don't trigger hung task for syzbot craziness
io_uring: add IO_URING_EXIT_WAIT_MAX definition
io_uring/sync: validate passed in offset
io_uring/eventfd: remove unused ctx->evfd_last_cq_tail member
io_uring/timeout: annotate data race in io_flush_timeouts()
io_uring/uring_cmd: explicitly disallow cancelations for IOPOLL
io_uring: fix IOPOLL with passthrough I/O
...
35 files changed, 1074 insertions, 915 deletions
diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h index a3e8ddc9b380..544f78e3ca32 100644 --- a/include/linux/io_uring_types.h +++ b/include/linux/io_uring_types.h @@ -224,7 +224,10 @@ struct io_restriction { DECLARE_BITMAP(sqe_op, IORING_OP_LAST); u8 sqe_flags_allowed; u8 sqe_flags_required; - bool registered; + /* IORING_OP_* restrictions exist */ + bool op_registered; + /* IORING_REGISTER_* restrictions exist */ + bool reg_registered; }; struct io_submit_link { @@ -259,7 +262,8 @@ struct io_ring_ctx { struct { unsigned int flags; unsigned int drain_next: 1; - unsigned int restricted: 1; + unsigned int op_restricted: 1; + unsigned int reg_restricted: 1; unsigned int off_timeout_used: 1; unsigned int drain_active: 1; unsigned int has_evfd: 1; @@ -316,7 +320,7 @@ struct io_ring_ctx { * manipulate the list, hence no extra locking is needed there. */ bool poll_multi_queue; - struct io_wq_work_list iopoll_list; + struct list_head iopoll_list; struct io_file_table file_table; struct io_rsrc_data buf_table; @@ -444,6 +448,9 @@ struct io_ring_ctx { struct list_head defer_list; unsigned nr_drained; + /* protected by ->completion_lock */ + unsigned nr_req_allocated; + #ifdef CONFIG_NET_RX_BUSY_POLL struct list_head napi_list; /* track busy poll napi_id */ spinlock_t napi_lock; /* napi_list lock */ @@ -456,10 +463,6 @@ struct io_ring_ctx { DECLARE_HASHTABLE(napi_ht, 4); #endif - /* protected by ->completion_lock */ - unsigned evfd_last_cq_tail; - unsigned nr_req_allocated; - /* * Protection for resize vs mmap races - both the mmap and resize * side will need to grab this lock, to prevent either side from @@ -714,15 +717,21 @@ struct io_kiocb { atomic_t refs; bool cancel_seq_set; - struct io_task_work io_task_work; + + union { + struct io_task_work io_task_work; + /* For IOPOLL setup queues, with hybrid polling */ + u64 iopoll_start; + }; + union { /* * for polled requests, i.e. IORING_OP_POLL_ADD and async armed * poll */ struct hlist_node hash_node; - /* For IOPOLL setup queues, with hybrid polling */ - u64 iopoll_start; + /* IOPOLL completion handling */ + struct list_head iopoll_node; /* for private io_kiocb freeing */ struct rcu_head rcu_head; }; diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index b5b23c0d5283..475094c7a668 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -237,6 +237,18 @@ enum io_uring_sqe_flags_bit { */ #define IORING_SETUP_SQE_MIXED (1U << 19) +/* + * When set, io_uring ignores SQ head and tail and fetches SQEs to submit + * starting from index 0 instead from the index stored in the head pointer. + * IOW, the user should place all SQE at the beginning of the SQ memory + * before issuing a submission syscall. + * + * It requires IORING_SETUP_NO_SQARRAY and is incompatible with + * IORING_SETUP_SQPOLL. The user must also never change the SQ head and tail + * values and keep it set to 0. Any other value is undefined behaviour. + */ +#define IORING_SETUP_SQ_REWIND (1U << 20) + enum io_uring_op { IORING_OP_NOP, IORING_OP_READV, diff --git a/io_uring/Makefile b/io_uring/Makefile index bc4e4a3fa0a5..bf9eff88427a 100644 --- a/io_uring/Makefile +++ b/io_uring/Makefile @@ -8,12 +8,14 @@ endif obj-$(CONFIG_IO_URING) += io_uring.o opdef.o kbuf.o rsrc.o notif.o \ tctx.o filetable.o rw.o poll.o \ - eventfd.o uring_cmd.o openclose.o \ - sqpoll.o xattr.o nop.o fs.o splice.o \ - sync.o msg_ring.o advise.o openclose.o \ - statx.o timeout.o cancel.o \ - waitid.o register.o truncate.o \ - memmap.o alloc_cache.o query.o + tw.o wait.o eventfd.o uring_cmd.o \ + openclose.o sqpoll.o xattr.o nop.o \ + fs.o splice.o sync.o msg_ring.o \ + advise.o openclose.o statx.o timeout.o \ + cancel.o waitid.o register.o \ + truncate.o memmap.o alloc_cache.o \ + query.o + obj-$(CONFIG_IO_URING_ZCRX) += zcrx.o obj-$(CONFIG_IO_WQ) += io-wq.o obj-$(CONFIG_FUTEX) += futex.o diff --git a/io_uring/alloc_cache.h b/io_uring/alloc_cache.h index d33ce159ef33..45fcd8b3b824 100644 --- a/io_uring/alloc_cache.h +++ b/io_uring/alloc_cache.h @@ -1,7 +1,9 @@ +/* SPDX-License-Identifier: GPL-2.0 */ #ifndef IOU_ALLOC_CACHE_H #define IOU_ALLOC_CACHE_H #include <linux/io_uring_types.h> +#include <linux/kasan.h> /* * Don't allow the cache to grow beyond this size. diff --git a/io_uring/cancel.c b/io_uring/cancel.c index 07b8d852218b..65e04063e343 100644 --- a/io_uring/cancel.c +++ b/io_uring/cancel.c @@ -2,10 +2,8 @@ #include <linux/kernel.h> #include <linux/errno.h> #include <linux/fs.h> -#include <linux/file.h> #include <linux/mm.h> #include <linux/slab.h> -#include <linux/namei.h> #include <linux/nospec.h> #include <linux/io_uring.h> @@ -21,6 +19,7 @@ #include "waitid.h" #include "futex.h" #include "cancel.h" +#include "wait.h" struct io_cancel { struct file *file; @@ -539,7 +538,7 @@ __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx, /* SQPOLL thread does its own polling */ if ((!(ctx->flags & IORING_SETUP_SQPOLL) && cancel_all) || is_sqpoll_thread) { - while (!wq_list_empty(&ctx->iopoll_list)) { + while (!list_empty(&ctx->iopoll_list)) { io_iopoll_try_reap_events(ctx); ret = true; cond_resched(); diff --git a/io_uring/cmd_net.c b/io_uring/cmd_net.c index 19d3ce2bd20a..cb2775936fb8 100644 --- a/io_uring/cmd_net.c +++ b/io_uring/cmd_net.c @@ -1,3 +1,4 @@ +// SPDX-License-Identifier: GPL-2.0 #include <asm/ioctls.h> #include <linux/io_uring/net.h> #include <linux/errqueue.h> diff --git a/io_uring/eventfd.h b/io_uring/eventfd.h index e2f1985c2cf9..400eda4a4165 100644 --- a/io_uring/eventfd.h +++ b/io_uring/eventfd.h @@ -1,3 +1,4 @@ +/* SPDX-License-Identifier: GPL-2.0 */ struct io_ring_ctx; int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg, diff --git a/io_uring/filetable.h b/io_uring/filetable.h index 7717ea9efd0e..c348233a3411 100644 --- a/io_uring/filetable.h +++ b/io_uring/filetable.h @@ -2,7 +2,6 @@ #ifndef IOU_FILE_TABLE_H #define IOU_FILE_TABLE_H -#include <linux/file.h> #include <linux/io_uring_types.h> #include "rsrc.h" diff --git a/io_uring/futex.c b/io_uring/futex.c index 11bfff5a80df..1dabcfd503b8 100644 --- a/io_uring/futex.c +++ b/io_uring/futex.c @@ -186,7 +186,7 @@ int io_futexv_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) return -EINVAL; ifd = kzalloc(struct_size_t(struct io_futexv_data, futexv, iof->futex_nr), - GFP_KERNEL); + GFP_KERNEL_ACCOUNT); if (!ifd) return -ENOMEM; diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c index 2fa7d3601edb..7ed04911f7b9 100644 --- a/io_uring/io-wq.c +++ b/io_uring/io-wq.c @@ -17,6 +17,7 @@ #include <linux/task_work.h> #include <linux/audit.h> #include <linux/mmu_context.h> +#include <linux/sched/sysctl.h> #include <uapi/linux/io_uring.h> #include "io-wq.h" @@ -34,6 +35,7 @@ enum { enum { IO_WQ_BIT_EXIT = 0, /* wq exiting */ + IO_WQ_BIT_EXIT_ON_IDLE = 1, /* allow all workers to exit on idle */ }; enum { @@ -706,9 +708,13 @@ static int io_wq_worker(void *data) raw_spin_lock(&acct->workers_lock); /* * Last sleep timed out. Exit if we're not the last worker, - * or if someone modified our affinity. + * or if someone modified our affinity. If wq is marked + * idle-exit, drop the worker as well. This is used to avoid + * keeping io-wq workers around for tasks that no longer have + * any active io_uring instances. */ - if (last_timeout && (exit_mask || acct->nr_workers > 1)) { + if ((last_timeout && (exit_mask || acct->nr_workers > 1)) || + test_bit(IO_WQ_BIT_EXIT_ON_IDLE, &wq->state)) { acct->nr_workers--; raw_spin_unlock(&acct->workers_lock); __set_current_state(TASK_RUNNING); @@ -963,6 +969,24 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data) return false; } +void io_wq_set_exit_on_idle(struct io_wq *wq, bool enable) +{ + if (!wq->task) + return; + + if (!enable) { + clear_bit(IO_WQ_BIT_EXIT_ON_IDLE, &wq->state); + return; + } + + if (test_and_set_bit(IO_WQ_BIT_EXIT_ON_IDLE, &wq->state)) + return; + + rcu_read_lock(); + io_wq_for_each_worker(wq, io_wq_worker_wake, NULL); + rcu_read_unlock(); +} + static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) { do { @@ -1313,6 +1337,8 @@ static void io_wq_cancel_tw_create(struct io_wq *wq) static void io_wq_exit_workers(struct io_wq *wq) { + unsigned long timeout, warn_timeout; + if (!wq->task) return; @@ -1322,7 +1348,26 @@ static void io_wq_exit_workers(struct io_wq *wq) io_wq_for_each_worker(wq, io_wq_worker_wake, NULL); rcu_read_unlock(); io_worker_ref_put(wq); - wait_for_completion(&wq->worker_done); + + /* + * Shut up hung task complaint, see for example + * + * https://lore.kernel.org/all/696fc9e7.a70a0220.111c58.0006.GAE@google.com/ + * + * where completely overloading the system with tons of long running + * io-wq items can easily trigger the hung task timeout. Only sleep + * uninterruptibly for half that time, and warn if we exceeded end + * up waiting more than IO_URING_EXIT_WAIT_MAX. + */ + timeout = sysctl_hung_task_timeout_secs * HZ / 2; + if (!timeout) + timeout = MAX_SCHEDULE_TIMEOUT; + warn_timeout = jiffies + IO_URING_EXIT_WAIT_MAX; + do { + if (wait_for_completion_timeout(&wq->worker_done, timeout)) + break; + WARN_ON_ONCE(time_after(jiffies, warn_timeout)); + } while (1); spin_lock_irq(&wq->hash->wait.lock); list_del_init(&wq->wait.entry); diff --git a/io_uring/io-wq.h b/io_uring/io-wq.h index 774abab54732..42f00a47a9c9 100644 --- a/io_uring/io-wq.h +++ b/io_uring/io-wq.h @@ -1,3 +1,4 @@ +/* SPDX-License-Identifier: GPL-2.0 */ #ifndef INTERNAL_IO_WQ_H #define INTERNAL_IO_WQ_H @@ -41,6 +42,7 @@ struct io_wq_data { struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data); void io_wq_exit_start(struct io_wq *wq); void io_wq_put_and_exit(struct io_wq *wq); +void io_wq_set_exit_on_idle(struct io_wq *wq, bool enable); void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work); void io_wq_hash_work(struct io_wq_work *work, void *val); diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index b7a077c11c21..2ca561881ef7 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -40,37 +40,25 @@ * Copyright (c) 2018-2019 Christoph Hellwig */ #include <linux/kernel.h> -#include <linux/init.h> #include <linux/errno.h> #include <linux/syscalls.h> -#include <net/compat.h> #include <linux/refcount.h> -#include <linux/uio.h> #include <linux/bits.h> #include <linux/sched/signal.h> #include <linux/fs.h> -#include <linux/file.h> #include <linux/mm.h> -#include <linux/mman.h> #include <linux/percpu.h> #include <linux/slab.h> -#include <linux/bvec.h> -#include <linux/net.h> -#include <net/sock.h> #include <linux/anon_inodes.h> -#include <linux/sched/mm.h> #include <linux/uaccess.h> #include <linux/nospec.h> -#include <linux/fsnotify.h> -#include <linux/fadvise.h> #include <linux/task_work.h> #include <linux/io_uring.h> #include <linux/io_uring/cmd.h> #include <linux/audit.h> #include <linux/security.h> #include <linux/jump_label.h> -#include <asm/shmparam.h> #define CREATE_TRACE_POINTS #include <trace/events/io_uring.h> @@ -105,6 +93,7 @@ #include "rw.h" #include "alloc_cache.h" #include "eventfd.h" +#include "wait.h" #define SQE_COMMON_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_LINK | \ IOSQE_IO_HARDLINK | IOSQE_ASYNC) @@ -122,19 +111,10 @@ #define IO_COMPL_BATCH 32 #define IO_REQ_ALLOC_BATCH 8 -#define IO_LOCAL_TW_DEFAULT_MAX 20 /* requests with any of those set should undergo io_disarm_next() */ #define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL) -/* - * No waiters. It's larger than any valid value of the tw counter - * so that tests against ->cq_wait_nr would fail and skip wake_up(). - */ -#define IO_CQ_WAKE_INIT (-1U) -/* Forced wake up if there is a waiter regardless of ->cq_wait_nr */ -#define IO_CQ_WAKE_FORCE (IO_CQ_WAKE_INIT >> 1) - static void io_queue_sqe(struct io_kiocb *req, unsigned int extra_flags); static void __io_req_caches_free(struct io_ring_ctx *ctx); @@ -187,16 +167,6 @@ static void io_poison_req(struct io_kiocb *req) req->link = IO_URING_PTR_POISON; } -static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx) -{ - return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head); -} - -static inline unsigned int __io_cqring_events_user(struct io_ring_ctx *ctx) -{ - return READ_ONCE(ctx->rings->cq.tail) - READ_ONCE(ctx->rings->cq.head); -} - static inline void req_fail_link_node(struct io_kiocb *req, int res) { req_set_fail(req); @@ -217,38 +187,6 @@ static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref) complete(&ctx->ref_comp); } -/* - * Terminate the request if either of these conditions are true: - * - * 1) It's being executed by the original task, but that task is marked - * with PF_EXITING as it's exiting. - * 2) PF_KTHREAD is set, in which case the invoker of the task_work is - * our fallback task_work. - * 3) The ring has been closed and is going away. - */ -static inline bool io_should_terminate_tw(struct io_ring_ctx *ctx) -{ - return (current->flags & (PF_EXITING | PF_KTHREAD)) || percpu_ref_is_dying(&ctx->refs); -} - -static __cold void io_fallback_req_func(struct work_struct *work) -{ - struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, - fallback_work.work); - struct llist_node *node = llist_del_all(&ctx->fallback_llist); - struct io_kiocb *req, *tmp; - struct io_tw_state ts = {}; - - percpu_ref_get(&ctx->refs); - mutex_lock(&ctx->uring_lock); - ts.cancel = io_should_terminate_tw(ctx); - llist_for_each_entry_safe(req, tmp, node, io_task_work.node) - req->io_task_work.func((struct io_tw_req){req}, ts); - io_submit_flush_completions(ctx); - mutex_unlock(&ctx->uring_lock); - percpu_ref_put(&ctx->refs); -} - static int io_alloc_hash_table(struct io_hash_table *table, unsigned bits) { unsigned int hash_buckets; @@ -334,7 +272,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) init_waitqueue_head(&ctx->poll_wq); spin_lock_init(&ctx->completion_lock); raw_spin_lock_init(&ctx->timeout_lock); - INIT_WQ_LIST(&ctx->iopoll_list); + INIT_LIST_HEAD(&ctx->iopoll_list); INIT_LIST_HEAD(&ctx->defer_list); INIT_LIST_HEAD(&ctx->timeout_list); INIT_LIST_HEAD(&ctx->ltimeout_list); @@ -643,7 +581,7 @@ static void io_cqring_overflow_kill(struct io_ring_ctx *ctx) __io_cqring_overflow_flush(ctx, true); } -static void io_cqring_do_overflow_flush(struct io_ring_ctx *ctx) +void io_cqring_do_overflow_flush(struct io_ring_ctx *ctx) { mutex_lock(&ctx->uring_lock); __io_cqring_overflow_flush(ctx, false); @@ -1083,336 +1021,6 @@ static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req) return nxt; } -static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw) -{ - if (!ctx) - return; - if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) - atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); - - io_submit_flush_completions(ctx); - mutex_unlock(&ctx->uring_lock); - percpu_ref_put(&ctx->refs); -} - -/* - * Run queued task_work, returning the number of entries processed in *count. - * If more entries than max_entries are available, stop processing once this - * is reached and return the rest of the list. - */ -struct llist_node *io_handle_tw_list(struct llist_node *node, - unsigned int *count, - unsigned int max_entries) -{ - struct io_ring_ctx *ctx = NULL; - struct io_tw_state ts = { }; - - do { - struct llist_node *next = node->next; - struct io_kiocb *req = container_of(node, struct io_kiocb, - io_task_work.node); - - if (req->ctx != ctx) { - ctx_flush_and_put(ctx, ts); - ctx = req->ctx; - mutex_lock(&ctx->uring_lock); - percpu_ref_get(&ctx->refs); - ts.cancel = io_should_terminate_tw(ctx); - } - INDIRECT_CALL_2(req->io_task_work.func, - io_poll_task_func, io_req_rw_complete, - (struct io_tw_req){req}, ts); - node = next; - (*count)++; - if (unlikely(need_resched())) { - ctx_flush_and_put(ctx, ts); - ctx = NULL; - cond_resched(); - } - } while (node && *count < max_entries); - - ctx_flush_and_put(ctx, ts); - return node; -} - -static __cold void __io_fallback_tw(struct llist_node *node, bool sync) -{ - struct io_ring_ctx *last_ctx = NULL; - struct io_kiocb *req; - - while (node) { - req = container_of(node, struct io_kiocb, io_task_work.node); - node = node->next; - if (last_ctx != req->ctx) { - if (last_ctx) { - if (sync) - flush_delayed_work(&last_ctx->fallback_work); - percpu_ref_put(&last_ctx->refs); - } - last_ctx = req->ctx; - percpu_ref_get(&last_ctx->refs); - } - if (llist_add(&req->io_task_work.node, &last_ctx->fallback_llist)) - schedule_delayed_work(&last_ctx->fallback_work, 1); - } - - if (last_ctx) { - if (sync) - flush_delayed_work(&last_ctx->fallback_work); - percpu_ref_put(&last_ctx->refs); - } -} - -static void io_fallback_tw(struct io_uring_task *tctx, bool sync) -{ - struct llist_node *node = llist_del_all(&tctx->task_list); - - __io_fallback_tw(node, sync); -} - -struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, - unsigned int max_entries, - unsigned int *count) -{ - struct llist_node *node; - - node = llist_del_all(&tctx->task_list); - if (node) { - node = llist_reverse_order(node); - node = io_handle_tw_list(node, count, max_entries); - } - - /* relaxed read is enough as only the task itself sets ->in_cancel */ - if (unlikely(atomic_read(&tctx->in_cancel))) - io_uring_drop_tctx_refs(current); - - trace_io_uring_task_work_run(tctx, *count); - return node; -} - -void tctx_task_work(struct callback_head *cb) -{ - struct io_uring_task *tctx; - struct llist_node *ret; - unsigned int count = 0; - - tctx = container_of(cb, struct io_uring_task, task_work); - ret = tctx_task_work_run(tctx, UINT_MAX, &count); - /* can't happen */ - WARN_ON_ONCE(ret); -} - -static void io_req_local_work_add(struct io_kiocb *req, unsigned flags) -{ - struct io_ring_ctx *ctx = req->ctx; - unsigned nr_wait, nr_tw, nr_tw_prev; - struct llist_node *head; - - /* See comment above IO_CQ_WAKE_INIT */ - BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES); - - /* - * We don't know how many requests there are in the link and whether - * they can even be queued lazily, fall back to non-lazy. - */ - if (req->flags & IO_REQ_LINK_FLAGS) - flags &= ~IOU_F_TWQ_LAZY_WAKE; - - guard(rcu)(); - - head = READ_ONCE(ctx->work_llist.first); - do { - nr_tw_prev = 0; - if (head) { - struct io_kiocb *first_req = container_of(head, - struct io_kiocb, - io_task_work.node); - /* - * Might be executed at any moment, rely on - * SLAB_TYPESAFE_BY_RCU to keep it alive. - */ - nr_tw_prev = READ_ONCE(first_req->nr_tw); - } - - /* - * Theoretically, it can overflow, but that's fine as one of - * previous adds should've tried to wake the task. - */ - nr_tw = nr_tw_prev + 1; - if (!(flags & IOU_F_TWQ_LAZY_WAKE)) - nr_tw = IO_CQ_WAKE_FORCE; - - req->nr_tw = nr_tw; - req->io_task_work.node.next = head; - } while (!try_cmpxchg(&ctx->work_llist.first, &head, - &req->io_task_work.node)); - - /* - * cmpxchg implies a full barrier, which pairs with the barrier - * in set_current_state() on the io_cqring_wait() side. It's used - * to ensure that either we see updated ->cq_wait_nr, or waiters - * going to sleep will observe the work added to the list, which - * is similar to the wait/wawke task state sync. - */ - - if (!head) { - if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) - atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); - if (ctx->has_evfd) - io_eventfd_signal(ctx, false); - } - - nr_wait = atomic_read(&ctx->cq_wait_nr); - /* not enough or no one is waiting */ - if (nr_tw < nr_wait) - return; - /* the previous add has already woken it up */ - if (nr_tw_prev >= nr_wait) - return; - wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE); -} - -static void io_req_normal_work_add(struct io_kiocb *req) -{ - struct io_uring_task *tctx = req->tctx; - struct io_ring_ctx *ctx = req->ctx; - - /* task_work already pending, we're done */ - if (!llist_add(&req->io_task_work.node, &tctx->task_list)) - return; - - if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) - atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); - - /* SQPOLL doesn't need the task_work added, it'll run it itself */ - if (ctx->flags & IORING_SETUP_SQPOLL) { - __set_notify_signal(tctx->task); - return; - } - - if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method))) - return; - - io_fallback_tw(tctx, false); -} - -void __io_req_task_work_add(struct io_kiocb *req, unsigned flags) -{ - if (req->ctx->flags & IORING_SETUP_DEFER_TASKRUN) - io_req_local_work_add(req, flags); - else - io_req_normal_work_add(req); -} - -void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags) -{ - if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN))) - return; - __io_req_task_work_add(req, flags); -} - -static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) -{ - struct llist_node *node = llist_del_all(&ctx->work_llist); - - __io_fallback_tw(node, false); - node = llist_del_all(&ctx->retry_llist); - __io_fallback_tw(node, false); -} - -static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, - int min_events) -{ - if (!io_local_work_pending(ctx)) - return false; - if (events < min_events) - return true; - if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) - atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); - return false; -} - -static int __io_run_local_work_loop(struct llist_node **node, - io_tw_token_t tw, - int events) -{ - int ret = 0; - - while (*node) { - struct llist_node *next = (*node)->next; - struct io_kiocb *req = container_of(*node, struct io_kiocb, - io_task_work.node); - INDIRECT_CALL_2(req->io_task_work.func, - io_poll_task_func, io_req_rw_complete, - (struct io_tw_req){req}, tw); - *node = next; - if (++ret >= events) - break; - } - - return ret; -} - -static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw, - int min_events, int max_events) -{ - struct llist_node *node; - unsigned int loops = 0; - int ret = 0; - - if (WARN_ON_ONCE(ctx->submitter_task != current)) - return -EEXIST; - if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) - atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); -again: - tw.cancel = io_should_terminate_tw(ctx); - min_events -= ret; - ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events); - if (ctx->retry_llist.first) - goto retry_done; - - /* - * llists are in reverse order, flip it back the right way before - * running the pending items. - */ - node = llist_reverse_order(llist_del_all(&ctx->work_llist)); - ret += __io_run_local_work_loop(&node, tw, max_events - ret); - ctx->retry_llist.first = node; - loops++; - - if (io_run_local_work_continue(ctx, ret, min_events)) - goto again; -retry_done: - io_submit_flush_completions(ctx); - if (io_run_local_work_continue(ctx, ret, min_events)) - goto again; - - trace_io_uring_local_work_run(ctx, ret, loops); - return ret; -} - -static inline int io_run_local_work_locked(struct io_ring_ctx *ctx, - int min_events) -{ - struct io_tw_state ts = {}; - - if (!io_local_work_pending(ctx)) - return 0; - return __io_run_local_work(ctx, ts, min_events, - max(IO_LOCAL_TW_DEFAULT_MAX, min_events)); -} - -int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events) -{ - struct io_tw_state ts = {}; - int ret; - - mutex_lock(&ctx->uring_lock); - ret = __io_run_local_work(ctx, ts, min_events, max_events); - mutex_unlock(&ctx->uring_lock); - return ret; -} - static void io_req_task_cancel(struct io_tw_req tw_req, io_tw_token_t tw) { struct io_kiocb *req = tw_req.req; @@ -1545,13 +1153,6 @@ void __io_submit_flush_completions(struct io_ring_ctx *ctx) ctx->submit_state.cq_flush = false; } -static unsigned io_cqring_events(struct io_ring_ctx *ctx) -{ - /* See comment at the top of this file */ - smp_rmb(); - return __io_cqring_events(ctx); -} - /* * We can't just wait for polled events to come to us, we have to actively * find and complete them. @@ -1562,7 +1163,7 @@ __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx) return; mutex_lock(&ctx->uring_lock); - while (!wq_list_empty(&ctx->iopoll_list)) { + while (!list_empty(&ctx->iopoll_list)) { /* let it sleep and repeat later if can't complete a request */ if (io_do_iopoll(ctx, true) == 0) break; @@ -1627,21 +1228,18 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned int min_events) * forever, while the workqueue is stuck trying to acquire the * very same mutex. */ - if (wq_list_empty(&ctx->iopoll_list) || - io_task_work_pending(ctx)) { + if (list_empty(&ctx->iopoll_list) || io_task_work_pending(ctx)) { u32 tail = ctx->cached_cq_tail; (void) io_run_local_work_locked(ctx, min_events); - if (task_work_pending(current) || - wq_list_empty(&ctx->iopoll_list)) { + if (task_work_pending(current) || list_empty(&ctx->iopoll_list)) { mutex_unlock(&ctx->uring_lock); io_run_task_work(); mutex_lock(&ctx->uring_lock); } /* some requests don't go through iopoll_list */ - if (tail != ctx->cached_cq_tail || - wq_list_empty(&ctx->iopoll_list)) + if (tail != ctx->cached_cq_tail || list_empty(&ctx->iopoll_list)) break; } ret = io_do_iopoll(ctx, !min_events); @@ -1684,25 +1282,17 @@ static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags) * how we do polling eventually, not spinning if we're on potentially * different devices. */ - if (wq_list_empty(&ctx->iopoll_list)) { + if (list_empty(&ctx->iopoll_list)) { ctx->poll_multi_queue = false; } else if (!ctx->poll_multi_queue) { struct io_kiocb *list_req; - list_req = container_of(ctx->iopoll_list.first, struct io_kiocb, - comp_list); + list_req = list_first_entry(&ctx->iopoll_list, struct io_kiocb, iopoll_node); if (list_req->file != req->file) ctx->poll_multi_queue = true; } - /* - * For fast devices, IO may have already completed. If it has, add - * it to the front so we find it first. - */ - if (READ_ONCE(req->iopoll_completed)) - wq_list_add_head(&req->comp_list, &ctx->iopoll_list); - else - wq_list_add_tail(&req->comp_list, &ctx->iopoll_list); + list_add_tail(&req->iopoll_node, &ctx->iopoll_list); if (unlikely(needs_lock)) { /* @@ -2080,6 +1670,8 @@ static inline bool io_check_restriction(struct io_ring_ctx *ctx, struct io_kiocb *req, unsigned int sqe_flags) { + if (!ctx->op_restricted) + return true; if (!test_bit(req->opcode, ctx->restrictions.sqe_op)) return false; @@ -2181,8 +1773,8 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, io_init_drain(ctx); } } - if (unlikely(ctx->restricted || ctx->drain_active || ctx->drain_next)) { - if (ctx->restricted && !io_check_restriction(ctx, req, sqe_flags)) + if (unlikely(ctx->op_restricted || ctx->drain_active || ctx->drain_next)) { + if (!io_check_restriction(ctx, req, sqe_flags)) return io_init_fail_req(req, -EACCES); /* knock it to the slow queue path, will be drained there */ if (ctx->drain_active) @@ -2354,12 +1946,16 @@ static void io_commit_sqring(struct io_ring_ctx *ctx) { struct io_rings *rings = ctx->rings; - /* - * Ensure any loads from the SQEs are done at this point, - * since once we write the new head, the application could - * write new data to them. - */ - smp_store_release(&rings->sq.head, ctx->cached_sq_head); + if (ctx->flags & IORING_SETUP_SQ_REWIND) { + ctx->cached_sq_head = 0; + } else { + /* + * Ensure any loads from the SQEs are done at this point, + * since once we write the new head, the application could + * write new data to them. + */ + smp_store_release(&rings->sq.head, ctx->cached_sq_head); + } } /* @@ -2405,10 +2001,15 @@ static bool io_get_sqe(struct io_ring_ctx *ctx, const struct io_uring_sqe **sqe) int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) __must_hold(&ctx->uring_lock) { - unsigned int entries = io_sqring_entries(ctx); + unsigned int entries; unsigned int left; int ret; + if (ctx->flags & IORING_SETUP_SQ_REWIND) + entries = ctx->sq_entries; + else + entries = io_sqring_entries(ctx); + entries = min(nr, entries); if (unlikely(!entries)) return 0; @@ -2453,308 +2054,6 @@ int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) return ret; } -static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode, - int wake_flags, void *key) -{ - struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq); - - /* - * Cannot safely flush overflowed CQEs from here, ensure we wake up - * the task, and the next invocation will do it. - */ - if (io_should_wake(iowq) || io_has_work(iowq->ctx)) - return autoremove_wake_function(curr, mode, wake_flags, key); - return -1; -} - -int io_run_task_work_sig(struct io_ring_ctx *ctx) -{ - if (io_local_work_pending(ctx)) { - __set_current_state(TASK_RUNNING); - if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0) - return 0; - } - if (io_run_task_work() > 0) - return 0; - if (task_sigpending(current)) - return -EINTR; - return 0; -} - -static bool current_pending_io(void) -{ - struct io_uring_task *tctx = current->io_uring; - - if (!tctx) - return false; - return percpu_counter_read_positive(&tctx->inflight); -} - -static enum hrtimer_restart io_cqring_timer_wakeup(struct hrtimer *timer) -{ - struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t); - - WRITE_ONCE(iowq->hit_timeout, 1); - iowq->min_timeout = 0; - wake_up_process(iowq->wq.private); - return HRTIMER_NORESTART; -} - -/* - * Doing min_timeout portion. If we saw any timeouts, events, or have work, - * wake up. If not, and we have a normal timeout, switch to that and keep - * sleeping. - */ -static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer) -{ - struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t); - struct io_ring_ctx *ctx = iowq->ctx; - - /* no general timeout, or shorter (or equal), we are done */ - if (iowq->timeout == KTIME_MAX || - ktime_compare(iowq->min_timeout, iowq->timeout) >= 0) - goto out_wake; - /* work we may need to run, wake function will see if we need to wake */ - if (io_has_work(ctx)) - goto out_wake; - /* got events since we started waiting, min timeout is done */ - if (iowq->cq_min_tail != READ_ONCE(ctx->rings->cq.tail)) - goto out_wake; - /* if we have any events and min timeout expired, we're done */ - if (io_cqring_events(ctx)) - goto out_wake; - - /* - * If using deferred task_work running and application is waiting on - * more than one request, ensure we reset it now where we are switching - * to normal sleeps. Any request completion post min_wait should wake - * the task and return. - */ - if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { - atomic_set(&ctx->cq_wait_nr, 1); - smp_mb(); - if (!llist_empty(&ctx->work_llist)) - goto out_wake; - } - - /* any generated CQE posted past this time should wake us up */ - iowq->cq_tail = iowq->cq_min_tail; - - hrtimer_update_function(&iowq->t, io_cqring_timer_wakeup); - hrtimer_set_expires(timer, iowq->timeout); - return HRTIMER_RESTART; -out_wake: - return io_cqring_timer_wakeup(timer); -} - -static int io_cqring_schedule_timeout(struct io_wait_queue *iowq, - clockid_t clock_id, ktime_t start_time) -{ - ktime_t timeout; - - if (iowq->min_timeout) { - timeout = ktime_add_ns(iowq->min_timeout, start_time); - hrtimer_setup_on_stack(&iowq->t, io_cqring_min_timer_wakeup, clock_id, - HRTIMER_MODE_ABS); - } else { - timeout = iowq->timeout; - hrtimer_setup_on_stack(&iowq->t, io_cqring_timer_wakeup, clock_id, - HRTIMER_MODE_ABS); - } - - hrtimer_set_expires_range_ns(&iowq->t, timeout, 0); - hrtimer_start_expires(&iowq->t, HRTIMER_MODE_ABS); - - if (!READ_ONCE(iowq->hit_timeout)) - schedule(); - - hrtimer_cancel(&iowq->t); - destroy_hrtimer_on_stack(&iowq->t); - __set_current_state(TASK_RUNNING); - - return READ_ONCE(iowq->hit_timeout) ? -ETIME : 0; -} - -struct ext_arg { - size_t argsz; - struct timespec64 ts; - const sigset_t __user *sig; - ktime_t min_time; - bool ts_set; - bool iowait; -}; - -static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx, - struct io_wait_queue *iowq, - struct ext_arg *ext_arg, - ktime_t start_time) -{ - int ret = 0; - - /* - * Mark us as being in io_wait if we have pending requests, so cpufreq - * can take into account that the task is waiting for IO - turns out - * to be important for low QD IO. - */ - if (ext_arg->iowait && current_pending_io()) - current->in_iowait = 1; - if (iowq->timeout != KTIME_MAX || iowq->min_timeout) - ret = io_cqring_schedule_timeout(iowq, ctx->clockid, start_time); - else - schedule(); - current->in_iowait = 0; - return ret; -} - -/* If this returns > 0, the caller should retry */ -static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx, - struct io_wait_queue *iowq, - struct ext_arg *ext_arg, - ktime_t start_time) -{ - if (unlikely(READ_ONCE(ctx->check_cq))) - return 1; - if (unlikely(io_local_work_pending(ctx))) - return 1; - if (unlikely(task_work_pending(current))) - return 1; - if (unlikely(task_sigpending(current))) - return -EINTR; - if (unlikely(io_should_wake(iowq))) - return 0; - - return __io_cqring_wait_schedule(ctx, iowq, ext_arg, start_time); -} - -/* - * Wait until events become available, if we don't already have some. The - * application must reap them itself, as they reside on the shared cq ring. - */ -static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, - struct ext_arg *ext_arg) -{ - struct io_wait_queue iowq; - struct io_rings *rings = ctx->rings; - ktime_t start_time; - int ret; - - min_events = min_t(int, min_events, ctx->cq_entries); - - if (!io_allowed_run_tw(ctx)) - return -EEXIST; - if (io_local_work_pending(ctx)) - io_run_local_work(ctx, min_events, - max(IO_LOCAL_TW_DEFAULT_MAX, min_events)); - io_run_task_work(); - - if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq))) - io_cqring_do_overflow_flush(ctx); - if (__io_cqring_events_user(ctx) >= min_events) - return 0; - - init_waitqueue_func_entry(&iowq.wq, io_wake_function); - iowq.wq.private = current; - INIT_LIST_HEAD(&iowq.wq.entry); - iowq.ctx = ctx; - iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events; - iowq.cq_min_tail = READ_ONCE(ctx->rings->cq.tail); - iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); - iowq.hit_timeout = 0; - iowq.min_timeout = ext_arg->min_time; - iowq.timeout = KTIME_MAX; - start_time = io_get_time(ctx); - - if (ext_arg->ts_set) { - iowq.timeout = timespec64_to_ktime(ext_arg->ts); - if (!(flags & IORING_ENTER_ABS_TIMER)) - iowq.timeout = ktime_add(iowq.timeout, start_time); - } - - if (ext_arg->sig) { -#ifdef CONFIG_COMPAT - if (in_compat_syscall()) - ret = set_compat_user_sigmask((const compat_sigset_t __user *)ext_arg->sig, - ext_arg->argsz); - else -#endif - ret = set_user_sigmask(ext_arg->sig, ext_arg->argsz); - - if (ret) - return ret; - } - - io_napi_busy_loop(ctx, &iowq); - - trace_io_uring_cqring_wait(ctx, min_events); - do { - unsigned long check_cq; - int nr_wait; - - /* if min timeout has been hit, don't reset wait count */ - if (!iowq.hit_timeout) - nr_wait = (int) iowq.cq_tail - - READ_ONCE(ctx->rings->cq.tail); - else - nr_wait = 1; - - if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { - atomic_set(&ctx->cq_wait_nr, nr_wait); - set_current_state(TASK_INTERRUPTIBLE); - } else { - prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq, - TASK_INTERRUPTIBLE); - } - - ret = io_cqring_wait_schedule(ctx, &iowq, ext_arg, start_time); - __set_current_state(TASK_RUNNING); - atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT); - - /* - * Run task_work after scheduling and before io_should_wake(). - * If we got woken because of task_work being processed, run it - * now rather than let the caller do another wait loop. - */ - if (io_local_work_pending(ctx)) - io_run_local_work(ctx, nr_wait, nr_wait); - io_run_task_work(); - - /* - * Non-local task_work will be run on exit to userspace, but - * if we're using DEFER_TASKRUN, then we could have waited - * with a timeout for a number of requests. If the timeout - * hits, we could have some requests ready to process. Ensure - * this break is _after_ we have run task_work, to avoid - * deferring running potentially pending requests until the - * next time we wait for events. - */ - if (ret < 0) - break; - - check_cq = READ_ONCE(ctx->check_cq); - if (unlikely(check_cq)) { - /* let the caller flush overflows, retry */ - if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)) - io_cqring_do_overflow_flush(ctx); - if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) { - ret = -EBADR; - break; - } - } - - if (io_should_wake(&iowq)) { - ret = 0; - break; - } - cond_resched(); - } while (1); - - if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) - finish_wait(&ctx->cq_wait, &iowq.wq); - restore_saved_sigmask_unless(ret == -EINTR); - - return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0; -} - static void io_rings_free(struct io_ring_ctx *ctx) { io_free_region(ctx->user, &ctx->sq_region); @@ -2984,7 +2283,7 @@ static __cold void io_tctx_exit_cb(struct callback_head *cb) static __cold void io_ring_exit_work(struct work_struct *work) { struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, exit_work); - unsigned long timeout = jiffies + HZ * 60 * 5; + unsigned long timeout = jiffies + IO_URING_EXIT_WAIT_MAX; unsigned long interval = HZ / 20; struct io_tctx_exit exit; struct io_tctx_node *node; @@ -3256,7 +2555,11 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, ctx = file->private_data; ret = -EBADFD; - if (unlikely(ctx->flags & IORING_SETUP_R_DISABLED)) + /* + * Keep IORING_SETUP_R_DISABLED check before submitter_task load + * in io_uring_add_tctx_node() -> __io_uring_add_tctx_node_from_submit() + */ + if (unlikely(smp_load_acquire(&ctx->flags) & IORING_SETUP_R_DISABLED)) goto out; /* @@ -3439,6 +2742,12 @@ static int io_uring_sanitise_params(struct io_uring_params *p) if (flags & ~IORING_SETUP_FLAGS) return -EINVAL; + if (flags & IORING_SETUP_SQ_REWIND) { + if ((flags & IORING_SETUP_SQPOLL) || + !(flags & IORING_SETUP_NO_SQARRAY)) + return -EINVAL; + } + /* There is no way to mmap rings without a real fd */ if ((flags & IORING_SETUP_REGISTERED_FD_ONLY) && !(flags & IORING_SETUP_NO_MMAP)) @@ -3661,13 +2970,8 @@ static __cold int io_uring_create(struct io_ctx_config *config) } if (ctx->flags & IORING_SETUP_SINGLE_ISSUER - && !(ctx->flags & IORING_SETUP_R_DISABLED)) { - /* - * Unlike io_register_enable_rings(), don't need WRITE_ONCE() - * since ctx isn't yet accessible from other tasks - */ + && !(ctx->flags & IORING_SETUP_R_DISABLED)) ctx->submitter_task = get_task_struct(current); - } file = io_uring_get_file(ctx); if (IS_ERR(file)) { diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h index a790c16854d3..f12e597adc9e 100644 --- a/io_uring/io_uring.h +++ b/io_uring/io_uring.h @@ -1,16 +1,17 @@ +/* SPDX-License-Identifier: GPL-2.0 */ #ifndef IOU_CORE_H #define IOU_CORE_H #include <linux/errno.h> #include <linux/lockdep.h> #include <linux/resume_user_mode.h> -#include <linux/kasan.h> #include <linux/poll.h> #include <linux/io_uring_types.h> #include <uapi/linux/eventpoll.h> #include "alloc_cache.h" #include "io-wq.h" #include "slist.h" +#include "tw.h" #include "opdef.h" #ifndef CREATE_TRACE_POINTS @@ -69,7 +70,8 @@ struct io_ctx_config { IORING_SETUP_NO_SQARRAY |\ IORING_SETUP_HYBRID_IOPOLL |\ IORING_SETUP_CQE_MIXED |\ - IORING_SETUP_SQE_MIXED) + IORING_SETUP_SQE_MIXED |\ + IORING_SETUP_SQ_REWIND) #define IORING_ENTER_FLAGS (IORING_ENTER_GETEVENTS |\ IORING_ENTER_SQ_WAKEUP |\ @@ -89,6 +91,14 @@ struct io_ctx_config { IOSQE_BUFFER_SELECT |\ IOSQE_CQE_SKIP_SUCCESS) +#define IO_REQ_LINK_FLAGS (REQ_F_LINK | REQ_F_HARDLINK) + +/* + * Complaint timeout for io_uring cancelation exits, and for io-wq exit + * worker waiting. + */ +#define IO_URING_EXIT_WAIT_MAX (HZ * 60 * 5) + enum { IOU_COMPLETE = 0, @@ -151,8 +161,6 @@ static inline bool io_should_wake(struct io_wait_queue *iowq) int io_prepare_config(struct io_ctx_config *config); bool io_cqe_cache_refill(struct io_ring_ctx *ctx, bool overflow, bool cqe32); -int io_run_task_work_sig(struct io_ring_ctx *ctx); -int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events); void io_req_defer_failed(struct io_kiocb *req, s32 res); bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags); void io_add_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags); @@ -166,15 +174,10 @@ struct file *io_file_get_normal(struct io_kiocb *req, int fd); struct file *io_file_get_fixed(struct io_kiocb *req, int fd, unsigned issue_flags); -void __io_req_task_work_add(struct io_kiocb *req, unsigned flags); -void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags); void io_req_task_queue(struct io_kiocb *req); void io_req_task_complete(struct io_tw_req tw_req, io_tw_token_t tw); void io_req_task_queue_fail(struct io_kiocb *req, int ret); void io_req_task_submit(struct io_tw_req tw_req, io_tw_token_t tw); -struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries); -struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count); -void tctx_task_work(struct callback_head *cb); __cold void io_uring_drop_tctx_refs(struct task_struct *task); int io_ring_add_registered_file(struct io_uring_task *tctx, struct file *file, @@ -227,11 +230,6 @@ static inline bool io_is_compat(struct io_ring_ctx *ctx) return IS_ENABLED(CONFIG_COMPAT) && unlikely(ctx->compat); } -static inline void io_req_task_work_add(struct io_kiocb *req) -{ - __io_req_task_work_add(req, 0); -} - static inline void io_submit_flush_completions(struct io_ring_ctx *ctx) { if (!wq_list_empty(&ctx->submit_state.compl_reqs) || @@ -456,59 +454,6 @@ static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx) return min(entries, ctx->sq_entries); } -static inline int io_run_task_work(void) -{ - bool ret = false; - - /* - * Always check-and-clear the task_work notification signal. With how - * signaling works for task_work, we can find it set with nothing to - * run. We need to clear it for that case, like get_signal() does. - */ - if (test_thread_flag(TIF_NOTIFY_SIGNAL)) - clear_notify_signal(); - /* - * PF_IO_WORKER never returns to userspace, so check here if we have - * notify work that needs processing. - */ - if (current->flags & PF_IO_WORKER) { - if (test_thread_flag(TIF_NOTIFY_RESUME)) { - __set_current_state(TASK_RUNNING); - resume_user_mode_work(NULL); - } - if (current->io_uring) { - unsigned int count = 0; - - __set_current_state(TASK_RUNNING); - tctx_task_work_run(current->io_uring, UINT_MAX, &count); - if (count) - ret = true; - } - } - if (task_work_pending(current)) { - __set_current_state(TASK_RUNNING); - task_work_run(); - ret = true; - } - - return ret; -} - -static inline bool io_local_work_pending(struct io_ring_ctx *ctx) -{ - return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist); -} - -static inline bool io_task_work_pending(struct io_ring_ctx *ctx) -{ - return task_work_pending(current) || io_local_work_pending(ctx); -} - -static inline void io_tw_lock(struct io_ring_ctx *ctx, io_tw_token_t tw) -{ - lockdep_assert_held(&ctx->uring_lock); -} - /* * Don't complete immediately but use deferred completion infrastructure. * Protected by ->uring_lock and can only be used either with @@ -566,17 +511,6 @@ static inline bool io_alloc_req(struct io_ring_ctx *ctx, struct io_kiocb **req) return true; } -static inline bool io_allowed_defer_tw_run(struct io_ring_ctx *ctx) -{ - return likely(ctx->submitter_task == current); -} - -static inline bool io_allowed_run_tw(struct io_ring_ctx *ctx) -{ - return likely(!(ctx->flags & IORING_SETUP_DEFER_TASKRUN) || - ctx->submitter_task == current); -} - static inline void io_req_queue_tw_complete(struct io_kiocb *req, s32 res) { io_req_set_res(req, res, 0); diff --git a/io_uring/kbuf.c b/io_uring/kbuf.c index 796d131107dd..67d4fe576473 100644 --- a/io_uring/kbuf.c +++ b/io_uring/kbuf.c @@ -669,8 +669,9 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg) bl->buf_ring = br; if (reg.flags & IOU_PBUF_RING_INC) bl->flags |= IOBL_INC; - io_buffer_add_list(ctx, bl, reg.bgid); - return 0; + ret = io_buffer_add_list(ctx, bl, reg.bgid); + if (!ret) + return 0; fail: io_free_region(ctx->user, &bl->region); kfree(bl); diff --git a/io_uring/memmap.c b/io_uring/memmap.c index 7d3c5eb58480..89f56609e50a 100644 --- a/io_uring/memmap.c +++ b/io_uring/memmap.c @@ -56,7 +56,7 @@ struct page **io_pin_pages(unsigned long uaddr, unsigned long len, int *npages) if (WARN_ON_ONCE(nr_pages > INT_MAX)) return ERR_PTR(-EOVERFLOW); - pages = kvmalloc_array(nr_pages, sizeof(struct page *), GFP_KERNEL); + pages = kvmalloc_array(nr_pages, sizeof(struct page *), GFP_KERNEL_ACCOUNT); if (!pages) return ERR_PTR(-ENOMEM); diff --git a/io_uring/memmap.h b/io_uring/memmap.h index a39d9e518905..f4cfbb6b9a1f 100644 --- a/io_uring/memmap.h +++ b/io_uring/memmap.h @@ -1,3 +1,4 @@ +/* SPDX-License-Identifier: GPL-2.0 */ #ifndef IO_URING_MEMMAP_H #define IO_URING_MEMMAP_H diff --git a/io_uring/mock_file.c b/io_uring/mock_file.c index 3ffac8f72974..80c96ad2061f 100644 --- a/io_uring/mock_file.c +++ b/io_uring/mock_file.c @@ -1,3 +1,4 @@ +// SPDX-License-Identifier: GPL-2.0 #include <linux/device.h> #include <linux/init.h> #include <linux/kernel.h> diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c index 7063ea7964e7..57ad0085869a 100644 --- a/io_uring/msg_ring.c +++ b/io_uring/msg_ring.c @@ -80,13 +80,9 @@ static void io_msg_tw_complete(struct io_tw_req tw_req, io_tw_token_t tw) percpu_ref_put(&ctx->refs); } -static int io_msg_remote_post(struct io_ring_ctx *ctx, struct io_kiocb *req, +static void io_msg_remote_post(struct io_ring_ctx *ctx, struct io_kiocb *req, int res, u32 cflags, u64 user_data) { - if (!READ_ONCE(ctx->submitter_task)) { - kfree_rcu(req, rcu_head); - return -EOWNERDEAD; - } req->opcode = IORING_OP_NOP; req->cqe.user_data = user_data; io_req_set_res(req, res, cflags); @@ -95,7 +91,6 @@ static int io_msg_remote_post(struct io_ring_ctx *ctx, struct io_kiocb *req, req->tctx = NULL; req->io_task_work.func = io_msg_tw_complete; io_req_task_work_add_remote(req, IOU_F_TWQ_LAZY_WAKE); - return 0; } static int io_msg_data_remote(struct io_ring_ctx *target_ctx, @@ -111,8 +106,8 @@ static int io_msg_data_remote(struct io_ring_ctx *target_ctx, if (msg->flags & IORING_MSG_RING_FLAGS_PASS) flags = msg->cqe_flags; - return io_msg_remote_post(target_ctx, target, msg->len, flags, - msg->user_data); + io_msg_remote_post(target_ctx, target, msg->len, flags, msg->user_data); + return 0; } static int __io_msg_ring_data(struct io_ring_ctx *target_ctx, @@ -125,7 +120,11 @@ static int __io_msg_ring_data(struct io_ring_ctx *target_ctx, return -EINVAL; if (!(msg->flags & IORING_MSG_RING_FLAGS_PASS) && msg->dst_fd) return -EINVAL; - if (target_ctx->flags & IORING_SETUP_R_DISABLED) + /* + * Keep IORING_SETUP_R_DISABLED check before submitter_task load + * in io_msg_data_remote() -> io_req_task_work_add_remote() + */ + if (smp_load_acquire(&target_ctx->flags) & IORING_SETUP_R_DISABLED) return -EBADFD; if (io_msg_need_remote(target_ctx)) @@ -223,10 +222,7 @@ static int io_msg_fd_remote(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->file->private_data; struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg); - struct task_struct *task = READ_ONCE(ctx->submitter_task); - - if (unlikely(!task)) - return -EOWNERDEAD; + struct task_struct *task = ctx->submitter_task; init_task_work(&msg->tw, io_msg_tw_fd_complete); if (task_work_add(task, &msg->tw, TWA_SIGNAL)) @@ -245,7 +241,11 @@ static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags) return -EINVAL; if (target_ctx == ctx) return -EINVAL; - if (target_ctx->flags & IORING_SETUP_R_DISABLED) + /* + * Keep IORING_SETUP_R_DISABLED check before submitter_task load + * in io_msg_fd_remote() + */ + if (smp_load_acquire(&target_ctx->flags) & IORING_SETUP_R_DISABLED) return -EBADFD; if (!msg->src_file) { int ret = io_msg_grab_file(req, issue_flags); diff --git a/io_uring/net.c b/io_uring/net.c index 519ea055b761..d9a4b83804a2 100644 --- a/io_uring/net.c +++ b/io_uring/net.c @@ -515,7 +515,11 @@ static inline bool io_send_finish(struct io_kiocb *req, cflags = io_put_kbufs(req, sel->val, sel->buf_list, io_bundle_nbufs(kmsg, sel->val)); - if (bundle_finished || req->flags & REQ_F_BL_EMPTY) + /* + * Don't start new bundles if the buffer list is empty, or if the + * current operation needed to go through polling to complete. + */ + if (bundle_finished || req->flags & (REQ_F_BL_EMPTY | REQ_F_POLLED)) goto finish; /* diff --git a/io_uring/notif.c b/io_uring/notif.c index f476775ba44b..efce8ae12eaa 100644 --- a/io_uring/notif.c +++ b/io_uring/notif.c @@ -1,3 +1,4 @@ +// SPDX-License-Identifier: GPL-2.0 #include <linux/kernel.h> #include <linux/errno.h> #include <linux/file.h> diff --git a/io_uring/refs.h b/io_uring/refs.h index 0d928d87c4ed..0fe16b67c308 100644 --- a/io_uring/refs.h +++ b/io_uring/refs.h @@ -1,3 +1,4 @@ +/* SPDX-License-Identifier: GPL-2.0 */ #ifndef IOU_REQ_REF_H #define IOU_REQ_REF_H diff --git a/io_uring/register.c b/io_uring/register.c index 3d3822ff3fd9..7e9d2234e4ae 100644 --- a/io_uring/register.c +++ b/io_uring/register.c @@ -103,6 +103,10 @@ static int io_register_personality(struct io_ring_ctx *ctx) return id; } +/* + * Returns number of restrictions parsed and added on success, or < 0 for + * an error. + */ static __cold int io_parse_restrictions(void __user *arg, unsigned int nr_args, struct io_restriction *restrictions) { @@ -129,25 +133,31 @@ static __cold int io_parse_restrictions(void __user *arg, unsigned int nr_args, if (res[i].register_op >= IORING_REGISTER_LAST) goto err; __set_bit(res[i].register_op, restrictions->register_op); + restrictions->reg_registered = true; break; case IORING_RESTRICTION_SQE_OP: if (res[i].sqe_op >= IORING_OP_LAST) goto err; __set_bit(res[i].sqe_op, restrictions->sqe_op); + restrictions->op_registered = true; break; case IORING_RESTRICTION_SQE_FLAGS_ALLOWED: restrictions->sqe_flags_allowed = res[i].sqe_flags; + restrictions->op_registered = true; break; case IORING_RESTRICTION_SQE_FLAGS_REQUIRED: restrictions->sqe_flags_required = res[i].sqe_flags; + restrictions->op_registered = true; break; default: goto err; } } - - ret = 0; - + ret = nr_args; + if (!nr_args) { + restrictions->op_registered = true; + restrictions->reg_registered = true; + } err: kfree(res); return ret; @@ -163,16 +173,20 @@ static __cold int io_register_restrictions(struct io_ring_ctx *ctx, return -EBADFD; /* We allow only a single restrictions registration */ - if (ctx->restrictions.registered) + if (ctx->restrictions.op_registered || ctx->restrictions.reg_registered) return -EBUSY; ret = io_parse_restrictions(arg, nr_args, &ctx->restrictions); /* Reset all restrictions if an error happened */ - if (ret != 0) + if (ret < 0) { memset(&ctx->restrictions, 0, sizeof(ctx->restrictions)); - else - ctx->restrictions.registered = true; - return ret; + return ret; + } + if (ctx->restrictions.op_registered) + ctx->op_restricted = 1; + if (ctx->restrictions.reg_registered) + ctx->reg_restricted = 1; + return 0; } static int io_register_enable_rings(struct io_ring_ctx *ctx) @@ -180,8 +194,8 @@ static int io_register_enable_rings(struct io_ring_ctx *ctx) if (!(ctx->flags & IORING_SETUP_R_DISABLED)) return -EBADFD; - if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && !ctx->submitter_task) { - WRITE_ONCE(ctx->submitter_task, get_task_struct(current)); + if (ctx->flags & IORING_SETUP_SINGLE_ISSUER) { + ctx->submitter_task = get_task_struct(current); /* * Lazy activation attempts would fail if it was polled before * submitter_task is set. @@ -190,10 +204,8 @@ static int io_register_enable_rings(struct io_ring_ctx *ctx) io_activate_pollwq(ctx); } - if (ctx->restrictions.registered) - ctx->restricted = 1; - - ctx->flags &= ~IORING_SETUP_R_DISABLED; + /* Keep submitter_task store before clearing IORING_SETUP_R_DISABLED */ + smp_store_release(&ctx->flags, ctx->flags & ~IORING_SETUP_R_DISABLED); if (ctx->sq_data && wq_has_sleeper(&ctx->sq_data->wait)) wake_up(&ctx->sq_data->wait); return 0; @@ -625,7 +637,7 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, if (ctx->submitter_task && ctx->submitter_task != current) return -EEXIST; - if (ctx->restricted) { + if (ctx->reg_restricted && !(ctx->flags & IORING_SETUP_R_DISABLED)) { opcode = array_index_nospec(opcode, IORING_REGISTER_LAST); if (!test_bit(opcode, ctx->restrictions.register_op)) return -EACCES; diff --git a/io_uring/rsrc.c b/io_uring/rsrc.c index 41c89f5c616d..73b38888a304 100644 --- a/io_uring/rsrc.c +++ b/io_uring/rsrc.c @@ -1329,7 +1329,7 @@ void io_vec_free(struct iou_vec *iv) int io_vec_realloc(struct iou_vec *iv, unsigned nr_entries) { - gfp_t gfp = GFP_KERNEL | __GFP_NOWARN; + gfp_t gfp = GFP_KERNEL_ACCOUNT | __GFP_NOWARN; struct iovec *iov; iov = kmalloc_array(nr_entries, sizeof(iov[0]), gfp); diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h index d603f6a47f5e..4a5db2ad1af2 100644 --- a/io_uring/rsrc.h +++ b/io_uring/rsrc.h @@ -90,7 +90,7 @@ bool io_check_coalesce_buffer(struct page **page_array, int nr_pages, struct io_imu_folio_data *data); static inline struct io_rsrc_node *io_rsrc_node_lookup(struct io_rsrc_data *data, - int index) + unsigned int index) { if (index < data->nr) return data->nodes[array_index_nospec(index, data->nr)]; diff --git a/io_uring/rw.c b/io_uring/rw.c index 28555bc85ba0..3190c41bfdc9 100644 --- a/io_uring/rw.c +++ b/io_uring/rw.c @@ -1303,12 +1303,13 @@ static int io_uring_hybrid_poll(struct io_kiocb *req, struct io_comp_batch *iob, unsigned int poll_flags) { struct io_ring_ctx *ctx = req->ctx; - u64 runtime, sleep_time; + u64 runtime, sleep_time, iopoll_start; int ret; + iopoll_start = READ_ONCE(req->iopoll_start); sleep_time = io_hybrid_iopoll_delay(ctx, req); ret = io_uring_classic_poll(req, iob, poll_flags); - runtime = ktime_get_ns() - req->iopoll_start - sleep_time; + runtime = ktime_get_ns() - iopoll_start - sleep_time; /* * Use minimum sleep time if we're polling devices with different @@ -1322,9 +1323,9 @@ static int io_uring_hybrid_poll(struct io_kiocb *req, int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin) { - struct io_wq_work_node *pos, *start, *prev; unsigned int poll_flags = 0; DEFINE_IO_COMP_BATCH(iob); + struct io_kiocb *req, *tmp; int nr_events = 0; /* @@ -1334,8 +1335,7 @@ int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin) if (ctx->poll_multi_queue || force_nonspin) poll_flags |= BLK_POLL_ONESHOT; - wq_list_for_each(pos, start, &ctx->iopoll_list) { - struct io_kiocb *req = container_of(pos, struct io_kiocb, comp_list); + list_for_each_entry(req, &ctx->iopoll_list, iopoll_node) { int ret; /* @@ -1364,31 +1364,20 @@ int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin) if (!rq_list_empty(&iob.req_list)) iob.complete(&iob); - else if (!pos) - return 0; - - prev = start; - wq_list_for_each_resume(pos, prev) { - struct io_kiocb *req = container_of(pos, struct io_kiocb, comp_list); + list_for_each_entry_safe(req, tmp, &ctx->iopoll_list, iopoll_node) { /* order with io_complete_rw_iopoll(), e.g. ->result updates */ if (!smp_load_acquire(&req->iopoll_completed)) - break; + continue; + list_del(&req->iopoll_node); + wq_list_add_tail(&req->comp_list, &ctx->submit_state.compl_reqs); nr_events++; req->cqe.flags = io_put_kbuf(req, req->cqe.res, NULL); if (req->opcode != IORING_OP_URING_CMD) io_req_rw_cleanup(req, 0); } - if (unlikely(!nr_events)) - return 0; - - pos = start ? start->next : ctx->iopoll_list.first; - wq_list_cut(&ctx->iopoll_list, prev, start); - - if (WARN_ON_ONCE(!wq_list_empty(&ctx->submit_state.compl_reqs))) - return 0; - ctx->submit_state.compl_reqs.first = pos; - __io_submit_flush_completions(ctx); + if (nr_events) + __io_submit_flush_completions(ctx); return nr_events; } diff --git a/io_uring/slist.h b/io_uring/slist.h index 7ef747442754..45c21a8ad685 100644 --- a/io_uring/slist.h +++ b/io_uring/slist.h @@ -1,3 +1,4 @@ +/* SPDX-License-Identifier: GPL-2.0 */ #ifndef INTERNAL_IO_SLIST_H #define INTERNAL_IO_SLIST_H @@ -9,9 +10,6 @@ #define wq_list_for_each(pos, prv, head) \ for (pos = (head)->first, prv = NULL; pos; prv = pos, pos = (pos)->next) -#define wq_list_for_each_resume(pos, prv) \ - for (; pos; prv = pos, pos = (pos)->next) - #define wq_list_empty(list) (READ_ONCE((list)->first) == NULL) #define INIT_WQ_LIST(list) do { \ @@ -43,15 +41,6 @@ static inline void wq_list_add_tail(struct io_wq_work_node *node, } } -static inline void wq_list_add_head(struct io_wq_work_node *node, - struct io_wq_work_list *list) -{ - node->next = list->first; - if (!node->next) - list->last = node; - WRITE_ONCE(list->first, node); -} - static inline void wq_list_cut(struct io_wq_work_list *list, struct io_wq_work_node *last, struct io_wq_work_node *prev) diff --git a/io_uring/sqpoll.c b/io_uring/sqpoll.c index 74c1a130cd87..becdfdd323a9 100644 --- a/io_uring/sqpoll.c +++ b/io_uring/sqpoll.c @@ -212,7 +212,7 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, struct io_sq_data *sqd, if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE) to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE; - if (to_submit || !wq_list_empty(&ctx->iopoll_list)) { + if (to_submit || !list_empty(&ctx->iopoll_list)) { const struct cred *creds = NULL; io_sq_start_worktime(ist); @@ -221,7 +221,7 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, struct io_sq_data *sqd, creds = override_creds(ctx->sq_creds); mutex_lock(&ctx->uring_lock); - if (!wq_list_empty(&ctx->iopoll_list)) + if (!list_empty(&ctx->iopoll_list)) io_do_iopoll(ctx, true); /* @@ -344,7 +344,7 @@ static int io_sq_thread(void *data) list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { int ret = __io_sq_thread(ctx, sqd, cap_entries, &ist); - if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list))) + if (!sqt_spin && (ret > 0 || !list_empty(&ctx->iopoll_list))) sqt_spin = true; } if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE)) @@ -379,7 +379,7 @@ static int io_sq_thread(void *data) atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags); if ((ctx->flags & IORING_SETUP_IOPOLL) && - !wq_list_empty(&ctx->iopoll_list)) { + !list_empty(&ctx->iopoll_list)) { needs_sched = false; break; } diff --git a/io_uring/sync.c b/io_uring/sync.c index cea2d381ffd2..ab7fa1cd7dd6 100644 --- a/io_uring/sync.c +++ b/io_uring/sync.c @@ -62,6 +62,8 @@ int io_fsync_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) return -EINVAL; sync->off = READ_ONCE(sqe->off); + if (sync->off < 0) + return -EINVAL; sync->len = READ_ONCE(sqe->len); req->flags |= REQ_F_FORCE_ASYNC; return 0; diff --git a/io_uring/tctx.c b/io_uring/tctx.c index 6d6f44215ec8..91f4b830b77b 100644 --- a/io_uring/tctx.c +++ b/io_uring/tctx.c @@ -122,6 +122,14 @@ int __io_uring_add_tctx_node(struct io_ring_ctx *ctx) return ret; } } + + /* + * Re-activate io-wq keepalive on any new io_uring usage. The wq may have + * been marked for idle-exit when the task temporarily had no active + * io_uring instances. + */ + if (tctx->io_wq) + io_wq_set_exit_on_idle(tctx->io_wq, false); if (!xa_load(&tctx->xa, (unsigned long)ctx)) { node = kmalloc(sizeof(*node), GFP_KERNEL); if (!node) @@ -183,6 +191,9 @@ __cold void io_uring_del_tctx_node(unsigned long index) if (tctx->last == node->ctx) tctx->last = NULL; kfree(node); + + if (xa_empty(&tctx->xa) && tctx->io_wq) + io_wq_set_exit_on_idle(tctx->io_wq, true); } __cold void io_uring_clean_tctx(struct io_uring_task *tctx) diff --git a/io_uring/timeout.c b/io_uring/timeout.c index d8fbbaf31cf3..84dda24f3eb2 100644 --- a/io_uring/timeout.c +++ b/io_uring/timeout.c @@ -130,7 +130,7 @@ __cold void io_flush_timeouts(struct io_ring_ctx *ctx) u32 seq; raw_spin_lock_irq(&ctx->timeout_lock); - seq = ctx->cached_cq_tail - atomic_read(&ctx->cq_timeouts); + seq = READ_ONCE(ctx->cached_cq_tail) - atomic_read(&ctx->cq_timeouts); list_for_each_entry_safe(timeout, tmp, &ctx->timeout_list, list) { struct io_kiocb *req = cmd_to_io_kiocb(timeout); diff --git a/io_uring/tw.c b/io_uring/tw.c new file mode 100644 index 000000000000..1ee2b8ab07c8 --- /dev/null +++ b/io_uring/tw.c @@ -0,0 +1,355 @@ +// SPDX-License-Identifier: GPL-2.0 +/* + * Task work handling for io_uring + */ +#include <linux/kernel.h> +#include <linux/errno.h> +#include <linux/sched/signal.h> +#include <linux/io_uring.h> +#include <linux/indirect_call_wrapper.h> + +#include "io_uring.h" +#include "tctx.h" +#include "poll.h" +#include "rw.h" +#include "eventfd.h" +#include "wait.h" + +void io_fallback_req_func(struct work_struct *work) +{ + struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, + fallback_work.work); + struct llist_node *node = llist_del_all(&ctx->fallback_llist); + struct io_kiocb *req, *tmp; + struct io_tw_state ts = {}; + + percpu_ref_get(&ctx->refs); + mutex_lock(&ctx->uring_lock); + ts.cancel = io_should_terminate_tw(ctx); + llist_for_each_entry_safe(req, tmp, node, io_task_work.node) + req->io_task_work.func((struct io_tw_req){req}, ts); + io_submit_flush_completions(ctx); + mutex_unlock(&ctx->uring_lock); + percpu_ref_put(&ctx->refs); +} + +static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw) +{ + if (!ctx) + return; + if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) + atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); + + io_submit_flush_completions(ctx); + mutex_unlock(&ctx->uring_lock); + percpu_ref_put(&ctx->refs); +} + +/* + * Run queued task_work, returning the number of entries processed in *count. + * If more entries than max_entries are available, stop processing once this + * is reached and return the rest of the list. + */ +struct llist_node *io_handle_tw_list(struct llist_node *node, + unsigned int *count, + unsigned int max_entries) +{ + struct io_ring_ctx *ctx = NULL; + struct io_tw_state ts = { }; + + do { + struct llist_node *next = node->next; + struct io_kiocb *req = container_of(node, struct io_kiocb, + io_task_work.node); + + if (req->ctx != ctx) { + ctx_flush_and_put(ctx, ts); + ctx = req->ctx; + mutex_lock(&ctx->uring_lock); + percpu_ref_get(&ctx->refs); + ts.cancel = io_should_terminate_tw(ctx); + } + INDIRECT_CALL_2(req->io_task_work.func, + io_poll_task_func, io_req_rw_complete, + (struct io_tw_req){req}, ts); + node = next; + (*count)++; + if (unlikely(need_resched())) { + ctx_flush_and_put(ctx, ts); + ctx = NULL; + cond_resched(); + } + } while (node && *count < max_entries); + + ctx_flush_and_put(ctx, ts); + return node; +} + +static __cold void __io_fallback_tw(struct llist_node *node, bool sync) +{ + struct io_ring_ctx *last_ctx = NULL; + struct io_kiocb *req; + + while (node) { + req = container_of(node, struct io_kiocb, io_task_work.node); + node = node->next; + if (last_ctx != req->ctx) { + if (last_ctx) { + if (sync) + flush_delayed_work(&last_ctx->fallback_work); + percpu_ref_put(&last_ctx->refs); + } + last_ctx = req->ctx; + percpu_ref_get(&last_ctx->refs); + } + if (llist_add(&req->io_task_work.node, &last_ctx->fallback_llist)) + schedule_delayed_work(&last_ctx->fallback_work, 1); + } + + if (last_ctx) { + if (sync) + flush_delayed_work(&last_ctx->fallback_work); + percpu_ref_put(&last_ctx->refs); + } +} + +static void io_fallback_tw(struct io_uring_task *tctx, bool sync) +{ + struct llist_node *node = llist_del_all(&tctx->task_list); + + __io_fallback_tw(node, sync); +} + +struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, + unsigned int max_entries, + unsigned int *count) +{ + struct llist_node *node; + + node = llist_del_all(&tctx->task_list); + if (node) { + node = llist_reverse_order(node); + node = io_handle_tw_list(node, count, max_entries); + } + + /* relaxed read is enough as only the task itself sets ->in_cancel */ + if (unlikely(atomic_read(&tctx->in_cancel))) + io_uring_drop_tctx_refs(current); + + trace_io_uring_task_work_run(tctx, *count); + return node; +} + +void tctx_task_work(struct callback_head *cb) +{ + struct io_uring_task *tctx; + struct llist_node *ret; + unsigned int count = 0; + + tctx = container_of(cb, struct io_uring_task, task_work); + ret = tctx_task_work_run(tctx, UINT_MAX, &count); + /* can't happen */ + WARN_ON_ONCE(ret); +} + +void io_req_local_work_add(struct io_kiocb *req, unsigned flags) +{ + struct io_ring_ctx *ctx = req->ctx; + unsigned nr_wait, nr_tw, nr_tw_prev; + struct llist_node *head; + + /* See comment above IO_CQ_WAKE_INIT */ + BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES); + + /* + * We don't know how many requests there are in the link and whether + * they can even be queued lazily, fall back to non-lazy. + */ + if (req->flags & IO_REQ_LINK_FLAGS) + flags &= ~IOU_F_TWQ_LAZY_WAKE; + + guard(rcu)(); + + head = READ_ONCE(ctx->work_llist.first); + do { + nr_tw_prev = 0; + if (head) { + struct io_kiocb *first_req = container_of(head, + struct io_kiocb, + io_task_work.node); + /* + * Might be executed at any moment, rely on + * SLAB_TYPESAFE_BY_RCU to keep it alive. + */ + nr_tw_prev = READ_ONCE(first_req->nr_tw); + } + + /* + * Theoretically, it can overflow, but that's fine as one of + * previous adds should've tried to wake the task. + */ + nr_tw = nr_tw_prev + 1; + if (!(flags & IOU_F_TWQ_LAZY_WAKE)) + nr_tw = IO_CQ_WAKE_FORCE; + + req->nr_tw = nr_tw; + req->io_task_work.node.next = head; + } while (!try_cmpxchg(&ctx->work_llist.first, &head, + &req->io_task_work.node)); + + /* + * cmpxchg implies a full barrier, which pairs with the barrier + * in set_current_state() on the io_cqring_wait() side. It's used + * to ensure that either we see updated ->cq_wait_nr, or waiters + * going to sleep will observe the work added to the list, which + * is similar to the wait/wawke task state sync. + */ + + if (!head) { + if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) + atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); + if (ctx->has_evfd) + io_eventfd_signal(ctx, false); + } + + nr_wait = atomic_read(&ctx->cq_wait_nr); + /* not enough or no one is waiting */ + if (nr_tw < nr_wait) + return; + /* the previous add has already woken it up */ + if (nr_tw_prev >= nr_wait) + return; + wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE); +} + +void io_req_normal_work_add(struct io_kiocb *req) +{ + struct io_uring_task *tctx = req->tctx; + struct io_ring_ctx *ctx = req->ctx; + + /* task_work already pending, we're done */ + if (!llist_add(&req->io_task_work.node, &tctx->task_list)) + return; + + if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) + atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); + + /* SQPOLL doesn't need the task_work added, it'll run it itself */ + if (ctx->flags & IORING_SETUP_SQPOLL) { + __set_notify_signal(tctx->task); + return; + } + + if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method))) + return; + + io_fallback_tw(tctx, false); +} + +void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags) +{ + if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN))) + return; + __io_req_task_work_add(req, flags); +} + +void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) +{ + struct llist_node *node = llist_del_all(&ctx->work_llist); + + __io_fallback_tw(node, false); + node = llist_del_all(&ctx->retry_llist); + __io_fallback_tw(node, false); +} + +static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, + int min_events) +{ + if (!io_local_work_pending(ctx)) + return false; + if (events < min_events) + return true; + if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) + atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); + return false; +} + +static int __io_run_local_work_loop(struct llist_node **node, + io_tw_token_t tw, + int events) +{ + int ret = 0; + + while (*node) { + struct llist_node *next = (*node)->next; + struct io_kiocb *req = container_of(*node, struct io_kiocb, + io_task_work.node); + INDIRECT_CALL_2(req->io_task_work.func, + io_poll_task_func, io_req_rw_complete, + (struct io_tw_req){req}, tw); + *node = next; + if (++ret >= events) + break; + } + + return ret; +} + +static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw, + int min_events, int max_events) +{ + struct llist_node *node; + unsigned int loops = 0; + int ret = 0; + + if (WARN_ON_ONCE(ctx->submitter_task != current)) + return -EEXIST; + if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) + atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); +again: + tw.cancel = io_should_terminate_tw(ctx); + min_events -= ret; + ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events); + if (ctx->retry_llist.first) + goto retry_done; + + /* + * llists are in reverse order, flip it back the right way before + * running the pending items. + */ + node = llist_reverse_order(llist_del_all(&ctx->work_llist)); + ret += __io_run_local_work_loop(&node, tw, max_events - ret); + ctx->retry_llist.first = node; + loops++; + + if (io_run_local_work_continue(ctx, ret, min_events)) + goto again; +retry_done: + io_submit_flush_completions(ctx); + if (io_run_local_work_continue(ctx, ret, min_events)) + goto again; + + trace_io_uring_local_work_run(ctx, ret, loops); + return ret; +} + +int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events) +{ + struct io_tw_state ts = {}; + + if (!io_local_work_pending(ctx)) + return 0; + return __io_run_local_work(ctx, ts, min_events, + max(IO_LOCAL_TW_DEFAULT_MAX, min_events)); +} + +int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events) +{ + struct io_tw_state ts = {}; + int ret; + + mutex_lock(&ctx->uring_lock); + ret = __io_run_local_work(ctx, ts, min_events, max_events); + mutex_unlock(&ctx->uring_lock); + return ret; +} diff --git a/io_uring/tw.h b/io_uring/tw.h new file mode 100644 index 000000000000..415e330fabde --- /dev/null +++ b/io_uring/tw.h @@ -0,0 +1,116 @@ +// SPDX-License-Identifier: GPL-2.0 +#ifndef IOU_TW_H +#define IOU_TW_H + +#include <linux/sched.h> +#include <linux/percpu-refcount.h> +#include <linux/io_uring_types.h> + +#define IO_LOCAL_TW_DEFAULT_MAX 20 + +/* + * Terminate the request if either of these conditions are true: + * + * 1) It's being executed by the original task, but that task is marked + * with PF_EXITING as it's exiting. + * 2) PF_KTHREAD is set, in which case the invoker of the task_work is + * our fallback task_work. + * 3) The ring has been closed and is going away. + */ +static inline bool io_should_terminate_tw(struct io_ring_ctx *ctx) +{ + return (current->flags & (PF_EXITING | PF_KTHREAD)) || percpu_ref_is_dying(&ctx->refs); +} + +void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags); +struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries); +void tctx_task_work(struct callback_head *cb); +int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events); +int io_run_task_work_sig(struct io_ring_ctx *ctx); + +__cold void io_fallback_req_func(struct work_struct *work); +__cold void io_move_task_work_from_local(struct io_ring_ctx *ctx); +int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events); + +void io_req_local_work_add(struct io_kiocb *req, unsigned flags); +void io_req_normal_work_add(struct io_kiocb *req); +struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count); + +static inline void __io_req_task_work_add(struct io_kiocb *req, unsigned flags) +{ + if (req->ctx->flags & IORING_SETUP_DEFER_TASKRUN) + io_req_local_work_add(req, flags); + else + io_req_normal_work_add(req); +} + +static inline void io_req_task_work_add(struct io_kiocb *req) +{ + __io_req_task_work_add(req, 0); +} + +static inline int io_run_task_work(void) +{ + bool ret = false; + + /* + * Always check-and-clear the task_work notification signal. With how + * signaling works for task_work, we can find it set with nothing to + * run. We need to clear it for that case, like get_signal() does. + */ + if (test_thread_flag(TIF_NOTIFY_SIGNAL)) + clear_notify_signal(); + /* + * PF_IO_WORKER never returns to userspace, so check here if we have + * notify work that needs processing. + */ + if (current->flags & PF_IO_WORKER) { + if (test_thread_flag(TIF_NOTIFY_RESUME)) { + __set_current_state(TASK_RUNNING); + resume_user_mode_work(NULL); + } + if (current->io_uring) { + unsigned int count = 0; + + __set_current_state(TASK_RUNNING); + tctx_task_work_run(current->io_uring, UINT_MAX, &count); + if (count) + ret = true; + } + } + if (task_work_pending(current)) { + __set_current_state(TASK_RUNNING); + task_work_run(); + ret = true; + } + + return ret; +} + +static inline bool io_local_work_pending(struct io_ring_ctx *ctx) +{ + return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist); +} + +static inline bool io_task_work_pending(struct io_ring_ctx *ctx) +{ + return task_work_pending(current) || io_local_work_pending(ctx); +} + +static inline void io_tw_lock(struct io_ring_ctx *ctx, io_tw_token_t tw) +{ + lockdep_assert_held(&ctx->uring_lock); +} + +static inline bool io_allowed_defer_tw_run(struct io_ring_ctx *ctx) +{ + return likely(ctx->submitter_task == current); +} + +static inline bool io_allowed_run_tw(struct io_ring_ctx *ctx) +{ + return likely(!(ctx->flags & IORING_SETUP_DEFER_TASKRUN) || + ctx->submitter_task == current); +} + +#endif diff --git a/io_uring/uring_cmd.c b/io_uring/uring_cmd.c index 197474911f04..ee7b49f47cb5 100644 --- a/io_uring/uring_cmd.c +++ b/io_uring/uring_cmd.c @@ -104,6 +104,15 @@ void io_uring_cmd_mark_cancelable(struct io_uring_cmd *cmd, struct io_kiocb *req = cmd_to_io_kiocb(cmd); struct io_ring_ctx *ctx = req->ctx; + /* + * Doing cancelations on IOPOLL requests are not supported. Both + * because they can't get canceled in the block stack, but also + * because iopoll completion data overlaps with the hash_node used + * for tracking. + */ + if (ctx->flags & IORING_SETUP_IOPOLL) + return; + if (!(cmd->flags & IORING_URING_CMD_CANCELABLE)) { cmd->flags |= IORING_URING_CMD_CANCELABLE; io_ring_submit_lock(ctx, issue_flags); diff --git a/io_uring/wait.c b/io_uring/wait.c new file mode 100644 index 000000000000..0581cadf20ee --- /dev/null +++ b/io_uring/wait.c @@ -0,0 +1,308 @@ +// SPDX-License-Identifier: GPL-2.0 +/* + * Waiting for completion events + */ +#include <linux/kernel.h> +#include <linux/sched/signal.h> +#include <linux/io_uring.h> + +#include <trace/events/io_uring.h> + +#include <uapi/linux/io_uring.h> + +#include "io_uring.h" +#include "napi.h" +#include "wait.h" + +static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode, + int wake_flags, void *key) +{ + struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq); + + /* + * Cannot safely flush overflowed CQEs from here, ensure we wake up + * the task, and the next invocation will do it. + */ + if (io_should_wake(iowq) || io_has_work(iowq->ctx)) + return autoremove_wake_function(curr, mode, wake_flags, key); + return -1; +} + +int io_run_task_work_sig(struct io_ring_ctx *ctx) +{ + if (io_local_work_pending(ctx)) { + __set_current_state(TASK_RUNNING); + if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0) + return 0; + } + if (io_run_task_work() > 0) + return 0; + if (task_sigpending(current)) + return -EINTR; + return 0; +} + +static bool current_pending_io(void) +{ + struct io_uring_task *tctx = current->io_uring; + + if (!tctx) + return false; + return percpu_counter_read_positive(&tctx->inflight); +} + +static enum hrtimer_restart io_cqring_timer_wakeup(struct hrtimer *timer) +{ + struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t); + + WRITE_ONCE(iowq->hit_timeout, 1); + iowq->min_timeout = 0; + wake_up_process(iowq->wq.private); + return HRTIMER_NORESTART; +} + +/* + * Doing min_timeout portion. If we saw any timeouts, events, or have work, + * wake up. If not, and we have a normal timeout, switch to that and keep + * sleeping. + */ +static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer) +{ + struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t); + struct io_ring_ctx *ctx = iowq->ctx; + + /* no general timeout, or shorter (or equal), we are done */ + if (iowq->timeout == KTIME_MAX || + ktime_compare(iowq->min_timeout, iowq->timeout) >= 0) + goto out_wake; + /* work we may need to run, wake function will see if we need to wake */ + if (io_has_work(ctx)) + goto out_wake; + /* got events since we started waiting, min timeout is done */ + if (iowq->cq_min_tail != READ_ONCE(ctx->rings->cq.tail)) + goto out_wake; + /* if we have any events and min timeout expired, we're done */ + if (io_cqring_events(ctx)) + goto out_wake; + + /* + * If using deferred task_work running and application is waiting on + * more than one request, ensure we reset it now where we are switching + * to normal sleeps. Any request completion post min_wait should wake + * the task and return. + */ + if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { + atomic_set(&ctx->cq_wait_nr, 1); + smp_mb(); + if (!llist_empty(&ctx->work_llist)) + goto out_wake; + } + + /* any generated CQE posted past this time should wake us up */ + iowq->cq_tail = iowq->cq_min_tail; + + hrtimer_update_function(&iowq->t, io_cqring_timer_wakeup); + hrtimer_set_expires(timer, iowq->timeout); + return HRTIMER_RESTART; +out_wake: + return io_cqring_timer_wakeup(timer); +} + +static int io_cqring_schedule_timeout(struct io_wait_queue *iowq, + clockid_t clock_id, ktime_t start_time) +{ + ktime_t timeout; + + if (iowq->min_timeout) { + timeout = ktime_add_ns(iowq->min_timeout, start_time); + hrtimer_setup_on_stack(&iowq->t, io_cqring_min_timer_wakeup, clock_id, + HRTIMER_MODE_ABS); + } else { + timeout = iowq->timeout; + hrtimer_setup_on_stack(&iowq->t, io_cqring_timer_wakeup, clock_id, + HRTIMER_MODE_ABS); + } + + hrtimer_set_expires_range_ns(&iowq->t, timeout, 0); + hrtimer_start_expires(&iowq->t, HRTIMER_MODE_ABS); + + if (!READ_ONCE(iowq->hit_timeout)) + schedule(); + + hrtimer_cancel(&iowq->t); + destroy_hrtimer_on_stack(&iowq->t); + __set_current_state(TASK_RUNNING); + + return READ_ONCE(iowq->hit_timeout) ? -ETIME : 0; +} + +static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx, + struct io_wait_queue *iowq, + struct ext_arg *ext_arg, + ktime_t start_time) +{ + int ret = 0; + + /* + * Mark us as being in io_wait if we have pending requests, so cpufreq + * can take into account that the task is waiting for IO - turns out + * to be important for low QD IO. + */ + if (ext_arg->iowait && current_pending_io()) + current->in_iowait = 1; + if (iowq->timeout != KTIME_MAX || iowq->min_timeout) + ret = io_cqring_schedule_timeout(iowq, ctx->clockid, start_time); + else + schedule(); + current->in_iowait = 0; + return ret; +} + +/* If this returns > 0, the caller should retry */ +static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx, + struct io_wait_queue *iowq, + struct ext_arg *ext_arg, + ktime_t start_time) +{ + if (unlikely(READ_ONCE(ctx->check_cq))) + return 1; + if (unlikely(io_local_work_pending(ctx))) + return 1; + if (unlikely(task_work_pending(current))) + return 1; + if (unlikely(task_sigpending(current))) + return -EINTR; + if (unlikely(io_should_wake(iowq))) + return 0; + + return __io_cqring_wait_schedule(ctx, iowq, ext_arg, start_time); +} + +/* + * Wait until events become available, if we don't already have some. The + * application must reap them itself, as they reside on the shared cq ring. + */ +int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, + struct ext_arg *ext_arg) +{ + struct io_wait_queue iowq; + struct io_rings *rings = ctx->rings; + ktime_t start_time; + int ret; + + min_events = min_t(int, min_events, ctx->cq_entries); + + if (!io_allowed_run_tw(ctx)) + return -EEXIST; + if (io_local_work_pending(ctx)) + io_run_local_work(ctx, min_events, + max(IO_LOCAL_TW_DEFAULT_MAX, min_events)); + io_run_task_work(); + + if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq))) + io_cqring_do_overflow_flush(ctx); + if (__io_cqring_events_user(ctx) >= min_events) + return 0; + + init_waitqueue_func_entry(&iowq.wq, io_wake_function); + iowq.wq.private = current; + INIT_LIST_HEAD(&iowq.wq.entry); + iowq.ctx = ctx; + iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events; + iowq.cq_min_tail = READ_ONCE(ctx->rings->cq.tail); + iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); + iowq.hit_timeout = 0; + iowq.min_timeout = ext_arg->min_time; + iowq.timeout = KTIME_MAX; + start_time = io_get_time(ctx); + + if (ext_arg->ts_set) { + iowq.timeout = timespec64_to_ktime(ext_arg->ts); + if (!(flags & IORING_ENTER_ABS_TIMER)) + iowq.timeout = ktime_add(iowq.timeout, start_time); + } + + if (ext_arg->sig) { +#ifdef CONFIG_COMPAT + if (in_compat_syscall()) + ret = set_compat_user_sigmask((const compat_sigset_t __user *)ext_arg->sig, + ext_arg->argsz); + else +#endif + ret = set_user_sigmask(ext_arg->sig, ext_arg->argsz); + + if (ret) + return ret; + } + + io_napi_busy_loop(ctx, &iowq); + + trace_io_uring_cqring_wait(ctx, min_events); + do { + unsigned long check_cq; + int nr_wait; + + /* if min timeout has been hit, don't reset wait count */ + if (!iowq.hit_timeout) + nr_wait = (int) iowq.cq_tail - + READ_ONCE(ctx->rings->cq.tail); + else + nr_wait = 1; + + if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { + atomic_set(&ctx->cq_wait_nr, nr_wait); + set_current_state(TASK_INTERRUPTIBLE); + } else { + prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq, + TASK_INTERRUPTIBLE); + } + + ret = io_cqring_wait_schedule(ctx, &iowq, ext_arg, start_time); + __set_current_state(TASK_RUNNING); + atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT); + + /* + * Run task_work after scheduling and before io_should_wake(). + * If we got woken because of task_work being processed, run it + * now rather than let the caller do another wait loop. + */ + if (io_local_work_pending(ctx)) + io_run_local_work(ctx, nr_wait, nr_wait); + io_run_task_work(); + + /* + * Non-local task_work will be run on exit to userspace, but + * if we're using DEFER_TASKRUN, then we could have waited + * with a timeout for a number of requests. If the timeout + * hits, we could have some requests ready to process. Ensure + * this break is _after_ we have run task_work, to avoid + * deferring running potentially pending requests until the + * next time we wait for events. + */ + if (ret < 0) + break; + + check_cq = READ_ONCE(ctx->check_cq); + if (unlikely(check_cq)) { + /* let the caller flush overflows, retry */ + if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)) + io_cqring_do_overflow_flush(ctx); + if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) { + ret = -EBADR; + break; + } + } + + if (io_should_wake(&iowq)) { + ret = 0; + break; + } + cond_resched(); + } while (1); + + if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) + finish_wait(&ctx->cq_wait, &iowq.wq); + restore_saved_sigmask_unless(ret == -EINTR); + + return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0; +} diff --git a/io_uring/wait.h b/io_uring/wait.h new file mode 100644 index 000000000000..5e236f74e1af --- /dev/null +++ b/io_uring/wait.h @@ -0,0 +1,49 @@ +// SPDX-License-Identifier: GPL-2.0 +#ifndef IOU_WAIT_H +#define IOU_WAIT_H + +#include <linux/io_uring_types.h> + +/* + * No waiters. It's larger than any valid value of the tw counter + * so that tests against ->cq_wait_nr would fail and skip wake_up(). + */ +#define IO_CQ_WAKE_INIT (-1U) +/* Forced wake up if there is a waiter regardless of ->cq_wait_nr */ +#define IO_CQ_WAKE_FORCE (IO_CQ_WAKE_INIT >> 1) + +struct ext_arg { + size_t argsz; + struct timespec64 ts; + const sigset_t __user *sig; + ktime_t min_time; + bool ts_set; + bool iowait; +}; + +int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags, + struct ext_arg *ext_arg); +int io_run_task_work_sig(struct io_ring_ctx *ctx); +void io_cqring_do_overflow_flush(struct io_ring_ctx *ctx); + +static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx) +{ + return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head); +} + +static inline unsigned int __io_cqring_events_user(struct io_ring_ctx *ctx) +{ + return READ_ONCE(ctx->rings->cq.tail) - READ_ONCE(ctx->rings->cq.head); +} + +/* + * Reads the tail/head of the CQ ring while providing an acquire ordering, + * see comment at top of io_uring.c. + */ +static inline unsigned io_cqring_events(struct io_ring_ctx *ctx) +{ + smp_rmb(); + return __io_cqring_events(ctx); +} + +#endif |
