734 taskq_dispatch_prealloc() desired
authorGarrett D'Amore <garrett@nexenta.com>
Wed, 27 Jul 2011 07:13:44 -0700
changeset 13414 b42c1f0432b6
parent 13413 22409bb43a34
child 13415 cc04348bbc51
734 taskq_dispatch_prealloc() desired 943 zio_interrupt ends up calling taskq_dispatch with TQ_SLEEP Reviewed by: Albert Lee <[email protected]> Reviewed by: Richard Lowe <[email protected]> Reviewed by: Alexey Zaytsev <[email protected]> Reviewed by: Jason Brian King <[email protected]> Reviewed by: George Wilson <[email protected]> Reviewed by: Adam Leventhal <[email protected]> Approved by: Gordon Ross <[email protected]>
usr/src/lib/libzpool/common/sys/zfs_context.h
usr/src/lib/libzpool/common/taskq.c
usr/src/uts/common/fs/zfs/spa.c
usr/src/uts/common/fs/zfs/sys/zfs_context.h
usr/src/uts/common/fs/zfs/sys/zio.h
usr/src/uts/common/fs/zfs/zio.c
usr/src/uts/common/os/taskq.c
usr/src/uts/common/sys/taskq_impl.h
--- a/usr/src/lib/libzpool/common/sys/zfs_context.h	Mon May 16 00:20:30 2011 +0100
+++ b/usr/src/lib/libzpool/common/sys/zfs_context.h	Wed Jul 27 07:13:44 2011 -0700
@@ -21,6 +21,9 @@
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  */
+/*
+ * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
+ */
 
 #ifndef _SYS_ZFS_CONTEXT_H
 #define	_SYS_ZFS_CONTEXT_H
@@ -347,6 +350,16 @@
 typedef uintptr_t taskqid_t;
 typedef void (task_func_t)(void *);
 
+typedef struct taskq_ent {
+	struct taskq_ent	*tqent_next;
+	struct taskq_ent	*tqent_prev;
+	task_func_t		*tqent_func;
+	void			*tqent_arg;
+	uintptr_t		tqent_flags;
+} taskq_ent_t;
+
+#define	TQENT_FLAG_PREALLOC	0x1	/* taskq_dispatch_ent used */
+
 #define	TASKQ_PREPOPULATE	0x0001
 #define	TASKQ_CPR_SAFE		0x0002	/* Use CPR safe protocol */
 #define	TASKQ_DYNAMIC		0x0004	/* Use dynamic thread scheduling */
@@ -358,6 +371,7 @@
 #define	TQ_NOQUEUE	0x02		/* Do not enqueue if can't dispatch */
 #define	TQ_FRONT	0x08		/* Queue in front */
 
+
 extern taskq_t *system_taskq;
 
 extern taskq_t	*taskq_create(const char *, int, pri_t, int, int, uint_t);
@@ -366,6 +380,8 @@
 #define	taskq_create_sysdc(a, b, d, e, p, dc, f) \
 	    (taskq_create(a, b, maxclsyspri, d, e, f))
 extern taskqid_t taskq_dispatch(taskq_t *, task_func_t, void *, uint_t);
+extern void	taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t,
+    taskq_ent_t *);
 extern void	taskq_destroy(taskq_t *);
 extern void	taskq_wait(taskq_t *);
 extern int	taskq_member(taskq_t *, void *);
--- a/usr/src/lib/libzpool/common/taskq.c	Mon May 16 00:20:30 2011 +0100
+++ b/usr/src/lib/libzpool/common/taskq.c	Wed Jul 27 07:13:44 2011 -0700
@@ -22,19 +22,15 @@
  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
+/*
+ * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
+ */
 
 #include <sys/zfs_context.h>
 
 int taskq_now;
 taskq_t *system_taskq;
 
-typedef struct task {
-	struct task	*task_next;
-	struct task	*task_prev;
-	task_func_t	*task_func;
-	void		*task_arg;
-} task_t;
-
 #define	TASKQ_ACTIVE	0x00010000
 
 struct taskq {
@@ -51,18 +47,18 @@
 	int		tq_maxalloc;
 	kcondvar_t	tq_maxalloc_cv;
 	int		tq_maxalloc_wait;
-	task_t		*tq_freelist;
-	task_t		tq_task;
+	taskq_ent_t	*tq_freelist;
+	taskq_ent_t	tq_task;
 };
 
