85 if (rv > 0) |
81 if (rv > 0) |
86 goto again; /* signaled */ |
82 goto again; /* signaled */ |
87 } |
83 } |
88 mutex_exit(&tq->tq_lock); |
84 mutex_exit(&tq->tq_lock); |
89 |
85 |
90 t = kmem_alloc(sizeof (task_t), tqflags); |
86 t = kmem_alloc(sizeof (taskq_ent_t), tqflags); |
91 |
87 |
92 mutex_enter(&tq->tq_lock); |
88 mutex_enter(&tq->tq_lock); |
93 if (t != NULL) |
89 if (t != NULL) |
94 tq->tq_nalloc++; |
90 tq->tq_nalloc++; |
95 } |
91 } |
96 return (t); |
92 return (t); |
97 } |
93 } |
98 |
94 |
99 static void |
95 static void |
100 task_free(taskq_t *tq, task_t *t) |
96 task_free(taskq_t *tq, taskq_ent_t *t) |
101 { |
97 { |
102 if (tq->tq_nalloc <= tq->tq_minalloc) { |
98 if (tq->tq_nalloc <= tq->tq_minalloc) { |
103 t->task_next = tq->tq_freelist; |
99 t->tqent_next = tq->tq_freelist; |
104 tq->tq_freelist = t; |
100 tq->tq_freelist = t; |
105 } else { |
101 } else { |
106 tq->tq_nalloc--; |
102 tq->tq_nalloc--; |
107 mutex_exit(&tq->tq_lock); |
103 mutex_exit(&tq->tq_lock); |
108 kmem_free(t, sizeof (task_t)); |
104 kmem_free(t, sizeof (taskq_ent_t)); |
109 mutex_enter(&tq->tq_lock); |
105 mutex_enter(&tq->tq_lock); |
110 } |
106 } |
111 |
107 |
112 if (tq->tq_maxalloc_wait) |
108 if (tq->tq_maxalloc_wait) |
113 cv_signal(&tq->tq_maxalloc_cv); |
109 cv_signal(&tq->tq_maxalloc_cv); |
114 } |
110 } |
115 |
111 |
116 taskqid_t |
112 taskqid_t |
117 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) |
113 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) |
118 { |
114 { |
119 task_t *t; |
115 taskq_ent_t *t; |
120 |
116 |
121 if (taskq_now) { |
117 if (taskq_now) { |
122 func(arg); |
118 func(arg); |
123 return (1); |
119 return (1); |
124 } |
120 } |
128 if ((t = task_alloc(tq, tqflags)) == NULL) { |
124 if ((t = task_alloc(tq, tqflags)) == NULL) { |
129 mutex_exit(&tq->tq_lock); |
125 mutex_exit(&tq->tq_lock); |
130 return (0); |
126 return (0); |
131 } |
127 } |
132 if (tqflags & TQ_FRONT) { |
128 if (tqflags & TQ_FRONT) { |
133 t->task_next = tq->tq_task.task_next; |
129 t->tqent_next = tq->tq_task.tqent_next; |
134 t->task_prev = &tq->tq_task; |
130 t->tqent_prev = &tq->tq_task; |
135 } else { |
131 } else { |
136 t->task_next = &tq->tq_task; |
132 t->tqent_next = &tq->tq_task; |
137 t->task_prev = tq->tq_task.task_prev; |
133 t->tqent_prev = tq->tq_task.tqent_prev; |
138 } |
134 } |
139 t->task_next->task_prev = t; |
135 t->tqent_next->tqent_prev = t; |
140 t->task_prev->task_next = t; |
136 t->tqent_prev->tqent_next = t; |
141 t->task_func = func; |
137 t->tqent_func = func; |
142 t->task_arg = arg; |
138 t->tqent_arg = arg; |
143 cv_signal(&tq->tq_dispatch_cv); |
139 cv_signal(&tq->tq_dispatch_cv); |
144 mutex_exit(&tq->tq_lock); |
140 mutex_exit(&tq->tq_lock); |
145 return (1); |
141 return (1); |
146 } |
142 } |
147 |
143 |
148 void |
144 void |
|
145 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, |
|
146 taskq_ent_t *t) |
|
147 { |
|
148 ASSERT(func != NULL); |
|
149 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); |
|
150 |
|
151 /* |
|
152 * Mark it as a prealloc'd task. This is important |
|
153 * to ensure that we don't free it later. |
|
154 */ |
|
155 t->tqent_flags |= TQENT_FLAG_PREALLOC; |
|
156 /* |
|
157 * Enqueue the task to the underlying queue. |
|
158 */ |
|
159 mutex_enter(&tq->tq_lock); |
|
160 |
|
161 if (flags & TQ_FRONT) { |
|
162 t->tqent_next = tq->tq_task.tqent_next; |
|
163 t->tqent_prev = &tq->tq_task; |
|
164 } else { |
|
165 t->tqent_next = &tq->tq_task; |
|
166 t->tqent_prev = tq->tq_task.tqent_prev; |
|
167 } |
|
168 t->tqent_next->tqent_prev = t; |
|
169 t->tqent_prev->tqent_next = t; |
|
170 t->tqent_func = func; |
|
171 t->tqent_arg = arg; |
|
172 cv_signal(&tq->tq_dispatch_cv); |
|
173 mutex_exit(&tq->tq_lock); |
|
174 } |
|
175 |
|
176 void |
149 taskq_wait(taskq_t *tq) |
177 taskq_wait(taskq_t *tq) |
150 { |
178 { |
151 mutex_enter(&tq->tq_lock); |
179 mutex_enter(&tq->tq_lock); |
152 while (tq->tq_task.task_next != &tq->tq_task || tq->tq_active != 0) |
180 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) |
153 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); |
181 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); |
154 mutex_exit(&tq->tq_lock); |
182 mutex_exit(&tq->tq_lock); |
155 } |
183 } |
156 |
184 |
157 static void * |
185 static void * |
158 taskq_thread(void *arg) |
186 taskq_thread(void *arg) |
159 { |
187 { |
160 taskq_t *tq = arg; |
188 taskq_t *tq = arg; |
161 task_t *t; |
189 taskq_ent_t *t; |
|
190 boolean_t prealloc; |
162 |
191 |
163 mutex_enter(&tq->tq_lock); |
192 mutex_enter(&tq->tq_lock); |
164 while (tq->tq_flags & TASKQ_ACTIVE) { |
193 while (tq->tq_flags & TASKQ_ACTIVE) { |
165 if ((t = tq->tq_task.task_next) == &tq->tq_task) { |
194 if ((t = tq->tq_task.tqent_next) == &tq->tq_task) { |
166 if (--tq->tq_active == 0) |
195 if (--tq->tq_active == 0) |
167 cv_broadcast(&tq->tq_wait_cv); |
196 cv_broadcast(&tq->tq_wait_cv); |
168 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); |
197 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); |
169 tq->tq_active++; |
198 tq->tq_active++; |
170 continue; |
199 continue; |
171 } |
200 } |
172 t->task_prev->task_next = t->task_next; |
201 t->tqent_prev->tqent_next = t->tqent_next; |
173 t->task_next->task_prev = t->task_prev; |
202 t->tqent_next->tqent_prev = t->tqent_prev; |
|
203 t->tqent_next = NULL; |
|
204 t->tqent_prev = NULL; |
|
205 prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC; |
174 mutex_exit(&tq->tq_lock); |
206 mutex_exit(&tq->tq_lock); |
175 |
207 |
176 rw_enter(&tq->tq_threadlock, RW_READER); |
208 rw_enter(&tq->tq_threadlock, RW_READER); |
177 t->task_func(t->task_arg); |
209 t->tqent_func(t->tqent_arg); |
178 rw_exit(&tq->tq_threadlock); |
210 rw_exit(&tq->tq_threadlock); |
179 |
211 |
180 mutex_enter(&tq->tq_lock); |
212 mutex_enter(&tq->tq_lock); |
181 task_free(tq, t); |
213 if (!prealloc) |
|
214 task_free(tq, t); |
182 } |
215 } |
183 tq->tq_nthreads--; |
216 tq->tq_nthreads--; |
184 cv_broadcast(&tq->tq_wait_cv); |
217 cv_broadcast(&tq->tq_wait_cv); |
185 mutex_exit(&tq->tq_lock); |
218 mutex_exit(&tq->tq_lock); |
186 return (NULL); |
219 return (NULL); |
215 tq->tq_flags = flags | TASKQ_ACTIVE; |
248 tq->tq_flags = flags | TASKQ_ACTIVE; |
216 tq->tq_active = nthreads; |
249 tq->tq_active = nthreads; |
217 tq->tq_nthreads = nthreads; |
250 tq->tq_nthreads = nthreads; |
218 tq->tq_minalloc = minalloc; |
251 tq->tq_minalloc = minalloc; |
219 tq->tq_maxalloc = maxalloc; |
252 tq->tq_maxalloc = maxalloc; |
220 tq->tq_task.task_next = &tq->tq_task; |
253 tq->tq_task.tqent_next = &tq->tq_task; |
221 tq->tq_task.task_prev = &tq->tq_task; |
254 tq->tq_task.tqent_prev = &tq->tq_task; |
222 tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP); |
255 tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP); |
223 |
256 |
224 if (flags & TASKQ_PREPOPULATE) { |
257 if (flags & TASKQ_PREPOPULATE) { |
225 mutex_enter(&tq->tq_lock); |
258 mutex_enter(&tq->tq_lock); |
226 while (minalloc-- > 0) |
259 while (minalloc-- > 0) |