Lines Matching full:queue
12 #include "funnel-queue.h"
18 * This queue will attempt to handle requests in reasonably sized batches instead of reacting
22 * If the wait time becomes long enough, the queue will become dormant and must be explicitly
24 * queue via xchg (which is a memory barrier), and later checks "dormant" to decide whether to do a
28 * decide if the funnel queue is idle. In dormant mode, the last examination of "newest" before
31 * queue's "next" field update isn't visible yet to make the entry accessible, its existence will
35 * the queue to awaken immediately.
50 /* Wait queue for synchronizing producers and consumer */
54 /* Queue of new incoming requests */
56 /* Queue of old requests to retry */
68 static inline struct uds_request *poll_queues(struct uds_request_queue *queue) in poll_queues() argument
72 entry = vdo_funnel_queue_poll(queue->retry_queue); in poll_queues()
76 entry = vdo_funnel_queue_poll(queue->main_queue); in poll_queues()
83 static inline bool are_queues_idle(struct uds_request_queue *queue) in are_queues_idle() argument
85 return vdo_is_funnel_queue_idle(queue->retry_queue) && in are_queues_idle()
86 vdo_is_funnel_queue_idle(queue->main_queue); in are_queues_idle()
94 static inline bool dequeue_request(struct uds_request_queue *queue, in dequeue_request() argument
97 struct uds_request *request = poll_queues(queue); in dequeue_request()
104 if (!READ_ONCE(queue->running)) { in dequeue_request()
115 static void wait_for_request(struct uds_request_queue *queue, bool dormant, in wait_for_request() argument
120 wait_event_interruptible(queue->wait_head, in wait_for_request()
121 (dequeue_request(queue, request, waited) || in wait_for_request()
122 !are_queues_idle(queue))); in wait_for_request()
126 wait_event_interruptible_hrtimeout(queue->wait_head, in wait_for_request()
127 dequeue_request(queue, request, waited), in wait_for_request()
133 struct uds_request_queue *queue = arg; in request_queue_worker() local
136 bool dormant = atomic_read(&queue->dormant); in request_queue_worker()
141 wait_for_request(queue, dormant, time_batch, &request, &waited); in request_queue_worker()
144 queue->processor(request); in request_queue_worker()
145 } else if (!READ_ONCE(queue->running)) { in request_queue_worker()
151 * The queue has been roused from dormancy. Clear the flag so enqueuers can in request_queue_worker()
154 atomic_set(&queue->dormant, false); in request_queue_worker()
169 atomic_set(&queue->dormant, true); in request_queue_worker()
190 while ((request = poll_queues(queue)) != NULL) in request_queue_worker()
191 queue->processor(request); in request_queue_worker()
199 struct uds_request_queue *queue; in uds_make_request_queue() local
201 result = vdo_allocate(1, struct uds_request_queue, __func__, &queue); in uds_make_request_queue()
205 queue->processor = processor; in uds_make_request_queue()
206 queue->running = true; in uds_make_request_queue()
207 atomic_set(&queue->dormant, false); in uds_make_request_queue()
208 init_waitqueue_head(&queue->wait_head); in uds_make_request_queue()
210 result = vdo_make_funnel_queue(&queue->main_queue); in uds_make_request_queue()
212 uds_request_queue_finish(queue); in uds_make_request_queue()
216 result = vdo_make_funnel_queue(&queue->retry_queue); in uds_make_request_queue()
218 uds_request_queue_finish(queue); in uds_make_request_queue()
222 result = vdo_create_thread(request_queue_worker, queue, queue_name, in uds_make_request_queue()
223 &queue->thread); in uds_make_request_queue()
225 uds_request_queue_finish(queue); in uds_make_request_queue()
229 queue->started = true; in uds_make_request_queue()
230 *queue_ptr = queue; in uds_make_request_queue()
234 static inline void wake_up_worker(struct uds_request_queue *queue) in wake_up_worker() argument
236 if (wq_has_sleeper(&queue->wait_head)) in wake_up_worker()
237 wake_up(&queue->wait_head); in wake_up_worker()
240 void uds_request_queue_enqueue(struct uds_request_queue *queue, in uds_request_queue_enqueue() argument
246 sub_queue = request->requeued ? queue->retry_queue : queue->main_queue; in uds_request_queue_enqueue()
251 * we know the queue operation acts as one. in uds_request_queue_enqueue()
253 if (atomic_read(&queue->dormant) || unbatched) in uds_request_queue_enqueue()
254 wake_up_worker(queue); in uds_request_queue_enqueue()
257 void uds_request_queue_finish(struct uds_request_queue *queue) in uds_request_queue_finish() argument
259 if (queue == NULL) in uds_request_queue_finish()
265 * able to see any change we made to a next field in the funnel queue entry. The in uds_request_queue_finish()
269 WRITE_ONCE(queue->running, false); in uds_request_queue_finish()
271 if (queue->started) { in uds_request_queue_finish()
272 wake_up_worker(queue); in uds_request_queue_finish()
273 vdo_join_threads(queue->thread); in uds_request_queue_finish()
276 vdo_free_funnel_queue(queue->main_queue); in uds_request_queue_finish()
277 vdo_free_funnel_queue(queue->retry_queue); in uds_request_queue_finish()
278 vdo_free(queue); in uds_request_queue_finish()