-static task_t *
+static taskq_ent_t *
 task_alloc(taskq_t *tq, int tqflags)
 {
-	task_t *t;
+	taskq_ent_t *t;
 	int rv;
 
 again:	if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
-		tq->tq_freelist = t->task_next;
+		tq->tq_freelist = t->tqent_next;
 	} else {
 		if (tq->tq_nalloc >= tq->tq_maxalloc) {
 			if (!(tqflags & KM_SLEEP))
@@ -87,7 +83,7 @@
 		}
 		mutex_exit(&tq->tq_lock);
 
-		t = kmem_alloc(sizeof (task_t), tqflags);
+		t = kmem_alloc(sizeof (taskq_ent_t), tqflags);
 
 		mutex_enter(&tq->tq_lock);
 		if (t != NULL)
@@ -97,15 +93,15 @@
 }
 
 static void
-task_free(taskq_t *tq, task_t *t)
+task_free(taskq_t *tq, taskq_ent_t *t)
 {
 	if (tq->tq_nalloc <= tq->tq_minalloc) {
-		t->task_next = tq->tq_freelist;
+		t->tqent_next = tq->tq_freelist;
 		tq->tq_freelist = t;
 	} else {
 		tq->tq_nalloc--;
 		mutex_exit(&tq->tq_lock);
-		kmem_free(t, sizeof (task_t));
+		kmem_free(t, sizeof (taskq_ent_t));
 		mutex_enter(&tq->tq_lock);
 	}
 
@@ -116,7 +112,7 @@
 taskqid_t
 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags)
 {
-	task_t *t;
+	taskq_ent_t *t;
 
 	if (taskq_now) {
 		func(arg);
@@ -130,26 +126,58 @@
 		return (0);
 	}
 	if (tqflags & TQ_FRONT) {
-		t->task_next = tq->tq_task.task_next;
-		t->task_prev = &tq->tq_task;
+		t->tqent_next = tq->tq_task.tqent_next;
+		t->tqent_prev = &tq->tq_task;
 	} else {
-		t->task_next = &tq->tq_task;
-		t->task_prev = tq->tq_task.task_prev;
+		t->tqent_next = &tq->tq_task;
+		t->tqent_prev = tq->tq_task.tqent_prev;
 	}
-	t->task_next->task_prev = t;
-	t->task_prev->task_next = t;
-	t->task_func = func;
-	t->task_arg = arg;
+	t->tqent_next->tqent_prev = t;
+	t->tqent_prev->tqent_next = t;
+	t->tqent_func = func;
+	t->tqent_arg = arg;
 	cv_signal(&tq->tq_dispatch_cv);
 	mutex_exit(&tq->tq_lock);
 	return (1);
 }
 
 void
+taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
+    taskq_ent_t *t)
+{
+	ASSERT(func != NULL);
+	ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
+
+	/*
+	 * Mark it as a prealloc'd task.  This is important
+	 * to ensure that we don't free it later.
+	 */
+	t->tqent_flags |= TQENT_FLAG_PREALLOC;
+	/*
+	 * Enqueue the task to the underlying queue.
+	 */
+	mutex_enter(&tq->tq_lock);
+
+	if (flags & TQ_FRONT) {
+		t->tqent_next = tq->tq_task.tqent_next;
+		t->tqent_prev = &tq->tq_task;
+	} else {
+		t->tqent_next = &tq->tq_task;
+		t->tqent_prev = tq->tq_task.tqent_prev;
+	}
+	t->tqent_next->tqent_prev = t;
+	t->tqent_prev->tqent_next = t;
+	t->tqent_func = func;
+	t->tqent_arg = arg;
+	cv_signal(&tq->tq_dispatch_cv);
+	mutex_exit(&tq->tq_lock);
+}
+
+void
 taskq_wait(taskq_t *tq)
 {
 	mutex_enter(&tq->tq_lock);
-	while (tq->tq_task.task_next != &tq->tq_task || tq->tq_active != 0)
+	while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)
 		cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
 	mutex_exit(&tq->tq_lock);
 }
