usr/src/lib/libzpool/common/taskq.c
changeset 13414 b42c1f0432b6
parent 11854 5351ddd19d45
child 13597 3eac1e8e0f4c
equal deleted inserted replaced
13413:22409bb43a34 13414:b42c1f0432b6
    20  */
    20  */
    21 /*
    21 /*
    22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
    22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
    23  * Use is subject to license terms.
    23  * Use is subject to license terms.
    24  */
    24  */
       
    25 /*
       
    26  * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
       
    27  */
    25 
    28 
    26 #include <sys/zfs_context.h>
    29 #include <sys/zfs_context.h>
    27 
    30 
    28 int taskq_now;
    31 int taskq_now;
    29 taskq_t *system_taskq;
    32 taskq_t *system_taskq;
    30 
       
    31 typedef struct task {
       
    32 	struct task	*task_next;
       
    33 	struct task	*task_prev;
       
    34 	task_func_t	*task_func;
       
    35 	void		*task_arg;
       
    36 } task_t;
       
    37 
    33 
    38 #define	TASKQ_ACTIVE	0x00010000
    34 #define	TASKQ_ACTIVE	0x00010000
    39 
    35 
    40 struct taskq {
    36 struct taskq {
    41 	kmutex_t	tq_lock;
    37 	kmutex_t	tq_lock;
    49 	int		tq_nalloc;
    45 	int		tq_nalloc;
    50 	int		tq_minalloc;
    46 	int		tq_minalloc;
    51 	int		tq_maxalloc;
    47 	int		tq_maxalloc;
    52 	kcondvar_t	tq_maxalloc_cv;
    48 	kcondvar_t	tq_maxalloc_cv;
    53 	int		tq_maxalloc_wait;
    49 	int		tq_maxalloc_wait;
    54 	task_t		*tq_freelist;
    50 	taskq_ent_t	*tq_freelist;
    55 	task_t		tq_task;
    51 	taskq_ent_t	tq_task;
    56 };
    52 };
    57 
    53 
    58 static task_t *
    54 static taskq_ent_t *
    59 task_alloc(taskq_t *tq, int tqflags)
    55 task_alloc(taskq_t *tq, int tqflags)
    60 {
    56 {
    61 	task_t *t;
    57 	taskq_ent_t *t;
    62 	int rv;
    58 	int rv;
    63 
    59 
    64 again:	if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
    60 again:	if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
    65 		tq->tq_freelist = t->task_next;
    61 		tq->tq_freelist = t->tqent_next;
    66 	} else {
    62 	} else {
    67 		if (tq->tq_nalloc >= tq->tq_maxalloc) {
    63 		if (tq->tq_nalloc >= tq->tq_maxalloc) {
    68 			if (!(tqflags & KM_SLEEP))
    64 			if (!(tqflags & KM_SLEEP))
    69 				return (NULL);
    65 				return (NULL);
    70 
    66 
    85 			if (rv > 0)
    81 			if (rv > 0)
    86 				goto again;		/* signaled */
    82 				goto again;		/* signaled */
    87 		}
    83 		}
    88 		mutex_exit(&tq->tq_lock);
    84 		mutex_exit(&tq->tq_lock);
    89 
    85 
    90 		t = kmem_alloc(sizeof (task_t), tqflags);
    86 		t = kmem_alloc(sizeof (taskq_ent_t), tqflags);
    91 
    87 
    92 		mutex_enter(&tq->tq_lock);
    88 		mutex_enter(&tq->tq_lock);
    93 		if (t != NULL)
    89 		if (t != NULL)
    94 			tq->tq_nalloc++;
    90 			tq->tq_nalloc++;
    95 	}
    91 	}
    96 	return (t);
    92 	return (t);
    97 }
    93 }
    98 
    94 
    99 static void
    95 static void
   100 task_free(taskq_t *tq, task_t *t)
    96 task_free(taskq_t *tq, taskq_ent_t *t)
   101 {
    97 {
   102 	if (tq->tq_nalloc <= tq->tq_minalloc) {
    98 	if (tq->tq_nalloc <= tq->tq_minalloc) {
   103 		t->task_next = tq->tq_freelist;
    99 		t->tqent_next = tq->tq_freelist;
   104 		tq->tq_freelist = t;
   100 		tq->tq_freelist = t;
   105 	} else {
   101 	} else {
   106 		tq->tq_nalloc--;
   102 		tq->tq_nalloc--;
   107 		mutex_exit(&tq->tq_lock);
   103 		mutex_exit(&tq->tq_lock);
   108 		kmem_free(t, sizeof (task_t));
   104 		kmem_free(t, sizeof (taskq_ent_t));
   109 		mutex_enter(&tq->tq_lock);
   105 		mutex_enter(&tq->tq_lock);
   110 	}
   106 	}
   111 
   107 
   112 	if (tq->tq_maxalloc_wait)
   108 	if (tq->tq_maxalloc_wait)
   113 		cv_signal(&tq->tq_maxalloc_cv);
   109 		cv_signal(&tq->tq_maxalloc_cv);
   114 }
   110 }
   115 
   111 
   116 taskqid_t
   112 taskqid_t
   117 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags)
   113 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags)
   118 {
   114 {
   119 	task_t *t;
   115 	taskq_ent_t *t;
   120 
   116 
   121 	if (taskq_now) {
   117 	if (taskq_now) {
   122 		func(arg);
   118 		func(arg);
   123 		return (1);
   119 		return (1);
   124 	}
   120 	}
   128 	if ((t = task_alloc(tq, tqflags)) == NULL) {
   124 	if ((t = task_alloc(tq, tqflags)) == NULL) {
   129 		mutex_exit(&tq->tq_lock);
   125 		mutex_exit(&tq->tq_lock);
   130 		return (0);
   126 		return (0);
   131 	}
   127 	}
   132 	if (tqflags & TQ_FRONT) {
   128 	if (tqflags & TQ_FRONT) {
   133 		t->task_next = tq->tq_task.task_next;
   129 		t->tqent_next = tq->tq_task.tqent_next;
   134 		t->task_prev = &tq->tq_task;
   130 		t->tqent_prev = &tq->tq_task;
   135 	} else {
   131 	} else {
   136 		t->task_next = &tq->tq_task;
   132 		t->tqent_next = &tq->tq_task;
   137 		t->task_prev = tq->tq_task.task_prev;
   133 		t->tqent_prev = tq->tq_task.tqent_prev;
   138 	}
   134 	}
   139 	t->task_next->task_prev = t;
   135 	t->tqent_next->tqent_prev = t;
   140 	t->task_prev->task_next = t;
   136 	t->tqent_prev->tqent_next = t;
   141 	t->task_func = func;
   137 	t->tqent_func = func;
   142 	t->task_arg = arg;
   138 	t->tqent_arg = arg;
   143 	cv_signal(&tq->tq_dispatch_cv);
   139 	cv_signal(&tq->tq_dispatch_cv);
   144 	mutex_exit(&tq->tq_lock);
   140 	mutex_exit(&tq->tq_lock);
   145 	return (1);
   141 	return (1);
   146 }
   142 }
   147 
   143 
   148 void
   144 void
       
   145 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
       
   146     taskq_ent_t *t)
       
   147 {
       
   148 	ASSERT(func != NULL);
       
   149 	ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
       
   150 
       
   151 	/*
       
   152 	 * Mark it as a prealloc'd task.  This is important
       
   153 	 * to ensure that we don't free it later.
       
   154 	 */
       
   155 	t->tqent_flags |= TQENT_FLAG_PREALLOC;
       
   156 	/*
       
   157 	 * Enqueue the task to the underlying queue.
       
   158 	 */
       
   159 	mutex_enter(&tq->tq_lock);
       
   160 
       
   161 	if (flags & TQ_FRONT) {
       
   162 		t->tqent_next = tq->tq_task.tqent_next;
       
   163 		t->tqent_prev = &tq->tq_task;
       
   164 	} else {
       
   165 		t->tqent_next = &tq->tq_task;
       
   166 		t->tqent_prev = tq->tq_task.tqent_prev;
       
   167 	}
       
   168 	t->tqent_next->tqent_prev = t;
       
   169 	t->tqent_prev->tqent_next = t;
       
   170 	t->tqent_func = func;
       
   171 	t->tqent_arg = arg;
       
   172 	cv_signal(&tq->tq_dispatch_cv);
       
   173 	mutex_exit(&tq->tq_lock);
       
   174 }
       
   175 
       
   176 void
   149 taskq_wait(taskq_t *tq)
   177 taskq_wait(taskq_t *tq)
   150 {
   178 {
   151 	mutex_enter(&tq->tq_lock);
   179 	mutex_enter(&tq->tq_lock);
   152 	while (tq->tq_task.task_next != &tq->tq_task || tq->tq_active != 0)
   180 	while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)
   153 		cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
   181 		cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
   154 	mutex_exit(&tq->tq_lock);
   182 	mutex_exit(&tq->tq_lock);
   155 }
   183 }
   156 
   184 
   157 static void *
   185 static void *
   158 taskq_thread(void *arg)
   186 taskq_thread(void *arg)
   159 {
   187 {
   160 	taskq_t *tq = arg;
   188 	taskq_t *tq = arg;
   161 	task_t *t;
   189 	taskq_ent_t *t;
       
   190 	boolean_t prealloc;
   162 
   191 
   163 	mutex_enter(&tq->tq_lock);
   192 	mutex_enter(&tq->tq_lock);
   164 	while (tq->tq_flags & TASKQ_ACTIVE) {
   193 	while (tq->tq_flags & TASKQ_ACTIVE) {
   165 		if ((t = tq->tq_task.task_next) == &tq->tq_task) {
   194 		if ((t = tq->tq_task.tqent_next) == &tq->tq_task) {
   166 			if (--tq->tq_active == 0)
   195 			if (--tq->tq_active == 0)
   167 				cv_broadcast(&tq->tq_wait_cv);
   196 				cv_broadcast(&tq->tq_wait_cv);
   168 			cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
   197 			cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
   169 			tq->tq_active++;
   198 			tq->tq_active++;
   170 			continue;
   199 			continue;
   171 		}
   200 		}
   172 		t->task_prev->task_next = t->task_next;
   201 		t->tqent_prev->tqent_next = t->tqent_next;
   173 		t->task_next->task_prev = t->task_prev;
   202 		t->tqent_next->tqent_prev = t->tqent_prev;
       
   203 		t->tqent_next = NULL;
       
   204 		t->tqent_prev = NULL;
       
   205 		prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC;
   174 		mutex_exit(&tq->tq_lock);
   206 		mutex_exit(&tq->tq_lock);
   175 
   207 
   176 		rw_enter(&tq->tq_threadlock, RW_READER);
   208 		rw_enter(&tq->tq_threadlock, RW_READER);
   177 		t->task_func(t->task_arg);
   209 		t->tqent_func(t->tqent_arg);
   178 		rw_exit(&tq->tq_threadlock);
   210 		rw_exit(&tq->tq_threadlock);
   179 
   211 
   180 		mutex_enter(&tq->tq_lock);
   212 		mutex_enter(&tq->tq_lock);
   181 		task_free(tq, t);
   213 		if (!prealloc)
       
   214 			task_free(tq, t);
   182 	}
   215 	}
   183 	tq->tq_nthreads--;
   216 	tq->tq_nthreads--;
   184 	cv_broadcast(&tq->tq_wait_cv);
   217 	cv_broadcast(&tq->tq_wait_cv);
   185 	mutex_exit(&tq->tq_lock);
   218 	mutex_exit(&tq->tq_lock);
   186 	return (NULL);
   219 	return (NULL);
   215 	tq->tq_flags = flags | TASKQ_ACTIVE;
   248 	tq->tq_flags = flags | TASKQ_ACTIVE;
   216 	tq->tq_active = nthreads;
   249 	tq->tq_active = nthreads;
   217 	tq->tq_nthreads = nthreads;
   250 	tq->tq_nthreads = nthreads;
   218 	tq->tq_minalloc = minalloc;
   251 	tq->tq_minalloc = minalloc;
   219 	tq->tq_maxalloc = maxalloc;
   252 	tq->tq_maxalloc = maxalloc;
   220 	tq->tq_task.task_next = &tq->tq_task;
   253 	tq->tq_task.tqent_next = &tq->tq_task;
   221 	tq->tq_task.task_prev = &tq->tq_task;
   254 	tq->tq_task.tqent_prev = &tq->tq_task;
   222 	tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP);
   255 	tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP);
   223 
   256 
   224 	if (flags & TASKQ_PREPOPULATE) {
   257 	if (flags & TASKQ_PREPOPULATE) {
   225 		mutex_enter(&tq->tq_lock);
   258 		mutex_enter(&tq->tq_lock);
   226 		while (minalloc-- > 0)
   259 		while (minalloc-- > 0)