1  #ifndef IOU_CORE_H
2  #define IOU_CORE_H
3  
4  #include <linux/errno.h>
5  #include <linux/lockdep.h>
6  #include <linux/resume_user_mode.h>
7  #include <linux/kasan.h>
8  #include <linux/poll.h>
9  #include <linux/io_uring_types.h>
10  #include <uapi/linux/eventpoll.h>
11  #include "io-wq.h"
12  #include "slist.h"
13  #include "filetable.h"
14  
15  #ifndef CREATE_TRACE_POINTS
16  #include <trace/events/io_uring.h>
17  #endif
18  
19  enum {
20  	IOU_OK			= 0,
21  	IOU_ISSUE_SKIP_COMPLETE	= -EIOCBQUEUED,
22  
23  	/*
24  	 * Requeue the task_work to restart operations on this request. The
25  	 * actual value isn't important, should just be not an otherwise
26  	 * valid error code, yet less than -MAX_ERRNO and valid internally.
27  	 */
28  	IOU_REQUEUE		= -3072,
29  
30  	/*
31  	 * Intended only when both IO_URING_F_MULTISHOT is passed
32  	 * to indicate to the poll runner that multishot should be
33  	 * removed and the result is set on req->cqe.res.
34  	 */
35  	IOU_STOP_MULTISHOT	= -ECANCELED,
36  };
37  
38  struct io_wait_queue {
39  	struct wait_queue_entry wq;
40  	struct io_ring_ctx *ctx;
41  	unsigned cq_tail;
42  	unsigned cq_min_tail;
43  	unsigned nr_timeouts;
44  	int hit_timeout;
45  	ktime_t min_timeout;
46  	ktime_t timeout;
47  	struct hrtimer t;
48  
49  #ifdef CONFIG_NET_RX_BUSY_POLL
50  	ktime_t napi_busy_poll_dt;
51  	bool napi_prefer_busy_poll;
52  #endif
53  };
54  
io_should_wake(struct io_wait_queue * iowq)55  static inline bool io_should_wake(struct io_wait_queue *iowq)
56  {
57  	struct io_ring_ctx *ctx = iowq->ctx;
58  	int dist = READ_ONCE(ctx->rings->cq.tail) - (int) iowq->cq_tail;
59  
60  	/*
61  	 * Wake up if we have enough events, or if a timeout occurred since we
62  	 * started waiting. For timeouts, we always want to return to userspace,
63  	 * regardless of event count.
64  	 */
65  	return dist >= 0 || atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
66  }
67  
68  bool io_cqe_cache_refill(struct io_ring_ctx *ctx, bool overflow);
69  int io_run_task_work_sig(struct io_ring_ctx *ctx);
70  void io_req_defer_failed(struct io_kiocb *req, s32 res);
71  bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags);
72  void io_add_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags);
73  bool io_req_post_cqe(struct io_kiocb *req, s32 res, u32 cflags);
74  void __io_commit_cqring_flush(struct io_ring_ctx *ctx);
75  
76  struct file *io_file_get_normal(struct io_kiocb *req, int fd);
77  struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
78  			       unsigned issue_flags);
79  
80  void __io_req_task_work_add(struct io_kiocb *req, unsigned flags);
81  void io_req_task_work_add_remote(struct io_kiocb *req, struct io_ring_ctx *ctx,
82  				 unsigned flags);
83  bool io_alloc_async_data(struct io_kiocb *req);
84  void io_req_task_queue(struct io_kiocb *req);
85  void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts);
86  void io_req_task_queue_fail(struct io_kiocb *req, int ret);
87  void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts);
88  struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries);
89  struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
90  void tctx_task_work(struct callback_head *cb);
91  __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd);
92  int io_uring_alloc_task_context(struct task_struct *task,
93  				struct io_ring_ctx *ctx);
94  
95  int io_ring_add_registered_file(struct io_uring_task *tctx, struct file *file,
96  				     int start, int end);
97  void io_req_queue_iowq(struct io_kiocb *req);
98  
99  int io_poll_issue(struct io_kiocb *req, struct io_tw_state *ts);
100  int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr);
101  int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin);
102  void __io_submit_flush_completions(struct io_ring_ctx *ctx);
103  
104  struct io_wq_work *io_wq_free_work(struct io_wq_work *work);
105  void io_wq_submit_work(struct io_wq_work *work);
106  
107  void io_free_req(struct io_kiocb *req);
108  void io_queue_next(struct io_kiocb *req);
109  void io_task_refs_refill(struct io_uring_task *tctx);
110  bool __io_alloc_req_refill(struct io_ring_ctx *ctx);
111  
112  bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task,
113  			bool cancel_all);
114  
115  void io_activate_pollwq(struct io_ring_ctx *ctx);
116  
io_lockdep_assert_cq_locked(struct io_ring_ctx * ctx)117  static inline void io_lockdep_assert_cq_locked(struct io_ring_ctx *ctx)
118  {
119  #if defined(CONFIG_PROVE_LOCKING)
120  	lockdep_assert(in_task());
121  
122  	if (ctx->flags & IORING_SETUP_IOPOLL) {
123  		lockdep_assert_held(&ctx->uring_lock);
124  	} else if (!ctx->task_complete) {
125  		lockdep_assert_held(&ctx->completion_lock);
126  	} else if (ctx->submitter_task) {
127  		/*
128  		 * ->submitter_task may be NULL and we can still post a CQE,
129  		 * if the ring has been setup with IORING_SETUP_R_DISABLED.
130  		 * Not from an SQE, as those cannot be submitted, but via
131  		 * updating tagged resources.
132  		 */
133  		if (ctx->submitter_task->flags & PF_EXITING)
134  			lockdep_assert(current_work());
135  		else
136  			lockdep_assert(current == ctx->submitter_task);
137  	}
138  #endif
139  }
140  
io_req_task_work_add(struct io_kiocb * req)141  static inline void io_req_task_work_add(struct io_kiocb *req)
142  {
143  	__io_req_task_work_add(req, 0);
144  }
145  
io_submit_flush_completions(struct io_ring_ctx * ctx)146  static inline void io_submit_flush_completions(struct io_ring_ctx *ctx)
147  {
148  	if (!wq_list_empty(&ctx->submit_state.compl_reqs) ||
149  	    ctx->submit_state.cq_flush)
150  		__io_submit_flush_completions(ctx);
151  }
152  
153  #define io_for_each_link(pos, head) \
154  	for (pos = (head); pos; pos = pos->link)
155  
io_get_cqe_overflow(struct io_ring_ctx * ctx,struct io_uring_cqe ** ret,bool overflow)156  static inline bool io_get_cqe_overflow(struct io_ring_ctx *ctx,
157  					struct io_uring_cqe **ret,
158  					bool overflow)
159  {
160  	io_lockdep_assert_cq_locked(ctx);
161  
162  	if (unlikely(ctx->cqe_cached >= ctx->cqe_sentinel)) {
163  		if (unlikely(!io_cqe_cache_refill(ctx, overflow)))
164  			return false;
165  	}
166  	*ret = ctx->cqe_cached;
167  	ctx->cached_cq_tail++;
168  	ctx->cqe_cached++;
169  	if (ctx->flags & IORING_SETUP_CQE32)
170  		ctx->cqe_cached++;
171  	return true;
172  }
173  
io_get_cqe(struct io_ring_ctx * ctx,struct io_uring_cqe ** ret)174  static inline bool io_get_cqe(struct io_ring_ctx *ctx, struct io_uring_cqe **ret)
175  {
176  	return io_get_cqe_overflow(ctx, ret, false);
177  }
178  
io_fill_cqe_req(struct io_ring_ctx * ctx,struct io_kiocb * req)179  static __always_inline bool io_fill_cqe_req(struct io_ring_ctx *ctx,
180  					    struct io_kiocb *req)
181  {
182  	struct io_uring_cqe *cqe;
183  
184  	/*
185  	 * If we can't get a cq entry, userspace overflowed the
186  	 * submission (by quite a lot). Increment the overflow count in
187  	 * the ring.
188  	 */
189  	if (unlikely(!io_get_cqe(ctx, &cqe)))
190  		return false;
191  
192  	if (trace_io_uring_complete_enabled())
193  		trace_io_uring_complete(req->ctx, req, req->cqe.user_data,
194  					req->cqe.res, req->cqe.flags,
195  					req->big_cqe.extra1, req->big_cqe.extra2);
196  
197  	memcpy(cqe, &req->cqe, sizeof(*cqe));
198  	if (ctx->flags & IORING_SETUP_CQE32) {
199  		memcpy(cqe->big_cqe, &req->big_cqe, sizeof(*cqe));
200  		memset(&req->big_cqe, 0, sizeof(req->big_cqe));
201  	}
202  	return true;
203  }
204  
req_set_fail(struct io_kiocb * req)205  static inline void req_set_fail(struct io_kiocb *req)
206  {
207  	req->flags |= REQ_F_FAIL;
208  	if (req->flags & REQ_F_CQE_SKIP) {
209  		req->flags &= ~REQ_F_CQE_SKIP;
210  		req->flags |= REQ_F_SKIP_LINK_CQES;
211  	}
212  }
213  
io_req_set_res(struct io_kiocb * req,s32 res,u32 cflags)214  static inline void io_req_set_res(struct io_kiocb *req, s32 res, u32 cflags)
215  {
216  	req->cqe.res = res;
217  	req->cqe.flags = cflags;
218  }
219  
req_has_async_data(struct io_kiocb * req)220  static inline bool req_has_async_data(struct io_kiocb *req)
221  {
222  	return req->flags & REQ_F_ASYNC_DATA;
223  }
224  
io_put_file(struct io_kiocb * req)225  static inline void io_put_file(struct io_kiocb *req)
226  {
227  	if (!(req->flags & REQ_F_FIXED_FILE) && req->file)
228  		fput(req->file);
229  }
230  
io_ring_submit_unlock(struct io_ring_ctx * ctx,unsigned issue_flags)231  static inline void io_ring_submit_unlock(struct io_ring_ctx *ctx,
232  					 unsigned issue_flags)
233  {
234  	lockdep_assert_held(&ctx->uring_lock);
235  	if (unlikely(issue_flags & IO_URING_F_UNLOCKED))
236  		mutex_unlock(&ctx->uring_lock);
237  }
238  
io_ring_submit_lock(struct io_ring_ctx * ctx,unsigned issue_flags)239  static inline void io_ring_submit_lock(struct io_ring_ctx *ctx,
240  				       unsigned issue_flags)
241  {
242  	/*
243  	 * "Normal" inline submissions always hold the uring_lock, since we
244  	 * grab it from the system call. Same is true for the SQPOLL offload.
245  	 * The only exception is when we've detached the request and issue it
246  	 * from an async worker thread, grab the lock for that case.
247  	 */
248  	if (unlikely(issue_flags & IO_URING_F_UNLOCKED))
249  		mutex_lock(&ctx->uring_lock);
250  	lockdep_assert_held(&ctx->uring_lock);
251  }
252  
io_commit_cqring(struct io_ring_ctx * ctx)253  static inline void io_commit_cqring(struct io_ring_ctx *ctx)
254  {
255  	/* order cqe stores with ring update */
256  	smp_store_release(&ctx->rings->cq.tail, ctx->cached_cq_tail);
257  }
258  
io_poll_wq_wake(struct io_ring_ctx * ctx)259  static inline void io_poll_wq_wake(struct io_ring_ctx *ctx)
260  {
261  	if (wq_has_sleeper(&ctx->poll_wq))
262  		__wake_up(&ctx->poll_wq, TASK_NORMAL, 0,
263  				poll_to_key(EPOLL_URING_WAKE | EPOLLIN));
264  }
265  
io_cqring_wake(struct io_ring_ctx * ctx)266  static inline void io_cqring_wake(struct io_ring_ctx *ctx)
267  {
268  	/*
269  	 * Trigger waitqueue handler on all waiters on our waitqueue. This
270  	 * won't necessarily wake up all the tasks, io_should_wake() will make
271  	 * that decision.
272  	 *
273  	 * Pass in EPOLLIN|EPOLL_URING_WAKE as the poll wakeup key. The latter
274  	 * set in the mask so that if we recurse back into our own poll
275  	 * waitqueue handlers, we know we have a dependency between eventfd or
276  	 * epoll and should terminate multishot poll at that point.
277  	 */
278  	if (wq_has_sleeper(&ctx->cq_wait))
279  		__wake_up(&ctx->cq_wait, TASK_NORMAL, 0,
280  				poll_to_key(EPOLL_URING_WAKE | EPOLLIN));
281  }
282  
io_sqring_full(struct io_ring_ctx * ctx)283  static inline bool io_sqring_full(struct io_ring_ctx *ctx)
284  {
285  	struct io_rings *r = ctx->rings;
286  
287  	/*
288  	 * SQPOLL must use the actual sqring head, as using the cached_sq_head
289  	 * is race prone if the SQPOLL thread has grabbed entries but not yet
290  	 * committed them to the ring. For !SQPOLL, this doesn't matter, but
291  	 * since this helper is just used for SQPOLL sqring waits (or POLLOUT),
292  	 * just read the actual sqring head unconditionally.
293  	 */
294  	return READ_ONCE(r->sq.tail) - READ_ONCE(r->sq.head) == ctx->sq_entries;
295  }
296  
io_sqring_entries(struct io_ring_ctx * ctx)297  static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
298  {
299  	struct io_rings *rings = ctx->rings;
300  	unsigned int entries;
301  
302  	/* make sure SQ entry isn't read before tail */
303  	entries = smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
304  	return min(entries, ctx->sq_entries);
305  }
306  
io_run_task_work(void)307  static inline int io_run_task_work(void)
308  {
309  	bool ret = false;
310  
311  	/*
312  	 * Always check-and-clear the task_work notification signal. With how
313  	 * signaling works for task_work, we can find it set with nothing to
314  	 * run. We need to clear it for that case, like get_signal() does.
315  	 */
316  	if (test_thread_flag(TIF_NOTIFY_SIGNAL))
317  		clear_notify_signal();
318  	/*
319  	 * PF_IO_WORKER never returns to userspace, so check here if we have
320  	 * notify work that needs processing.
321  	 */
322  	if (current->flags & PF_IO_WORKER) {
323  		if (test_thread_flag(TIF_NOTIFY_RESUME)) {
324  			__set_current_state(TASK_RUNNING);
325  			resume_user_mode_work(NULL);
326  		}
327  		if (current->io_uring) {
328  			unsigned int count = 0;
329  
330  			__set_current_state(TASK_RUNNING);
331  			tctx_task_work_run(current->io_uring, UINT_MAX, &count);
332  			if (count)
333  				ret = true;
334  		}
335  	}
336  	if (task_work_pending(current)) {
337  		__set_current_state(TASK_RUNNING);
338  		task_work_run();
339  		ret = true;
340  	}
341  
342  	return ret;
343  }
344  
io_task_work_pending(struct io_ring_ctx * ctx)345  static inline bool io_task_work_pending(struct io_ring_ctx *ctx)
346  {
347  	return task_work_pending(current) || !llist_empty(&ctx->work_llist);
348  }
349  
io_tw_lock(struct io_ring_ctx * ctx,struct io_tw_state * ts)350  static inline void io_tw_lock(struct io_ring_ctx *ctx, struct io_tw_state *ts)
351  {
352  	lockdep_assert_held(&ctx->uring_lock);
353  }
354  
355  /*
356   * Don't complete immediately but use deferred completion infrastructure.
357   * Protected by ->uring_lock and can only be used either with
358   * IO_URING_F_COMPLETE_DEFER or inside a tw handler holding the mutex.
359   */
io_req_complete_defer(struct io_kiocb * req)360  static inline void io_req_complete_defer(struct io_kiocb *req)
361  	__must_hold(&req->ctx->uring_lock)
362  {
363  	struct io_submit_state *state = &req->ctx->submit_state;
364  
365  	lockdep_assert_held(&req->ctx->uring_lock);
366  
367  	wq_list_add_tail(&req->comp_list, &state->compl_reqs);
368  }
369  
io_commit_cqring_flush(struct io_ring_ctx * ctx)370  static inline void io_commit_cqring_flush(struct io_ring_ctx *ctx)
371  {
372  	if (unlikely(ctx->off_timeout_used || ctx->drain_active ||
373  		     ctx->has_evfd || ctx->poll_activated))
374  		__io_commit_cqring_flush(ctx);
375  }
376  
io_get_task_refs(int nr)377  static inline void io_get_task_refs(int nr)
378  {
379  	struct io_uring_task *tctx = current->io_uring;
380  
381  	tctx->cached_refs -= nr;
382  	if (unlikely(tctx->cached_refs < 0))
383  		io_task_refs_refill(tctx);
384  }
385  
io_req_cache_empty(struct io_ring_ctx * ctx)386  static inline bool io_req_cache_empty(struct io_ring_ctx *ctx)
387  {
388  	return !ctx->submit_state.free_list.next;
389  }
390  
391  extern struct kmem_cache *req_cachep;
392  extern struct kmem_cache *io_buf_cachep;
393  
io_extract_req(struct io_ring_ctx * ctx)394  static inline struct io_kiocb *io_extract_req(struct io_ring_ctx *ctx)
395  {
396  	struct io_kiocb *req;
397  
398  	req = container_of(ctx->submit_state.free_list.next, struct io_kiocb, comp_list);
399  	wq_stack_extract(&ctx->submit_state.free_list);
400  	return req;
401  }
402  
io_alloc_req(struct io_ring_ctx * ctx,struct io_kiocb ** req)403  static inline bool io_alloc_req(struct io_ring_ctx *ctx, struct io_kiocb **req)
404  {
405  	if (unlikely(io_req_cache_empty(ctx))) {
406  		if (!__io_alloc_req_refill(ctx))
407  			return false;
408  	}
409  	*req = io_extract_req(ctx);
410  	return true;
411  }
412  
io_allowed_defer_tw_run(struct io_ring_ctx * ctx)413  static inline bool io_allowed_defer_tw_run(struct io_ring_ctx *ctx)
414  {
415  	return likely(ctx->submitter_task == current);
416  }
417  
io_allowed_run_tw(struct io_ring_ctx * ctx)418  static inline bool io_allowed_run_tw(struct io_ring_ctx *ctx)
419  {
420  	return likely(!(ctx->flags & IORING_SETUP_DEFER_TASKRUN) ||
421  		      ctx->submitter_task == current);
422  }
423  
io_req_queue_tw_complete(struct io_kiocb * req,s32 res)424  static inline void io_req_queue_tw_complete(struct io_kiocb *req, s32 res)
425  {
426  	io_req_set_res(req, res, 0);
427  	req->io_task_work.func = io_req_task_complete;
428  	io_req_task_work_add(req);
429  }
430  
431  /*
432   * IORING_SETUP_SQE128 contexts allocate twice the normal SQE size for each
433   * slot.
434   */
uring_sqe_size(struct io_ring_ctx * ctx)435  static inline size_t uring_sqe_size(struct io_ring_ctx *ctx)
436  {
437  	if (ctx->flags & IORING_SETUP_SQE128)
438  		return 2 * sizeof(struct io_uring_sqe);
439  	return sizeof(struct io_uring_sqe);
440  }
441  
io_file_can_poll(struct io_kiocb * req)442  static inline bool io_file_can_poll(struct io_kiocb *req)
443  {
444  	if (req->flags & REQ_F_CAN_POLL)
445  		return true;
446  	if (req->file && file_can_poll(req->file)) {
447  		req->flags |= REQ_F_CAN_POLL;
448  		return true;
449  	}
450  	return false;
451  }
452  
io_get_time(struct io_ring_ctx * ctx)453  static inline ktime_t io_get_time(struct io_ring_ctx *ctx)
454  {
455  	if (ctx->clockid == CLOCK_MONOTONIC)
456  		return ktime_get();
457  
458  	return ktime_get_with_offset(ctx->clock_offset);
459  }
460  
461  enum {
462  	IO_CHECK_CQ_OVERFLOW_BIT,
463  	IO_CHECK_CQ_DROPPED_BIT,
464  };
465  
io_has_work(struct io_ring_ctx * ctx)466  static inline bool io_has_work(struct io_ring_ctx *ctx)
467  {
468  	return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq) ||
469  	       !llist_empty(&ctx->work_llist);
470  }
471  #endif
472