@@ -158,27 +186,32 @@
 taskq_thread(void *arg)
 {
 	taskq_t *tq = arg;
-	task_t *t;
+	taskq_ent_t *t;
+	boolean_t prealloc;
 
 	mutex_enter(&tq->tq_lock);
 	while (tq->tq_flags & TASKQ_ACTIVE) {
-		if ((t = tq->tq_task.task_next) == &tq->tq_task) {
+		if ((t = tq->tq_task.tqent_next) == &tq->tq_task) {
 			if (--tq->tq_active == 0)
 				cv_broadcast(&tq->tq_wait_cv);
 			cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
 			tq->tq_active++;
 			continue;
 		}
-		t->task_prev->task_next = t->task_next;
-		t->task_next->task_prev = t->task_prev;
+		t->tqent_prev->tqent_next = t->tqent_next;
+		t->tqent_next->tqent_prev = t->tqent_prev;
+		t->tqent_next = NULL;
+		t->tqent_prev = NULL;
+		prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC;
 		mutex_exit(&tq->tq_lock);
 
 		rw_enter(&tq->tq_threadlock, RW_READER);
-		t->task_func(t->task_arg);
+		t->tqent_func(t->tqent_arg);
 		rw_exit(&tq->tq_threadlock);
 
 		mutex_enter(&tq->tq_lock);
-		task_free(tq, t);
+		if (!prealloc)
+			task_free(tq, t);
 	}
 	tq->tq_nthreads--;
 	cv_broadcast(&tq->tq_wait_cv);
@@ -217,8 +250,8 @@
 	tq->tq_nthreads = nthreads;
 	tq->tq_minalloc = minalloc;
 	tq->tq_maxalloc = maxalloc;
-	tq->tq_task.task_next = &tq->tq_task;
-	tq->tq_task.task_prev = &tq->tq_task;
+	tq->tq_task.tqent_next = &tq->tq_task;
+	tq->tq_task.tqent_prev = &tq->tq_task;
 	tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP);
 
 	if (flags & TASKQ_PREPOPULATE) {
--- a/usr/src/uts/common/fs/zfs/spa.c	Mon May 16 00:20:30 2011 +0100
+++ b/usr/src/uts/common/fs/zfs/spa.c	Wed Jul 27 07:13:44 2011 -0700
@@ -22,6 +22,9 @@
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  */
+/*
+ * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
+ */
 
 /*
  * This file contains all the routines used when modifying on-disk SPA state.
@@ -610,7 +613,7 @@
 spa_taskq_create(spa_t *spa, const char *name, enum zti_modes mode,
     uint_t value)
 {
-	uint_t flags = TASKQ_PREPOPULATE;
+	uint_t flags = 0;
 	boolean_t batch = B_FALSE;
 
 	switch (mode) {
--- a/usr/src/uts/common/fs/zfs/sys/zfs_context.h	Mon May 16 00:20:30 2011 +0100
+++ b/usr/src/uts/common/fs/zfs/sys/zfs_context.h	Wed Jul 27 07:13:44 2011 -0700
@@ -22,6 +22,9 @@
  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
+/*
+ * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
+ */
 
 #ifndef _SYS_ZFS_CONTEXT_H
 #define	_SYS_ZFS_CONTEXT_H
@@ -39,6 +42,7 @@
 #include <sys/cmn_err.h>
 #include <sys/kmem.h>
 #include <sys/taskq.h>
+#include <sys/taskq_impl.h>
 #include <sys/buf.h>
 #include <sys/param.h>
 #include <sys/systm.h>
--- a/usr/src/uts/common/fs/zfs/sys/zio.h	Mon May 16 00:20:30 2011 +0100
+++ b/usr/src/uts/common/fs/zfs/sys/zio.h	Wed Jul 27 07:13:44 2011 -0700
@@ -22,6 +22,9 @@
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  */
+/*
+ * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
+ */
 
 #ifndef _ZIO_H
 #define	_ZIO_H
@@ -417,6 +420,9 @@
 	/* FMA state */
 	zio_cksum_report_t *io_cksum_report;
 	uint64_t	io_ena;
+
+	/* Taskq dispatching state */
+	taskq_ent_t	io_tqent;
 };
 
 extern zio_t *zio_null(zio_t *pio, spa_t *spa, vdev_t *vd,
--- a/usr/src/uts/common/fs/zfs/zio.c	Mon May 16 00:20:30 2011 +0100
+++ b/usr/src/uts/common/fs/zfs/zio.c	Wed Jul 27 07:13:44 2011 -0700
@@ -21,6 +21,7 @@
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  * Copyright (c) 2011 by Delphix. All rights reserved.
+ * Copyright (c) 2011 Nexenta Systems, Inc. All rights reserved.
  */
 
 #include <sys/zfs_context.h>
@@ -1061,7 +1062,7 @@
 {
 	spa_t *spa = zio->io_spa;
 	zio_type_t t = zio->io_type;
-	int flags = TQ_SLEEP | (cutinline ? TQ_FRONT : 0);
+	int flags = (cutinline ? TQ_FRONT : 0);
 
 	/*
 	 * If we're a config writer or a probe, the normal issue and
@@ -1085,8 +1086,15 @@
 		q++;
 
 	ASSERT3U(q, <, ZIO_TASKQ_TYPES);
-	(void) taskq_dispatch(spa->spa_zio_taskq[t][q],
-	    (task_func_t *)zio_execute, zio, flags);
+
+	/*
+	 * NB: We are assuming that the zio can only be dispatched
+	 * to a single taskq at a time.  It would be a grievous error
+	 * to dispatch the zio to another taskq at the same time.
+	 */
+	ASSERT(zio->io_tqent.tqent_next == NULL);
+	taskq_dispatch_ent(spa->spa_zio_taskq[t][q],
+	    (task_func_t *)zio_execute, zio, flags, &zio->io_tqent);
 }
 
 static boolean_t
@@ -2889,9 +2897,11 @@
 			 * Reexecution is potentially a huge amount of work.
 			 * Hand it off to the otherwise-unused claim taskq.
 			 */
-			(void) taskq_dispatch(
+			ASSERT(zio->io_tqent.tqent_next == NULL);
+			(void) taskq_dispatch_ent(
 			    spa->spa_zio_taskq[ZIO_TYPE_CLAIM][ZIO_TASKQ_ISSUE],
-			    (task_func_t *)zio_reexecute, zio, TQ_SLEEP);
+			    (task_func_t *)zio_reexecute, zio, 0,
+			    &zio->io_tqent);
 		}
 		return (ZIO_PIPELINE_STOP);
 	}
--- a/usr/src/uts/common/os/taskq.c	Mon May 16 00:20:30 2011 +0100
+++ b/usr/src/uts/common/os/taskq.c	Wed Jul 27 07:13:44 2011 -0700
@@ -24,6 +24,10 @@
  */
 
 /*
+ * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
+ */
+
+/*
  * Kernel task queues: general-purpose asynchronous task scheduling.
  *
  * A common problem in kernel programming is the need to schedule tasks
@@ -184,6 +188,17 @@
  *		taskq_dispatch() (especially if TQ_NOQUEUE was specified), so it
  *		is important to have backup strategies handling such failures.
  *
+ * void taskq_dispatch_ent(tq, func, arg, flags, tqent)
+ *
+ *	This is a light-weight form of taskq_dispatch(), that uses a
+ *	preallocated taskq_ent_t structure for scheduling.  As a
+ *	result, it does not perform allocations and cannot ever fail.
+ *	Note especially that it cannot be used with TASKQ_DYNAMIC
+ *	taskqs.  The memory for the tqent must not be modified or used
+ *	until the function (func) is called.  (However, func itself
+ *	may safely modify or free this memory, once it is called.)
+ *	Note that the taskq framework will NOT free this memory.
+ *
  * void taskq_wait(tq):
  *
  *	Waits for all previously scheduled tasks to complete.
@@ -1118,7 +1133,6 @@
  *	    Actual return value is the pointer to taskq entry that was used to
  *	    dispatch a task. This is useful for debugging.
  */
-/* ARGSUSED */
 taskqid_t
 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
 {
@@ -1134,7 +1148,7 @@
 		/*
 		 * TQ_NOQUEUE flag can't be used with non-dynamic task queues.
 		 */
-		ASSERT(! (flags & TQ_NOQUEUE));
+		ASSERT(!(flags & TQ_NOQUEUE));
 		/*
 		 * Enqueue the task to the underlying queue.
 		 */
@@ -1146,6 +1160,9 @@
 			mutex_exit(&tq->tq_lock);
 			return (NULL);
 		}
+		/* Make sure we start without any flags */
+		tqe->tqent_un.tqent_flags = 0;
+
 		if (flags & TQ_FRONT) {
 			TQ_ENQUEUE_FRONT(tq, tqe, func, arg);
 		} else {
@@ -1273,6 +1290,31 @@
 	return ((taskqid_t)tqe);
 }
 
+void
+taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
+    taskq_ent_t *tqe)
+{
+	ASSERT(func != NULL);
+	ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
+
+	/*
+	 * Mark it as a prealloc'd task.  This is important
+	 * to ensure that we don't free it later.
+	 */
+	tqe->tqent_un.tqent_flags |= TQENT_FLAG_PREALLOC;
+	/*
+	 * Enqueue the task to the underlying queue.
+	 */
+	mutex_enter(&tq->tq_lock);
+
+	if (flags & TQ_FRONT) {
+		TQ_ENQUEUE_FRONT(tq, tqe, func, arg);
+	} else {
+		TQ_ENQUEUE(tq, tqe, func, arg);
+	}
+	mutex_exit(&tq->tq_lock);
+}
+
 /*
  * Wait for all pending tasks to complete.
  * Calling taskq_wait from a task will cause deadlock.
@@ -1460,6 +1502,7 @@
 	taskq_ent_t *tqe;
 	callb_cpr_t cprinfo;
 	hrtime_t start, end;
+	boolean_t freeit;
 
 	curthread->t_taskq = tq;	/* mark ourselves for taskq_member() */
 
@@ -1546,6 +1589,23 @@
 		tqe->tqent_next->tqent_prev = tqe->tqent_prev;
 		mutex_exit(&tq->tq_lock);
 
+		/*
+		 * For prealloc'd tasks, we don't free anything.  We
+		 * have to check this now, because once we call the
+		 * function for a prealloc'd taskq, we can't touch the
+		 * tqent any longer (calling the function returns the
+		 * ownershp of the tqent back to caller of
+		 * taskq_dispatch.)
+		 */
+		if ((!(tq->tq_flags & TASKQ_DYNAMIC)) &&
+		    (tqe->tqent_un.tqent_flags & TQENT_FLAG_PREALLOC)) {
+			/* clear pointers to assist assertion checks */
+			tqe->tqent_next = tqe->tqent_prev = NULL;
+			freeit = B_FALSE;
+		} else {
+			freeit = B_TRUE;
+		}
+
 		rw_enter(&tq->tq_threadlock, RW_READER);
 		start = gethrtime();
 		DTRACE_PROBE2(taskq__exec__start, taskq_t *, tq,
@@ -1560,7 +1620,8 @@
 		tq->tq_totaltime += end - start;
 		tq->tq_executed++;
 
-		taskq_ent_free(tq, tqe);
+		if (freeit)
+			taskq_ent_free(tq, tqe);
 	}
 
 	if (tq->tq_nthreads_max == 1)
@@ -1600,7 +1661,7 @@
 static void
 taskq_d_thread(taskq_ent_t *tqe)
 {
-	taskq_bucket_t	*bucket = tqe->tqent_bucket;
+	taskq_bucket_t	*bucket = tqe->tqent_un.tqent_bucket;
 	taskq_t		*tq = bucket->tqbucket_taskq;
 	kmutex_t	*lock = &bucket->tqbucket_lock;
 	kcondvar_t	*cv = &tqe->tqent_cv;
@@ -2115,7 +2176,7 @@
 
 	ASSERT(tqe->tqent_thread == NULL);
 
-	tqe->tqent_bucket = b;
+	tqe->tqent_un.tqent_bucket = b;
 
 	/*
 	 * Create a thread in a TS_STOPPED state first. If it is successfully
--- a/usr/src/uts/common/sys/taskq_impl.h	Mon May 16 00:20:30 2011 +0100
+++ b/usr/src/uts/common/sys/taskq_impl.h	Wed Jul 27 07:13:44 2011 -0700
@@ -22,6 +22,9 @@
  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
+/*
+ * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
+ */
 
 #ifndef	_SYS_TASKQ_IMPL_H
 #define	_SYS_TASKQ_IMPL_H
@@ -43,11 +46,16 @@
 	struct taskq_ent	*tqent_prev;
 	task_func_t		*tqent_func;
 	void			*tqent_arg;
-	taskq_bucket_t		*tqent_bucket;
+	union {
+		taskq_bucket_t	*tqent_bucket;
+		uintptr_t	tqent_flags;
+	}			tqent_un;
 	kthread_t		*tqent_thread;
 	kcondvar_t		tqent_cv;
 } taskq_ent_t;
 
+#define	TQENT_FLAG_PREALLOC	0x1
+
 /*
  * Taskq Statistics fields are not protected by any locks.
  */
@@ -141,6 +149,10 @@
 	int		tq_tdeaths;
 };
 
+/* Special form of taskq dispatch that uses preallocated entries. */
+void taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t, taskq_ent_t *);
+
+
 #define	tq_thread tq_thr._tq_thread
 #define	tq_threadlist tq_thr._tq_threadlist