usr/src/uts/common/os/strsubr.c
author masputra
Sat, 22 Oct 2005 22:50:14 -0700
changeset 741 40027a3621ac
parent 577 65796fd3c2db
child 898 64b2a371a6bd
permissions -rw-r--r--
PSARC 2005/082 Yosemite: UDP Performance Enhancement 4796051 Solaris needs a more complete HW checksumming support 4905227 duplicate macros in ipclassifier.h and ip.h 4915681 need hardware checksum offload for the case of IP/UDP reassembly 6201076 outbound flow-control dysfunctional, ip to ce using mdt 6223331 ipv6 flow control may corrupt UDP packets 6223809 16-bit aligned IP header should be allowed for all x86 platforms 6275398 Galaxy hangs when running lmbench 6281836 Yosemite project integration into Solaris 6281885 xge needs to support IPv6 checksum offload 6282776 IPv6 NCE fast path is not created for incoming solicitation 6304890 IP transmit-side checksum logic needs to be tightened 6304902 IP6_IN_NOCKSUM is obsolete and should be torched 6304904 UDP should reject TI_GETPEERNAME for non-connected endpoint 6306768 IP and UDP device and module definitions need to be centralized

/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
 * or http://www.opensolaris.org/os/licensing.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information: Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 */
/*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
/*	  All Rights Reserved  	*/


/*
 * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
 * Use is subject to license terms.
 */

#pragma ident	"%Z%%M%	%I%	%E% SMI"

#include <sys/types.h>
#include <sys/sysmacros.h>
#include <sys/param.h>
#include <sys/errno.h>
#include <sys/signal.h>
#include <sys/proc.h>
#include <sys/conf.h>
#include <sys/cred.h>
#include <sys/user.h>
#include <sys/vnode.h>
#include <sys/file.h>
#include <sys/session.h>
#include <sys/stream.h>
#include <sys/strsubr.h>
#include <sys/stropts.h>
#include <sys/poll.h>
#include <sys/systm.h>
#include <sys/cpuvar.h>
#include <sys/uio.h>
#include <sys/cmn_err.h>
#include <sys/priocntl.h>
#include <sys/procset.h>
#include <sys/vmem.h>
#include <sys/bitmap.h>
#include <sys/kmem.h>
#include <sys/siginfo.h>
#include <sys/vtrace.h>
#include <sys/callb.h>
#include <sys/debug.h>
#include <sys/modctl.h>
#include <sys/vmsystm.h>
#include <vm/page.h>
#include <sys/atomic.h>
#include <sys/suntpi.h>
#include <sys/strlog.h>
#include <sys/promif.h>
#include <sys/project.h>
#include <sys/vm.h>
#include <sys/taskq.h>
#include <sys/sunddi.h>
#include <sys/sunldi_impl.h>
#include <sys/strsun.h>
#include <sys/isa_defs.h>
#include <sys/multidata.h>
#include <sys/pattr.h>
#include <sys/strft.h>
#include <sys/fs/snode.h>
#include <sys/zone.h>

#define	O_SAMESTR(q)	(((q)->q_next) && \
	(((q)->q_flag & QREADR) == ((q)->q_next->q_flag & QREADR)))

/*
 * WARNING:
 * The variables and routines in this file are private, belonging
 * to the STREAMS subsystem. These should not be used by modules
 * or drivers. Compatibility will not be guaranteed.
 */

/*
 * Id value used to distinguish between different multiplexor links.
 */
static int32_t lnk_id = 0;

#define	STREAMS_LOPRI MINCLSYSPRI
static pri_t streams_lopri = STREAMS_LOPRI;

#define	STRSTAT(x)	(str_statistics.x.value.ui64++)
typedef struct str_stat {
	kstat_named_t	sqenables;
	kstat_named_t	stenables;
	kstat_named_t	syncqservice;
	kstat_named_t	freebs;
	kstat_named_t	qwr_outer;
	kstat_named_t	rservice;
	kstat_named_t	strwaits;
	kstat_named_t	taskqfails;
	kstat_named_t	bufcalls;
	kstat_named_t	qhelps;
	kstat_named_t	qremoved;
	kstat_named_t	sqremoved;
	kstat_named_t	bcwaits;
	kstat_named_t	sqtoomany;
} str_stat_t;

static str_stat_t str_statistics = {
	{ "sqenables",		KSTAT_DATA_UINT64 },
	{ "stenables",		KSTAT_DATA_UINT64 },
	{ "syncqservice",	KSTAT_DATA_UINT64 },
	{ "freebs",		KSTAT_DATA_UINT64 },
	{ "qwr_outer",		KSTAT_DATA_UINT64 },
	{ "rservice",		KSTAT_DATA_UINT64 },
	{ "strwaits",		KSTAT_DATA_UINT64 },
	{ "taskqfails",		KSTAT_DATA_UINT64 },
	{ "bufcalls",		KSTAT_DATA_UINT64 },
	{ "qhelps",		KSTAT_DATA_UINT64 },
	{ "qremoved",		KSTAT_DATA_UINT64 },
	{ "sqremoved",		KSTAT_DATA_UINT64 },
	{ "bcwaits",		KSTAT_DATA_UINT64 },
	{ "sqtoomany",		KSTAT_DATA_UINT64 },
};

static kstat_t *str_kstat;

/*
 * qrunflag was used previously to control background scheduling of queues. It
 * is not used anymore, but kept here in case some module still wants to access
 * it via qready() and setqsched macros.
 */
char qrunflag;			/*  Unused */

/*
 * Most of the streams scheduling is done via task queues. Task queues may fail
 * for non-sleep dispatches, so there are two backup threads servicing failed
 * requests for queues and syncqs. Both of these threads also service failed
 * dispatches freebs requests. Queues are put in the list specified by `qhead'
 * and `qtail' pointers, syncqs use `sqhead' and `sqtail' pointers and freebs
 * requests are put into `freebs_list' which has no tail pointer. All three
 * lists are protected by a single `service_queue' lock and use
 * `services_to_run' condition variable for signaling background threads. Use of
 * a single lock should not be a problem because it is only used under heavy
 * loads when task queues start to fail and at that time it may be a good idea
 * to throttle scheduling requests.
 *
 * NOTE: queues and syncqs should be scheduled by two separate threads because
 * queue servicing may be blocked waiting for a syncq which may be also
 * scheduled for background execution. This may create a deadlock when only one
 * thread is used for both.
 */

static taskq_t *streams_taskq;		/* Used for most STREAMS scheduling */

static kmutex_t service_queue;		/* protects all of servicing vars */
static kcondvar_t services_to_run;	/* wake up background service thread */
static kcondvar_t syncqs_to_run;	/* wake up background service thread */

/*
 * List of queues scheduled for background processing dueue to lack of resources
 * in the task queues. Protected by service_queue lock;
 */
static struct queue *qhead;
static struct queue *qtail;

/*
 * Same list for syncqs
 */
static syncq_t *sqhead;
static syncq_t *sqtail;

static mblk_t *freebs_list;	/* list of buffers to free */

/*
 * Backup threads for servicing queues and syncqs
 */
kthread_t *streams_qbkgrnd_thread;
kthread_t *streams_sqbkgrnd_thread;

/*
 * Bufcalls related variables.
 */
struct bclist	strbcalls;	/* list of waiting bufcalls */
kmutex_t	strbcall_lock;	/* protects bufcall list (strbcalls) */
kcondvar_t	strbcall_cv;	/* Signaling when a bufcall is added */
kmutex_t	bcall_monitor;	/* sleep/wakeup style monitor */
kcondvar_t	bcall_cv;	/* wait 'till executing bufcall completes */
kthread_t	*bc_bkgrnd_thread; /* Thread to service bufcall requests */

kmutex_t	strresources;	/* protects global resources */
kmutex_t	muxifier;	/* single-threads multiplexor creation */

extern void	time_to_wait(clock_t *, clock_t);

/*
 * run_queues is no longer used, but is kept in case some 3-d party
 * module/driver decides to use it.
 */
int run_queues = 0;

/*
 * sq_max_size is the depth of the syncq (in number of messages) before
 * qfill_syncq() starts QFULL'ing destination queues. As its primary
 * consumer - IP is no longer D_MTPERMOD, but there may be other
 * modules/drivers depend on this syncq flow control, we prefer to
 * choose a large number as the default value. For potential
 * performance gain, this value is tunable in /etc/system.
 */
int sq_max_size = 10000;

/*
 * the number of ciputctrl structures per syncq and stream we create when
 * needed.
 */
int n_ciputctrl;
int max_n_ciputctrl = 16;
/*
 * if n_ciputctrl is < min_n_ciputctrl don't even create ciputctrl_cache.
 */
int min_n_ciputctrl = 2;

static struct mux_node *mux_nodes;	/* mux info for cycle checking */

/*
 * Per-driver/module syncqs
 * ========================
 *
 * For drivers/modules that use PERMOD or outer syncqs we keep a list of
 * perdm structures, new entries being added (and new syncqs allocated) when
 * setq() encounters a module/driver with a streamtab that it hasn't seen
 * before.
 * The reason for this mechanism is that some modules and drivers share a
 * common streamtab and it is necessary for those modules and drivers to also
 * share a common PERMOD syncq.
 *
 * perdm_list --> dm_str == streamtab_1
 *                dm_sq == syncq_1
 *                dm_ref
 *                dm_next --> dm_str == streamtab_2
 *                            dm_sq == syncq_2
 *                            dm_ref
 *                            dm_next --> ... NULL
 *
 * The dm_ref field is incremented for each new driver/module that takes
 * a reference to the perdm structure and hence shares the syncq.
 * References are held in the fmodsw_impl_t structure for each STREAMS module
 * or the dev_impl array (indexed by device major number) for each driver.
 *
 * perdm_list -> [dm_ref == 1] -> [dm_ref == 2] -> [dm_ref == 1] -> NULL
 *		     ^                 ^ ^               ^
 *                   |  ______________/  |               |
 *                   | /                 |               |
 * dev_impl:     ...|x|y|...          module A	      module B
 *
 * When a module/driver is unloaded the reference count is decremented and,
 * when it falls to zero, the perdm structure is removed from the list and
 * the syncq is freed (see rele_dm()).
 */
perdm_t *perdm_list = NULL;
static krwlock_t perdm_rwlock;
cdevsw_impl_t *devimpl;

extern struct qinit strdata;
extern struct qinit stwdata;

static void runservice(queue_t *);
static void streams_bufcall_service(void);
static void streams_qbkgrnd_service(void);
static void streams_sqbkgrnd_service(void);
static syncq_t *new_syncq(void);
static void free_syncq(syncq_t *);
static void outer_insert(syncq_t *, syncq_t *);
static void outer_remove(syncq_t *, syncq_t *);
static void write_now(syncq_t *);
static void clr_qfull(queue_t *);
static void enable_svc(queue_t *);
static void runbufcalls(void);
static void sqenable(syncq_t *);
static void sqfill_events(syncq_t *, queue_t *, mblk_t *, void (*)());
static void wait_q_syncq(queue_t *);
static void backenable_insertedq(queue_t *);

static void queue_service(queue_t *);
static void stream_service(stdata_t *);
static void syncq_service(syncq_t *);
static void qwriter_outer_service(syncq_t *);
static void mblk_free(mblk_t *);
#ifdef DEBUG
static int qprocsareon(queue_t *);
#endif

static void set_nfsrv_ptr(queue_t *, queue_t *, queue_t *, queue_t *);
static void reset_nfsrv_ptr(queue_t *, queue_t *);

static void sq_run_events(syncq_t *);
static int propagate_syncq(queue_t *);

static void	blocksq(syncq_t *, ushort_t, int);
static void	unblocksq(syncq_t *, ushort_t, int);
static int	dropsq(syncq_t *, uint16_t);
static void	emptysq(syncq_t *);
static sqlist_t *sqlist_alloc(struct stdata *, int);
static void	sqlist_free(sqlist_t *);
static sqlist_t	*sqlist_build(queue_t *, struct stdata *, boolean_t);
static void	sqlist_insert(sqlist_t *, syncq_t *);
static void	sqlist_insertall(sqlist_t *, queue_t *);

static void	strsetuio(stdata_t *);

struct kmem_cache *stream_head_cache;
struct kmem_cache *queue_cache;
struct kmem_cache *syncq_cache;
struct kmem_cache *qband_cache;
struct kmem_cache *linkinfo_cache;
struct kmem_cache *ciputctrl_cache = NULL;

static linkinfo_t *linkinfo_list;

/*
 *  Qinit structure and Module_info structures
 *	for passthru read and write queues
 */

static void pass_wput(queue_t *, mblk_t *);
static queue_t *link_addpassthru(stdata_t *);
static void link_rempassthru(queue_t *);

struct  module_info passthru_info = {
	0,
	"passthru",
	0,
	INFPSZ,
	STRHIGH,
	STRLOW
};

struct  qinit passthru_rinit = {
	(int (*)())putnext,
	NULL,
	NULL,
	NULL,
	NULL,
	&passthru_info,
	NULL
};

struct  qinit passthru_winit = {
	(int (*)()) pass_wput,
	NULL,
	NULL,
	NULL,
	NULL,
	&passthru_info,
	NULL
};

/*
 * Special form of assertion: verify that X implies Y i.e. when X is true Y
 * should also be true.
 */
#define	IMPLY(X, Y)	ASSERT(!(X) || (Y))

/*
 * Logical equivalence. Verify that both X and Y are either TRUE or FALSE.
 */
#define	EQUIV(X, Y)	{ IMPLY(X, Y); IMPLY(Y, X); }

/*
 * Verify correctness of list head/tail pointers.
 */
#define	LISTCHECK(head, tail, link) {				\
	EQUIV(head, tail);					\
	IMPLY(tail != NULL, tail->link == NULL);		\
}

/*
 * Enqueue a list element `el' in the end of a list denoted by `head' and `tail'
 * using a `link' field.
 */
#define	ENQUEUE(el, head, tail, link) {				\
	ASSERT(el->link == NULL);				\
	LISTCHECK(head, tail, link);				\
	if (head == NULL)					\
		head = el;					\
	else							\
		tail->link = el;				\
	tail = el;						\
}

/*
 * Dequeue the first element of the list denoted by `head' and `tail' pointers
 * using a `link' field and put result into `el'.
 */
#define	DQ(el, head, tail, link) {				\
	LISTCHECK(head, tail, link);				\
	el = head;						\
	if (head != NULL) {					\
		head = head->link;				\
		if (head == NULL)				\
			tail = NULL;				\
		el->link = NULL;				\
	}							\
}

/*
 * Remove `el' from the list using `chase' and `curr' pointers and return result
 * in `succeed'.
 */
#define	RMQ(el, head, tail, link, chase, curr, succeed) {	\
	LISTCHECK(head, tail, link);				\
	chase = NULL;						\
	succeed = 0;						\
	for (curr = head; (curr != el) && (curr != NULL); curr = curr->link) \
		chase = curr;					\
	if (curr != NULL) {					\
		succeed = 1;					\
		ASSERT(curr == el);				\
		if (chase != NULL)				\
			chase->link = curr->link;		\
		else						\
			head = curr->link;			\
		curr->link = NULL;				\
		if (curr == tail)				\
			tail = chase;				\
	}							\
	LISTCHECK(head, tail, link);				\
}

/* Handling of delayed messages on the inner syncq. */

/*
 * DEBUG versions should use function versions (to simplify tracing) and
 * non-DEBUG kernels should use macro versions.
 */

/*
 * Put a queue on the syncq list of queues.
 * Assumes SQLOCK held.
 */
#define	SQPUT_Q(sq, qp)							\
{									\
	ASSERT(MUTEX_HELD(SQLOCK(sq)));					\
	if (!(qp->q_sqflags & Q_SQQUEUED)) {				\
		/* The queue should not be linked anywhere */		\
		ASSERT((qp->q_sqprev == NULL) && (qp->q_sqnext == NULL)); \
		/* Head and tail may only be NULL simultaneously */	\
		EQUIV(sq->sq_head, sq->sq_tail);			\
		/* Queue may be only enqueyed on its syncq */		\
		ASSERT(sq == qp->q_syncq);				\
		/* Check the correctness of SQ_MESSAGES flag */		\
		EQUIV(sq->sq_head, (sq->sq_flags & SQ_MESSAGES));	\
		/* Sanity check first/last elements of the list */	\
		IMPLY(sq->sq_head != NULL, sq->sq_head->q_sqprev == NULL);\
		IMPLY(sq->sq_tail != NULL, sq->sq_tail->q_sqnext == NULL);\
		/*							\
		 * Sanity check of priority field: empty queue should	\
		 * have zero priority					\
		 * and nqueues equal to zero.				\
		 */							\
		IMPLY(sq->sq_head == NULL, sq->sq_pri == 0);		\
		/* Sanity check of sq_nqueues field */			\
		EQUIV(sq->sq_head, sq->sq_nqueues);			\
		if (sq->sq_head == NULL) {				\
			sq->sq_head = sq->sq_tail = qp;			\
			sq->sq_flags |= SQ_MESSAGES;			\
		} else if (qp->q_spri == 0) {				\
			qp->q_sqprev = sq->sq_tail;			\
			sq->sq_tail->q_sqnext = qp;			\
			sq->sq_tail = qp;				\
		} else {						\
			/*						\
			 * Put this queue in priority order: higher	\
			 * priority gets closer to the head.		\
			 */						\
			queue_t **qpp = &sq->sq_tail;			\
			queue_t *qnext = NULL;				\
									\
			while (*qpp != NULL && qp->q_spri > (*qpp)->q_spri) { \
				qnext = *qpp;				\
				qpp = &(*qpp)->q_sqprev;		\
			}						\
			qp->q_sqnext = qnext;				\
			qp->q_sqprev = *qpp;				\
			if (*qpp != NULL) {				\
				(*qpp)->q_sqnext = qp;			\
			} else {					\
				sq->sq_head = qp;			\
				sq->sq_pri = sq->sq_head->q_spri;	\
			}						\
			*qpp = qp;					\
		}							\
		qp->q_sqflags |= Q_SQQUEUED;				\
		qp->q_sqtstamp = lbolt;					\
		sq->sq_nqueues++;					\
	}								\
}

/*
 * Remove a queue from the syncq list
 * Assumes SQLOCK held.
 */
#define	SQRM_Q(sq, qp)							\
	{								\
		ASSERT(MUTEX_HELD(SQLOCK(sq)));				\
		ASSERT(qp->q_sqflags & Q_SQQUEUED);			\
		ASSERT(sq->sq_head != NULL && sq->sq_tail != NULL);	\
		ASSERT((sq->sq_flags & SQ_MESSAGES) != 0);		\
		/* Check that the queue is actually in the list */	\
		ASSERT(qp->q_sqnext != NULL || sq->sq_tail == qp);	\
		ASSERT(qp->q_sqprev != NULL || sq->sq_head == qp);	\
		ASSERT(sq->sq_nqueues != 0);				\
		if (qp->q_sqprev == NULL) {				\
			/* First queue on list, make head q_sqnext */	\
			sq->sq_head = qp->q_sqnext;			\
		} else {						\
			/* Make prev->next == next */			\
			qp->q_sqprev->q_sqnext = qp->q_sqnext;		\
		}							\
		if (qp->q_sqnext == NULL) {				\
			/* Last queue on list, make tail sqprev */	\
			sq->sq_tail = qp->q_sqprev;			\
		} else {						\
			/* Make next->prev == prev */			\
			qp->q_sqnext->q_sqprev = qp->q_sqprev;		\
		}							\
		/* clear out references on this queue */		\
		qp->q_sqprev = qp->q_sqnext = NULL;			\
		qp->q_sqflags &= ~Q_SQQUEUED;				\
		/* If there is nothing queued, clear SQ_MESSAGES */	\
		if (sq->sq_head != NULL) {				\
			sq->sq_pri = sq->sq_head->q_spri;		\
		} else	{						\
			sq->sq_flags &= ~SQ_MESSAGES;			\
			sq->sq_pri = 0;					\
		}							\
		sq->sq_nqueues--;					\
		ASSERT(sq->sq_head != NULL || sq->sq_evhead != NULL ||	\
		    (sq->sq_flags & SQ_QUEUED) == 0);			\
	}

/* Hide the definition from the header file. */
#ifdef SQPUT_MP
#undef SQPUT_MP
#endif

/*
 * Put a message on the queue syncq.
 * Assumes QLOCK held.
 */
#define	SQPUT_MP(qp, mp)						\
	{								\
		ASSERT(MUTEX_HELD(QLOCK(qp)));				\
		ASSERT(qp->q_sqhead == NULL ||				\
		    (qp->q_sqtail != NULL &&				\
		    qp->q_sqtail->b_next == NULL));			\
		qp->q_syncqmsgs++;					\
		ASSERT(qp->q_syncqmsgs != 0);	/* Wraparound */	\
		if (qp->q_sqhead == NULL) {				\
			qp->q_sqhead = qp->q_sqtail = mp;		\
		} else {						\
			qp->q_sqtail->b_next = mp;			\
			qp->q_sqtail = mp;				\
		}							\
		ASSERT(qp->q_syncqmsgs > 0);				\
	}

#define	SQ_PUTCOUNT_SETFAST_LOCKED(sq) {				\
		ASSERT(MUTEX_HELD(SQLOCK(sq)));				\
		if ((sq)->sq_ciputctrl != NULL) {			\
			int i;						\
			int nlocks = (sq)->sq_nciputctrl;		\
			ciputctrl_t *cip = (sq)->sq_ciputctrl;		\
			ASSERT((sq)->sq_type & SQ_CIPUT);		\
			for (i = 0; i <= nlocks; i++) {			\
				ASSERT(MUTEX_HELD(&cip[i].ciputctrl_lock)); \
				cip[i].ciputctrl_count |= SQ_FASTPUT;	\
			}						\
		}							\
	}


#define	SQ_PUTCOUNT_CLRFAST_LOCKED(sq) {				\
		ASSERT(MUTEX_HELD(SQLOCK(sq)));				\
		if ((sq)->sq_ciputctrl != NULL) {			\
			int i;						\
			int nlocks = (sq)->sq_nciputctrl;		\
			ciputctrl_t *cip = (sq)->sq_ciputctrl;		\
			ASSERT((sq)->sq_type & SQ_CIPUT);		\
			for (i = 0; i <= nlocks; i++) {			\
				ASSERT(MUTEX_HELD(&cip[i].ciputctrl_lock)); \
				cip[i].ciputctrl_count &= ~SQ_FASTPUT;	\
			}						\
		}							\
	}

/*
 * Run service procedures for all queues in the stream head.
 */
#define	STR_SERVICE(stp, q) {						\
	ASSERT(MUTEX_HELD(&stp->sd_qlock));				\
	while (stp->sd_qhead != NULL) {					\
		DQ(q, stp->sd_qhead, stp->sd_qtail, q_link);		\
		ASSERT(stp->sd_nqueues > 0);				\
		stp->sd_nqueues--;					\
		ASSERT(!(q->q_flag & QINSERVICE));			\
		mutex_exit(&stp->sd_qlock);				\
		queue_service(q);					\
		mutex_enter(&stp->sd_qlock);				\
	}								\
	ASSERT(stp->sd_nqueues == 0);					\
	ASSERT((stp->sd_qhead == NULL) && (stp->sd_qtail == NULL));	\
}

/*
 * constructor/destructor routines for the stream head cache
 */
/* ARGSUSED */
static int
stream_head_constructor(void *buf, void *cdrarg, int kmflags)
{
	stdata_t *stp = buf;

	mutex_init(&stp->sd_lock, NULL, MUTEX_DEFAULT, NULL);
	mutex_init(&stp->sd_reflock, NULL, MUTEX_DEFAULT, NULL);
	mutex_init(&stp->sd_qlock, NULL, MUTEX_DEFAULT, NULL);
	cv_init(&stp->sd_monitor, NULL, CV_DEFAULT, NULL);
	cv_init(&stp->sd_iocmonitor, NULL, CV_DEFAULT, NULL);
	cv_init(&stp->sd_refmonitor, NULL, CV_DEFAULT, NULL);
	cv_init(&stp->sd_qcv, NULL, CV_DEFAULT, NULL);
	cv_init(&stp->sd_zcopy_wait, NULL, CV_DEFAULT, NULL);
	stp->sd_wrq = NULL;

	return (0);
}

/* ARGSUSED */
static void
stream_head_destructor(void *buf, void *cdrarg)
{
	stdata_t *stp = buf;

	mutex_destroy(&stp->sd_lock);
	mutex_destroy(&stp->sd_reflock);
	mutex_destroy(&stp->sd_qlock);
	cv_destroy(&stp->sd_monitor);
	cv_destroy(&stp->sd_iocmonitor);
	cv_destroy(&stp->sd_refmonitor);
	cv_destroy(&stp->sd_qcv);
	cv_destroy(&stp->sd_zcopy_wait);
}

/*
 * constructor/destructor routines for the queue cache
 */
/* ARGSUSED */
static int
queue_constructor(void *buf, void *cdrarg, int kmflags)
{
	queinfo_t *qip = buf;
	queue_t *qp = &qip->qu_rqueue;
	queue_t *wqp = &qip->qu_wqueue;
	syncq_t	*sq = &qip->qu_syncq;

	qp->q_first = NULL;
	qp->q_link = NULL;
	qp->q_count = 0;
	qp->q_mblkcnt = 0;
	qp->q_sqhead = NULL;
	qp->q_sqtail = NULL;
	qp->q_sqnext = NULL;
	qp->q_sqprev = NULL;
	qp->q_sqflags = 0;
	qp->q_rwcnt = 0;
	qp->q_spri = 0;

	mutex_init(QLOCK(qp), NULL, MUTEX_DEFAULT, NULL);
	cv_init(&qp->q_wait, NULL, CV_DEFAULT, NULL);

	wqp->q_first = NULL;
	wqp->q_link = NULL;
	wqp->q_count = 0;
	wqp->q_mblkcnt = 0;
	wqp->q_sqhead = NULL;
	wqp->q_sqtail = NULL;
	wqp->q_sqnext = NULL;
	wqp->q_sqprev = NULL;
	wqp->q_sqflags = 0;
	wqp->q_rwcnt = 0;
	wqp->q_spri = 0;

	mutex_init(QLOCK(wqp), NULL, MUTEX_DEFAULT, NULL);
	cv_init(&wqp->q_wait, NULL, CV_DEFAULT, NULL);

	sq->sq_head = NULL;
	sq->sq_tail = NULL;
	sq->sq_evhead = NULL;
	sq->sq_evtail = NULL;
	sq->sq_callbpend = NULL;
	sq->sq_outer = NULL;
	sq->sq_onext = NULL;
	sq->sq_oprev = NULL;
	sq->sq_next = NULL;
	sq->sq_svcflags = 0;
	sq->sq_servcount = 0;
	sq->sq_needexcl = 0;
	sq->sq_nqueues = 0;
	sq->sq_pri = 0;

	mutex_init(&sq->sq_lock, NULL, MUTEX_DEFAULT, NULL);
	cv_init(&sq->sq_wait, NULL, CV_DEFAULT, NULL);
	cv_init(&sq->sq_exitwait, NULL, CV_DEFAULT, NULL);

	return (0);
}

/* ARGSUSED */
static void
queue_destructor(void *buf, void *cdrarg)
{
	queinfo_t *qip = buf;
	queue_t *qp = &qip->qu_rqueue;
	queue_t *wqp = &qip->qu_wqueue;
	syncq_t	*sq = &qip->qu_syncq;

	ASSERT(qp->q_sqhead == NULL);
	ASSERT(wqp->q_sqhead == NULL);
	ASSERT(qp->q_sqnext == NULL);
	ASSERT(wqp->q_sqnext == NULL);
	ASSERT(qp->q_rwcnt == 0);
	ASSERT(wqp->q_rwcnt == 0);

	mutex_destroy(&qp->q_lock);
	cv_destroy(&qp->q_wait);

	mutex_destroy(&wqp->q_lock);
	cv_destroy(&wqp->q_wait);

	mutex_destroy(&sq->sq_lock);
	cv_destroy(&sq->sq_wait);
	cv_destroy(&sq->sq_exitwait);
}

/*
 * constructor/destructor routines for the syncq cache
 */
/* ARGSUSED */
static int
syncq_constructor(void *buf, void *cdrarg, int kmflags)
{
	syncq_t	*sq = buf;

	bzero(buf, sizeof (syncq_t));

	mutex_init(&sq->sq_lock, NULL, MUTEX_DEFAULT, NULL);
	cv_init(&sq->sq_wait, NULL, CV_DEFAULT, NULL);
	cv_init(&sq->sq_exitwait, NULL, CV_DEFAULT, NULL);

	return (0);
}

/* ARGSUSED */
static void
syncq_destructor(void *buf, void *cdrarg)
{
	syncq_t	*sq = buf;

	ASSERT(sq->sq_head == NULL);
	ASSERT(sq->sq_tail == NULL);
	ASSERT(sq->sq_evhead == NULL);
	ASSERT(sq->sq_evtail == NULL);
	ASSERT(sq->sq_callbpend == NULL);
	ASSERT(sq->sq_callbflags == 0);
	ASSERT(sq->sq_outer == NULL);
	ASSERT(sq->sq_onext == NULL);
	ASSERT(sq->sq_oprev == NULL);
	ASSERT(sq->sq_next == NULL);
	ASSERT(sq->sq_needexcl == 0);
	ASSERT(sq->sq_svcflags == 0);
	ASSERT(sq->sq_servcount == 0);
	ASSERT(sq->sq_nqueues == 0);
	ASSERT(sq->sq_pri == 0);
	ASSERT(sq->sq_count == 0);
	ASSERT(sq->sq_rmqcount == 0);
	ASSERT(sq->sq_cancelid == 0);
	ASSERT(sq->sq_ciputctrl == NULL);
	ASSERT(sq->sq_nciputctrl == 0);
	ASSERT(sq->sq_type == 0);
	ASSERT(sq->sq_flags == 0);

	mutex_destroy(&sq->sq_lock);
	cv_destroy(&sq->sq_wait);
	cv_destroy(&sq->sq_exitwait);
}

/* ARGSUSED */
static int
ciputctrl_constructor(void *buf, void *cdrarg, int kmflags)
{
	ciputctrl_t *cip = buf;
	int i;

	for (i = 0; i < n_ciputctrl; i++) {
		cip[i].ciputctrl_count = SQ_FASTPUT;
		mutex_init(&cip[i].ciputctrl_lock, NULL, MUTEX_DEFAULT, NULL);
	}

	return (0);
}

/* ARGSUSED */
static void
ciputctrl_destructor(void *buf, void *cdrarg)
{
	ciputctrl_t *cip = buf;
	int i;

	for (i = 0; i < n_ciputctrl; i++) {
		ASSERT(cip[i].ciputctrl_count & SQ_FASTPUT);
		mutex_destroy(&cip[i].ciputctrl_lock);
	}
}

/*
 * Init routine run from main at boot time.
 */
void
strinit(void)
{
	int i;
	int ncpus = ((boot_max_ncpus == -1) ? max_ncpus : boot_max_ncpus);

	/*
	 * Set up mux_node structures.
	 */
	mux_nodes = kmem_zalloc((sizeof (struct mux_node) * devcnt), KM_SLEEP);
	for (i = 0; i < devcnt; i++)
		mux_nodes[i].mn_imaj = i;

	stream_head_cache = kmem_cache_create("stream_head_cache",
		sizeof (stdata_t), 0,
		stream_head_constructor, stream_head_destructor, NULL,
		NULL, NULL, 0);

	queue_cache = kmem_cache_create("queue_cache", sizeof (queinfo_t), 0,
		queue_constructor, queue_destructor, NULL, NULL, NULL, 0);

	syncq_cache = kmem_cache_create("syncq_cache", sizeof (syncq_t), 0,
		syncq_constructor, syncq_destructor, NULL, NULL, NULL, 0);

	qband_cache = kmem_cache_create("qband_cache",
		sizeof (qband_t), 0, NULL, NULL, NULL, NULL, NULL, 0);

	linkinfo_cache = kmem_cache_create("linkinfo_cache",
		sizeof (linkinfo_t), 0, NULL, NULL, NULL, NULL, NULL, 0);

	n_ciputctrl = ncpus;
	n_ciputctrl = 1 << highbit(n_ciputctrl - 1);
	ASSERT(n_ciputctrl >= 1);
	n_ciputctrl = MIN(n_ciputctrl, max_n_ciputctrl);
	if (n_ciputctrl >= min_n_ciputctrl) {
		ciputctrl_cache = kmem_cache_create("ciputctrl_cache",
			sizeof (ciputctrl_t) * n_ciputctrl,
			sizeof (ciputctrl_t), ciputctrl_constructor,
			ciputctrl_destructor, NULL, NULL, NULL, 0);
	}

	streams_taskq = system_taskq;

	if (streams_taskq == NULL)
		panic("strinit: no memory for streams taskq!");

	bc_bkgrnd_thread = thread_create(NULL, 0,
	    streams_bufcall_service, NULL, 0, &p0, TS_RUN, streams_lopri);

	streams_qbkgrnd_thread = thread_create(NULL, 0,
	    streams_qbkgrnd_service, NULL, 0, &p0, TS_RUN, streams_lopri);

	streams_sqbkgrnd_thread = thread_create(NULL, 0,
	    streams_sqbkgrnd_service, NULL, 0, &p0, TS_RUN, streams_lopri);

	/*
	 * Create STREAMS kstats.
	 */
	str_kstat = kstat_create("streams", 0, "strstat",
	    "net", KSTAT_TYPE_NAMED,
	    sizeof (str_statistics) / sizeof (kstat_named_t),
	    KSTAT_FLAG_VIRTUAL);

	if (str_kstat != NULL) {
		str_kstat->ks_data = &str_statistics;
		kstat_install(str_kstat);
	}

	/*
	 * TPI support routine initialisation.
	 */
	tpi_init();
}

void
str_sendsig(vnode_t *vp, int event, uchar_t band, int error)
{
	struct stdata *stp;

	ASSERT(vp->v_stream);
	stp = vp->v_stream;
	/* Have to hold sd_lock to prevent siglist from changing */
	mutex_enter(&stp->sd_lock);
	if (stp->sd_sigflags & event)
		strsendsig(stp->sd_siglist, event, band, error);
	mutex_exit(&stp->sd_lock);
}

/*
 * Send the "sevent" set of signals to a process.
 * This might send more than one signal if the process is registered
 * for multiple events. The caller should pass in an sevent that only
 * includes the events for which the process has registered.
 */
static void
dosendsig(proc_t *proc, int events, int sevent, k_siginfo_t *info,
	uchar_t band, int error)
{
	ASSERT(MUTEX_HELD(&proc->p_lock));

	info->si_band = 0;
	info->si_errno = 0;

	if (sevent & S_ERROR) {
		sevent &= ~S_ERROR;
		info->si_code = POLL_ERR;
		info->si_errno = error;
		TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
			"strsendsig:proc %p info %p", proc, info);
		sigaddq(proc, NULL, info, KM_NOSLEEP);
		info->si_errno = 0;
	}
	if (sevent & S_HANGUP) {
		sevent &= ~S_HANGUP;
		info->si_code = POLL_HUP;
		TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
			"strsendsig:proc %p info %p", proc, info);
		sigaddq(proc, NULL, info, KM_NOSLEEP);
	}
	if (sevent & S_HIPRI) {
		sevent &= ~S_HIPRI;
		info->si_code = POLL_PRI;
		TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
			"strsendsig:proc %p info %p", proc, info);
		sigaddq(proc, NULL, info, KM_NOSLEEP);
	}
	if (sevent & S_RDBAND) {
		sevent &= ~S_RDBAND;
		if (events & S_BANDURG)
			sigtoproc(proc, NULL, SIGURG);
		else
			sigtoproc(proc, NULL, SIGPOLL);
	}
	if (sevent & S_WRBAND) {
		sevent &= ~S_WRBAND;
		sigtoproc(proc, NULL, SIGPOLL);
	}
	if (sevent & S_INPUT) {
		sevent &= ~S_INPUT;
		info->si_code = POLL_IN;
		info->si_band = band;
		TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
			"strsendsig:proc %p info %p", proc, info);
		sigaddq(proc, NULL, info, KM_NOSLEEP);
		info->si_band = 0;
	}
	if (sevent & S_OUTPUT) {
		sevent &= ~S_OUTPUT;
		info->si_code = POLL_OUT;
		info->si_band = band;
		TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
			"strsendsig:proc %p info %p", proc, info);
		sigaddq(proc, NULL, info, KM_NOSLEEP);
		info->si_band = 0;
	}
	if (sevent & S_MSG) {
		sevent &= ~S_MSG;
		info->si_code = POLL_MSG;
		info->si_band = band;
		TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
			"strsendsig:proc %p info %p", proc, info);
		sigaddq(proc, NULL, info, KM_NOSLEEP);
		info->si_band = 0;
	}
	if (sevent & S_RDNORM) {
		sevent &= ~S_RDNORM;
		sigtoproc(proc, NULL, SIGPOLL);
	}
	if (sevent != 0) {
		panic("strsendsig: unknown event(s) %x", sevent);
	}
}

/*
 * Send SIGPOLL/SIGURG signal to all processes and process groups
 * registered on the given signal list that want a signal for at
 * least one of the specified events.
 *
 * Must be called with exclusive access to siglist (caller holding sd_lock).
 *
 * strioctl(I_SETSIG/I_ESETSIG) will only change siglist when holding
 * sd_lock and the ioctl code maintains a PID_HOLD on the pid structure
 * while it is in the siglist.
 *
 * For performance reasons (MP scalability) the code drops pidlock
 * when sending signals to a single process.
 * When sending to a process group the code holds
 * pidlock to prevent the membership in the process group from changing
 * while walking the p_pglink list.
 */
void
strsendsig(strsig_t *siglist, int event, uchar_t band, int error)
{
	strsig_t *ssp;
	k_siginfo_t info;
	struct pid *pidp;
	proc_t  *proc;

	info.si_signo = SIGPOLL;
	info.si_errno = 0;
	for (ssp = siglist; ssp; ssp = ssp->ss_next) {
		int sevent;

		sevent = ssp->ss_events & event;
		if (sevent == 0)
			continue;

		if ((pidp = ssp->ss_pidp) == NULL) {
			/* pid was released but still on event list */
			continue;
		}


		if (ssp->ss_pid > 0) {
			/*
			 * XXX This unfortunately still generates
			 * a signal when a fd is closed but
			 * the proc is active.
			 */
			ASSERT(ssp->ss_pid == pidp->pid_id);

			mutex_enter(&pidlock);
			proc = prfind_zone(pidp->pid_id, ALL_ZONES);
			if (proc == NULL) {
				mutex_exit(&pidlock);
				continue;
			}
			mutex_enter(&proc->p_lock);
			mutex_exit(&pidlock);
			dosendsig(proc, ssp->ss_events, sevent, &info,
				band, error);
			mutex_exit(&proc->p_lock);
		} else {
			/*
			 * Send to process group. Hold pidlock across
			 * calls to dosendsig().
			 */
			pid_t pgrp = -ssp->ss_pid;

			mutex_enter(&pidlock);
			proc = pgfind_zone(pgrp, ALL_ZONES);
			while (proc != NULL) {
				mutex_enter(&proc->p_lock);
				dosendsig(proc, ssp->ss_events, sevent,
					&info, band, error);
				mutex_exit(&proc->p_lock);
				proc = proc->p_pglink;
			}
			mutex_exit(&pidlock);
		}
	}
}

/*
 * Attach a stream device or module.
 * qp is a read queue; the new queue goes in so its next
 * read ptr is the argument, and the write queue corresponding
 * to the argument points to this queue. Return 0 on success,
 * or a non-zero errno on failure.
 */
int
qattach(queue_t *qp, dev_t *devp, int oflag, cred_t *crp, fmodsw_impl_t *fp,
    boolean_t is_insert)
{
	major_t			major;
	cdevsw_impl_t		*dp;
	struct streamtab	*str;
	queue_t			*rq;
	queue_t			*wrq;
	uint32_t		qflag;
	uint32_t		sqtype;
	perdm_t			*dmp;
	int			error;
	int			sflag;

	rq = allocq();
	wrq = _WR(rq);
	STREAM(rq) = STREAM(wrq) = STREAM(qp);

	if (fp != NULL) {
		str = fp->f_str;
		qflag = fp->f_qflag;
		sqtype = fp->f_sqtype;
		dmp = fp->f_dmp;
		IMPLY((qflag & (QPERMOD | QMTOUTPERIM)), dmp != NULL);
		sflag = MODOPEN;

		/*
		 * stash away a pointer to the module structure so we can
		 * unref it in qdetach.
		 */
		rq->q_fp = fp;
	} else {
		ASSERT(!is_insert);

		major = getmajor(*devp);
		dp = &devimpl[major];

		str = dp->d_str;
		ASSERT(str == STREAMSTAB(major));

		qflag = dp->d_qflag;
		ASSERT(qflag & QISDRV);
		sqtype = dp->d_sqtype;

		/* create perdm_t if needed */
		if (NEED_DM(dp->d_dmp, qflag))
			dp->d_dmp = hold_dm(str, qflag, sqtype);

		dmp = dp->d_dmp;
		sflag = 0;
	}

	TRACE_2(TR_FAC_STREAMS_FR, TR_QATTACH_FLAGS,
	    "qattach:qflag == %X(%X)", qflag, *devp);

	/* setq might sleep in allocator - avoid holding locks. */
	setq(rq, str->st_rdinit, str->st_wrinit, dmp, qflag, sqtype, B_FALSE);

	/*
	 * Before calling the module's open routine, set up the q_next
	 * pointer for inserting a module in the middle of a stream.
	 *
	 * Note that we can always set _QINSERTING and set up q_next
	 * pointer for both inserting and pushing a module.  Then there
	 * is no need for the is_insert parameter.  In insertq(), called
	 * by qprocson(), assume that q_next of the new module always points
	 * to the correct queue and use it for insertion.  Everything should
	 * work out fine.  But in the first release of _I_INSERT, we
	 * distinguish between inserting and pushing to make sure that
	 * pushing a module follows the same code path as before.
	 */
	if (is_insert) {
		rq->q_flag |= _QINSERTING;
		rq->q_next = qp;
	}

	/*
	 * If there is an outer perimeter get exclusive access during
	 * the open procedure.  Bump up the reference count on the queue.
	 */
	entersq(rq->q_syncq, SQ_OPENCLOSE);
	error = (*rq->q_qinfo->qi_qopen)(rq, devp, oflag, sflag, crp);
	if (error != 0)
		goto failed;
	leavesq(rq->q_syncq, SQ_OPENCLOSE);
	ASSERT(qprocsareon(rq));
	return (0);

failed:
	rq->q_flag &= ~_QINSERTING;
	if (backq(wrq) != NULL && backq(wrq)->q_next == wrq)
		qprocsoff(rq);
	leavesq(rq->q_syncq, SQ_OPENCLOSE);
	rq->q_next = wrq->q_next = NULL;
	qdetach(rq, 0, 0, crp, B_FALSE);
	return (error);
}

/*
 * Handle second open of stream. For modules, set the
 * last argument to MODOPEN and do not pass any open flags.
 * Ignore dummydev since this is not the first open.
 */
int
qreopen(queue_t *qp, dev_t *devp, int flag, cred_t *crp)
{
	int	error;
	dev_t dummydev;
	queue_t *wqp = _WR(qp);

	ASSERT(qp->q_flag & QREADR);
	entersq(qp->q_syncq, SQ_OPENCLOSE);

	dummydev = *devp;
	if (error = ((*qp->q_qinfo->qi_qopen)(qp, &dummydev,
	    (wqp->q_next ? 0 : flag), (wqp->q_next ? MODOPEN : 0), crp))) {
		leavesq(qp->q_syncq, SQ_OPENCLOSE);
		mutex_enter(&STREAM(qp)->sd_lock);
		qp->q_stream->sd_flag |= STREOPENFAIL;
		mutex_exit(&STREAM(qp)->sd_lock);
		return (error);
	}
	leavesq(qp->q_syncq, SQ_OPENCLOSE);

	/*
	 * successful open should have done qprocson()
	 */
	ASSERT(qprocsareon(_RD(qp)));
	return (0);
}

/*
 * Detach a stream module or device.
 * If clmode == 1 then the module or driver was opened and its
 * close routine must be called. If clmode == 0, the module
 * or driver was never opened or the open failed, and so its close
 * should not be called.
 */
void
qdetach(queue_t *qp, int clmode, int flag, cred_t *crp, boolean_t is_remove)
{
	queue_t *wqp = _WR(qp);
	ASSERT(STREAM(qp)->sd_flag & (STRCLOSE|STWOPEN|STRPLUMB));

	if (STREAM_NEEDSERVICE(STREAM(qp)))
		stream_runservice(STREAM(qp));

	if (clmode) {
		/*
		 * Make sure that all the messages on the write side syncq are
		 * processed and nothing is left. Since we are closing, no new
		 * messages may appear there.
		 */
		wait_q_syncq(wqp);

		entersq(qp->q_syncq, SQ_OPENCLOSE);
		if (is_remove) {
			mutex_enter(QLOCK(qp));
			qp->q_flag |= _QREMOVING;
			mutex_exit(QLOCK(qp));
		}
		(*qp->q_qinfo->qi_qclose)(qp, flag, crp);
		/*
		 * Check that qprocsoff() was actually called.
		 */
		ASSERT((qp->q_flag & QWCLOSE) && (wqp->q_flag & QWCLOSE));

		leavesq(qp->q_syncq, SQ_OPENCLOSE);
	} else {
		disable_svc(qp);
	}

	/*
	 * Allow any threads blocked in entersq to proceed and discover
	 * the QWCLOSE is set.
	 * Note: This assumes that all users of entersq check QWCLOSE.
	 * Currently runservice is the only entersq that can happen
	 * after removeq has finished.
	 * Removeq will have discarded all messages destined to the closing
	 * pair of queues from the syncq.
	 * NOTE: Calling a function inside an assert is unconventional.
	 * However, it does not cause any problem since flush_syncq() does
	 * not change any state except when it returns non-zero i.e.
	 * when the assert will trigger.
	 */
	ASSERT(flush_syncq(qp->q_syncq, qp) == 0);
	ASSERT(flush_syncq(wqp->q_syncq, wqp) == 0);
	ASSERT((qp->q_flag & QPERMOD) ||
		((qp->q_syncq->sq_head == NULL) &&
		(wqp->q_syncq->sq_head == NULL)));

	/*
	 * Flush the queues before q_next is set to NULL. This is needed
	 * in order to backenable any downstream queue before we go away.
	 * Note: we are already removed from the stream so that the
	 * backenabling will not cause any messages to be delivered to our
	 * put procedures.
	 */
	flushq(qp, FLUSHALL);
	flushq(wqp, FLUSHALL);

	/*
	 * wait for any pending service processing to complete
	 */
	wait_svc(qp);

	/* Tidy up - removeq only does a half-remove from stream */
	qp->q_next = wqp->q_next = NULL;
	ASSERT(!(qp->q_flag & QENAB));
	ASSERT(!(wqp->q_flag & QENAB));

	/* release any fmodsw_impl_t structure held on behalf of the queue */

	ASSERT(qp->q_fp != NULL || qp->q_flag & QISDRV);
	if (qp->q_fp != NULL)
		fmodsw_rele(qp->q_fp);

	/* freeq removes us from the outer perimeter if any */
	freeq(qp);
}

/* Prevent service procedures from being called */
void
disable_svc(queue_t *qp)
{
	queue_t *wqp = _WR(qp);

	ASSERT(qp->q_flag & QREADR);
	mutex_enter(QLOCK(qp));
	qp->q_flag |= QWCLOSE;
	mutex_exit(QLOCK(qp));
	mutex_enter(QLOCK(wqp));
	wqp->q_flag |= QWCLOSE;
	mutex_exit(QLOCK(wqp));
}

/* allow service procedures to be called again */
void
enable_svc(queue_t *qp)
{
	queue_t *wqp = _WR(qp);

	ASSERT(qp->q_flag & QREADR);
	mutex_enter(QLOCK(qp));
	qp->q_flag &= ~QWCLOSE;
	mutex_exit(QLOCK(qp));
	mutex_enter(QLOCK(wqp));
	wqp->q_flag &= ~QWCLOSE;
	mutex_exit(QLOCK(wqp));
}

/*
 * Remove queue from qhead/qtail if it is enabled.
 * Only reset QENAB if the queue was removed from the runlist.
 * A queue goes through 3 stages:
 *	It is on the service list and QENAB is set.
 *	It is removed from the service list but QENAB is still set.
 *	QENAB gets changed to QINSERVICE.
 *	QINSERVICE is reset (when the service procedure is done)
 * Thus we can not reset QENAB unless we actually removed it from the service
 * queue.
 */
void
remove_runlist(queue_t *qp)
{
	if (qp->q_flag & QENAB && qhead != NULL) {
		queue_t *q_chase;
		queue_t *q_curr;
		int removed;

		mutex_enter(&service_queue);
		RMQ(qp, qhead, qtail, q_link, q_chase, q_curr, removed);
		mutex_exit(&service_queue);
		if (removed) {
			STRSTAT(qremoved);
			qp->q_flag &= ~QENAB;
		}
	}
}


/*
 * wait for any pending service processing to complete.
 * The removal of queues from the runlist is not atomic with the
 * clearing of the QENABLED flag and setting the INSERVICE flag.
 * consequently it is possible for remove_runlist in strclose
 * to not find the queue on the runlist but for it to be QENABLED
 * and not yet INSERVICE -> hence wait_svc needs to check QENABLED
 * as well as INSERVICE.
 */
void
wait_svc(queue_t *qp)
{
	queue_t *wqp = _WR(qp);

	ASSERT(qp->q_flag & QREADR);

	/*
	 * Try to remove queues from qhead/qtail list.
	 */
	if (qhead != NULL) {
		remove_runlist(qp);
		remove_runlist(wqp);
	}
	/*
	 * Wait till the syncqs associated with the queue
	 * will dissapear from background processing list.
	 * This only needs to be done for non-PERMOD perimeters since
	 * for PERMOD perimeters the syncq may be shared and will only be freed
	 * when the last module/driver is unloaded.
	 * If for PERMOD perimeters queue was on the syncq list, removeq()
	 * should call propagate_syncq() or drain_syncq() for it. Both of these
	 * function remove the queue from its syncq list, so sqthread will not
	 * try to access the queue.
	 */
	if (!(qp->q_flag & QPERMOD)) {
		syncq_t *rsq = qp->q_syncq;
		syncq_t *wsq = wqp->q_syncq;

		/*
		 * Disable rsq and wsq and wait for any background processing of
		 * syncq to complete.
		 */
		wait_sq_svc(rsq);
		if (wsq != rsq)
			wait_sq_svc(wsq);
	}

	mutex_enter(QLOCK(qp));
	while (qp->q_flag & (QINSERVICE|QENAB))
		cv_wait(&qp->q_wait, QLOCK(qp));
	mutex_exit(QLOCK(qp));
	mutex_enter(QLOCK(wqp));
	while (wqp->q_flag & (QINSERVICE|QENAB))
		cv_wait(&wqp->q_wait, QLOCK(wqp));
	mutex_exit(QLOCK(wqp));
}

/*
 * Put ioctl data from userland buffer `arg' into the mblk chain `bp'.
 * `flag' must always contain either K_TO_K or U_TO_K; STR_NOSIG may
 * also be set, and is passed through to allocb_cred_wait().
 *
 * Returns errno on failure, zero on success.
 */
int
putiocd(mblk_t *bp, char *arg, int flag, cred_t *cr)
{
	mblk_t *tmp;
	ssize_t  count;
	size_t n;
	int error = 0;

	ASSERT((flag & (U_TO_K | K_TO_K)) == U_TO_K ||
		(flag & (U_TO_K | K_TO_K)) == K_TO_K);

	if (bp->b_datap->db_type == M_IOCTL) {
		count = ((struct iocblk *)bp->b_rptr)->ioc_count;
	} else {
		ASSERT(bp->b_datap->db_type == M_COPYIN);
		count = ((struct copyreq *)bp->b_rptr)->cq_size;
	}
	/*
	 * strdoioctl validates ioc_count, so if this assert fails it
	 * cannot be due to user error.
	 */
	ASSERT(count >= 0);

	while (count > 0) {
		n = MIN(MAXIOCBSZ, count);
		if ((tmp = allocb_cred_wait(n, (flag & STR_NOSIG), &error,
		    cr)) == NULL) {
			return (error);
		}
		error = strcopyin(arg, tmp->b_wptr, n, flag & (U_TO_K|K_TO_K));
		if (error != 0) {
			freeb(tmp);
			return (error);
		}
		arg += n;
		DB_CPID(tmp) = curproc->p_pid;
		tmp->b_wptr += n;
		count -= n;
		bp = (bp->b_cont = tmp);
	}

	return (0);
}

/*
 * Copy ioctl data to user-land. Return non-zero errno on failure,
 * 0 for success.
 */
int
getiocd(mblk_t *bp, char *arg, int copymode)
{
	ssize_t count;
	size_t  n;
	int	error;

	if (bp->b_datap->db_type == M_IOCACK)
		count = ((struct iocblk *)bp->b_rptr)->ioc_count;
	else {
		ASSERT(bp->b_datap->db_type == M_COPYOUT);
		count = ((struct copyreq *)bp->b_rptr)->cq_size;
	}
	ASSERT(count >= 0);

	for (bp = bp->b_cont; bp && count;
	    count -= n, bp = bp->b_cont, arg += n) {
		n = MIN(count, bp->b_wptr - bp->b_rptr);
		error = strcopyout(bp->b_rptr, arg, n, copymode);
		if (error)
			return (error);
	}
	ASSERT(count == 0);
	return (0);
}

/*
 * Allocate a linkinfo entry given the write queue of the
 * bottom module of the top stream and the write queue of the
 * stream head of the bottom stream.
 */
linkinfo_t *
alloclink(queue_t *qup, queue_t *qdown, file_t *fpdown)
{
	linkinfo_t *linkp;

	linkp = kmem_cache_alloc(linkinfo_cache, KM_SLEEP);

	linkp->li_lblk.l_qtop = qup;
	linkp->li_lblk.l_qbot = qdown;
	linkp->li_fpdown = fpdown;

	mutex_enter(&strresources);
	linkp->li_next = linkinfo_list;
	linkp->li_prev = NULL;
	if (linkp->li_next)
		linkp->li_next->li_prev = linkp;
	linkinfo_list = linkp;
	linkp->li_lblk.l_index = ++lnk_id;
	ASSERT(lnk_id != 0);	/* this should never wrap in practice */
	mutex_exit(&strresources);

	return (linkp);
}

/*
 * Free a linkinfo entry.
 */
void
lbfree(linkinfo_t *linkp)
{
	mutex_enter(&strresources);
	if (linkp->li_next)
		linkp->li_next->li_prev = linkp->li_prev;
	if (linkp->li_prev)
		linkp->li_prev->li_next = linkp->li_next;
	else
		linkinfo_list = linkp->li_next;
	mutex_exit(&strresources);

	kmem_cache_free(linkinfo_cache, linkp);
}

/*
 * Check for a potential linking cycle.
 * Return 1 if a link will result in a cycle,
 * and 0 otherwise.
 */
int
linkcycle(stdata_t *upstp, stdata_t *lostp)
{
	struct mux_node *np;
	struct mux_edge *ep;
	int i;
	major_t lomaj;
	major_t upmaj;
	/*
	 * if the lower stream is a pipe/FIFO, return, since link
	 * cycles can not happen on pipes/FIFOs
	 */
	if (lostp->sd_vnode->v_type == VFIFO)
		return (0);

	for (i = 0; i < devcnt; i++) {
		np = &mux_nodes[i];
		MUX_CLEAR(np);
	}
	lomaj = getmajor(lostp->sd_vnode->v_rdev);
	upmaj = getmajor(upstp->sd_vnode->v_rdev);
	np = &mux_nodes[lomaj];
	for (;;) {
		if (!MUX_DIDVISIT(np)) {
			if (np->mn_imaj == upmaj)
				return (1);
			if (np->mn_outp == NULL) {
				MUX_VISIT(np);
				if (np->mn_originp == NULL)
					return (0);
				np = np->mn_originp;
				continue;
			}
			MUX_VISIT(np);
			np->mn_startp = np->mn_outp;
		} else {
			if (np->mn_startp == NULL) {
				if (np->mn_originp == NULL)
					return (0);
				else {
					np = np->mn_originp;
					continue;
				}
			}
			/*
			 * If ep->me_nodep is a FIFO (me_nodep == NULL),
			 * ignore the edge and move on. ep->me_nodep gets
			 * set to NULL in mux_addedge() if it is a FIFO.
			 *
			 */
			ep = np->mn_startp;
			np->mn_startp = ep->me_nextp;
			if (ep->me_nodep == NULL)
				continue;
			ep->me_nodep->mn_originp = np;
			np = ep->me_nodep;
		}
	}
}

/*
 * Find linkinfo entry corresponding to the parameters.
 */
linkinfo_t *
findlinks(stdata_t *stp, int index, int type)
{
	linkinfo_t *linkp;
	struct mux_edge *mep;
	struct mux_node *mnp;
	queue_t *qup;

	mutex_enter(&strresources);
	if ((type & LINKTYPEMASK) == LINKNORMAL) {
		qup = getendq(stp->sd_wrq);
		for (linkp = linkinfo_list; linkp; linkp = linkp->li_next) {
			if ((qup == linkp->li_lblk.l_qtop) &&
			    (!index || (index == linkp->li_lblk.l_index))) {
				mutex_exit(&strresources);
				return (linkp);
			}
		}
	} else {
		ASSERT((type & LINKTYPEMASK) == LINKPERSIST);
		mnp = &mux_nodes[getmajor(stp->sd_vnode->v_rdev)];
		mep = mnp->mn_outp;
		while (mep) {
			if ((index == 0) || (index == mep->me_muxid))
				break;
			mep = mep->me_nextp;
		}
		if (!mep) {
			mutex_exit(&strresources);
			return (NULL);
		}
		for (linkp = linkinfo_list; linkp; linkp = linkp->li_next) {
			if ((!linkp->li_lblk.l_qtop) &&
			    (mep->me_muxid == linkp->li_lblk.l_index)) {
				mutex_exit(&strresources);
				return (linkp);
			}
		}
	}
	mutex_exit(&strresources);
	return (NULL);
}

/*
 * Given a queue ptr, follow the chain of q_next pointers until you reach the
 * last queue on the chain and return it.
 */
queue_t *
getendq(queue_t *q)
{
	ASSERT(q != NULL);
	while (_SAMESTR(q))
		q = q->q_next;
	return (q);
}

/*
 * wait for the syncq count to drop to zero.
 * sq could be either outer or inner.
 */

static void
wait_syncq(syncq_t *sq)
{
	uint16_t count;

	mutex_enter(SQLOCK(sq));
	count = sq->sq_count;
	SQ_PUTLOCKS_ENTER(sq);
	SUM_SQ_PUTCOUNTS(sq, count);
	while (count != 0) {
		sq->sq_flags |= SQ_WANTWAKEUP;
		SQ_PUTLOCKS_EXIT(sq);
		cv_wait(&sq->sq_wait, SQLOCK(sq));
		count = sq->sq_count;
		SQ_PUTLOCKS_ENTER(sq);
		SUM_SQ_PUTCOUNTS(sq, count);
	}
	SQ_PUTLOCKS_EXIT(sq);
	mutex_exit(SQLOCK(sq));
}

/*
 * Wait while there are any messages for the queue in its syncq.
 */
static void
wait_q_syncq(queue_t *q)
{
	if ((q->q_sqflags & Q_SQQUEUED) || (q->q_syncqmsgs > 0)) {
		syncq_t *sq = q->q_syncq;

		mutex_enter(SQLOCK(sq));
		while ((q->q_sqflags & Q_SQQUEUED) || (q->q_syncqmsgs > 0)) {
			sq->sq_flags |= SQ_WANTWAKEUP;
			cv_wait(&sq->sq_wait, SQLOCK(sq));
		}
		mutex_exit(SQLOCK(sq));
	}
}


int
mlink_file(vnode_t *vp, int cmd, struct file *fpdown, cred_t *crp, int *rvalp,
    int lhlink)
{
	struct stdata *stp;
	struct strioctl strioc;
	struct linkinfo *linkp;
	struct stdata *stpdown;
	struct streamtab *str;
	queue_t *passq;
	syncq_t *passyncq;
	queue_t *rq;
	cdevsw_impl_t *dp;
	uint32_t qflag;
	uint32_t sqtype;
	perdm_t *dmp;
	int error = 0;

	stp = vp->v_stream;
	TRACE_1(TR_FAC_STREAMS_FR,
		TR_I_LINK, "I_LINK/I_PLINK:stp %p", stp);
	/*
	 * Test for invalid upper stream
	 */
	if (stp->sd_flag & STRHUP) {
		return (ENXIO);
	}
	if (vp->v_type == VFIFO) {
		return (EINVAL);
	}
	if (stp->sd_strtab == NULL) {
		return (EINVAL);
	}
	if (!stp->sd_strtab->st_muxwinit) {
		return (EINVAL);
	}
	if (fpdown == NULL) {
		return (EBADF);
	}
	if (getmajor(stp->sd_vnode->v_rdev) >= devcnt) {
		return (EINVAL);
	}
	mutex_enter(&muxifier);
	if (stp->sd_flag & STPLEX) {
		mutex_exit(&muxifier);
		return (ENXIO);
	}

	/*
	 * Test for invalid lower stream.
	 * The check for the v_type != VFIFO and having a major
	 * number not >= devcnt is done to avoid problems with
	 * adding mux_node entry past the end of mux_nodes[].
	 * For FIFO's we don't add an entry so this isn't a
	 * problem.
	 */
	if (((stpdown = fpdown->f_vnode->v_stream) == NULL) ||
	    (stpdown == stp) || (stpdown->sd_flag &
	    (STPLEX|STRHUP|STRDERR|STWRERR|IOCWAIT|STRPLUMB)) ||
	    ((stpdown->sd_vnode->v_type != VFIFO) &&
	    (getmajor(stpdown->sd_vnode->v_rdev) >= devcnt)) ||
	    linkcycle(stp, stpdown)) {
		mutex_exit(&muxifier);
		return (EINVAL);
	}
	TRACE_1(TR_FAC_STREAMS_FR,
		TR_STPDOWN, "stpdown:%p", stpdown);
	rq = getendq(stp->sd_wrq);
	if (cmd == I_PLINK)
		rq = NULL;

	linkp = alloclink(rq, stpdown->sd_wrq, fpdown);

	strioc.ic_cmd = cmd;
	strioc.ic_timout = INFTIM;
	strioc.ic_len = sizeof (struct linkblk);
	strioc.ic_dp = (char *)&linkp->li_lblk;

	/*
	 * STRPLUMB protects plumbing changes and should be set before
	 * link_addpassthru()/link_rempassthru() are called, so it is set here
	 * and cleared in the end of mlink when passthru queue is removed.
	 * Setting of STRPLUMB prevents reopens of the stream while passthru
	 * queue is in-place (it is not a proper module and doesn't have open
	 * entry point).
	 *
	 * STPLEX prevents any threads from entering the stream from above. It
	 * can't be set before the call to link_addpassthru() because putnext
	 * from below may cause stream head I/O routines to be called and these
	 * routines assert that STPLEX is not set. After link_addpassthru()
	 * nothing may come from below since the pass queue syncq is blocked.
	 * Note also that STPLEX should be cleared before the call to
	 * link_remmpassthru() since when messages start flowing to the stream
	 * head (e.g. because of message propagation from the pass queue) stream
	 * head I/O routines may be called with STPLEX flag set.
	 *
	 * When STPLEX is set, nothing may come into the stream from above and
	 * it is safe to do a setq which will change stream head. So, the
	 * correct sequence of actions is:
	 *
	 * 1) Set STRPLUMB
	 * 2) Call link_addpassthru()
	 * 3) Set STPLEX
	 * 4) Call setq and update the stream state
	 * 5) Clear STPLEX
	 * 6) Call link_rempassthru()
	 * 7) Clear STRPLUMB
	 *
	 * The same sequence applies to munlink() code.
	 */
	mutex_enter(&stpdown->sd_lock);
	stpdown->sd_flag |= STRPLUMB;
	mutex_exit(&stpdown->sd_lock);
	/*
	 * Add passthru queue below lower mux. This will block
	 * syncqs of lower muxs read queue during I_LINK/I_UNLINK.
	 */
	passq = link_addpassthru(stpdown);

	mutex_enter(&stpdown->sd_lock);
	stpdown->sd_flag |= STPLEX;
	mutex_exit(&stpdown->sd_lock);

	rq = _RD(stpdown->sd_wrq);
	/*
	 * There may be messages in the streamhead's syncq due to messages
	 * that arrived before link_addpassthru() was done. To avoid
	 * background processing of the syncq happening simultaneous with
	 * setq processing, we disable the streamhead syncq and wait until
	 * existing background thread finishes working on it.
	 */
	wait_sq_svc(rq->q_syncq);
	passyncq = passq->q_syncq;
	if (!(passyncq->sq_flags & SQ_BLOCKED))
		blocksq(passyncq, SQ_BLOCKED, 0);

	ASSERT((rq->q_flag & QMT_TYPEMASK) == QMTSAFE);
	ASSERT(rq->q_syncq == SQ(rq) && _WR(rq)->q_syncq == SQ(rq));
	rq->q_ptr = _WR(rq)->q_ptr = NULL;

	/* setq might sleep in allocator - avoid holding locks. */
	/* Note: we are holding muxifier here. */

	str = stp->sd_strtab;
	dp = &devimpl[getmajor(vp->v_rdev)];
	ASSERT(dp->d_str == str);

	qflag = dp->d_qflag;
	sqtype = dp->d_sqtype;

	/* create perdm_t if needed */
	if (NEED_DM(dp->d_dmp, qflag))
		dp->d_dmp = hold_dm(str, qflag, sqtype);

	dmp = dp->d_dmp;

	setq(rq, str->st_muxrinit, str->st_muxwinit, dmp, qflag, sqtype,
	    B_TRUE);

	/*
	 * XXX Remove any "odd" messages from the queue.
	 * Keep only M_DATA, M_PROTO, M_PCPROTO.
	 */
	error = strdoioctl(stp, &strioc, FNATIVE,
	    K_TO_K | STR_NOERROR | STR_NOSIG, crp, rvalp);
	if (error != 0) {
		lbfree(linkp);

		if (!(passyncq->sq_flags & SQ_BLOCKED))
			blocksq(passyncq, SQ_BLOCKED, 0);
		/*
		 * Restore the stream head queue and then remove
		 * the passq. Turn off STPLEX before we turn on
		 * the stream by removing the passq.
		 */
		rq->q_ptr = _WR(rq)->q_ptr = stpdown;
		setq(rq, &strdata, &stwdata, NULL, QMTSAFE, SQ_CI|SQ_CO,
		    B_TRUE);

		mutex_enter(&stpdown->sd_lock);
		stpdown->sd_flag &= ~STPLEX;
		mutex_exit(&stpdown->sd_lock);

		link_rempassthru(passq);

		mutex_enter(&stpdown->sd_lock);
		stpdown->sd_flag &= ~STRPLUMB;
		/* Wakeup anyone waiting for STRPLUMB to clear. */
		cv_broadcast(&stpdown->sd_monitor);
		mutex_exit(&stpdown->sd_lock);

		mutex_exit(&muxifier);
		return (error);
	}
	mutex_enter(&fpdown->f_tlock);
	fpdown->f_count++;
	mutex_exit(&fpdown->f_tlock);

	/*
	 * if we've made it here the linkage is all set up so we should also
	 * set up the layered driver linkages
	 */

	ASSERT((cmd == I_LINK) || (cmd == I_PLINK));
	if (cmd == I_LINK) {
		ldi_mlink_fp(stp, fpdown, lhlink, LINKNORMAL);
	} else {
		ldi_mlink_fp(stp, fpdown, lhlink, LINKPERSIST);
	}

	link_rempassthru(passq);

	mux_addedge(stp, stpdown, linkp->li_lblk.l_index);

	/*
	 * Mark the upper stream as having dependent links
	 * so that strclose can clean it up.
	 */
	if (cmd == I_LINK) {
		mutex_enter(&stp->sd_lock);
		stp->sd_flag |= STRHASLINKS;
		mutex_exit(&stp->sd_lock);
	}
	/*
	 * Wake up any other processes that may have been
	 * waiting on the lower stream. These will all
	 * error out.
	 */
	mutex_enter(&stpdown->sd_lock);
	/* The passthru module is removed so we may release STRPLUMB */
	stpdown->sd_flag &= ~STRPLUMB;
	cv_broadcast(&rq->q_wait);
	cv_broadcast(&_WR(rq)->q_wait);
	cv_broadcast(&stpdown->sd_monitor);
	mutex_exit(&stpdown->sd_lock);
	mutex_exit(&muxifier);
	*rvalp = linkp->li_lblk.l_index;
	return (0);
}

int
mlink(vnode_t *vp, int cmd, int arg, cred_t *crp, int *rvalp, int lhlink)
{
	int		ret;
	struct file	*fpdown;

	fpdown = getf(arg);
	ret = mlink_file(vp, cmd, fpdown, crp, rvalp, lhlink);
	if (fpdown != NULL)
		releasef(arg);
	return (ret);
}

/*
 * Unlink a multiplexor link. Stp is the controlling stream for the
 * link, and linkp points to the link's entry in the linkinfo list.
 * The muxifier lock must be held on entry and is dropped on exit.
 *
 * NOTE : Currently it is assumed that mux would process all the messages
 * sitting on it's queue before ACKing the UNLINK. It is the responsibility
 * of the mux to handle all the messages that arrive before UNLINK.
 * If the mux has to send down messages on its lower stream before
 * ACKing I_UNLINK, then it *should* know to handle messages even
 * after the UNLINK is acked (actually it should be able to handle till we
 * re-block the read side of the pass queue here). If the mux does not
 * open up the lower stream, any messages that arrive during UNLINK
 * will be put in the stream head. In the case of lower stream opening
 * up, some messages might land in the stream head depending on when
 * the message arrived and when the read side of the pass queue was
 * re-blocked.
 */
int
munlink(stdata_t *stp, linkinfo_t *linkp, int flag, cred_t *crp, int *rvalp)
{
	struct strioctl strioc;
	struct stdata *stpdown;
	queue_t *rq, *wrq;
	queue_t	*passq;
	syncq_t *passyncq;
	int error = 0;
	file_t *fpdown;

	ASSERT(MUTEX_HELD(&muxifier));

	stpdown = linkp->li_fpdown->f_vnode->v_stream;

	/*
	 * See the comment in mlink() concerning STRPLUMB/STPLEX flags.
	 */
	mutex_enter(&stpdown->sd_lock);
	stpdown->sd_flag |= STRPLUMB;
	mutex_exit(&stpdown->sd_lock);

	/*
	 * Add passthru queue below lower mux. This will block
	 * syncqs of lower muxs read queue during I_LINK/I_UNLINK.
	 */
	passq = link_addpassthru(stpdown);

	if ((flag & LINKTYPEMASK) == LINKNORMAL)
		strioc.ic_cmd = I_UNLINK;
	else
		strioc.ic_cmd = I_PUNLINK;
	strioc.ic_timout = INFTIM;
	strioc.ic_len = sizeof (struct linkblk);
	strioc.ic_dp = (char *)&linkp->li_lblk;

	error = strdoioctl(stp, &strioc, FNATIVE,
	    K_TO_K | STR_NOERROR | STR_NOSIG, crp, rvalp);

	/*
	 * If there was an error and this is not called via strclose,
	 * return to the user. Otherwise, pretend there was no error
	 * and close the link.
	 */
	if (error) {
		if (flag & LINKCLOSE) {
			cmn_err(CE_WARN, "KERNEL: munlink: could not perform "
			    "unlink ioctl, closing anyway (%d)\n", error);
		} else {
			link_rempassthru(passq);
			mutex_enter(&stpdown->sd_lock);
			stpdown->sd_flag &= ~STRPLUMB;
			cv_broadcast(&stpdown->sd_monitor);
			mutex_exit(&stpdown->sd_lock);
			mutex_exit(&muxifier);
			return (error);
		}
	}

	mux_rmvedge(stp, linkp->li_lblk.l_index);
	fpdown = linkp->li_fpdown;
	lbfree(linkp);

	/*
	 * We go ahead and drop muxifier here--it's a nasty global lock that
	 * can slow others down. It's okay to since attempts to mlink() this
	 * stream will be stopped because STPLEX is still set in the stdata
	 * structure, and munlink() is stopped because mux_rmvedge() and
	 * lbfree() have removed it from mux_nodes[] and linkinfo_list,
	 * respectively.  Note that we defer the closef() of fpdown until
	 * after we drop muxifier since strclose() can call munlinkall().
	 */
	mutex_exit(&muxifier);

	wrq = stpdown->sd_wrq;
	rq = _RD(wrq);

	/*
	 * Get rid of outstanding service procedure runs, before we make
	 * it a stream head, since a stream head doesn't have any service
	 * procedure.
	 */
	disable_svc(rq);
	wait_svc(rq);

	/*
	 * Since we don't disable the syncq for QPERMOD, we wait for whatever
	 * is queued up to be finished. mux should take care that nothing is
	 * send down to this queue. We should do it now as we're going to block
	 * passyncq if it was unblocked.
	 */
	if (wrq->q_flag & QPERMOD) {
		syncq_t	*sq = wrq->q_syncq;

		mutex_enter(SQLOCK(sq));
		while (wrq->q_sqflags & Q_SQQUEUED) {
			sq->sq_flags |= SQ_WANTWAKEUP;
			cv_wait(&sq->sq_wait, SQLOCK(sq));
		}
		mutex_exit(SQLOCK(sq));
	}
	passyncq = passq->q_syncq;
	if (!(passyncq->sq_flags & SQ_BLOCKED)) {

		syncq_t *sq, *outer;

		/*
		 * Messages could be flowing from underneath. We will
		 * block the read side of the passq. This would be
		 * sufficient for QPAIR and QPERQ muxes to ensure
		 * that no data is flowing up into this queue
		 * and hence no thread active in this instance of
		 * lower mux. But for QPERMOD and QMTOUTPERIM there
		 * could be messages on the inner and outer/inner
		 * syncqs respectively. We will wait for them to drain.
		 * Because passq is blocked messages end up in the syncq
		 * And qfill_syncq could possibly end up setting QFULL
		 * which will access the rq->q_flag. Hence, we have to
		 * acquire the QLOCK in setq.
		 *
		 * XXX Messages can also flow from top into this
		 * queue though the unlink is over (Ex. some instance
		 * in putnext() called from top that has still not
		 * accessed this queue. And also putq(lowerq) ?).
		 * Solution : How about blocking the l_qtop queue ?
		 * Do we really care about such pure D_MP muxes ?
		 */

		blocksq(passyncq, SQ_BLOCKED, 0);

		sq = rq->q_syncq;
		if ((outer = sq->sq_outer) != NULL) {

			/*
			 * We have to just wait for the outer sq_count
			 * drop to zero. As this does not prevent new
			 * messages to enter the outer perimeter, this
			 * is subject to starvation.
			 *
			 * NOTE :Because of blocksq above, messages could
			 * be in the inner syncq only because of some
			 * thread holding the outer perimeter exclusively.
			 * Hence it would be sufficient to wait for the
			 * exclusive holder of the outer perimeter to drain
			 * the inner and outer syncqs. But we will not depend
			 * on this feature and hence check the inner syncqs
			 * separately.
			 */
			wait_syncq(outer);
		}


		/*
		 * There could be messages destined for
		 * this queue. Let the exclusive holder
		 * drain it.
		 */

		wait_syncq(sq);
		ASSERT((rq->q_flag & QPERMOD) ||
			((rq->q_syncq->sq_head == NULL) &&
			(_WR(rq)->q_syncq->sq_head == NULL)));
	}

	/*
	 * We haven't taken care of QPERMOD case yet. QPERMOD is a special
	 * case as we don't disable its syncq or remove it off the syncq
	 * service list.
	 */
	if (rq->q_flag & QPERMOD) {
		syncq_t	*sq = rq->q_syncq;

		mutex_enter(SQLOCK(sq));
		while (rq->q_sqflags & Q_SQQUEUED) {
			sq->sq_flags |= SQ_WANTWAKEUP;
			cv_wait(&sq->sq_wait, SQLOCK(sq));
		}
		mutex_exit(SQLOCK(sq));
	}

	/*
	 * flush_syncq changes states only when there is some messages to
	 * free. ie when it returns non-zero value to return.
	 */
	ASSERT(flush_syncq(rq->q_syncq, rq) == 0);
	ASSERT(flush_syncq(wrq->q_syncq, wrq) == 0);

	/*
	 * No body else should know about this queue now.
	 * If the mux did not process the messages before
	 * acking the I_UNLINK, free them now.
	 */

	flushq(rq, FLUSHALL);
	flushq(_WR(rq), FLUSHALL);

	/*
	 * Convert the mux lower queue into a stream head queue.
	 * Turn off STPLEX before we turn on the stream by removing the passq.
	 */
	rq->q_ptr = wrq->q_ptr = stpdown;
	setq(rq, &strdata, &stwdata, NULL, QMTSAFE, SQ_CI|SQ_CO, B_TRUE);

	ASSERT((rq->q_flag & QMT_TYPEMASK) == QMTSAFE);
	ASSERT(rq->q_syncq == SQ(rq) && _WR(rq)->q_syncq == SQ(rq));

	enable_svc(rq);

	/*
	 * Now it is a proper stream, so STPLEX is cleared. But STRPLUMB still
	 * needs to be set to prevent reopen() of the stream - such reopen may
	 * try to call non-existent pass queue open routine and panic.
	 */
	mutex_enter(&stpdown->sd_lock);
	stpdown->sd_flag &= ~STPLEX;
	mutex_exit(&stpdown->sd_lock);

	ASSERT(((flag & LINKTYPEMASK) == LINKNORMAL) ||
	    ((flag & LINKTYPEMASK) == LINKPERSIST));

	/* clean up the layered driver linkages */
	if ((flag & LINKTYPEMASK) == LINKNORMAL) {
		ldi_munlink_fp(stp, fpdown, LINKNORMAL);
	} else {
		ldi_munlink_fp(stp, fpdown, LINKPERSIST);
	}

	link_rempassthru(passq);

	/*
	 * Now all plumbing changes are finished and STRPLUMB is no
	 * longer needed.
	 */
	mutex_enter(&stpdown->sd_lock);
	stpdown->sd_flag &= ~STRPLUMB;
	cv_broadcast(&stpdown->sd_monitor);
	mutex_exit(&stpdown->sd_lock);

	(void) closef(fpdown);
	return (0);
}

/*
 * Unlink all multiplexor links for which stp is the controlling stream.
 * Return 0, or a non-zero errno on failure.
 */
int
munlinkall(stdata_t *stp, int flag, cred_t *crp, int *rvalp)
{
	linkinfo_t *linkp;
	int error = 0;

	mutex_enter(&muxifier);
	while (linkp = findlinks(stp, 0, flag)) {
		/*
		 * munlink() releases the muxifier lock.
		 */
		if (error = munlink(stp, linkp, flag, crp, rvalp))
			return (error);
		mutex_enter(&muxifier);
	}
	mutex_exit(&muxifier);
	return (0);
}

/*
 * A multiplexor link has been made. Add an
 * edge to the directed graph.
 */
void
mux_addedge(stdata_t *upstp, stdata_t *lostp, int muxid)
{
	struct mux_node *np;
	struct mux_edge *ep;
	major_t upmaj;
	major_t lomaj;

	upmaj = getmajor(upstp->sd_vnode->v_rdev);
	lomaj = getmajor(lostp->sd_vnode->v_rdev);
	np = &mux_nodes[upmaj];
	if (np->mn_outp) {
		ep = np->mn_outp;
		while (ep->me_nextp)
			ep = ep->me_nextp;
		ep->me_nextp = kmem_alloc(sizeof (struct mux_edge), KM_SLEEP);
		ep = ep->me_nextp;
	} else {
		np->mn_outp = kmem_alloc(sizeof (struct mux_edge), KM_SLEEP);
		ep = np->mn_outp;
	}
	ep->me_nextp = NULL;
	ep->me_muxid = muxid;
	if (lostp->sd_vnode->v_type == VFIFO)
		ep->me_nodep = NULL;
	else
		ep->me_nodep = &mux_nodes[lomaj];
}

/*
 * A multiplexor link has been removed. Remove the
 * edge in the directed graph.
 */
void
mux_rmvedge(stdata_t *upstp, int muxid)
{
	struct mux_node *np;
	struct mux_edge *ep;
	struct mux_edge *pep = NULL;
	major_t upmaj;

	upmaj = getmajor(upstp->sd_vnode->v_rdev);
	np = &mux_nodes[upmaj];
	ASSERT(np->mn_outp != NULL);
	ep = np->mn_outp;
	while (ep) {
		if (ep->me_muxid == muxid) {
			if (pep)
				pep->me_nextp = ep->me_nextp;
			else
				np->mn_outp = ep->me_nextp;
			kmem_free(ep, sizeof (struct mux_edge));
			return;
		}
		pep = ep;
		ep = ep->me_nextp;
	}
	ASSERT(0);	/* should not reach here */
}

/*
 * Translate the device flags (from conf.h) to the corresponding
 * qflag and sq_flag (type) values.
 */
int
devflg_to_qflag(struct streamtab *stp, uint32_t devflag, uint32_t *qflagp,
	uint32_t *sqtypep)
{
	uint32_t qflag = 0;
	uint32_t sqtype = 0;

	if (devflag & _D_OLD)
		goto bad;

	/* Inner perimeter presence and scope */
	switch (devflag & D_MTINNER_MASK) {
	case D_MP:
		qflag |= QMTSAFE;
		sqtype |= SQ_CI;
		break;
	case D_MTPERQ|D_MP:
		qflag |= QPERQ;
		break;
	case D_MTQPAIR|D_MP:
		qflag |= QPAIR;
		break;
	case D_MTPERMOD|D_MP:
		qflag |= QPERMOD;
		break;
	default:
		goto bad;
	}

	/* Outer perimeter */
	if (devflag & D_MTOUTPERIM) {
		switch (devflag & D_MTINNER_MASK) {
		case D_MP:
		case D_MTPERQ|D_MP:
		case D_MTQPAIR|D_MP:
			break;
		default:
			goto bad;
		}
		qflag |= QMTOUTPERIM;
	}

	/* Inner perimeter modifiers */
	if (devflag & D_MTINNER_MOD) {
		switch (devflag & D_MTINNER_MASK) {
		case D_MP:
			goto bad;
		default:
			break;
		}
		if (devflag & D_MTPUTSHARED)
			sqtype |= SQ_CIPUT;
		if (devflag & _D_MTOCSHARED) {
			/*
			 * The code in putnext assumes that it has the
			 * highest concurrency by not checking sq_count.
			 * Thus _D_MTOCSHARED can only be supported when
			 * D_MTPUTSHARED is set.
			 */
			if (!(devflag & D_MTPUTSHARED))
				goto bad;
			sqtype |= SQ_CIOC;
		}
		if (devflag & _D_MTCBSHARED) {
			/*
			 * The code in putnext assumes that it has the
			 * highest concurrency by not checking sq_count.
			 * Thus _D_MTCBSHARED can only be supported when
			 * D_MTPUTSHARED is set.
			 */
			if (!(devflag & D_MTPUTSHARED))
				goto bad;
			sqtype |= SQ_CICB;
		}
		if (devflag & _D_MTSVCSHARED) {
			/*
			 * The code in putnext assumes that it has the
			 * highest concurrency by not checking sq_count.
			 * Thus _D_MTSVCSHARED can only be supported when
			 * D_MTPUTSHARED is set. Also _D_MTSVCSHARED is
			 * supported only for QPERMOD.
			 */
			if (!(devflag & D_MTPUTSHARED) || !(qflag & QPERMOD))
				goto bad;
			sqtype |= SQ_CISVC;
		}
	}

	/* Default outer perimeter concurrency */
	sqtype |= SQ_CO;

	/* Outer perimeter modifiers */
	if (devflag & D_MTOCEXCL) {
		if (!(devflag & D_MTOUTPERIM)) {
			/* No outer perimeter */
			goto bad;
		}
		sqtype &= ~SQ_COOC;
	}

	/* Synchronous Streams extended qinit structure */
	if (devflag & D_SYNCSTR)
		qflag |= QSYNCSTR;

	/*
	 * Private flag used by a transport module to indicate
	 * to sockfs that it supports direct-access mode without
	 * having to go through STREAMS.
	 */
	if (devflag & _D_DIRECT) {
		/* Reject unless the module is fully-MT (no perimeter) */
		if ((qflag & QMT_TYPEMASK) != QMTSAFE)
			goto bad;
		qflag |= _QDIRECT;
	}

	*qflagp = qflag;
	*sqtypep = sqtype;
	return (0);

bad:
	cmn_err(CE_WARN,
	    "stropen: bad MT flags (0x%x) in driver '%s'",
	    (int)(qflag & D_MTSAFETY_MASK),
	    stp->st_rdinit->qi_minfo->mi_idname);

	return (EINVAL);
}

/*
 * Set the interface values for a pair of queues (qinit structure,
 * packet sizes, water marks).
 * setq assumes that the caller does not have a claim (entersq or claimq)
 * on the queue.
 */
void
setq(queue_t *rq, struct qinit *rinit, struct qinit *winit,
    perdm_t *dmp, uint32_t qflag, uint32_t sqtype, boolean_t lock_needed)
{
	queue_t *wq;
	syncq_t	*sq, *outer;

	ASSERT(rq->q_flag & QREADR);
	ASSERT((qflag & QMT_TYPEMASK) != 0);
	IMPLY((qflag & (QPERMOD | QMTOUTPERIM)), dmp != NULL);

	wq = _WR(rq);
	rq->q_qinfo = rinit;
	rq->q_hiwat = rinit->qi_minfo->mi_hiwat;
	rq->q_lowat = rinit->qi_minfo->mi_lowat;
	rq->q_minpsz = rinit->qi_minfo->mi_minpsz;
	rq->q_maxpsz = rinit->qi_minfo->mi_maxpsz;
	wq->q_qinfo = winit;
	wq->q_hiwat = winit->qi_minfo->mi_hiwat;
	wq->q_lowat = winit->qi_minfo->mi_lowat;
	wq->q_minpsz = winit->qi_minfo->mi_minpsz;
	wq->q_maxpsz = winit->qi_minfo->mi_maxpsz;

	/* Remove old syncqs */
	sq = rq->q_syncq;
	outer = sq->sq_outer;
	if (outer != NULL) {
		ASSERT(wq->q_syncq->sq_outer == outer);
		outer_remove(outer, rq->q_syncq);
		if (wq->q_syncq != rq->q_syncq)
			outer_remove(outer, wq->q_syncq);
	}
	ASSERT(sq->sq_outer == NULL);
	ASSERT(sq->sq_onext == NULL && sq->sq_oprev == NULL);

	if (sq != SQ(rq)) {
		if (!(rq->q_flag & QPERMOD))
			free_syncq(sq);
		if (wq->q_syncq == rq->q_syncq)
			wq->q_syncq = NULL;
		rq->q_syncq = NULL;
	}
	if (wq->q_syncq != NULL && wq->q_syncq != sq &&
	    wq->q_syncq != SQ(rq)) {
		free_syncq(wq->q_syncq);
		wq->q_syncq = NULL;
	}
	ASSERT(rq->q_syncq == NULL || (rq->q_syncq->sq_head == NULL &&
				rq->q_syncq->sq_tail == NULL));
	ASSERT(wq->q_syncq == NULL || (wq->q_syncq->sq_head == NULL &&
				wq->q_syncq->sq_tail == NULL));

	if (!(rq->q_flag & QPERMOD) &&
	    rq->q_syncq != NULL && rq->q_syncq->sq_ciputctrl != NULL) {
		ASSERT(rq->q_syncq->sq_nciputctrl == n_ciputctrl - 1);
		SUMCHECK_CIPUTCTRL_COUNTS(rq->q_syncq->sq_ciputctrl,
		    rq->q_syncq->sq_nciputctrl, 0);
		ASSERT(ciputctrl_cache != NULL);
		kmem_cache_free(ciputctrl_cache, rq->q_syncq->sq_ciputctrl);
		rq->q_syncq->sq_ciputctrl = NULL;
		rq->q_syncq->sq_nciputctrl = 0;
	}

	if (!(wq->q_flag & QPERMOD) &&
	    wq->q_syncq != NULL && wq->q_syncq->sq_ciputctrl != NULL) {
		ASSERT(wq->q_syncq->sq_nciputctrl == n_ciputctrl - 1);
		SUMCHECK_CIPUTCTRL_COUNTS(wq->q_syncq->sq_ciputctrl,
		    wq->q_syncq->sq_nciputctrl, 0);
		ASSERT(ciputctrl_cache != NULL);
		kmem_cache_free(ciputctrl_cache, wq->q_syncq->sq_ciputctrl);
		wq->q_syncq->sq_ciputctrl = NULL;
		wq->q_syncq->sq_nciputctrl = 0;
	}

	sq = SQ(rq);
	ASSERT(sq->sq_head == NULL && sq->sq_tail == NULL);
	ASSERT(sq->sq_outer == NULL);
	ASSERT(sq->sq_onext == NULL && sq->sq_oprev == NULL);

	/*
	 * Create syncqs based on qflag and sqtype. Set the SQ_TYPES_IN_FLAGS
	 * bits in sq_flag based on the sqtype.
	 */
	ASSERT((sq->sq_flags & ~SQ_TYPES_IN_FLAGS) == 0);

	rq->q_syncq = wq->q_syncq = sq;
	sq->sq_type = sqtype;
	sq->sq_flags = (sqtype & SQ_TYPES_IN_FLAGS);

	/*
	 *  We are making sq_svcflags zero,
	 *  resetting SQ_DISABLED in case it was set by
	 *  wait_svc() in the munlink path.
	 *
	 */
	ASSERT((sq->sq_svcflags & SQ_SERVICE) == 0);
	sq->sq_svcflags = 0;

	/*
	 * We need to acquire the lock here for the mlink and munlink case,
	 * where canputnext, backenable, etc can access the q_flag.
	 */
	if (lock_needed) {
		mutex_enter(QLOCK(rq));
		rq->q_flag = (rq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
		mutex_exit(QLOCK(rq));
		mutex_enter(QLOCK(wq));
		wq->q_flag = (wq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
		mutex_exit(QLOCK(wq));
	} else {
		rq->q_flag = (rq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
		wq->q_flag = (wq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
	}

	if (qflag & QPERQ) {
		/* Allocate a separate syncq for the write side */
		sq = new_syncq();
		sq->sq_type = rq->q_syncq->sq_type;
		sq->sq_flags = rq->q_syncq->sq_flags;
		ASSERT(sq->sq_outer == NULL && sq->sq_onext == NULL &&
		    sq->sq_oprev == NULL);
		wq->q_syncq = sq;
	}
	if (qflag & QPERMOD) {
		sq = dmp->dm_sq;

		/*
		 * Assert that we do have an inner perimeter syncq and that it
		 * does not have an outer perimeter associated with it.
		 */
		ASSERT(sq->sq_outer == NULL && sq->sq_onext == NULL &&
		    sq->sq_oprev == NULL);
		rq->q_syncq = wq->q_syncq = sq;
	}
	if (qflag & QMTOUTPERIM) {
		outer = dmp->dm_sq;

		ASSERT(outer->sq_outer == NULL);
		outer_insert(outer, rq->q_syncq);
		if (wq->q_syncq != rq->q_syncq)
			outer_insert(outer, wq->q_syncq);
	}
	ASSERT((rq->q_syncq->sq_flags & SQ_TYPES_IN_FLAGS) ==
		(rq->q_syncq->sq_type & SQ_TYPES_IN_FLAGS));
	ASSERT((wq->q_syncq->sq_flags & SQ_TYPES_IN_FLAGS) ==
		(wq->q_syncq->sq_type & SQ_TYPES_IN_FLAGS));
	ASSERT((rq->q_flag & QMT_TYPEMASK) == (qflag & QMT_TYPEMASK));

	/*
	 * Initialize struio() types.
	 */
	rq->q_struiot =
	    (rq->q_flag & QSYNCSTR) ? rinit->qi_struiot : STRUIOT_NONE;
	wq->q_struiot =
	    (wq->q_flag & QSYNCSTR) ? winit->qi_struiot : STRUIOT_NONE;
}

perdm_t *
hold_dm(struct streamtab *str, uint32_t qflag, uint32_t sqtype)
{
	syncq_t	*sq;
	perdm_t	**pp;
	perdm_t	*p;
	perdm_t	*dmp;

	ASSERT(str != NULL);
	ASSERT(qflag & (QPERMOD | QMTOUTPERIM));

	rw_enter(&perdm_rwlock, RW_READER);
	for (p = perdm_list; p != NULL; p = p->dm_next) {
		if (p->dm_str == str) {	/* found one */
			atomic_add_32(&(p->dm_ref), 1);
			rw_exit(&perdm_rwlock);
			return (p);
		}
	}
	rw_exit(&perdm_rwlock);

	sq = new_syncq();
	if (qflag & QPERMOD) {
		sq->sq_type = sqtype | SQ_PERMOD;
		sq->sq_flags = sqtype & SQ_TYPES_IN_FLAGS;
	} else {
		ASSERT(qflag & QMTOUTPERIM);
		sq->sq_onext = sq->sq_oprev = sq;
	}

	dmp = kmem_alloc(sizeof (perdm_t), KM_SLEEP);
	dmp->dm_sq = sq;
	dmp->dm_str = str;
	dmp->dm_ref = 1;
	dmp->dm_next = NULL;

	rw_enter(&perdm_rwlock, RW_WRITER);
	for (pp = &perdm_list; (p = *pp) != NULL; pp = &(p->dm_next)) {
		if (p->dm_str == str) {	/* already present */
			p->dm_ref++;
			rw_exit(&perdm_rwlock);
			free_syncq(sq);
			kmem_free(dmp, sizeof (perdm_t));
			return (p);
		}
	}

	*pp = dmp;
	rw_exit(&perdm_rwlock);
	return (dmp);
}

void
rele_dm(perdm_t *dmp)
{
	perdm_t **pp;
	perdm_t *p;

	rw_enter(&perdm_rwlock, RW_WRITER);
	ASSERT(dmp->dm_ref > 0);

	if (--dmp->dm_ref > 0) {
		rw_exit(&perdm_rwlock);
		return;
	}

	for (pp = &perdm_list; (p = *pp) != NULL; pp = &(p->dm_next))
		if (p == dmp)
			break;
	ASSERT(p == dmp);
	*pp = p->dm_next;
	rw_exit(&perdm_rwlock);

	/*
	 * Wait for any background processing that relies on the
	 * syncq to complete before it is freed.
	 */
	wait_sq_svc(p->dm_sq);
	free_syncq(p->dm_sq);
	kmem_free(p, sizeof (perdm_t));
}

/*
 * Make a protocol message given control and data buffers.
 * n.b., this can block; be careful of what locks you hold when calling it.
 *
 * If sd_maxblk is less than *iosize this routine can fail part way through
 * (due to an allocation failure). In this case on return *iosize will contain
 * the amount that was consumed. Otherwise *iosize will not be modified
 * i.e. it will contain the amount that was consumed.
 */
int
strmakemsg(
	struct strbuf *mctl,
	ssize_t *iosize,
	struct uio *uiop,
	stdata_t *stp,
	int32_t flag,
	mblk_t **mpp)
{
	mblk_t *mpctl = NULL;
	mblk_t *mpdata = NULL;
	int error;

	ASSERT(uiop != NULL);

	*mpp = NULL;
	/* Create control part, if any */
	if ((mctl != NULL) && (mctl->len >= 0)) {
		error = strmakectl(mctl, flag, uiop->uio_fmode, &mpctl);
		if (error)
			return (error);
	}
	/* Create data part, if any */
	if (*iosize >= 0) {
		error = strmakedata(iosize, uiop, stp, flag, &mpdata);
		if (error) {
			freemsg(mpctl);
			return (error);
		}
	}
	if (mpctl != NULL) {
		if (mpdata != NULL)
			linkb(mpctl, mpdata);
		*mpp = mpctl;
	} else {
		*mpp = mpdata;
	}
	return (0);
}

/*
 * Make the control part of a protocol message given a control buffer.
 * n.b., this can block; be careful of what locks you hold when calling it.
 */
int
strmakectl(
	struct strbuf *mctl,
	int32_t flag,
	int32_t fflag,
	mblk_t **mpp)
{
	mblk_t *bp = NULL;
	unsigned char msgtype;
	int error = 0;

	*mpp = NULL;
	/*
	 * Create control part of message, if any.
	 */
	if ((mctl != NULL) && (mctl->len >= 0)) {
		caddr_t base;
		int ctlcount;
		int allocsz;

		if (flag & RS_HIPRI)
			msgtype = M_PCPROTO;
		else
			msgtype = M_PROTO;

		ctlcount = mctl->len;
		base = mctl->buf;

		/*
		 * Give modules a better chance to reuse M_PROTO/M_PCPROTO
		 * blocks by increasing the size to something more usable.
		 */
		allocsz = MAX(ctlcount, 64);

		/*
		 * Range checking has already been done; simply try
		 * to allocate a message block for the ctl part.
		 */
		while (!(bp = allocb(allocsz, BPRI_MED))) {
			if (fflag & (FNDELAY|FNONBLOCK))
				return (EAGAIN);
			if (error = strwaitbuf(allocsz, BPRI_MED))
				return (error);
		}

		bp->b_datap->db_type = msgtype;
		if (copyin(base, bp->b_wptr, ctlcount)) {
			freeb(bp);
			return (EFAULT);
		}
		bp->b_wptr += ctlcount;
	}
	*mpp = bp;
	return (0);
}

/*
 * Make a protocol message given data buffers.
 * n.b., this can block; be careful of what locks you hold when calling it.
 *
 * If sd_maxblk is less than *iosize this routine can fail part way through
 * (due to an allocation failure). In this case on return *iosize will contain
 * the amount that was consumed. Otherwise *iosize will not be modified
 * i.e. it will contain the amount that was consumed.
 */
int
strmakedata(
	ssize_t   *iosize,
	struct uio *uiop,
	stdata_t *stp,
	int32_t flag,
	mblk_t **mpp)
{
	mblk_t *mp = NULL;
	mblk_t *bp;
	int wroff = (int)stp->sd_wroff;
	int error = 0;
	ssize_t maxblk;
	ssize_t count = *iosize;
	cred_t *cr = CRED();

	*mpp = NULL;
	if (count < 0)
		return (0);

	maxblk = stp->sd_maxblk;
	if (maxblk == INFPSZ)
		maxblk = count;

	/*
	 * Create data part of message, if any.
	 */
	do {
		ssize_t size;
		dblk_t  *dp;

		ASSERT(uiop);

		size = MIN(count, maxblk);

		while ((bp = allocb_cred(size + wroff, cr)) == NULL) {
			error = EAGAIN;
			if ((uiop->uio_fmode & (FNDELAY|FNONBLOCK)) ||
			    (error = strwaitbuf(size + wroff, BPRI_MED)) != 0) {
				if (count == *iosize) {
					freemsg(mp);
					return (error);
				} else {
					*iosize -= count;
					*mpp = mp;
					return (0);
				}
			}
		}
		dp = bp->b_datap;
		dp->db_cpid = curproc->p_pid;
		ASSERT(wroff <= dp->db_lim - bp->b_wptr);
		bp->b_wptr = bp->b_rptr = bp->b_rptr + wroff;

		if (flag & STRUIO_POSTPONE) {
			/*
			 * Setup the stream uio portion of the
			 * dblk for subsequent use by struioget().
			 */
			dp->db_struioflag = STRUIO_SPEC;
			dp->db_cksumstart = 0;
			dp->db_cksumstuff = 0;
			dp->db_cksumend = size;
			*(long long *)dp->db_struioun.data = 0ll;
		} else {
			if (stp->sd_copyflag & STRCOPYCACHED)
				uiop->uio_extflg |= UIO_COPY_CACHED;

			if (size != 0) {
				error = uiomove(bp->b_wptr, size, UIO_WRITE,
				    uiop);
				if (error != 0) {
					freeb(bp);
					freemsg(mp);
					return (error);
				}
			}
		}

		bp->b_wptr += size;
		count -= size;

		if (mp == NULL)
			mp = bp;
		else
			linkb(mp, bp);
	} while (count > 0);

	*mpp = mp;
	return (0);
}

/*
 * Wait for a buffer to become available. Return non-zero errno
 * if not able to wait, 0 if buffer is probably there.
 */
int
strwaitbuf(size_t size, int pri)
{
	bufcall_id_t id;

	mutex_enter(&bcall_monitor);
	if ((id = bufcall(size, pri, (void (*)(void *))cv_broadcast,
	    &ttoproc(curthread)->p_flag_cv)) == 0) {
		mutex_exit(&bcall_monitor);
		return (ENOSR);
	}
	if (!cv_wait_sig(&(ttoproc(curthread)->p_flag_cv), &bcall_monitor)) {
		unbufcall(id);
		mutex_exit(&bcall_monitor);
		return (EINTR);
	}
	unbufcall(id);
	mutex_exit(&bcall_monitor);
	return (0);
}

/*
 * This function waits for a read or write event to happen on a stream.
 * fmode can specify FNDELAY and/or FNONBLOCK.
 * The timeout is in ms with -1 meaning infinite.
 * The flag values work as follows:
 *	READWAIT	Check for read side errors, send M_READ
 *	GETWAIT		Check for read side errors, no M_READ
 *	WRITEWAIT	Check for write side errors.
 *	NOINTR		Do not return error if nonblocking or timeout.
 * 	STR_NOERROR	Ignore all errors except STPLEX.
 *	STR_NOSIG	Ignore/hold signals during the duration of the call.
 *	STR_PEEK	Pass through the strgeterr().
 */
int
strwaitq(stdata_t *stp, int flag, ssize_t count, int fmode, clock_t timout,
    int *done)
{
	int slpflg, errs;
	int error;
	kcondvar_t *sleepon;
	mblk_t *mp;
	ssize_t *rd_count;
	clock_t rval;

	ASSERT(MUTEX_HELD(&stp->sd_lock));
	if ((flag & READWAIT) || (flag & GETWAIT)) {
		slpflg = RSLEEP;
		sleepon = &_RD(stp->sd_wrq)->q_wait;
		errs = STRDERR|STPLEX;
	} else {
		slpflg = WSLEEP;
		sleepon = &stp->sd_wrq->q_wait;
		errs = STWRERR|STRHUP|STPLEX;
	}
	if (flag & STR_NOERROR)
		errs = STPLEX;

	if (stp->sd_wakeq & slpflg) {
		/*
		 * A strwakeq() is pending, no need to sleep.
		 */
		stp->sd_wakeq &= ~slpflg;
		*done = 0;
		return (0);
	}

	if (fmode & (FNDELAY|FNONBLOCK)) {
		if (!(flag & NOINTR))
			error = EAGAIN;
		else
			error = 0;
		*done = 1;
		return (error);
	}

	if (stp->sd_flag & errs) {
		/*
		 * Check for errors before going to sleep since the
		 * caller might not have checked this while holding
		 * sd_lock.
		 */
		error = strgeterr(stp, errs, (flag & STR_PEEK));
		if (error != 0) {
			*done = 1;
			return (error);
		}
	}

	/*
	 * If any module downstream has requested read notification
	 * by setting SNDMREAD flag using M_SETOPTS, send a message
	 * down stream.
	 */
	if ((flag & READWAIT) && (stp->sd_flag & SNDMREAD)) {
		mutex_exit(&stp->sd_lock);
		if (!(mp = allocb_wait(sizeof (ssize_t), BPRI_MED,
		    (flag & STR_NOSIG), &error))) {
			mutex_enter(&stp->sd_lock);
			*done = 1;
			return (error);
		}
		mp->b_datap->db_type = M_READ;
		rd_count = (ssize_t *)mp->b_wptr;
		*rd_count = count;
		mp->b_wptr += sizeof (ssize_t);
		/*
		 * Send the number of bytes requested by the
		 * read as the argument to M_READ.
		 */
		stream_willservice(stp);
		putnext(stp->sd_wrq, mp);
		stream_runservice(stp);
		mutex_enter(&stp->sd_lock);

		/*
		 * If any data arrived due to inline processing
		 * of putnext(), don't sleep.
		 */
		if (_RD(stp->sd_wrq)->q_first != NULL) {
			*done = 0;
			return (0);
		}
	}

	stp->sd_flag |= slpflg;
	TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_WAIT2,
		"strwaitq sleeps (2):%p, %X, %lX, %X, %p",
		stp, flag, count, fmode, done);

	rval = str_cv_wait(sleepon, &stp->sd_lock, timout, flag & STR_NOSIG);
	if (rval > 0) {
		/* EMPTY */
		TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_WAKE2,
			"strwaitq awakes(2):%X, %X, %X, %X, %X",
			stp, flag, count, fmode, done);
	} else if (rval == 0) {
		TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_INTR2,
			"strwaitq interrupt #2:%p, %X, %lX, %X, %p",
			stp, flag, count, fmode, done);
		stp->sd_flag &= ~slpflg;
		cv_broadcast(sleepon);
		if (!(flag & NOINTR))
			error = EINTR;
		else
			error = 0;
		*done = 1;
		return (error);
	} else {
		/* timeout */
		TRACE_5(TR_FAC_STREAMS_FR, TR_STRWAITQ_TIME,
			"strwaitq timeout:%p, %X, %lX, %X, %p",
			stp, flag, count, fmode, done);
		*done = 1;
		if (!(flag & NOINTR))
			return (ETIME);
		else
			return (0);
	}
	/*
	 * If the caller implements delayed errors (i.e. queued after data)
	 * we can not check for errors here since data as well as an
	 * error might have arrived at the stream head. We return to
	 * have the caller check the read queue before checking for errors.
	 */
	if ((stp->sd_flag & errs) && !(flag & STR_DELAYERR)) {
		error = strgeterr(stp, errs, (flag & STR_PEEK));
		if (error != 0) {
			*done = 1;
			return (error);
		}
	}
	*done = 0;
	return (0);
}

/*
 * Perform job control discipline access checks.
 * Return 0 for success and the errno for failure.
 */

#define	cantsend(p, t, sig) \
	(sigismember(&(p)->p_ignore, sig) || signal_is_blocked((t), sig))

int
straccess(struct stdata *stp, enum jcaccess mode)
{
	extern kcondvar_t lbolt_cv;	/* XXX: should be in a header file */
	kthread_t *t = curthread;
	proc_t *p = ttoproc(t);
	sess_t *sp;

	if (stp->sd_sidp == NULL || stp->sd_vnode->v_type == VFIFO)
		return (0);

	mutex_enter(&p->p_lock);
	sp = p->p_sessp;

	for (;;) {
		/*
		 * If this is not the calling process's controlling terminal
		 * or if the calling process is already in the foreground
		 * then allow access.
		 */
		if (sp->s_dev != stp->sd_vnode->v_rdev ||
		    p->p_pgidp == stp->sd_pgidp) {
			mutex_exit(&p->p_lock);
			return (0);
		}

		/*
		 * Check to see if controlling terminal has been deallocated.
		 */
		if (sp->s_vp == NULL) {
			if (!cantsend(p, t, SIGHUP))
				sigtoproc(p, t, SIGHUP);
			mutex_exit(&p->p_lock);
			return (EIO);
		}

		if (mode == JCGETP) {
			mutex_exit(&p->p_lock);
			return (0);
		}

		if (mode == JCREAD) {
			if (p->p_detached || cantsend(p, t, SIGTTIN)) {
				mutex_exit(&p->p_lock);
				return (EIO);
			}
			mutex_exit(&p->p_lock);
			pgsignal(p->p_pgidp, SIGTTIN);
			mutex_enter(&p->p_lock);
		} else {  /* mode == JCWRITE or JCSETP */
			if ((mode == JCWRITE && !(stp->sd_flag & STRTOSTOP)) ||
			    cantsend(p, t, SIGTTOU)) {
				mutex_exit(&p->p_lock);
				return (0);
			}
			if (p->p_detached) {
				mutex_exit(&p->p_lock);
				return (EIO);
			}
			mutex_exit(&p->p_lock);
			pgsignal(p->p_pgidp, SIGTTOU);
			mutex_enter(&p->p_lock);
		}

		/*
		 * We call cv_wait_sig_swap() to cause the appropriate
		 * action for the jobcontrol signal to take place.
		 * If the signal is being caught, we will take the
		 * EINTR error return.  Otherwise, the default action
		 * of causing the process to stop will take place.
		 * In this case, we rely on the periodic cv_broadcast() on
		 * &lbolt_cv to wake us up to loop around and test again.
		 * We can't get here if the signal is ignored or
		 * if the current thread is blocking the signal.
		 */
		if (!cv_wait_sig_swap(&lbolt_cv, &p->p_lock)) {
			mutex_exit(&p->p_lock);
			return (EINTR);
		}
	}
}

/*
 * Return size of message of block type (bp->b_datap->db_type)
 */
size_t
xmsgsize(mblk_t *bp)
{
	unsigned char type;
	size_t count = 0;

	type = bp->b_datap->db_type;

	for (; bp; bp = bp->b_cont) {
		if (type != bp->b_datap->db_type)
			break;
		ASSERT(bp->b_wptr >= bp->b_rptr);
		count += bp->b_wptr - bp->b_rptr;
	}
	return (count);
}

/*
 * Allocate a stream head.
 */
struct stdata *
shalloc(queue_t *qp)
{
	stdata_t *stp;

	stp = kmem_cache_alloc(stream_head_cache, KM_SLEEP);

	stp->sd_wrq = _WR(qp);
	stp->sd_strtab = NULL;
	stp->sd_iocid = 0;
	stp->sd_mate = NULL;
	stp->sd_freezer = NULL;
	stp->sd_refcnt = 0;
	stp->sd_wakeq = 0;
	stp->sd_anchor = 0;
	stp->sd_struiowrq = NULL;
	stp->sd_struiordq = NULL;
	stp->sd_struiodnak = 0;
	stp->sd_struionak = NULL;
#ifdef C2_AUDIT
	stp->sd_t_audit_data = NULL;
#endif
	stp->sd_rput_opt = 0;
	stp->sd_wput_opt = 0;
	stp->sd_read_opt = 0;
	stp->sd_rprotofunc = strrput_proto;
	stp->sd_rmiscfunc = strrput_misc;
	stp->sd_rderrfunc = stp->sd_wrerrfunc = NULL;
	stp->sd_ciputctrl = NULL;
	stp->sd_nciputctrl = 0;
	stp->sd_qhead = NULL;
	stp->sd_qtail = NULL;
	stp->sd_servid = NULL;
	stp->sd_nqueues = 0;
	stp->sd_svcflags = 0;
	stp->sd_copyflag = 0;
	return (stp);
}

/*
 * Free a stream head.
 */
void
shfree(stdata_t *stp)
{
	ASSERT(MUTEX_NOT_HELD(&stp->sd_lock));

	stp->sd_wrq = NULL;

	mutex_enter(&stp->sd_qlock);
	while (stp->sd_svcflags & STRS_SCHEDULED) {
		STRSTAT(strwaits);
		cv_wait(&stp->sd_qcv, &stp->sd_qlock);
	}
	mutex_exit(&stp->sd_qlock);

	if (stp->sd_ciputctrl != NULL) {
		ASSERT(stp->sd_nciputctrl == n_ciputctrl - 1);
		SUMCHECK_CIPUTCTRL_COUNTS(stp->sd_ciputctrl,
		    stp->sd_nciputctrl, 0);
		ASSERT(ciputctrl_cache != NULL);
		kmem_cache_free(ciputctrl_cache, stp->sd_ciputctrl);
		stp->sd_ciputctrl = NULL;
		stp->sd_nciputctrl = 0;
	}
	ASSERT(stp->sd_qhead == NULL);
	ASSERT(stp->sd_qtail == NULL);
	ASSERT(stp->sd_nqueues == 0);
	kmem_cache_free(stream_head_cache, stp);
}

/*
 * Allocate a pair of queues and a syncq for the pair
 */
queue_t *
allocq(void)
{
	queinfo_t *qip;
	queue_t *qp, *wqp;
	syncq_t	*sq;

	qip = kmem_cache_alloc(queue_cache, KM_SLEEP);

	qp = &qip->qu_rqueue;
	wqp = &qip->qu_wqueue;
	sq = &qip->qu_syncq;

	qp->q_last	= NULL;
	qp->q_next	= NULL;
	qp->q_ptr	= NULL;
	qp->q_flag	= QUSE | QREADR;
	qp->q_bandp	= NULL;
	qp->q_stream	= NULL;
	qp->q_syncq	= sq;
	qp->q_nband	= 0;
	qp->q_nfsrv	= NULL;
	qp->q_draining	= 0;
	qp->q_syncqmsgs	= 0;
	qp->q_spri	= 0;
	qp->q_qtstamp	= 0;
	qp->q_sqtstamp	= 0;
	qp->q_fp	= NULL;

	wqp->q_last	= NULL;
	wqp->q_next	= NULL;
	wqp->q_ptr	= NULL;
	wqp->q_flag	= QUSE;
	wqp->q_bandp	= NULL;
	wqp->q_stream	= NULL;
	wqp->q_syncq	= sq;
	wqp->q_nband	= 0;
	wqp->q_nfsrv	= NULL;
	wqp->q_draining	= 0;
	wqp->q_syncqmsgs = 0;
	wqp->q_qtstamp	= 0;
	wqp->q_sqtstamp	= 0;
	wqp->q_spri	= 0;

	sq->sq_count	= 0;
	sq->sq_rmqcount	= 0;
	sq->sq_flags	= 0;
	sq->sq_type	= 0;
	sq->sq_callbflags = 0;
	sq->sq_cancelid	= 0;
	sq->sq_ciputctrl = NULL;
	sq->sq_nciputctrl = 0;
	sq->sq_needexcl = 0;
	sq->sq_svcflags = 0;

	return (qp);
}

/*
 * Free a pair of queues and the "attached" syncq.
 * Discard any messages left on the syncq(s), remove the syncq(s) from the
 * outer perimeter, and free the syncq(s) if they are not the "attached" syncq.
 */
void
freeq(queue_t *qp)
{
	qband_t *qbp, *nqbp;
	syncq_t *sq, *outer;
	queue_t *wqp = _WR(qp);

	ASSERT(qp->q_flag & QREADR);

	(void) flush_syncq(qp->q_syncq, qp);
	(void) flush_syncq(wqp->q_syncq, wqp);
	ASSERT(qp->q_syncqmsgs == 0 && wqp->q_syncqmsgs == 0);

	outer = qp->q_syncq->sq_outer;
	if (outer != NULL) {
		outer_remove(outer, qp->q_syncq);
		if (wqp->q_syncq != qp->q_syncq)
			outer_remove(outer, wqp->q_syncq);
	}
	/*
	 * Free any syncqs that are outside what allocq returned.
	 */
	if (qp->q_syncq != SQ(qp) && !(qp->q_flag & QPERMOD))
		free_syncq(qp->q_syncq);
	if (qp->q_syncq != wqp->q_syncq && wqp->q_syncq != SQ(qp))
		free_syncq(wqp->q_syncq);

	ASSERT((qp->q_sqflags & (Q_SQQUEUED | Q_SQDRAINING)) == 0);
	ASSERT((wqp->q_sqflags & (Q_SQQUEUED | Q_SQDRAINING)) == 0);
	ASSERT(MUTEX_NOT_HELD(QLOCK(qp)));
	ASSERT(MUTEX_NOT_HELD(QLOCK(wqp)));
	sq = SQ(qp);
	ASSERT(MUTEX_NOT_HELD(SQLOCK(sq)));
	ASSERT(sq->sq_head == NULL && sq->sq_tail == NULL);
	ASSERT(sq->sq_outer == NULL);
	ASSERT(sq->sq_onext == NULL && sq->sq_oprev == NULL);
	ASSERT(sq->sq_callbpend == NULL);
	ASSERT(sq->sq_needexcl == 0);

	if (sq->sq_ciputctrl != NULL) {
		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
		SUMCHECK_CIPUTCTRL_COUNTS(sq->sq_ciputctrl,
		    sq->sq_nciputctrl, 0);
		ASSERT(ciputctrl_cache != NULL);
		kmem_cache_free(ciputctrl_cache, sq->sq_ciputctrl);
		sq->sq_ciputctrl = NULL;
		sq->sq_nciputctrl = 0;
	}

	ASSERT(qp->q_first == NULL && wqp->q_first == NULL);
	ASSERT(qp->q_count == 0 && wqp->q_count == 0);
	ASSERT(qp->q_mblkcnt == 0 && wqp->q_mblkcnt == 0);

	qp->q_flag &= ~QUSE;
	wqp->q_flag &= ~QUSE;

	/* NOTE: Uncomment the assert below once bugid 1159635 is fixed. */
	/* ASSERT((qp->q_flag & QWANTW) == 0 && (wqp->q_flag & QWANTW) == 0); */

	qbp = qp->q_bandp;
	while (qbp) {
		nqbp = qbp->qb_next;
		freeband(qbp);
		qbp = nqbp;
	}
	qbp = wqp->q_bandp;
	while (qbp) {
		nqbp = qbp->qb_next;
		freeband(qbp);
		qbp = nqbp;
	}
	kmem_cache_free(queue_cache, qp);
}

/*
 * Allocate a qband structure.
 */
qband_t *
allocband(void)
{
	qband_t *qbp;

	qbp = kmem_cache_alloc(qband_cache, KM_NOSLEEP);
	if (qbp == NULL)
		return (NULL);

	qbp->qb_next	= NULL;
	qbp->qb_count	= 0;
	qbp->qb_mblkcnt	= 0;
	qbp->qb_first	= NULL;
	qbp->qb_last	= NULL;
	qbp->qb_flag	= 0;

	return (qbp);
}

/*
 * Free a qband structure.
 */
void
freeband(qband_t *qbp)
{
	kmem_cache_free(qband_cache, qbp);
}

/*
 * Just like putnextctl(9F), except that allocb_wait() is used.
 *
 * Consolidation Private, and of course only callable from the stream head or
 * routines that may block.
 */
int
putnextctl_wait(queue_t *q, int type)
{
	mblk_t *bp;
	int error;

	if ((datamsg(type) && (type != M_DELAY)) ||
	    (bp = allocb_wait(0, BPRI_HI, 0, &error)) == NULL)
		return (0);

	bp->b_datap->db_type = (unsigned char)type;
	putnext(q, bp);
	return (1);
}

/*
 * run any possible bufcalls.
 */
void
runbufcalls(void)
{
	strbufcall_t *bcp;

	mutex_enter(&bcall_monitor);
	mutex_enter(&strbcall_lock);

	if (strbcalls.bc_head) {
		size_t count;
		int nevent;

		/*
		 * count how many events are on the list
		 * now so we can check to avoid looping
		 * in low memory situations
		 */
		nevent = 0;
		for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next)
			nevent++;

		/*
		 * get estimate of available memory from kmem_avail().
		 * awake all bufcall functions waiting for
		 * memory whose request could be satisfied
		 * by 'count' memory and let 'em fight for it.
		 */
		count = kmem_avail();
		while ((bcp = strbcalls.bc_head) != NULL && nevent) {
			STRSTAT(bufcalls);
			--nevent;
			if (bcp->bc_size <= count) {
				bcp->bc_executor = curthread;
				mutex_exit(&strbcall_lock);
				(*bcp->bc_func)(bcp->bc_arg);
				mutex_enter(&strbcall_lock);
				bcp->bc_executor = NULL;
				cv_broadcast(&bcall_cv);
				strbcalls.bc_head = bcp->bc_next;
				kmem_free(bcp, sizeof (strbufcall_t));
			} else {
				/*
				 * too big, try again later - note
				 * that nevent was decremented above
				 * so we won't retry this one on this
				 * iteration of the loop
				 */
				if (bcp->bc_next != NULL) {
					strbcalls.bc_head = bcp->bc_next;
					bcp->bc_next = NULL;
					strbcalls.bc_tail->bc_next = bcp;
					strbcalls.bc_tail = bcp;
				}
			}
		}
		if (strbcalls.bc_head == NULL)
			strbcalls.bc_tail = NULL;
	}

	mutex_exit(&strbcall_lock);
	mutex_exit(&bcall_monitor);
}


/*
 * actually run queue's service routine.
 */
static void
runservice(queue_t *q)
{
	qband_t *qbp;

	ASSERT(q->q_qinfo->qi_srvp);
again:
	entersq(q->q_syncq, SQ_SVC);
	TRACE_1(TR_FAC_STREAMS_FR, TR_QRUNSERVICE_START,
		"runservice starts:%p", q);

	if (!(q->q_flag & QWCLOSE))
		(*q->q_qinfo->qi_srvp)(q);

	TRACE_1(TR_FAC_STREAMS_FR, TR_QRUNSERVICE_END,
		"runservice ends:(%p)", q);

	leavesq(q->q_syncq, SQ_SVC);

	mutex_enter(QLOCK(q));
	if (q->q_flag & QENAB) {
		q->q_flag &= ~QENAB;
		mutex_exit(QLOCK(q));
		goto again;
	}
	q->q_flag &= ~QINSERVICE;
	q->q_flag &= ~QBACK;
	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next)
		qbp->qb_flag &= ~QB_BACK;
	/*
	 * Wakeup thread waiting for the service procedure
	 * to be run (strclose and qdetach).
	 */
	cv_broadcast(&q->q_wait);

	mutex_exit(QLOCK(q));
}

/*
 * Background processing of bufcalls.
 */
void
streams_bufcall_service(void)
{
	callb_cpr_t	cprinfo;

	CALLB_CPR_INIT(&cprinfo, &strbcall_lock, callb_generic_cpr,
	    "streams_bufcall_service");

	mutex_enter(&strbcall_lock);

	for (;;) {
		if (strbcalls.bc_head != NULL && kmem_avail() > 0) {
			mutex_exit(&strbcall_lock);
			runbufcalls();
			mutex_enter(&strbcall_lock);
		}
		if (strbcalls.bc_head != NULL) {
			clock_t wt, tick;

			STRSTAT(bcwaits);
			/* Wait for memory to become available */
			CALLB_CPR_SAFE_BEGIN(&cprinfo);
			tick = SEC_TO_TICK(60);
			time_to_wait(&wt, tick);
			(void) cv_timedwait(&memavail_cv, &strbcall_lock, wt);
			CALLB_CPR_SAFE_END(&cprinfo, &strbcall_lock);
		}

		/* Wait for new work to arrive */
		if (strbcalls.bc_head == NULL) {
			CALLB_CPR_SAFE_BEGIN(&cprinfo);
			cv_wait(&strbcall_cv, &strbcall_lock);
			CALLB_CPR_SAFE_END(&cprinfo, &strbcall_lock);
		}
	}
}

/*
 * Background processing of streams background tasks which failed
 * taskq_dispatch.
 */
static void
streams_qbkgrnd_service(void)
{
	callb_cpr_t cprinfo;
	queue_t *q;

	CALLB_CPR_INIT(&cprinfo, &service_queue, callb_generic_cpr,
	    "streams_bkgrnd_service");

	mutex_enter(&service_queue);

	for (;;) {
		/*
		 * Wait for work to arrive.
		 */
		while ((freebs_list == NULL) && (qhead == NULL)) {
			CALLB_CPR_SAFE_BEGIN(&cprinfo);
			cv_wait(&services_to_run, &service_queue);
			CALLB_CPR_SAFE_END(&cprinfo, &service_queue);
		}
		/*
		 * Handle all pending freebs requests to free memory.
		 */
		while (freebs_list != NULL) {
			mblk_t *mp = freebs_list;
			freebs_list = mp->b_next;
			mutex_exit(&service_queue);
			mblk_free(mp);
			mutex_enter(&service_queue);
		}
		/*
		 * Run pending queues.
		 */
		while (qhead != NULL) {
			DQ(q, qhead, qtail, q_link);
			ASSERT(q != NULL);
			mutex_exit(&service_queue);
			queue_service(q);
			mutex_enter(&service_queue);
		}
		ASSERT(qhead == NULL && qtail == NULL);
	}
}

/*
 * Background processing of streams background tasks which failed
 * taskq_dispatch.
 */
static void
streams_sqbkgrnd_service(void)
{
	callb_cpr_t cprinfo;
	syncq_t *sq;

	CALLB_CPR_INIT(&cprinfo, &service_queue, callb_generic_cpr,
	    "streams_sqbkgrnd_service");

	mutex_enter(&service_queue);

	for (;;) {
		/*
		 * Wait for work to arrive.
		 */
		while (sqhead == NULL) {
			CALLB_CPR_SAFE_BEGIN(&cprinfo);
			cv_wait(&syncqs_to_run, &service_queue);
			CALLB_CPR_SAFE_END(&cprinfo, &service_queue);
		}

		/*
		 * Run pending syncqs.
		 */
		while (sqhead != NULL) {
			DQ(sq, sqhead, sqtail, sq_next);
			ASSERT(sq != NULL);
			ASSERT(sq->sq_svcflags & SQ_BGTHREAD);
			mutex_exit(&service_queue);
			syncq_service(sq);
			mutex_enter(&service_queue);
		}
	}
}

/*
 * Disable the syncq and wait for background syncq processing to complete.
 * If the syncq is placed on the sqhead/sqtail queue, try to remove it from the
 * list.
 */
void
wait_sq_svc(syncq_t *sq)
{
	mutex_enter(SQLOCK(sq));
	sq->sq_svcflags |= SQ_DISABLED;
	if (sq->sq_svcflags & SQ_BGTHREAD) {
		syncq_t *sq_chase;
		syncq_t *sq_curr;
		int removed;

		ASSERT(sq->sq_servcount == 1);
		mutex_enter(&service_queue);
		RMQ(sq, sqhead, sqtail, sq_next, sq_chase, sq_curr, removed);
		mutex_exit(&service_queue);
		if (removed) {
			sq->sq_svcflags &= ~SQ_BGTHREAD;
			sq->sq_servcount = 0;
			STRSTAT(sqremoved);
			goto done;
		}
	}
	while (sq->sq_servcount != 0) {
		sq->sq_flags |= SQ_WANTWAKEUP;
		cv_wait(&sq->sq_wait, SQLOCK(sq));
	}
done:
	mutex_exit(SQLOCK(sq));
}

/*
 * Put a syncq on the list of syncq's to be serviced by the sqthread.
 * Add the argument to the end of the sqhead list and set the flag
 * indicating this syncq has been enabled.  If it has already been
 * enabled, don't do anything.
 * This routine assumes that SQLOCK is held.
 * NOTE that the lock order is to have the SQLOCK first,
 * so if the service_syncq lock is held, we need to release it
 * before aquiring the SQLOCK (mostly relevant for the background
 * thread, and this seems to be common among the STREAMS global locks).
 * Note the the sq_svcflags are protected by the SQLOCK.
 */
void
sqenable(syncq_t *sq)
{
	/*
	 * This is probably not important except for where I believe it
	 * is being called.  At that point, it should be held (and it
	 * is a pain to release it just for this routine, so don't do
	 * it).
	 */
	ASSERT(MUTEX_HELD(SQLOCK(sq)));

	IMPLY(sq->sq_servcount == 0, sq->sq_next == NULL);
	IMPLY(sq->sq_next != NULL, sq->sq_svcflags & SQ_BGTHREAD);

	/*
	 * Do not put on list if background thread is scheduled or
	 * syncq is disabled.
	 */
	if (sq->sq_svcflags & (SQ_DISABLED | SQ_BGTHREAD))
		return;

	/*
	 * Check whether we should enable sq at all.
	 * Non PERMOD syncqs may be drained by at most one thread.
	 * PERMOD syncqs may be drained by several threads but we limit the
	 * total amount to the lesser of
	 *	Number of queues on the squeue and
	 *	Number of CPUs.
	 */
	if (sq->sq_servcount != 0) {
		if (((sq->sq_type & SQ_PERMOD) == 0) ||
		    (sq->sq_servcount >= MIN(sq->sq_nqueues, ncpus_online))) {
			STRSTAT(sqtoomany);
			return;
		}
	}

	sq->sq_tstamp = lbolt;
	STRSTAT(sqenables);

	/* Attempt a taskq dispatch */
	sq->sq_servid = (void *)taskq_dispatch(streams_taskq,
	    (task_func_t *)syncq_service, sq, TQ_NOSLEEP | TQ_NOQUEUE);
	if (sq->sq_servid != NULL) {
		sq->sq_servcount++;
		return;
	}

	/*
	 * This taskq dispatch failed, but a previous one may have succeeded.
	 * Don't try to schedule on the background thread whilst there is
	 * outstanding taskq processing.
	 */
	if (sq->sq_servcount != 0)
		return;

	/*
	 * System is low on resources and can't perform a non-sleeping
	 * dispatch. Schedule the syncq for a background thread and mark the
	 * syncq to avoid any further taskq dispatch attempts.
	 */
	mutex_enter(&service_queue);
	STRSTAT(taskqfails);
	ENQUEUE(sq, sqhead, sqtail, sq_next);
	sq->sq_svcflags |= SQ_BGTHREAD;
	sq->sq_servcount = 1;
	cv_signal(&syncqs_to_run);
	mutex_exit(&service_queue);
}

/*
 * Note: fifo_close() depends on the mblk_t on the queue being freed
 * asynchronously. The asynchronous freeing of messages breaks the
 * recursive call chain of fifo_close() while there are I_SENDFD type of
 * messages refering other file pointers on the queue. Then when
 * closing pipes it can avoid stack overflow in case of daisy-chained
 * pipes, and also avoid deadlock in case of fifonode_t pairs (which
 * share the same fifolock_t).
 */

/* ARGSUSED */
void
freebs_enqueue(mblk_t *mp, dblk_t *dbp)
{
	ASSERT(dbp->db_mblk == mp);

	/*
	 * Check data sanity. The dblock should have non-empty free function.
	 * It is better to panic here then later when the dblock is freed
	 * asynchronously when the context is lost.
	 */
	if (dbp->db_frtnp->free_func == NULL) {
		panic("freebs_enqueue: dblock %p has a NULL free callback",
		    (void *) dbp);
	}

	STRSTAT(freebs);
	if (taskq_dispatch(streams_taskq, (task_func_t *)mblk_free, mp,
	    TQ_NOSLEEP) == NULL) {
		/*
		 * System is low on resources and can't perform a non-sleeping
		 * dispatch. Schedule for a background thread.
		 */
		mutex_enter(&service_queue);
		STRSTAT(taskqfails);
		mp->b_next = freebs_list;
		freebs_list = mp;
		cv_signal(&services_to_run);
		mutex_exit(&service_queue);
	}
}

/*
 * Set the QBACK or QB_BACK flag in the given queue for
 * the given priority band.
 */
void
setqback(queue_t *q, unsigned char pri)
{
	int i;
	qband_t *qbp;
	qband_t **qbpp;

	ASSERT(MUTEX_HELD(QLOCK(q)));
	if (pri != 0) {
		if (pri > q->q_nband) {
			qbpp = &q->q_bandp;
			while (*qbpp)
				qbpp = &(*qbpp)->qb_next;
			while (pri > q->q_nband) {
				if ((*qbpp = allocband()) == NULL) {
					cmn_err(CE_WARN,
					    "setqback: can't allocate qband\n");
					return;
				}
				(*qbpp)->qb_hiwat = q->q_hiwat;
				(*qbpp)->qb_lowat = q->q_lowat;
				q->q_nband++;
				qbpp = &(*qbpp)->qb_next;
			}
		}
		qbp = q->q_bandp;
		i = pri;
		while (--i)
			qbp = qbp->qb_next;
		qbp->qb_flag |= QB_BACK;
	} else {
		q->q_flag |= QBACK;
	}
}

int
strcopyin(void *from, void *to, size_t len, int copyflag)
{
	if (copyflag & U_TO_K) {
		ASSERT((copyflag & K_TO_K) == 0);
		if (copyin(from, to, len))
			return (EFAULT);
	} else {
		ASSERT(copyflag & K_TO_K);
		bcopy(from, to, len);
	}
	return (0);
}

int
strcopyout(void *from, void *to, size_t len, int copyflag)
{
	if (copyflag & U_TO_K) {
		if (copyout(from, to, len))
			return (EFAULT);
	} else {
		ASSERT(copyflag & K_TO_K);
		bcopy(from, to, len);
	}
	return (0);
}

/*
 * strsignal_nolock() posts a signal to the process(es) at the stream head.
 * It assumes that the stream head lock is already held, whereas strsignal()
 * acquires the lock first.  This routine was created because a few callers
 * release the stream head lock before calling only to re-acquire it after
 * it returns.
 */
void
strsignal_nolock(stdata_t *stp, int sig, int32_t band)
{
	ASSERT(MUTEX_HELD(&stp->sd_lock));
	switch (sig) {
	case SIGPOLL:
		if (stp->sd_sigflags & S_MSG)
			strsendsig(stp->sd_siglist, S_MSG, (uchar_t)band, 0);
		break;

	default:
		if (stp->sd_pgidp) {
			pgsignal(stp->sd_pgidp, sig);
		}
		break;
	}
}

void
strsignal(stdata_t *stp, int sig, int32_t band)
{
	TRACE_3(TR_FAC_STREAMS_FR, TR_SENDSIG,
		"strsignal:%p, %X, %X", stp, sig, band);

	mutex_enter(&stp->sd_lock);
	switch (sig) {
	case SIGPOLL:
		if (stp->sd_sigflags & S_MSG)
			strsendsig(stp->sd_siglist, S_MSG, (uchar_t)band, 0);
		break;

	default:
		if (stp->sd_pgidp) {
			pgsignal(stp->sd_pgidp, sig);
		}
		break;
	}
	mutex_exit(&stp->sd_lock);
}

void
strhup(stdata_t *stp)
{
	pollwakeup(&stp->sd_pollist, POLLHUP);
	mutex_enter(&stp->sd_lock);
	if (stp->sd_sigflags & S_HANGUP)
		strsendsig(stp->sd_siglist, S_HANGUP, 0, 0);
	mutex_exit(&stp->sd_lock);
}

void
stralloctty(stdata_t *stp)
{
	proc_t *p = curproc;
	sess_t *sp = p->p_sessp;

	mutex_enter(&stp->sd_lock);
	/*
	 * No need to hold the session lock or do a TTY_HOLD() because
	 * this is the only thread that can be the session leader and not
	 * have a controlling tty.
	 */
	if ((stp->sd_flag &
	    (STRHUP|STRDERR|STWRERR|STPLEX|STRISTTY)) == STRISTTY &&
	    stp->sd_sidp == NULL &&		/* not allocated as ctty */
	    sp->s_sidp == p->p_pidp &&		/* session leader */
	    sp->s_flag != SESS_CLOSE &&		/* session is not closing */
	    sp->s_vp == NULL) {			/* without ctty */
		ASSERT(stp->sd_pgidp == NULL);
		alloctty(p, makectty(stp->sd_vnode));

		mutex_enter(&pidlock);
		stp->sd_sidp = sp->s_sidp;
		stp->sd_pgidp = sp->s_sidp;
		PID_HOLD(stp->sd_pgidp);
		PID_HOLD(stp->sd_sidp);
		mutex_exit(&pidlock);
	}
	mutex_exit(&stp->sd_lock);
}

void
strfreectty(stdata_t *stp)
{
	mutex_enter(&stp->sd_lock);
	pgsignal(stp->sd_pgidp, SIGHUP);
	mutex_enter(&pidlock);
	PID_RELE(stp->sd_pgidp);
	PID_RELE(stp->sd_sidp);
	stp->sd_pgidp = NULL;
	stp->sd_sidp = NULL;
	mutex_exit(&pidlock);
	mutex_exit(&stp->sd_lock);
	if (!(stp->sd_flag & STRHUP))
		strhup(stp);
}
/*
 * Backenable the first queue upstream from `q' with a service procedure.
 */
void
backenable(queue_t *q, uchar_t pri)
{
	queue_t	*nq;

	/*
	 * our presence might not prevent other modules in our own
	 * stream from popping/pushing since the caller of getq might not
	 * have a claim on the queue (some drivers do a getq on somebody
	 * else's queue - they know that the queue itself is not going away
	 * but the framework has to guarantee q_next in that stream.)
	 */
	claimstr(q);

	/* find nearest back queue with service proc */
	for (nq = backq(q); nq && !nq->q_qinfo->qi_srvp; nq = backq(nq)) {
		ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(nq));
	}

	if (nq) {
		kthread_t *freezer;
		/*
		 * backenable can be called either with no locks held
		 * or with the stream frozen (the latter occurs when a module
		 * calls rmvq with the stream frozen.) If the stream is frozen
		 * by the caller the caller will hold all qlocks in the stream.
		 */
		freezer = STREAM(q)->sd_freezer;
		if (freezer != curthread) {
			mutex_enter(QLOCK(nq));
		}
#ifdef DEBUG
		else {
			ASSERT(frozenstr(q));
			ASSERT(MUTEX_HELD(QLOCK(q)));
			ASSERT(MUTEX_HELD(QLOCK(nq)));
		}
#endif
		setqback(nq, pri);
		qenable_locked(nq);
		if (freezer != curthread)
			mutex_exit(QLOCK(nq));
	}
	releasestr(q);
}

/*
 * Return the appropriate errno when one of flags_to_check is set
 * in sd_flags. Uses the exported error routines if they are set.
 * Will return 0 if non error is set (or if the exported error routines
 * do not return an error).
 *
 * If there is both a read and write error to check we prefer the read error.
 * Also, give preference to recorded errno's over the error functions.
 * The flags that are handled are:
 *	STPLEX		return EINVAL
 *	STRDERR		return sd_rerror (and clear if STRDERRNONPERSIST)
 *	STWRERR		return sd_werror (and clear if STWRERRNONPERSIST)
 *	STRHUP		return sd_werror
 *
 * If the caller indicates that the operation is a peek a nonpersistent error
 * is not cleared.
 */
int
strgeterr(stdata_t *stp, int32_t flags_to_check, int ispeek)
{
	int32_t sd_flag = stp->sd_flag & flags_to_check;
	int error = 0;

	ASSERT(MUTEX_HELD(&stp->sd_lock));
	ASSERT((flags_to_check & ~(STRDERR|STWRERR|STRHUP|STPLEX)) == 0);
	if (sd_flag & STPLEX)
		error = EINVAL;
	else if (sd_flag & STRDERR) {
		error = stp->sd_rerror;
		if ((stp->sd_flag & STRDERRNONPERSIST) && !ispeek) {
			/*
			 * Read errors are non-persistent i.e. discarded once
			 * returned to a non-peeking caller,
			 */
			stp->sd_rerror = 0;
			stp->sd_flag &= ~STRDERR;
		}
		if (error == 0 && stp->sd_rderrfunc != NULL) {
			int clearerr = 0;

			error = (*stp->sd_rderrfunc)(stp->sd_vnode, ispeek,
						&clearerr);
			if (clearerr) {
				stp->sd_flag &= ~STRDERR;
				stp->sd_rderrfunc = NULL;
			}
		}
	} else if (sd_flag & STWRERR) {
		error = stp->sd_werror;
		if ((stp->sd_flag & STWRERRNONPERSIST) && !ispeek) {
			/*
			 * Write errors are non-persistent i.e. discarded once
			 * returned to a non-peeking caller,
			 */
			stp->sd_werror = 0;
			stp->sd_flag &= ~STWRERR;
		}
		if (error == 0 && stp->sd_wrerrfunc != NULL) {
			int clearerr = 0;

			error = (*stp->sd_wrerrfunc)(stp->sd_vnode, ispeek,
						&clearerr);
			if (clearerr) {
				stp->sd_flag &= ~STWRERR;
				stp->sd_wrerrfunc = NULL;
			}
		}
	} else if (sd_flag & STRHUP) {
		/* sd_werror set when STRHUP */
		error = stp->sd_werror;
	}
	return (error);
}


/*
 * single-thread open/close/push/pop
 * for twisted streams also
 */
int
strstartplumb(stdata_t *stp, int flag, int cmd)
{
	int waited = 1;
	int error = 0;

	if (STRMATED(stp)) {
		struct stdata *stmatep = stp->sd_mate;

		STRLOCKMATES(stp);
		while (waited) {
			waited = 0;
			while (stmatep->sd_flag & (STWOPEN|STRCLOSE|STRPLUMB)) {
				if ((cmd == I_POP) &&
				    (flag & (FNDELAY|FNONBLOCK))) {
					STRUNLOCKMATES(stp);
					return (EAGAIN);
				}
				waited = 1;
				mutex_exit(&stp->sd_lock);
				if (!cv_wait_sig(&stmatep->sd_monitor,
				    &stmatep->sd_lock)) {
					mutex_exit(&stmatep->sd_lock);
					return (EINTR);
				}
				mutex_exit(&stmatep->sd_lock);
				STRLOCKMATES(stp);
			}
			while (stp->sd_flag & (STWOPEN|STRCLOSE|STRPLUMB)) {
				if ((cmd == I_POP) &&
					(flag & (FNDELAY|FNONBLOCK))) {
					STRUNLOCKMATES(stp);
					return (EAGAIN);
				}
				waited = 1;
				mutex_exit(&stmatep->sd_lock);
				if (!cv_wait_sig(&stp->sd_monitor,
				    &stp->sd_lock)) {
					mutex_exit(&stp->sd_lock);
					return (EINTR);
				}
				mutex_exit(&stp->sd_lock);
				STRLOCKMATES(stp);
			}
			if (stp->sd_flag & (STRDERR|STWRERR|STRHUP|STPLEX)) {
				error = strgeterr(stp,
					STRDERR|STWRERR|STRHUP|STPLEX, 0);
				if (error != 0) {
					STRUNLOCKMATES(stp);
					return (error);
				}
			}
		}
		stp->sd_flag |= STRPLUMB;
		STRUNLOCKMATES(stp);
	} else {
		mutex_enter(&stp->sd_lock);
		while (stp->sd_flag & (STWOPEN|STRCLOSE|STRPLUMB)) {
			if (((cmd == I_POP) || (cmd == _I_REMOVE)) &&
			    (flag & (FNDELAY|FNONBLOCK))) {
				mutex_exit(&stp->sd_lock);
				return (EAGAIN);
			}
			if (!cv_wait_sig(&stp->sd_monitor, &stp->sd_lock)) {
				mutex_exit(&stp->sd_lock);
				return (EINTR);
			}
			if (stp->sd_flag & (STRDERR|STWRERR|STRHUP|STPLEX)) {
				error = strgeterr(stp,
					STRDERR|STWRERR|STRHUP|STPLEX, 0);
				if (error != 0) {
					mutex_exit(&stp->sd_lock);
					return (error);
				}
			}
		}
		stp->sd_flag |= STRPLUMB;
		mutex_exit(&stp->sd_lock);
	}
	return (0);
}

/*
 * Complete the plumbing operation associated with stream `stp'.
 */
void
strendplumb(stdata_t *stp)
{
	ASSERT(MUTEX_HELD(&stp->sd_lock));
	ASSERT(stp->sd_flag & STRPLUMB);
	stp->sd_flag &= ~STRPLUMB;
	cv_broadcast(&stp->sd_monitor);
}

/*
 * This describes how the STREAMS framework handles synchronization
 * during open/push and close/pop.
 * The key interfaces for open and close are qprocson and qprocsoff,
 * respectively. While the close case in general is harder both open
 * have close have significant similarities.
 *
 * During close the STREAMS framework has to both ensure that there
 * are no stale references to the queue pair (and syncq) that
 * are being closed and also provide the guarantees that are documented
 * in qprocsoff(9F).
 * If there are stale references to the queue that is closing it can
 * result in kernel memory corruption or kernel panics.
 *
 * Note that is it up to the module/driver to ensure that it itself
 * does not have any stale references to the closing queues once its close
 * routine returns. This includes:
 *  - Cancelling any timeout/bufcall/qtimeout/qbufcall callback routines
 *    associated with the queues. For timeout and bufcall callbacks the
 *    module/driver also has to ensure (or wait for) any callbacks that
 *    are in progress.
 *  - If the module/driver is using esballoc it has to ensure that any
 *    esballoc free functions do not refer to a queue that has closed.
 *    (Note that in general the close routine can not wait for the esballoc'ed
 *    messages to be freed since that can cause a deadlock.)
 *  - Cancelling any interrupts that refer to the closing queues and
 *    also ensuring that there are no interrupts in progress that will
 *    refer to the closing queues once the close routine returns.
 *  - For multiplexors removing any driver global state that refers to
 *    the closing queue and also ensuring that there are no threads in
 *    the multiplexor that has picked up a queue pointer but not yet
 *    finished using it.
 *
 * In addition, a driver/module can only reference the q_next pointer
 * in its open, close, put, or service procedures or in a
 * qtimeout/qbufcall callback procedure executing "on" the correct
 * stream. Thus it can not reference the q_next pointer in an interrupt
 * routine or a timeout, bufcall or esballoc callback routine. Likewise
 * it can not reference q_next of a different queue e.g. in a mux that
 * passes messages from one queues put/service procedure to another queue.
 * In all the cases when the driver/module can not access the q_next
 * field it must use the *next* versions e.g. canputnext instead of
 * canput(q->q_next) and putnextctl instead of putctl(q->q_next, ...).
 *
 *
 * Assuming that the driver/module conforms to the above constraints
 * the STREAMS framework has to avoid stale references to q_next for all
 * the framework internal cases which include (but are not limited to):
 *  - Threads in canput/canputnext/backenable and elsewhere that are
 *    walking q_next.
 *  - Messages on a syncq that have a reference to the queue through b_queue.
 *  - Messages on an outer perimeter (syncq) that have a reference to the
 *    queue through b_queue.
 *  - Threads that use q_nfsrv (e.g. canput) to find a queue.
 *    Note that only canput and bcanput use q_nfsrv without any locking.
 *
 * The STREAMS framework providing the qprocsoff(9F) guarantees means that
 * after qprocsoff returns, the framework has to ensure that no threads can
 * enter the put or service routines for the closing read or write-side queue.
 * In addition to preventing "direct" entry into the put procedures
 * the framework also has to prevent messages being drained from
 * the syncq or the outer perimeter.
 * XXX Note that currently qdetach does relies on D_MTOCEXCL as the only
 * mechanism to prevent qwriter(PERIM_OUTER) from running after
 * qprocsoff has returned.
 * Note that if a module/driver uses put(9F) on one of its own queues
 * it is up to the module/driver to ensure that the put() doesn't
 * get called when the queue is closing.
 *
 *
 * The framework aspects of the above "contract" is implemented by
 * qprocsoff, removeq, and strlock:
 *  - qprocsoff (disable_svc) sets QWCLOSE to prevent runservice from
 *    entering the service procedures.
 *  - strlock acquires the sd_lock and sd_reflock to prevent putnext,
 *    canputnext, backenable etc from dereferencing the q_next that will
 *    soon change.
 *  - strlock waits for sd_refcnt to be zero to wait for e.g. any canputnext
 *    or other q_next walker that uses claimstr/releasestr to finish.
 *  - optionally for every syncq in the stream strlock acquires all the
 *    sq_lock's and waits for all sq_counts to drop to a value that indicates
 *    that no thread executes in the put or service procedures and that no
 *    thread is draining into the module/driver. This ensures that no
 *    open, close, put, service, or qtimeout/qbufcall callback procedure is
 *    currently executing hence no such thread can end up with the old stale
 *    q_next value and no canput/backenable can have the old stale
 *    q_nfsrv/q_next.
 *  - qdetach (wait_svc) makes sure that any scheduled or running threads
 *    have either finished or observed the QWCLOSE flag and gone away.
 */


/*
 * Get all the locks necessary to change q_next.
 *
 * Wait for sd_refcnt to reach 0 and, if sqlist is present, wait for  the
 * sq_count of each syncq in the list to drop to sq_rmqcount, indicating that
 * the only threads inside the sqncq are threads currently calling removeq().
 * Since threads calling removeq() are in the process of removing their queues
 * from the stream, we do not need to worry about them accessing a stale q_next
 * pointer and thus we do not need to wait for them to exit (in fact, waiting
 * for them can cause deadlock).
 *
 * This routine is subject to starvation since it does not set any flag to
 * prevent threads from entering a module in the stream(i.e. sq_count can
 * increase on some syncq while it is waiting on some other syncq.)
 *
 * Assumes that only one thread attempts to call strlock for a given
 * stream. If this is not the case the two threads would deadlock.
 * This assumption is guaranteed since strlock is only called by insertq
 * and removeq and streams plumbing changes are single-threaded for
 * a given stream using the STWOPEN, STRCLOSE, and STRPLUMB flags.
 *
 * For pipes, it is not difficult to atomically designate a pair of streams
 * to be mated. Once mated atomically by the framework the twisted pair remain
 * configured that way until dismantled atomically by the framework.
 * When plumbing takes place on a twisted stream it is necessary to ensure that
 * this operation is done exclusively on the twisted stream since two such
 * operations, each initiated on different ends of the pipe will deadlock
 * waiting for each other to complete.
 *
 * On entry, no locks should be held.
 * The locks acquired and held by strlock depends on a few factors.
 * - If sqlist is non-NULL all the syncq locks in the sqlist will be acquired
 *   and held on exit and all sq_count are at an acceptable level.
 * - In all cases, sd_lock and sd_reflock are acquired and held on exit with
 *   sd_refcnt being zero.
 */

static void
strlock(struct stdata *stp, sqlist_t *sqlist)
{
	syncql_t *sql, *sql2;
retry:
	/*
	 * Wait for any claimstr to go away.
	 */
	if (STRMATED(stp)) {
		struct stdata *stp1, *stp2;

		STRLOCKMATES(stp);
		/*
		 * Note that the selection of locking order is not
		 * important, just that they are always aquired in
		 * the same order.  To assure this, we choose this
		 * order based on the value of the pointer, and since
		 * the pointer will not change for the life of this
		 * pair, we will always grab the locks in the same
		 * order (and hence, prevent deadlocks).
		 */
		if (&(stp->sd_lock) > &((stp->sd_mate)->sd_lock)) {
			stp1 = stp;
			stp2 = stp->sd_mate;
		} else {
			stp2 = stp;
			stp1 = stp->sd_mate;
		}
		mutex_enter(&stp1->sd_reflock);
		if (stp1->sd_refcnt > 0) {
			STRUNLOCKMATES(stp);
			cv_wait(&stp1->sd_refmonitor, &stp1->sd_reflock);
			mutex_exit(&stp1->sd_reflock);
			goto retry;
		}
		mutex_enter(&stp2->sd_reflock);
		if (stp2->sd_refcnt > 0) {
			STRUNLOCKMATES(stp);
			mutex_exit(&stp1->sd_reflock);
			cv_wait(&stp2->sd_refmonitor, &stp2->sd_reflock);
			mutex_exit(&stp2->sd_reflock);
			goto retry;
		}
		STREAM_PUTLOCKS_ENTER(stp1);
		STREAM_PUTLOCKS_ENTER(stp2);
	} else {
		mutex_enter(&stp->sd_lock);
		mutex_enter(&stp->sd_reflock);
		while (stp->sd_refcnt > 0) {
			mutex_exit(&stp->sd_lock);
			cv_wait(&stp->sd_refmonitor, &stp->sd_reflock);
			if (mutex_tryenter(&stp->sd_lock) == 0) {
				mutex_exit(&stp->sd_reflock);
				mutex_enter(&stp->sd_lock);
				mutex_enter(&stp->sd_reflock);
			}
		}
		STREAM_PUTLOCKS_ENTER(stp);
	}

	if (sqlist == NULL)
		return;

	for (sql = sqlist->sqlist_head; sql; sql = sql->sql_next) {
		syncq_t *sq = sql->sql_sq;
		uint16_t count;

		mutex_enter(SQLOCK(sq));
		count = sq->sq_count;
		ASSERT(sq->sq_rmqcount <= count);
		SQ_PUTLOCKS_ENTER(sq);
		SUM_SQ_PUTCOUNTS(sq, count);
		if (count == sq->sq_rmqcount)
			continue;

		/* Failed - drop all locks that we have acquired so far */
		if (STRMATED(stp)) {
			STREAM_PUTLOCKS_EXIT(stp);
			STREAM_PUTLOCKS_EXIT(stp->sd_mate);
			STRUNLOCKMATES(stp);
			mutex_exit(&stp->sd_reflock);
			mutex_exit(&stp->sd_mate->sd_reflock);
		} else {
			STREAM_PUTLOCKS_EXIT(stp);
			mutex_exit(&stp->sd_lock);
			mutex_exit(&stp->sd_reflock);
		}
		for (sql2 = sqlist->sqlist_head; sql2 != sql;
		    sql2 = sql2->sql_next) {
			SQ_PUTLOCKS_EXIT(sql2->sql_sq);
			mutex_exit(SQLOCK(sql2->sql_sq));
		}

		/*
		 * The wait loop below may starve when there are many threads
		 * claiming the syncq. This is especially a problem with permod
		 * syncqs (IP). To lessen the impact of the problem we increment
		 * sq_needexcl and clear fastbits so that putnexts will slow
		 * down and call sqenable instead of draining right away.
		 */
		sq->sq_needexcl++;
		SQ_PUTCOUNT_CLRFAST_LOCKED(sq);
		while (count > sq->sq_rmqcount) {
			sq->sq_flags |= SQ_WANTWAKEUP;
			SQ_PUTLOCKS_EXIT(sq);
			cv_wait(&sq->sq_wait, SQLOCK(sq));
			count = sq->sq_count;
			SQ_PUTLOCKS_ENTER(sq);
			SUM_SQ_PUTCOUNTS(sq, count);
		}
		sq->sq_needexcl--;
		if (sq->sq_needexcl == 0)
			SQ_PUTCOUNT_SETFAST_LOCKED(sq);
		SQ_PUTLOCKS_EXIT(sq);
		ASSERT(count == sq->sq_rmqcount);
		mutex_exit(SQLOCK(sq));
		goto retry;
	}
}

/*
 * Drop all the locks that strlock acquired.
 */
static void
strunlock(struct stdata *stp, sqlist_t *sqlist)
{
	syncql_t *sql;

	if (STRMATED(stp)) {
		STREAM_PUTLOCKS_EXIT(stp);
		STREAM_PUTLOCKS_EXIT(stp->sd_mate);
		STRUNLOCKMATES(stp);
		mutex_exit(&stp->sd_reflock);
		mutex_exit(&stp->sd_mate->sd_reflock);
	} else {
		STREAM_PUTLOCKS_EXIT(stp);
		mutex_exit(&stp->sd_lock);
		mutex_exit(&stp->sd_reflock);
	}

	if (sqlist == NULL)
		return;

	for (sql = sqlist->sqlist_head; sql; sql = sql->sql_next) {
		SQ_PUTLOCKS_EXIT(sql->sql_sq);
		mutex_exit(SQLOCK(sql->sql_sq));
	}
}

/*
 * When the module has service procedure, we need check if the next
 * module which has service procedure is in flow control to trigger
 * the backenable.
 */
static void
backenable_insertedq(queue_t *q)
{
	qband_t	*qbp;

	claimstr(q);
	if (q->q_qinfo->qi_srvp != NULL && q->q_next != NULL) {
		if (q->q_next->q_nfsrv->q_flag & QWANTW)
			backenable(q, 0);

		qbp = q->q_next->q_nfsrv->q_bandp;
		for (; qbp != NULL; qbp = qbp->qb_next)
			if ((qbp->qb_flag & QB_WANTW) && qbp->qb_first != NULL)
				backenable(q, qbp->qb_first->b_band);
	}
	releasestr(q);
}

/*
 * Given two read queues, insert a new single one after another.
 *
 * This routine acquires all the necessary locks in order to change
 * q_next and related pointer using strlock().
 * It depends on the stream head ensuring that there are no concurrent
 * insertq or removeq on the same stream. The stream head ensures this
 * using the flags STWOPEN, STRCLOSE, and STRPLUMB.
 *
 * Note that no syncq locks are held during the q_next change. This is
 * applied to all streams since, unlike removeq, there is no problem of stale
 * pointers when adding a module to the stream. Thus drivers/modules that do a
 * canput(rq->q_next) would never get a closed/freed queue pointer even if we
 * applied this optimization to all streams.
 */
void
insertq(struct stdata *stp, queue_t *new)
{
	queue_t	*after;
	queue_t *wafter;
	queue_t *wnew = _WR(new);
	boolean_t have_fifo = B_FALSE;

	if (new->q_flag & _QINSERTING) {
		ASSERT(stp->sd_vnode->v_type != VFIFO);
		after = new->q_next;
		wafter = _WR(new->q_next);
	} else {
		after = _RD(stp->sd_wrq);
		wafter = stp->sd_wrq;
	}

	TRACE_2(TR_FAC_STREAMS_FR, TR_INSERTQ,
		"insertq:%p, %p", after, new);
	ASSERT(after->q_flag & QREADR);
	ASSERT(new->q_flag & QREADR);

	strlock(stp, NULL);

	/* Do we have a FIFO? */
	if (wafter->q_next == after) {
		have_fifo = B_TRUE;
		wnew->q_next = new;
	} else {
		wnew->q_next = wafter->q_next;
	}
	new->q_next = after;

	set_nfsrv_ptr(new, wnew, after, wafter);
	/*
	 * set_nfsrv_ptr() needs to know if this is an insertion or not,
	 * so only reset this flag after calling it.
	 */
	new->q_flag &= ~_QINSERTING;

	if (have_fifo) {
		wafter->q_next = wnew;
	} else {
		if (wafter->q_next)
			_OTHERQ(wafter->q_next)->q_next = new;
		wafter->q_next = wnew;
	}

	set_qend(new);
	/* The QEND flag might have to be updated for the upstream guy */
	set_qend(after);

	ASSERT(_SAMESTR(new) == O_SAMESTR(new));
	ASSERT(_SAMESTR(wnew) == O_SAMESTR(wnew));
	ASSERT(_SAMESTR(after) == O_SAMESTR(after));
	ASSERT(_SAMESTR(wafter) == O_SAMESTR(wafter));
	strsetuio(stp);

	/*
	 * If this was a module insertion, bump the push count.
	 */
	if (!(new->q_flag & QISDRV))
		stp->sd_pushcnt++;

	strunlock(stp, NULL);

	/* check if the write Q needs backenable */
	backenable_insertedq(wnew);

	/* check if the read Q needs backenable */
	backenable_insertedq(new);
}

/*
 * Given a read queue, unlink it from any neighbors.
 *
 * This routine acquires all the necessary locks in order to
 * change q_next and related pointers and also guard against
 * stale references (e.g. through q_next) to the queue that
 * is being removed. It also plays part of the role in ensuring
 * that the module's/driver's put procedure doesn't get called
 * after qprocsoff returns.
 *
 * Removeq depends on the stream head ensuring that there are
 * no concurrent insertq or removeq on the same stream. The
 * stream head ensures this using the flags STWOPEN, STRCLOSE and
 * STRPLUMB.
 *
 * The set of locks needed to remove the queue is different in
 * different cases:
 *
 * Acquire sd_lock, sd_reflock, and all the syncq locks in the stream after
 * waiting for the syncq reference count to drop to 0 indicating that no
 * non-close threads are present anywhere in the stream. This ensures that any
 * module/driver can reference q_next in its open, close, put, or service
 * procedures.
 *
 * The sq_rmqcount counter tracks the number of threads inside removeq().
 * strlock() ensures that there is either no threads executing inside perimeter
 * or there is only a thread calling qprocsoff().
 *
 * strlock() compares the value of sq_count with the number of threads inside
 * removeq() and waits until sq_count is equal to sq_rmqcount. We need to wakeup
 * any threads waiting in strlock() when the sq_rmqcount increases.
 */

void
removeq(queue_t *qp)
{
	queue_t *wqp = _WR(qp);
	struct stdata *stp = STREAM(qp);
	sqlist_t *sqlist = NULL;
	boolean_t isdriver;
	int moved;
	syncq_t *sq = qp->q_syncq;
	syncq_t *wsq = wqp->q_syncq;

	ASSERT(stp);

	TRACE_2(TR_FAC_STREAMS_FR, TR_REMOVEQ,
		"removeq:%p %p", qp, wqp);
	ASSERT(qp->q_flag&QREADR);

	/*
	 * For queues using Synchronous streams, we must wait for all threads in
	 * rwnext() to drain out before proceeding.
	 */
	if (qp->q_flag & QSYNCSTR) {
		/* First, we need wakeup any threads blocked in rwnext() */
		mutex_enter(SQLOCK(sq));
		if (sq->sq_flags & SQ_WANTWAKEUP) {
			sq->sq_flags &= ~SQ_WANTWAKEUP;
			cv_broadcast(&sq->sq_wait);
		}
		mutex_exit(SQLOCK(sq));

		if (wsq != sq) {
			mutex_enter(SQLOCK(wsq));
			if (wsq->sq_flags & SQ_WANTWAKEUP) {
				wsq->sq_flags &= ~SQ_WANTWAKEUP;
				cv_broadcast(&wsq->sq_wait);
			}
			mutex_exit(SQLOCK(wsq));
		}

		mutex_enter(QLOCK(qp));
		while (qp->q_rwcnt > 0) {
			qp->q_flag |= QWANTRMQSYNC;
			cv_wait(&qp->q_wait, QLOCK(qp));
		}
		mutex_exit(QLOCK(qp));

		mutex_enter(QLOCK(wqp));
		while (wqp->q_rwcnt > 0) {
			wqp->q_flag |= QWANTRMQSYNC;
			cv_wait(&wqp->q_wait, QLOCK(wqp));
		}
		mutex_exit(QLOCK(wqp));
	}

	mutex_enter(SQLOCK(sq));
	sq->sq_rmqcount++;
	if (sq->sq_flags & SQ_WANTWAKEUP) {
		sq->sq_flags &= ~SQ_WANTWAKEUP;
		cv_broadcast(&sq->sq_wait);
	}
	mutex_exit(SQLOCK(sq));

	isdriver = (qp->q_flag & QISDRV);

	sqlist = sqlist_build(qp, stp, STRMATED(stp));
	strlock(stp, sqlist);

	reset_nfsrv_ptr(qp, wqp);

	ASSERT(wqp->q_next == NULL || backq(qp)->q_next == qp);
	ASSERT(qp->q_next == NULL || backq(wqp)->q_next == wqp);
	/* Do we have a FIFO? */
	if (wqp->q_next == qp) {
		stp->sd_wrq->q_next = _RD(stp->sd_wrq);
	} else {
		if (wqp->q_next)
			backq(qp)->q_next = qp->q_next;
		if (qp->q_next)
			backq(wqp)->q_next = wqp->q_next;
	}

	/* The QEND flag might have to be updated for the upstream guy */
	if (qp->q_next)
		set_qend(qp->q_next);

	ASSERT(_SAMESTR(stp->sd_wrq) == O_SAMESTR(stp->sd_wrq));
	ASSERT(_SAMESTR(_RD(stp->sd_wrq)) == O_SAMESTR(_RD(stp->sd_wrq)));

	/*
	 * Move any messages destined for the put procedures to the next
	 * syncq in line. Otherwise free them.
	 */
	moved = 0;
	/*
	 * Quick check to see whether there are any messages or events.
	 */
	if (qp->q_syncqmsgs != 0 || (qp->q_syncq->sq_flags & SQ_EVENTS))
		moved += propagate_syncq(qp);
	if (wqp->q_syncqmsgs != 0 ||
	    (wqp->q_syncq->sq_flags & SQ_EVENTS))
		moved += propagate_syncq(wqp);

	strsetuio(stp);

	/*
	 * If this was a module removal, decrement the push count.
	 */
	if (!isdriver)
		stp->sd_pushcnt--;

	strunlock(stp, sqlist);
	sqlist_free(sqlist);

	/*
	 * Make sure any messages that were propagated are drained.
	 * Also clear any QFULL bit caused by messages that were propagated.
	 */

	if (qp->q_next != NULL) {
		clr_qfull(qp);
		/*
		 * For the driver calling qprocsoff, propagate_syncq
		 * frees all the messages instead of putting it in
		 * the stream head
		 */
		if (!isdriver && (moved > 0))
			emptysq(qp->q_next->q_syncq);
	}
	if (wqp->q_next != NULL) {
		clr_qfull(wqp);
		/*
		 * We come here for any pop of a module except for the
		 * case of driver being removed. We don't call emptysq
		 * if we did not move any messages. This will avoid holding
		 * PERMOD syncq locks in emptysq
		 */
		if (moved > 0)
			emptysq(wqp->q_next->q_syncq);
	}

	mutex_enter(SQLOCK(sq));
	sq->sq_rmqcount--;
	mutex_exit(SQLOCK(sq));
}

/*
 * Prevent further entry by setting a flag (like SQ_FROZEN, SQ_BLOCKED or
 * SQ_WRITER) on a syncq.
 * If maxcnt is not -1 it assumes that caller has "maxcnt" claim(s) on the
 * sync queue and waits until sq_count reaches maxcnt.
 *
 * if maxcnt is -1 there's no need to grab sq_putlocks since the caller
 * does not care about putnext threads that are in the middle of calling put
 * entry points.
 *
 * This routine is used for both inner and outer syncqs.
 */
static void
blocksq(syncq_t *sq, ushort_t flag, int maxcnt)
{
	uint16_t count = 0;

	mutex_enter(SQLOCK(sq));
	/*
	 * Wait for SQ_FROZEN/SQ_BLOCKED to be reset.
	 * SQ_FROZEN will be set if there is a frozen stream that has a
	 * queue which also refers to this "shared" syncq.
	 * SQ_BLOCKED will be set if there is "off" queue which also
	 * refers to this "shared" syncq.
	 */
	if (maxcnt != -1) {
		count = sq->sq_count;
		SQ_PUTLOCKS_ENTER(sq);
		SQ_PUTCOUNT_CLRFAST_LOCKED(sq);
		SUM_SQ_PUTCOUNTS(sq, count);
	}
	sq->sq_needexcl++;
	ASSERT(sq->sq_needexcl != 0);	/* wraparound */

	while ((sq->sq_flags & flag) ||
	    (maxcnt != -1 && count > (unsigned)maxcnt)) {
		sq->sq_flags |= SQ_WANTWAKEUP;
		if (maxcnt != -1) {
			SQ_PUTLOCKS_EXIT(sq);
		}
		cv_wait(&sq->sq_wait, SQLOCK(sq));
		if (maxcnt != -1) {
			count = sq->sq_count;
			SQ_PUTLOCKS_ENTER(sq);
			SUM_SQ_PUTCOUNTS(sq, count);
		}
	}
	sq->sq_needexcl--;
	sq->sq_flags |= flag;
	ASSERT(maxcnt == -1 || count == maxcnt);
	if (maxcnt != -1) {
		if (sq->sq_needexcl == 0) {
			SQ_PUTCOUNT_SETFAST_LOCKED(sq);
		}
		SQ_PUTLOCKS_EXIT(sq);
	} else if (sq->sq_needexcl == 0) {
		SQ_PUTCOUNT_SETFAST(sq);
	}

	mutex_exit(SQLOCK(sq));
}

/*
 * Reset a flag that was set with blocksq.
 *
 * Can not use this routine to reset SQ_WRITER.
 *
 * If "isouter" is set then the syncq is assumed to be an outer perimeter
 * and drain_syncq is not called. Instead we rely on the qwriter_outer thread
 * to handle the queued qwriter operations.
 *
 * no need to grab sq_putlocks here. See comment in strsubr.h that explains when
 * sq_putlocks are used.
 */
static void
unblocksq(syncq_t *sq, uint16_t resetflag, int isouter)
{
	uint16_t flags;

	mutex_enter(SQLOCK(sq));
	ASSERT(resetflag != SQ_WRITER);
	ASSERT(sq->sq_flags & resetflag);
	flags = sq->sq_flags & ~resetflag;
	sq->sq_flags = flags;
	if (flags & (SQ_QUEUED | SQ_WANTWAKEUP)) {
		if (flags & SQ_WANTWAKEUP) {
			flags &= ~SQ_WANTWAKEUP;
			cv_broadcast(&sq->sq_wait);
		}
		sq->sq_flags = flags;
		if ((flags & SQ_QUEUED) && !(flags & (SQ_STAYAWAY|SQ_EXCL))) {
			if (!isouter) {
				/* drain_syncq drops SQLOCK */
				drain_syncq(sq);
				return;
			}
		}
	}
	mutex_exit(SQLOCK(sq));
}

/*
 * Reset a flag that was set with blocksq.
 * Does not drain the syncq. Use emptysq() for that.
 * Returns 1 if SQ_QUEUED is set. Otherwise 0.
 *
 * no need to grab sq_putlocks here. See comment in strsubr.h that explains when
 * sq_putlocks are used.
 */
static int
dropsq(syncq_t *sq, uint16_t resetflag)
{
	uint16_t flags;

	mutex_enter(SQLOCK(sq));
	ASSERT(sq->sq_flags & resetflag);
	flags = sq->sq_flags & ~resetflag;
	if (flags & SQ_WANTWAKEUP) {
		flags &= ~SQ_WANTWAKEUP;
		cv_broadcast(&sq->sq_wait);
	}
	sq->sq_flags = flags;
	mutex_exit(SQLOCK(sq));
	if (flags & SQ_QUEUED)
		return (1);
	return (0);
}

/*
 * Empty all the messages on a syncq.
 *
 * no need to grab sq_putlocks here. See comment in strsubr.h that explains when
 * sq_putlocks are used.
 */
static void
emptysq(syncq_t *sq)
{
	uint16_t flags;

	mutex_enter(SQLOCK(sq));
	flags = sq->sq_flags;
	if ((flags & SQ_QUEUED) && !(flags & (SQ_STAYAWAY|SQ_EXCL))) {
		/*
		 * To prevent potential recursive invocation of drain_syncq we
		 * do not call drain_syncq if count is non-zero.
		 */
		if (sq->sq_count == 0) {
			/* drain_syncq() drops SQLOCK */
			drain_syncq(sq);
			return;
		} else
			sqenable(sq);
	}
	mutex_exit(SQLOCK(sq));
}

/*
 * Ordered insert while removing duplicates.
 */
static void
sqlist_insert(sqlist_t *sqlist, syncq_t *sqp)
{
	syncql_t *sqlp, **prev_sqlpp, *new_sqlp;

	prev_sqlpp = &sqlist->sqlist_head;
	while ((sqlp = *prev_sqlpp) != NULL) {
		if (sqlp->sql_sq >= sqp) {
			if (sqlp->sql_sq == sqp)	/* duplicate */
				return;
			break;
		}
		prev_sqlpp = &sqlp->sql_next;
	}
	new_sqlp = &sqlist->sqlist_array[sqlist->sqlist_index++];
	ASSERT((char *)new_sqlp < (char *)sqlist + sqlist->sqlist_size);
	new_sqlp->sql_next = sqlp;
	new_sqlp->sql_sq = sqp;
	*prev_sqlpp = new_sqlp;
}

/*
 * Walk the write side queues until we hit either the driver
 * or a twist in the stream (_SAMESTR will return false in both
 * these cases) then turn around and walk the read side queues
 * back up to the stream head.
 */
static void
sqlist_insertall(sqlist_t *sqlist, queue_t *q)
{
	while (q != NULL) {
		sqlist_insert(sqlist, q->q_syncq);

		if (_SAMESTR(q))
			q = q->q_next;
		else if (!(q->q_flag & QREADR))
			q = _RD(q);
		else
			q = NULL;
	}
}

/*
 * Allocate and build a list of all syncqs in a stream and the syncq(s)
 * associated with the "q" parameter. The resulting list is sorted in a
 * canonical order and is free of duplicates.
 * Assumes the passed queue is a _RD(q).
 */
static sqlist_t *
sqlist_build(queue_t *q, struct stdata *stp, boolean_t do_twist)
{
	sqlist_t *sqlist = sqlist_alloc(stp, KM_SLEEP);

	/*
	 * start with the current queue/qpair
	 */
	ASSERT(q->q_flag & QREADR);

	sqlist_insert(sqlist, q->q_syncq);
	sqlist_insert(sqlist, _WR(q)->q_syncq);

	sqlist_insertall(sqlist, stp->sd_wrq);
	if (do_twist)
		sqlist_insertall(sqlist, stp->sd_mate->sd_wrq);

	return (sqlist);
}

static sqlist_t *
sqlist_alloc(struct stdata *stp, int kmflag)
{
	size_t sqlist_size;
	sqlist_t *sqlist;

	/*
	 * Allocate 2 syncql_t's for each pushed module. Note that
	 * the sqlist_t structure already has 4 syncql_t's built in:
	 * 2 for the stream head, and 2 for the driver/other stream head.
	 */
	sqlist_size = 2 * sizeof (syncql_t) * stp->sd_pushcnt +
		sizeof (sqlist_t);
	if (STRMATED(stp))
		sqlist_size += 2 * sizeof (syncql_t) * stp->sd_mate->sd_pushcnt;
	sqlist = kmem_alloc(sqlist_size, kmflag);

	sqlist->sqlist_head = NULL;
	sqlist->sqlist_size = sqlist_size;
	sqlist->sqlist_index = 0;

	return (sqlist);
}

/*
 * Free the list created by sqlist_alloc()
 */
static void
sqlist_free(sqlist_t *sqlist)
{
	kmem_free(sqlist, sqlist->sqlist_size);
}

/*
 * Prevent any new entries into any syncq in this stream.
 * Used by freezestr.
 */
void
strblock(queue_t *q)
{
	struct stdata	*stp;
	syncql_t	*sql;
	sqlist_t	*sqlist;

	q = _RD(q);

	stp = STREAM(q);
	ASSERT(stp != NULL);

	/*
	 * Get a sorted list with all the duplicates removed containing
	 * all the syncqs referenced by this stream.
	 */
	sqlist = sqlist_build(q, stp, B_FALSE);
	for (sql = sqlist->sqlist_head; sql != NULL; sql = sql->sql_next)
		blocksq(sql->sql_sq, SQ_FROZEN, -1);
	sqlist_free(sqlist);
}

/*
 * Release the block on new entries into this stream
 */
void
strunblock(queue_t *q)
{
	struct stdata	*stp;
	syncql_t	*sql;
	sqlist_t	*sqlist;
	int		drain_needed;

	q = _RD(q);

	/*
	 * Get a sorted list with all the duplicates removed containing
	 * all the syncqs referenced by this stream.
	 * Have to drop the SQ_FROZEN flag on all the syncqs before
	 * starting to drain them; otherwise the draining might
	 * cause a freezestr in some module on the stream (which
	 * would deadlock.)
	 */
	stp = STREAM(q);
	ASSERT(stp != NULL);
	sqlist = sqlist_build(q, stp, B_FALSE);
	drain_needed = 0;
	for (sql = sqlist->sqlist_head; sql != NULL; sql = sql->sql_next)
		drain_needed += dropsq(sql->sql_sq, SQ_FROZEN);
	if (drain_needed) {
		for (sql = sqlist->sqlist_head; sql != NULL;
		    sql = sql->sql_next)
			emptysq(sql->sql_sq);
	}
	sqlist_free(sqlist);
}

#ifdef DEBUG
static int
qprocsareon(queue_t *rq)
{
	if (rq->q_next == NULL)
		return (0);
	return (_WR(rq->q_next)->q_next == _WR(rq));
}

int
qclaimed(queue_t *q)
{
	uint_t count;

	count = q->q_syncq->sq_count;
	SUM_SQ_PUTCOUNTS(q->q_syncq, count);
	return (count != 0);
}

/*
 * Check if anyone has frozen this stream with freezestr
 */
int
frozenstr(queue_t *q)
{
	return ((q->q_syncq->sq_flags & SQ_FROZEN) != 0);
}
#endif /* DEBUG */

/*
 * Enter a queue.
 * Obsoleted interface. Should not be used.
 */
void
enterq(queue_t *q)
{
	entersq(q->q_syncq, SQ_CALLBACK);
}

void
leaveq(queue_t *q)
{
	leavesq(q->q_syncq, SQ_CALLBACK);
}

/*
 * Enter a perimeter. c_inner and c_outer specifies which concurrency bits
 * to check.
 * Wait if SQ_QUEUED is set to preserve ordering between messages and qwriter
 * calls and the running of open, close and service procedures.
 *
 * if c_inner bit is set no need to grab sq_putlocks since we don't care
 * if other threads have entered or are entering put entry point.
 *
 * if c_inner bit is set it might have been posible to use
 * sq_putlocks/sq_putcounts instead of SQLOCK/sq_count (e.g. to optimize
 * open/close path for IP) but since the count may need to be decremented in
 * qwait() we wouldn't know which counter to decrement. Currently counter is
 * selected by current cpu_seqid and current CPU can change at any moment. XXX
 * in the future we might use curthread id bits to select the counter and this
 * would stay constant across routine calls.
 */
void
entersq(syncq_t *sq, int entrypoint)
{
	uint16_t	count = 0;
	uint16_t	flags;
	uint16_t	waitflags = SQ_STAYAWAY | SQ_EVENTS | SQ_EXCL;
	uint16_t	type;
	uint_t		c_inner = entrypoint & SQ_CI;
	uint_t		c_outer = entrypoint & SQ_CO;

	/*
	 * Increment ref count to keep closes out of this queue.
	 */
	ASSERT(sq);
	ASSERT(c_inner && c_outer);
	mutex_enter(SQLOCK(sq));
	flags = sq->sq_flags;
	type = sq->sq_type;
	if (!(type & c_inner)) {
		/* Make sure all putcounts now use slowlock. */
		count = sq->sq_count;
		SQ_PUTLOCKS_ENTER(sq);
		SQ_PUTCOUNT_CLRFAST_LOCKED(sq);
		SUM_SQ_PUTCOUNTS(sq, count);
		sq->sq_needexcl++;
		ASSERT(sq->sq_needexcl != 0);	/* wraparound */
		waitflags |= SQ_MESSAGES;
	}
	/*
	 * Wait until we can enter the inner perimeter.
	 * If we want exclusive access we wait until sq_count is 0.
	 * We have to do this before entering the outer perimeter in order
	 * to preserve put/close message ordering.
	 */
	while ((flags & waitflags) || (!(type & c_inner) && count != 0)) {
		sq->sq_flags = flags | SQ_WANTWAKEUP;
		if (!(type & c_inner)) {
			SQ_PUTLOCKS_EXIT(sq);
		}
		cv_wait(&sq->sq_wait, SQLOCK(sq));
		if (!(type & c_inner)) {
			count = sq->sq_count;
			SQ_PUTLOCKS_ENTER(sq);
			SUM_SQ_PUTCOUNTS(sq, count);
		}
		flags = sq->sq_flags;
	}

	if (!(type & c_inner)) {
		ASSERT(sq->sq_needexcl > 0);
		sq->sq_needexcl--;
		if (sq->sq_needexcl == 0) {
			SQ_PUTCOUNT_SETFAST_LOCKED(sq);
		}
	}

	/* Check if we need to enter the outer perimeter */
	if (!(type & c_outer)) {
		/*
		 * We have to enter the outer perimeter exclusively before
		 * we can increment sq_count to avoid deadlock. This implies
		 * that we have to re-check sq_flags and sq_count.
		 *
		 * is it possible to have c_inner set when c_outer is not set?
		 */
		if (!(type & c_inner)) {
			SQ_PUTLOCKS_EXIT(sq);
		}
		mutex_exit(SQLOCK(sq));
		outer_enter(sq->sq_outer, SQ_GOAWAY);
		mutex_enter(SQLOCK(sq));
		flags = sq->sq_flags;
		/*
		 * there should be no need to recheck sq_putcounts
		 * because outer_enter() has already waited for them to clear
		 * after setting SQ_WRITER.
		 */
		count = sq->sq_count;
#ifdef DEBUG
		/*
		 * SUMCHECK_SQ_PUTCOUNTS should return the sum instead
		 * of doing an ASSERT internally. Others should do
		 * something like
		 *	 ASSERT(SUMCHECK_SQ_PUTCOUNTS(sq) == 0);
		 * without the need to #ifdef DEBUG it.
		 */
		SUMCHECK_SQ_PUTCOUNTS(sq, 0);
#endif
		while ((flags & (SQ_EXCL|SQ_BLOCKED|SQ_FROZEN)) ||
		    (!(type & c_inner) && count != 0)) {
			sq->sq_flags = flags | SQ_WANTWAKEUP;
			cv_wait(&sq->sq_wait, SQLOCK(sq));
			count = sq->sq_count;
			flags = sq->sq_flags;
		}
	}

	sq->sq_count++;
	ASSERT(sq->sq_count != 0);	/* Wraparound */
	if (!(type & c_inner)) {
		/* Exclusive entry */
		ASSERT(sq->sq_count == 1);
		sq->sq_flags |= SQ_EXCL;
		if (type & c_outer) {
			SQ_PUTLOCKS_EXIT(sq);
		}
	}
	mutex_exit(SQLOCK(sq));
}

/*
 * leave a syncq. announce to framework that closes may proceed.
 * c_inner and c_outer specifies which concurrency bits
 * to check.
 *
 * must never be called from driver or module put entry point.
 *
 * no need to grab sq_putlocks here. See comment in strsubr.h that explains when
 * sq_putlocks are used.
 */
void
leavesq(syncq_t *sq, int entrypoint)
{
	uint16_t	flags;
	uint16_t	type;
	uint_t		c_outer = entrypoint & SQ_CO;
#ifdef DEBUG
	uint_t		c_inner = entrypoint & SQ_CI;
#endif

	/*
	 * decrement ref count, drain the syncq if possible, and wake up
	 * any waiting close.
	 */
	ASSERT(sq);
	ASSERT(c_inner && c_outer);
	mutex_enter(SQLOCK(sq));
	flags = sq->sq_flags;
	type = sq->sq_type;
	if (flags & (SQ_QUEUED|SQ_WANTWAKEUP|SQ_WANTEXWAKEUP)) {

		if (flags & SQ_WANTWAKEUP) {
			flags &= ~SQ_WANTWAKEUP;
			cv_broadcast(&sq->sq_wait);
		}
		if (flags & SQ_WANTEXWAKEUP) {
			flags &= ~SQ_WANTEXWAKEUP;
			cv_broadcast(&sq->sq_exitwait);
		}

		if ((flags & SQ_QUEUED) && !(flags & SQ_STAYAWAY)) {
			/*
			 * The syncq needs to be drained. "Exit" the syncq
			 * before calling drain_syncq.
			 */
			ASSERT(sq->sq_count != 0);
			sq->sq_count--;
			ASSERT((flags & SQ_EXCL) || (type & c_inner));
			sq->sq_flags = flags & ~SQ_EXCL;
			drain_syncq(sq);
			ASSERT(MUTEX_NOT_HELD(SQLOCK(sq)));
			/* Check if we need to exit the outer perimeter */
			/* XXX will this ever be true? */
			if (!(type & c_outer))
				outer_exit(sq->sq_outer);
			return;
		}
	}
	ASSERT(sq->sq_count != 0);
	sq->sq_count--;
	ASSERT((flags & SQ_EXCL) || (type & c_inner));
	sq->sq_flags = flags & ~SQ_EXCL;
	mutex_exit(SQLOCK(sq));

	/* Check if we need to exit the outer perimeter */
	if (!(sq->sq_type & c_outer))
		outer_exit(sq->sq_outer);
}

/*
 * Prevent q_next from changing in this stream by incrementing sq_count.
 *
 * no need to grab sq_putlocks here. See comment in strsubr.h that explains when
 * sq_putlocks are used.
 */
void
claimq(queue_t *qp)
{
	syncq_t	*sq = qp->q_syncq;

	mutex_enter(SQLOCK(sq));
	sq->sq_count++;
	ASSERT(sq->sq_count != 0);	/* Wraparound */
	mutex_exit(SQLOCK(sq));
}

/*
 * Undo claimq.
 *
 * no need to grab sq_putlocks here. See comment in strsubr.h that explains when
 * sq_putlocks are used.
 */
void
releaseq(queue_t *qp)
{
	syncq_t	*sq = qp->q_syncq;
	uint16_t flags;

	mutex_enter(SQLOCK(sq));
	ASSERT(sq->sq_count > 0);
	sq->sq_count--;

	flags = sq->sq_flags;
	if (flags & (SQ_WANTWAKEUP|SQ_QUEUED)) {
		if (flags & SQ_WANTWAKEUP) {
			flags &= ~SQ_WANTWAKEUP;
			cv_broadcast(&sq->sq_wait);
		}
		sq->sq_flags = flags;
		if ((flags & SQ_QUEUED) && !(flags & (SQ_STAYAWAY|SQ_EXCL))) {
			/*
			 * To prevent potential recursive invocation of
			 * drain_syncq we do not call drain_syncq if count is
			 * non-zero.
			 */
			if (sq->sq_count == 0) {
				drain_syncq(sq);
				return;
			} else
				sqenable(sq);
		}
	}
	mutex_exit(SQLOCK(sq));
}

/*
 * Prevent q_next from changing in this stream by incrementing sd_refcnt.
 */
void
claimstr(queue_t *qp)
{
	struct stdata *stp = STREAM(qp);

	mutex_enter(&stp->sd_reflock);
	stp->sd_refcnt++;
	ASSERT(stp->sd_refcnt != 0);	/* Wraparound */
	mutex_exit(&stp->sd_reflock);
}

/*
 * Undo claimstr.
 */
void
releasestr(queue_t *qp)
{
	struct stdata *stp = STREAM(qp);

	mutex_enter(&stp->sd_reflock);
	ASSERT(stp->sd_refcnt != 0);
	if (--stp->sd_refcnt == 0)
		cv_broadcast(&stp->sd_refmonitor);
	mutex_exit(&stp->sd_reflock);
}

static syncq_t *
new_syncq(void)
{
	return (kmem_cache_alloc(syncq_cache, KM_SLEEP));
}

static void
free_syncq(syncq_t *sq)
{
	ASSERT(sq->sq_head == NULL);
	ASSERT(sq->sq_outer == NULL);
	ASSERT(sq->sq_callbpend == NULL);
	ASSERT((sq->sq_onext == NULL && sq->sq_oprev == NULL) ||
	    (sq->sq_onext == sq && sq->sq_oprev == sq));

	if (sq->sq_ciputctrl != NULL) {
		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
		SUMCHECK_CIPUTCTRL_COUNTS(sq->sq_ciputctrl,
		    sq->sq_nciputctrl, 0);
		ASSERT(ciputctrl_cache != NULL);
		kmem_cache_free(ciputctrl_cache, sq->sq_ciputctrl);
	}

	sq->sq_tail = NULL;
	sq->sq_evhead = NULL;
	sq->sq_evtail = NULL;
	sq->sq_ciputctrl = NULL;
	sq->sq_nciputctrl = 0;
	sq->sq_count = 0;
	sq->sq_rmqcount = 0;
	sq->sq_callbflags = 0;
	sq->sq_cancelid = 0;
	sq->sq_next = NULL;
	sq->sq_needexcl = 0;
	sq->sq_svcflags = 0;
	sq->sq_nqueues = 0;
	sq->sq_pri = 0;
	sq->sq_onext = NULL;
	sq->sq_oprev = NULL;
	sq->sq_flags = 0;
	sq->sq_type = 0;
	sq->sq_servcount = 0;

	kmem_cache_free(syncq_cache, sq);
}

/* Outer perimeter code */

/*
 * The outer syncq uses the fields and flags in the syncq slightly
 * differently from the inner syncqs.
 *	sq_count	Incremented when there are pending or running
 *			writers at the outer perimeter to prevent the set of
 *			inner syncqs that belong to the outer perimeter from
 *			changing.
 *	sq_head/tail	List of deferred qwriter(OUTER) operations.
 *
 *	SQ_BLOCKED	Set to prevent traversing of sq_next,sq_prev while
 *			inner syncqs are added to or removed from the
 *			outer perimeter.
 *	SQ_QUEUED	sq_head/tail has messages or eventsqueued.
 *
 *	SQ_WRITER	A thread is currently traversing all the inner syncqs
 *			setting the SQ_WRITER flag.
 */

/*
 * Get write access at the outer perimeter.
 * Note that read access is done by entersq, putnext, and put by simply
 * incrementing sq_count in the inner syncq.
 *
 * Waits until "flags" is no longer set in the outer to prevent multiple
 * threads from having write access at the same time. SQ_WRITER has to be part
 * of "flags".
 *
 * Increases sq_count on the outer syncq to keep away outer_insert/remove
 * until the outer_exit is finished.
 *
 * outer_enter is vulnerable to starvation since it does not prevent new
 * threads from entering the inner syncqs while it is waiting for sq_count to
 * go to zero.
 */
void
outer_enter(syncq_t *outer, uint16_t flags)
{
	syncq_t	*sq;
	int	wait_needed;
	uint16_t	count;

	ASSERT(outer->sq_outer == NULL && outer->sq_onext != NULL &&
	    outer->sq_oprev != NULL);
	ASSERT(flags & SQ_WRITER);

retry:
	mutex_enter(SQLOCK(outer));
	while (outer->sq_flags & flags) {
		outer->sq_flags |= SQ_WANTWAKEUP;
		cv_wait(&outer->sq_wait, SQLOCK(outer));
	}

	ASSERT(!(outer->sq_flags & SQ_WRITER));
	outer->sq_flags |= SQ_WRITER;
	outer->sq_count++;
	ASSERT(outer->sq_count != 0);	/* wraparound */
	wait_needed = 0;
	/*
	 * Set SQ_WRITER on all the inner syncqs while holding
	 * the SQLOCK on the outer syncq. This ensures that the changing
	 * of SQ_WRITER is atomic under the outer SQLOCK.
	 */
	for (sq = outer->sq_onext; sq != outer; sq = sq->sq_onext) {
		mutex_enter(SQLOCK(sq));
		count = sq->sq_count;
		SQ_PUTLOCKS_ENTER(sq);
		sq->sq_flags |= SQ_WRITER;
		SUM_SQ_PUTCOUNTS(sq, count);
		if (count != 0)
			wait_needed = 1;
		SQ_PUTLOCKS_EXIT(sq);
		mutex_exit(SQLOCK(sq));
	}
	mutex_exit(SQLOCK(outer));

	/*
	 * Get everybody out of the syncqs sequentially.
	 * Note that we don't actually need to aqiure the PUTLOCKS, since
	 * we have already cleared the fastbit, and set QWRITER.  By
	 * definition, the count can not increase since putnext will
	 * take the slowlock path (and the purpose of aquiring the
	 * putlocks was to make sure it didn't increase while we were
	 * waiting).
	 *
	 * Note that we still aquire the PUTLOCKS to be safe.
	 */
	if (wait_needed) {
		for (sq = outer->sq_onext; sq != outer; sq = sq->sq_onext) {
			mutex_enter(SQLOCK(sq));
			count = sq->sq_count;
			SQ_PUTLOCKS_ENTER(sq);
			SUM_SQ_PUTCOUNTS(sq, count);
			while (count != 0) {
				sq->sq_flags |= SQ_WANTWAKEUP;
				SQ_PUTLOCKS_EXIT(sq);
				cv_wait(&sq->sq_wait, SQLOCK(sq));
				count = sq->sq_count;
				SQ_PUTLOCKS_ENTER(sq);
				SUM_SQ_PUTCOUNTS(sq, count);
			}
			SQ_PUTLOCKS_EXIT(sq);
			mutex_exit(SQLOCK(sq));
		}
		/*
		 * Verify that none of the flags got set while we
		 * were waiting for the sq_counts to drop.
		 * If this happens we exit and retry entering the
		 * outer perimeter.
		 */
		mutex_enter(SQLOCK(outer));
		if (outer->sq_flags & (flags & ~SQ_WRITER)) {
			mutex_exit(SQLOCK(outer));
			outer_exit(outer);
			goto retry;
		}
		mutex_exit(SQLOCK(outer));
	}
}

/*
 * Drop the write access at the outer perimeter.
 * Read access is dropped implicitly (by putnext, put, and leavesq) by
 * decrementing sq_count.
 */
void
outer_exit(syncq_t *outer)
{
	syncq_t	*sq;
	int	 drain_needed;
	uint16_t flags;

	ASSERT(outer->sq_outer == NULL && outer->sq_onext != NULL &&
	    outer->sq_oprev != NULL);
	ASSERT(MUTEX_NOT_HELD(SQLOCK(outer)));

	/*
	 * Atomically (from the perspective of threads calling become_writer)
	 * drop the write access at the outer perimeter by holding
	 * SQLOCK(outer) across all the dropsq calls and the resetting of
	 * SQ_WRITER.
	 * This defines a locking order between the outer perimeter
	 * SQLOCK and the inner perimeter SQLOCKs.
	 */
	mutex_enter(SQLOCK(outer));
	flags = outer->sq_flags;
	ASSERT(outer->sq_flags & SQ_WRITER);
	if (flags & SQ_QUEUED) {
		write_now(outer);
		flags = outer->sq_flags;
	}

	/*
	 * sq_onext is stable since sq_count has not yet been decreased.
	 * Reset the SQ_WRITER flags in all syncqs.
	 * After dropping SQ_WRITER on the outer syncq we empty all the
	 * inner syncqs.
	 */
	drain_needed = 0;
	for (sq = outer->sq_onext; sq != outer; sq = sq->sq_onext)
		drain_needed += dropsq(sq, SQ_WRITER);
	ASSERT(!(outer->sq_flags & SQ_QUEUED));
	flags &= ~SQ_WRITER;
	if (drain_needed) {
		outer->sq_flags = flags;
		mutex_exit(SQLOCK(outer));
		for (sq = outer->sq_onext; sq != outer; sq = sq->sq_onext)
			emptysq(sq);
		mutex_enter(SQLOCK(outer));
		flags = outer->sq_flags;
	}
	if (flags & SQ_WANTWAKEUP) {
		flags &= ~SQ_WANTWAKEUP;
		cv_broadcast(&outer->sq_wait);
	}
	outer->sq_flags = flags;
	ASSERT(outer->sq_count > 0);
	outer->sq_count--;
	mutex_exit(SQLOCK(outer));
}

/*
 * Add another syncq to an outer perimeter.
 * Block out all other access to the outer perimeter while it is being
 * changed using blocksq.
 * Assumes that the caller has *not* done an outer_enter.
 *
 * Vulnerable to starvation in blocksq.
 */
static void
outer_insert(syncq_t *outer, syncq_t *sq)
{
	ASSERT(outer->sq_outer == NULL && outer->sq_onext != NULL &&
	    outer->sq_oprev != NULL);
	ASSERT(sq->sq_outer == NULL && sq->sq_onext == NULL &&
	    sq->sq_oprev == NULL);	/* Can't be in an outer perimeter */

	/* Get exclusive access to the outer perimeter list */
	blocksq(outer, SQ_BLOCKED, 0);
	ASSERT(outer->sq_flags & SQ_BLOCKED);
	ASSERT(!(outer->sq_flags & SQ_WRITER));

	mutex_enter(SQLOCK(sq));
	sq->sq_outer = outer;
	outer->sq_onext->sq_oprev = sq;
	sq->sq_onext = outer->sq_onext;
	outer->sq_onext = sq;
	sq->sq_oprev = outer;
	mutex_exit(SQLOCK(sq));
	unblocksq(outer, SQ_BLOCKED, 1);
}

/*
 * Remove a syncq from an outer perimeter.
 * Block out all other access to the outer perimeter while it is being
 * changed using blocksq.
 * Assumes that the caller has *not* done an outer_enter.
 *
 * Vulnerable to starvation in blocksq.
 */
static void
outer_remove(syncq_t *outer, syncq_t *sq)
{
	ASSERT(outer->sq_outer == NULL && outer->sq_onext != NULL &&
	    outer->sq_oprev != NULL);
	ASSERT(sq->sq_outer == outer);

	/* Get exclusive access to the outer perimeter list */
	blocksq(outer, SQ_BLOCKED, 0);
	ASSERT(outer->sq_flags & SQ_BLOCKED);
	ASSERT(!(outer->sq_flags & SQ_WRITER));

	mutex_enter(SQLOCK(sq));
	sq->sq_outer = NULL;
	sq->sq_onext->sq_oprev = sq->sq_oprev;
	sq->sq_oprev->sq_onext = sq->sq_onext;
	sq->sq_oprev = sq->sq_onext = NULL;
	mutex_exit(SQLOCK(sq));
	unblocksq(outer, SQ_BLOCKED, 1);
}

/*
 * Queue a deferred qwriter(OUTER) callback for this outer perimeter.
 * If this is the first callback for this outer perimeter then add
 * this outer perimeter to the list of outer perimeters that
 * the qwriter_outer_thread will process.
 *
 * Increments sq_count in the outer syncq to prevent the membership
 * of the outer perimeter (in terms of inner syncqs) to change while
 * the callback is pending.
 */
static void
queue_writer(syncq_t *outer, void (*func)(), queue_t *q, mblk_t *mp)
{
	ASSERT(MUTEX_HELD(SQLOCK(outer)));

	mp->b_prev = (mblk_t *)func;
	mp->b_queue = q;
	mp->b_next = NULL;
	outer->sq_count++;	/* Decremented when dequeued */
	ASSERT(outer->sq_count != 0);	/* Wraparound */
	if (outer->sq_evhead == NULL) {
		/* First message. */
		outer->sq_evhead = outer->sq_evtail = mp;
		outer->sq_flags |= SQ_EVENTS;
		mutex_exit(SQLOCK(outer));
		STRSTAT(qwr_outer);
		(void) taskq_dispatch(streams_taskq,
		    (task_func_t *)qwriter_outer_service, outer, TQ_SLEEP);
	} else {
		ASSERT(outer->sq_flags & SQ_EVENTS);
		outer->sq_evtail->b_next = mp;
		outer->sq_evtail = mp;
		mutex_exit(SQLOCK(outer));
	}
}

/*
 * Try and upgrade to write access at the outer perimeter. If this can
 * not be done without blocking then queue the callback to be done
 * by the qwriter_outer_thread.
 *
 * This routine can only be called from put or service procedures plus
 * asynchronous callback routines that have properly entered to
 * queue (with entersq.) Thus qwriter(OUTER) assumes the caller has one claim
 * on the syncq associated with q.
 */
void
qwriter_outer(queue_t *q, mblk_t *mp, void (*func)())
{
	syncq_t	*osq, *sq, *outer;
	int	failed;
	uint16_t flags;

	osq = q->q_syncq;
	outer = osq->sq_outer;
	if (outer == NULL)
		panic("qwriter(PERIM_OUTER): no outer perimeter");
	ASSERT(outer->sq_outer == NULL && outer->sq_onext != NULL &&
	    outer->sq_oprev != NULL);

	mutex_enter(SQLOCK(outer));
	flags = outer->sq_flags;
	/*
	 * If some thread is traversing sq_next, or if we are blocked by
	 * outer_insert or outer_remove, or if the we already have queued
	 * callbacks, then queue this callback for later processing.
	 *
	 * Also queue the qwriter for an interrupt thread in order
	 * to reduce the time spent running at high IPL.
	 * to identify there are events.
	 */
	if ((flags & SQ_GOAWAY) || (curthread->t_pri >= kpreemptpri)) {
		/*
		 * Queue the become_writer request.
		 * The queueing is atomic under SQLOCK(outer) in order
		 * to synchronize with outer_exit.
		 * queue_writer will drop the outer SQLOCK
		 */
		if (flags & SQ_BLOCKED) {
			/* Must set SQ_WRITER on inner perimeter */
			mutex_enter(SQLOCK(osq));
			osq->sq_flags |= SQ_WRITER;
			mutex_exit(SQLOCK(osq));
		} else {
			if (!(flags & SQ_WRITER)) {
				/*
				 * The outer could have been SQ_BLOCKED thus
				 * SQ_WRITER might not be set on the inner.
				 */
				mutex_enter(SQLOCK(osq));
				osq->sq_flags |= SQ_WRITER;
				mutex_exit(SQLOCK(osq));
			}
			ASSERT(osq->sq_flags & SQ_WRITER);
		}
		queue_writer(outer, func, q, mp);
		return;
	}
	/*
	 * We are half-way to exclusive access to the outer perimeter.
	 * Prevent any outer_enter, qwriter(OUTER), or outer_insert/remove
	 * while the inner syncqs are traversed.
	 */
	outer->sq_count++;
	ASSERT(outer->sq_count != 0);	/* wraparound */
	flags |= SQ_WRITER;
	/*
	 * Check if we can run the function immediately. Mark all
	 * syncqs with the writer flag to prevent new entries into
	 * put and service procedures.
	 *
	 * Set SQ_WRITER on all the inner syncqs while holding
	 * the SQLOCK on the outer syncq. This ensures that the changing
	 * of SQ_WRITER is atomic under the outer SQLOCK.
	 */
	failed = 0;
	for (sq = outer->sq_onext; sq != outer; sq = sq->sq_onext) {
		uint16_t count;
		uint_t	maxcnt = (sq == osq) ? 1 : 0;

		mutex_enter(SQLOCK(sq));
		count = sq->sq_count;
		SQ_PUTLOCKS_ENTER(sq);
		SUM_SQ_PUTCOUNTS(sq, count);
		if (sq->sq_count > maxcnt)
			failed = 1;
		sq->sq_flags |= SQ_WRITER;
		SQ_PUTLOCKS_EXIT(sq);
		mutex_exit(SQLOCK(sq));
	}
	if (failed) {
		/*
		 * Some other thread has a read claim on the outer perimeter.
		 * Queue the callback for deferred processing.
		 *
		 * queue_writer will set SQ_QUEUED before we drop SQ_WRITER
		 * so that other qwriter(OUTER) calls will queue their
		 * callbacks as well. queue_writer increments sq_count so we
		 * decrement to compensate for the our increment.
		 *
		 * Dropping SQ_WRITER enables the writer thread to work
		 * on this outer perimeter.
		 */
		outer->sq_flags = flags;
		queue_writer(outer, func, q, mp);
		/* queue_writer dropper the lock */
		mutex_enter(SQLOCK(outer));
		ASSERT(outer->sq_count > 0);
		outer->sq_count--;
		ASSERT(outer->sq_flags & SQ_WRITER);
		flags = outer->sq_flags;
		flags &= ~SQ_WRITER;
		if (flags & SQ_WANTWAKEUP) {
			flags &= ~SQ_WANTWAKEUP;
			cv_broadcast(&outer->sq_wait);
		}
		outer->sq_flags = flags;
		mutex_exit(SQLOCK(outer));
		return;
	} else {
		outer->sq_flags = flags;
		mutex_exit(SQLOCK(outer));
	}

	/* Can run it immediately */
	(*func)(q, mp);

	outer_exit(outer);
}

/*
 * Dequeue all writer callbacks from the outer perimeter and run them.
 */
static void
write_now(syncq_t *outer)
{
	mblk_t		*mp;
	queue_t		*q;
	void	(*func)();

	ASSERT(MUTEX_HELD(SQLOCK(outer)));
	ASSERT(outer->sq_outer == NULL && outer->sq_onext != NULL &&
	    outer->sq_oprev != NULL);
	while ((mp = outer->sq_evhead) != NULL) {
		/*
		 * queues cannot be placed on the queuelist on the outer
		 * perimiter.
		 */
		ASSERT(!(outer->sq_flags & SQ_MESSAGES));
		ASSERT((outer->sq_flags & SQ_EVENTS));

		outer->sq_evhead = mp->b_next;
		if (outer->sq_evhead == NULL) {
			outer->sq_evtail = NULL;
			outer->sq_flags &= ~SQ_EVENTS;
		}
		ASSERT(outer->sq_count != 0);
		outer->sq_count--;	/* Incremented when enqueued. */
		mutex_exit(SQLOCK(outer));
		/*
		 * Drop the message if the queue is closing.
		 * Make sure that the queue is "claimed" when the callback
		 * is run in order to satisfy various ASSERTs.
		 */
		q = mp->b_queue;
		func = (void (*)())mp->b_prev;
		ASSERT(func != NULL);
		mp->b_next = mp->b_prev = NULL;
		if (q->q_flag & QWCLOSE) {
			freemsg(mp);
		} else {
			claimq(q);
			(*func)(q, mp);
			releaseq(q);
		}
		mutex_enter(SQLOCK(outer));
	}
	ASSERT(MUTEX_HELD(SQLOCK(outer)));
}

/*
 * The list of messages on the inner syncq is effectively hashed
 * by destination queue.  These destination queues are doubly
 * linked lists (hopefully) in priority order.  Messages are then
 * put on the queue referenced by the q_sqhead/q_sqtail elements.
 * Additional messages are linked together by the b_next/b_prev
 * elements in the mblk, with (similar to putq()) the first message
 * having a NULL b_prev and the last message having a NULL b_next.
 *
 * Events, such as qwriter callbacks, are put onto a list in FIFO
 * order referenced by sq_evhead, and sq_evtail.  This is a singly
 * linked list, and messages here MUST be processed in the order queued.
 */

/*
 * Run the events on the syncq event list (sq_evhead).
 * Assumes there is only one claim on the syncq, it is
 * already exclusive (SQ_EXCL set), and the SQLOCK held.
 * Messages here are processed in order, with the SQ_EXCL bit
 * held all the way through till the last message is processed.
 */
void
sq_run_events(syncq_t *sq)
{
	mblk_t		*bp;
	queue_t		*qp;
	uint16_t	flags = sq->sq_flags;
	void		(*func)();

	ASSERT(MUTEX_HELD(SQLOCK(sq)));
	ASSERT((sq->sq_outer == NULL && sq->sq_onext == NULL &&
		sq->sq_oprev == NULL) ||
		(sq->sq_outer != NULL && sq->sq_onext != NULL &&
		sq->sq_oprev != NULL));

	ASSERT(flags & SQ_EXCL);
	ASSERT(sq->sq_count == 1);

	/*
	 * We need to process all of the events on this list.  It
	 * is possible that new events will be added while we are
	 * away processing a callback, so on every loop, we start
	 * back at the beginning of the list.
	 */
	/*
	 * We have to reaccess sq_evhead since there is a
	 * possibility of a new entry while we were running
	 * the callback.
	 */
	for (bp = sq->sq_evhead; bp != NULL; bp = sq->sq_evhead) {
		ASSERT(bp->b_queue->q_syncq == sq);
		ASSERT(sq->sq_flags & SQ_EVENTS);

		qp = bp->b_queue;
		func = (void (*)())bp->b_prev;
		ASSERT(func != NULL);

		/*
		 * Messages from the event queue must be taken off in
		 * FIFO order.
		 */
		ASSERT(sq->sq_evhead == bp);
		sq->sq_evhead = bp->b_next;

		if (bp->b_next == NULL) {
			/* Deleting last */
			ASSERT(sq->sq_evtail == bp);
			sq->sq_evtail = NULL;
			sq->sq_flags &= ~SQ_EVENTS;
		}
		bp->b_prev = bp->b_next = NULL;
		ASSERT(bp->b_datap->db_ref != 0);

		mutex_exit(SQLOCK(sq));

		(*func)(qp, bp);

		mutex_enter(SQLOCK(sq));
		/*
		 * re-read the flags, since they could have changed.
		 */
		flags = sq->sq_flags;
		ASSERT(flags & SQ_EXCL);
	}
	ASSERT(sq->sq_evhead == NULL && sq->sq_evtail == NULL);
	ASSERT(!(sq->sq_flags & SQ_EVENTS));

	if (flags & SQ_WANTWAKEUP) {
		flags &= ~SQ_WANTWAKEUP;
		cv_broadcast(&sq->sq_wait);
	}
	if (flags & SQ_WANTEXWAKEUP) {
		flags &= ~SQ_WANTEXWAKEUP;
		cv_broadcast(&sq->sq_exitwait);
	}
	sq->sq_flags = flags;
}

/*
 * Put messages on the event list.
 * If we can go exclusive now, do so and process the event list, otherwise
 * let the last claim service this list (or wake the sqthread).
 * This procedure assumes SQLOCK is held.  To run the event list, it
 * must be called with no claims.
 */
static void
sqfill_events(syncq_t *sq, queue_t *q, mblk_t *mp, void (*func)())
{
	uint16_t count;

	ASSERT(MUTEX_HELD(SQLOCK(sq)));
	ASSERT(func != NULL);

	/*
	 * This is a callback.  Add it to the list of callbacks
	 * and see about upgrading.
	 */
	mp->b_prev = (mblk_t *)func;
	mp->b_queue = q;
	mp->b_next = NULL;
	if (sq->sq_evhead == NULL) {
		sq->sq_evhead = sq->sq_evtail = mp;
		sq->sq_flags |= SQ_EVENTS;
	} else {
		ASSERT(sq->sq_evtail != NULL);
		ASSERT(sq->sq_evtail->b_next == NULL);
		ASSERT(sq->sq_flags & SQ_EVENTS);
		sq->sq_evtail->b_next = mp;
		sq->sq_evtail = mp;
	}
	/*
	 * We have set SQ_EVENTS, so threads will have to
	 * unwind out of the perimiter, and new entries will
	 * not grab a putlock.  But we still need to know
	 * how many threads have already made a claim to the
	 * syncq, so grab the putlocks, and sum the counts.
	 * If there are no claims on the syncq, we can upgrade
	 * to exclusive, and run the event list.
	 * NOTE: We hold the SQLOCK, so we can just grab the
	 * putlocks.
	 */
	count = sq->sq_count;
	SQ_PUTLOCKS_ENTER(sq);
	SUM_SQ_PUTCOUNTS(sq, count);
	/*
	 * We have no claim, so we need to check if there
	 * are no others, then we can upgrade.
	 */
	/*
	 * There are currently no claims on
	 * the syncq by this thread (at least on this entry). The thread who has
	 * the claim should drain syncq.
	 */
	if (count > 0) {
		/*
		 * Can't upgrade - other threads inside.
		 */
		SQ_PUTLOCKS_EXIT(sq);
		mutex_exit(SQLOCK(sq));
		return;
	}
	/*
	 * Need to set SQ_EXCL and make a claim on the syncq.
	 */
	ASSERT((sq->sq_flags & SQ_EXCL) == 0);
	sq->sq_flags |= SQ_EXCL;
	ASSERT(sq->sq_count == 0);
	sq->sq_count++;
	SQ_PUTLOCKS_EXIT(sq);

	/* Process the events list */
	sq_run_events(sq);

	/*
	 * Release our claim...
	 */
	sq->sq_count--;

	/*
	 * And release SQ_EXCL.
	 * We don't need to acquire the putlocks to release
	 * SQ_EXCL, since we are exclusive, and hold the SQLOCK.
	 */
	sq->sq_flags &= ~SQ_EXCL;

	/*
	 * sq_run_events should have released SQ_EXCL
	 */
	ASSERT(!(sq->sq_flags & SQ_EXCL));

	/*
	 * If anything happened while we were running the
	 * events (or was there before), we need to process
	 * them now.  We shouldn't be exclusive sine we
	 * released the perimiter above (plus, we asserted
	 * for it).
	 */
	if (!(sq->sq_flags & SQ_STAYAWAY) && (sq->sq_flags & SQ_QUEUED))
		drain_syncq(sq);
	else
		mutex_exit(SQLOCK(sq));
}

/*
 * Perform delayed processing. The caller has to make sure that it is safe
 * to enter the syncq (e.g. by checking that none of the SQ_STAYAWAY bits are
 * set.)
 *
 * Assume that the caller has NO claims on the syncq.  However, a claim
 * on the syncq does not indicate that a thread is draining the syncq.
 * There may be more claims on the syncq than there are threads draining
 * (i.e.  #_threads_draining <= sq_count)
 *
 * drain_syncq has to terminate when one of the SQ_STAYAWAY bits gets set
 * in order to preserve qwriter(OUTER) ordering constraints.
 *
 * sq_putcount only needs to be checked when dispatching the queued
 * writer call for CIPUT sync queue, but this is handled in sq_run_events.
 */
void
drain_syncq(syncq_t *sq)
{
	queue_t		*qp;
	uint16_t	count;
	uint16_t	type = sq->sq_type;
	uint16_t	flags = sq->sq_flags;
	boolean_t	bg_service = sq->sq_svcflags & SQ_SERVICE;

	TRACE_1(TR_FAC_STREAMS_FR, TR_DRAIN_SYNCQ_START,
		"drain_syncq start:%p", sq);
	ASSERT(MUTEX_HELD(SQLOCK(sq)));
	ASSERT((sq->sq_outer == NULL && sq->sq_onext == NULL &&
		sq->sq_oprev == NULL) ||
		(sq->sq_outer != NULL && sq->sq_onext != NULL &&
		sq->sq_oprev != NULL));

	/*
	 * Drop SQ_SERVICE flag.
	 */
	if (bg_service)
		sq->sq_svcflags &= ~SQ_SERVICE;

	/*
	 * If SQ_EXCL is set, someone else is processing this syncq - let him
	 * finish the job.
	 */
	if (flags & SQ_EXCL) {
		if (bg_service) {
			ASSERT(sq->sq_servcount != 0);
			sq->sq_servcount--;
		}
		mutex_exit(SQLOCK(sq));
		return;
	}

	/*
	 * This routine can be called by a background thread if
	 * it was scheduled by a hi-priority thread.  SO, if there are
	 * NOT messages queued, return (remember, we have the SQLOCK,
	 * and it cannot change until we release it). Wakeup any waiters also.
	 */
	if (!(flags & SQ_QUEUED)) {
		if (flags & SQ_WANTWAKEUP) {
			flags &= ~SQ_WANTWAKEUP;
			cv_broadcast(&sq->sq_wait);
		}
		if (flags & SQ_WANTEXWAKEUP) {
			flags &= ~SQ_WANTEXWAKEUP;
			cv_broadcast(&sq->sq_exitwait);
		}
		sq->sq_flags = flags;
		if (bg_service) {
			ASSERT(sq->sq_servcount != 0);
			sq->sq_servcount--;
		}
		mutex_exit(SQLOCK(sq));
		return;
	}

	/*
	 * If this is not a concurrent put perimiter, we need to
	 * become exclusive to drain.  Also, if not CIPUT, we would
	 * not have acquired a putlock, so we don't need to check
	 * the putcounts.  If not entering with a claim, we test
	 * for sq_count == 0.
	 */
	type = sq->sq_type;
	if (!(type & SQ_CIPUT)) {
		if (sq->sq_count > 1) {
			if (bg_service) {
				ASSERT(sq->sq_servcount != 0);
				sq->sq_servcount--;
			}
			mutex_exit(SQLOCK(sq));
			return;
		}
		sq->sq_flags |= SQ_EXCL;
	}

	/*
	 * This is where we make a claim to the syncq.
	 * This can either be done by incrementing a putlock, or
	 * the sq_count.  But since we already have the SQLOCK
	 * here, we just bump the sq_count.
	 *
	 * Note that after we make a claim, we need to let the code
	 * fall through to the end of this routine to clean itself
	 * up.  A return in the while loop will put the syncq in a
	 * very bad state.
	 */
	sq->sq_count++;
	ASSERT(sq->sq_count != 0);	/* wraparound */

	while ((flags = sq->sq_flags) & SQ_QUEUED) {
		/*
		 * If we are told to stayaway or went exclusive,
		 * we are done.
		 */
		if (flags & (SQ_STAYAWAY)) {
			break;
		}

		/*
		 * If there are events to run, do so.
		 * We have one claim to the syncq, so if there are
		 * more than one, other threads are running.
		 */
		if (sq->sq_evhead != NULL) {
			ASSERT(sq->sq_flags & SQ_EVENTS);

			count = sq->sq_count;
			SQ_PUTLOCKS_ENTER(sq);
			SUM_SQ_PUTCOUNTS(sq, count);
			if (count > 1) {
				SQ_PUTLOCKS_EXIT(sq);
				/* Can't upgrade - other threads inside */
				break;
			}
			ASSERT((flags & SQ_EXCL) == 0);
			sq->sq_flags = flags | SQ_EXCL;
			SQ_PUTLOCKS_EXIT(sq);
			/*
			 * we have the only claim, run the events,
			 * sq_run_events will clear the SQ_EXCL flag.
			 */
			sq_run_events(sq);

			/*
			 * If this is a CIPUT perimiter, we need
			 * to drop the SQ_EXCL flag so we can properly
			 * continue draining the syncq.
			 */
			if (type & SQ_CIPUT) {
				ASSERT(sq->sq_flags & SQ_EXCL);
				sq->sq_flags &= ~SQ_EXCL;
			}

			/*
			 * And go back to the beginning just in case
			 * anything changed while we were away.
			 */
			ASSERT((sq->sq_flags & SQ_EXCL) || (type & SQ_CIPUT));
			continue;
		}

		ASSERT(sq->sq_evhead == NULL);
		ASSERT(!(sq->sq_flags & SQ_EVENTS));

		/*
		 * Find the queue that is not draining.
		 *
		 * q_draining is protected by QLOCK which we do not hold.
		 * But if it was set, then a thread was draining, and if it gets
		 * cleared, then it was because the thread has successfully
		 * drained the syncq, or a GOAWAY state occured. For the GOAWAY
		 * state to happen, a thread needs the SQLOCK which we hold, and
		 * if there was such a flag, we whould have already seen it.
		 */

		for (qp = sq->sq_head;
		    qp != NULL && (qp->q_draining ||
			(qp->q_sqflags & Q_SQDRAINING));
		    qp = qp->q_sqnext)
			;

		if (qp == NULL)
			break;

		/*
		 * We have a queue to work on, and we hold the
		 * SQLOCK and one claim, call qdrain_syncq.
		 * This means we need to release the SQLOCK and
		 * aquire the QLOCK (OK since we have a claim).
		 * Note that qdrain_syncq will actually dequeue
		 * this queue from the sq_head list when it is
		 * convinced all the work is done and release
		 * the QLOCK before returning.
		 */
		qp->q_sqflags |= Q_SQDRAINING;
		mutex_exit(SQLOCK(sq));
		mutex_enter(QLOCK(qp));
		qdrain_syncq(sq, qp);
		mutex_enter(SQLOCK(sq));

		/* The queue is drained */
		ASSERT(qp->q_sqflags & Q_SQDRAINING);
		qp->q_sqflags &= ~Q_SQDRAINING;
		/*
		 * NOTE: After this point qp should not be used since it may be
		 * closed.
		 */
	}

	ASSERT(MUTEX_HELD(SQLOCK(sq)));
	flags = sq->sq_flags;

	/*
	 * sq->sq_head cannot change because we hold the
	 * sqlock. However, a thread CAN decide that it is no longer
	 * going to drain that queue.  However, this should be due to
	 * a GOAWAY state, and we should see that here.
	 *
	 * This loop is not very efficient. One solution may be adding a second
	 * pointer to the "draining" queue, but it is difficult to do when
	 * queues are inserted in the middle due to priority ordering. Another
	 * possibility is to yank the queue out of the sq list and put it onto
	 * the "draining list" and then put it back if it can't be drained.
	 */

	ASSERT((sq->sq_head == NULL) || (flags & SQ_GOAWAY) ||
		(type & SQ_CI) || sq->sq_head->q_draining);

	/* Drop SQ_EXCL for non-CIPUT perimiters */
	if (!(type & SQ_CIPUT))
		flags &= ~SQ_EXCL;
	ASSERT((flags & SQ_EXCL) == 0);

	/* Wake up any waiters. */
	if (flags & SQ_WANTWAKEUP) {
		flags &= ~SQ_WANTWAKEUP;
		cv_broadcast(&sq->sq_wait);
	}
	if (flags & SQ_WANTEXWAKEUP) {
		flags &= ~SQ_WANTEXWAKEUP;
		cv_broadcast(&sq->sq_exitwait);
	}
	sq->sq_flags = flags;

	ASSERT(sq->sq_count != 0);
	/* Release our claim. */
	sq->sq_count--;

	if (bg_service) {
		ASSERT(sq->sq_servcount != 0);
		sq->sq_servcount--;
	}

	mutex_exit(SQLOCK(sq));

	TRACE_1(TR_FAC_STREAMS_FR, TR_DRAIN_SYNCQ_END,
		"drain_syncq end:%p", sq);
}


/*
 *
 * qdrain_syncq can be called (currently) from only one of two places:
 *	drain_syncq
 * 	putnext  (or some variation of it).
 * and eventually
 * 	qwait(_sig)
 *
 * If called from drain_syncq, we found it in the list
 * of queue's needing service, so there is work to be done (or it
 * wouldn't be on the list).
 *
 * If called from some putnext variation, it was because the
 * perimiter is open, but messages are blocking a putnext and
 * there is not a thread working on it.  Now a thread could start
 * working on it while we are getting ready to do so ourself, but
 * the thread would set the q_draining flag, and we can spin out.
 *
 * As for qwait(_sig), I think I shall let it continue to call
 * drain_syncq directly (after all, it will get here eventually).
 *
 * qdrain_syncq has to terminate when:
 * - one of the SQ_STAYAWAY bits gets set to preserve qwriter(OUTER) ordering
 * - SQ_EVENTS gets set to preserve qwriter(INNER) ordering
 *
 * ASSUMES:
 *	One claim
 * 	QLOCK held
 * 	SQLOCK not held
 *	Will release QLOCK before returning
 */
void
qdrain_syncq(syncq_t *sq, queue_t *q)
{
	mblk_t		*bp;
	boolean_t	do_clr;
#ifdef DEBUG
	uint16_t	count;
#endif

	TRACE_1(TR_FAC_STREAMS_FR, TR_DRAIN_SYNCQ_START,
		"drain_syncq start:%p", sq);
	ASSERT(q->q_syncq == sq);
	ASSERT(MUTEX_HELD(QLOCK(q)));
	ASSERT(MUTEX_NOT_HELD(SQLOCK(sq)));
	/*
	 * For non-CIPUT perimiters, we should be called with the
	 * exclusive bit set already.  For non-CIPUT perimiters we
	 * will be doing a concurrent drain, so it better not be set.
	 */
	ASSERT((sq->sq_flags & (SQ_EXCL|SQ_CIPUT)));
	ASSERT(!((sq->sq_type & SQ_CIPUT) && (sq->sq_flags & SQ_EXCL)));
	ASSERT((sq->sq_type & SQ_CIPUT) || (sq->sq_flags & SQ_EXCL));
	/*
	 * All outer pointers are set, or none of them are
	 */
	ASSERT((sq->sq_outer == NULL && sq->sq_onext == NULL &&
		sq->sq_oprev == NULL) ||
		(sq->sq_outer != NULL && sq->sq_onext != NULL &&
		sq->sq_oprev != NULL));
#ifdef DEBUG
	count = sq->sq_count;
	/*
	 * This is OK without the putlocks, because we have one
	 * claim either from the sq_count, or a putcount.  We could
	 * get an erroneous value from other counts, but ours won't
	 * change, so one way or another, we will have at least a
	 * value of one.
	 */
	SUM_SQ_PUTCOUNTS(sq, count);
	ASSERT(count >= 1);
#endif /* DEBUG */

	/*
	 * The first thing to do here, is find out if a thread is already
	 * draining this queue or the queue is closing. If so, we are done,
	 * just return. Also, if there are no messages, we are done as well.
	 * Note that we check the q_sqhead since there is s window of
	 * opportunity for us to enter here because Q_SQQUEUED was set, but is
	 * not anymore.
	 */
	if (q->q_draining || (q->q_sqhead == NULL)) {
		mutex_exit(QLOCK(q));
		return;
	}

	/*
	 * If the perimiter is exclusive, there is nothing we can
	 * do right now, go away.
	 * Note that there is nothing to prevent this case from changing
	 * right after this check, but the spin-out will catch it.
	 */

	/* Tell other threads that we are draining this queue */
	q->q_draining = 1;	/* Protected by QLOCK */

	for (bp = q->q_sqhead; bp != NULL; bp = q->q_sqhead) {

		/*
		 * Because we can enter this routine just because
		 * a putnext is blocked, we need to spin out if
		 * the perimiter wants to go exclusive as well
		 * as just blocked. We need to spin out also if
		 * events are queued on the syncq.
		 * Don't check for SQ_EXCL, because non-CIPUT
		 * perimiters would set it, and it can't become
		 * exclusive while we hold a claim.
		 */
		if (sq->sq_flags & (SQ_STAYAWAY | SQ_EVENTS)) {
			break;
		}

#ifdef DEBUG
		/*
		 * Since we are in qdrain_syncq, we already know the queue,
		 * but for sanity, we want to check this against the qp that
		 * was passed in by bp->b_queue.
		 */

		ASSERT(bp->b_queue == q);
		ASSERT(bp->b_queue->q_syncq == sq);
		bp->b_queue = NULL;

		/*
		 * We would have the following check in the DEBUG code:
		 *
		 * if (bp->b_prev != NULL)  {
		 *	ASSERT(bp->b_prev == (void (*)())q->q_qinfo->qi_putp);
		 * }
		 *
		 * This can't be done, however, since IP modifies qinfo
		 * structure at run-time (switching between IPv4 qinfo and IPv6
		 * qinfo), invalidating the check.
		 * So the assignment to func is left here, but the ASSERT itself
		 * is removed until the whole issue is resolved.
		 */
#endif
		ASSERT(q->q_sqhead == bp);
		q->q_sqhead = bp->b_next;
		bp->b_prev = bp->b_next = NULL;
		ASSERT(q->q_syncqmsgs > 0);
		mutex_exit(QLOCK(q));

		ASSERT(bp->b_datap->db_ref != 0);

		(void) (*q->q_qinfo->qi_putp)(q, bp);

		mutex_enter(QLOCK(q));
		/*
		 * We should decrement q_syncqmsgs only after executing the
		 * put procedure to avoid a possible race with putnext().
		 * In putnext() though it sees Q_SQQUEUED is set, there is
		 * an optimization which allows putnext to call the put
		 * procedure directly if (q_syncqmsgs == 0) and thus
		 * a message reodering could otherwise occur.
		 */
		q->q_syncqmsgs--;

		/*
		 * Clear QFULL in the next service procedure queue if
		 * this is the last message destined to that queue.
		 *
		 * It would make better sense to have some sort of
		 * tunable for the low water mark, but these symantics
		 * are not yet defined.  So, alas, we use a constant.
		 */
		do_clr = (q->q_syncqmsgs == 0);
		mutex_exit(QLOCK(q));

		if (do_clr)
			clr_qfull(q);

		mutex_enter(QLOCK(q));
		/*
		 * Always clear SQ_EXCL when CIPUT in order to handle
		 * qwriter(INNER).
		 */
		/*
		 * The putp() can call qwriter and get exclusive access
		 * IFF this is the only claim.  So, we need to test for
		 * this possibility so we can aquire the mutex and clear
		 * the bit.
		 */
		if ((sq->sq_type & SQ_CIPUT) && (sq->sq_flags & SQ_EXCL)) {
			mutex_enter(SQLOCK(sq));
			sq->sq_flags &= ~SQ_EXCL;
			mutex_exit(SQLOCK(sq));
		}
	}

	/*
	 * We should either have no queues on the syncq, or we were
	 * told to goaway by a waiter (which we will wake up at the
	 * end of this function).
	 */
	ASSERT((q->q_sqhead == NULL) ||
	    (sq->sq_flags & (SQ_STAYAWAY | SQ_EVENTS)));

	ASSERT(MUTEX_HELD(QLOCK(q)));
	ASSERT(MUTEX_NOT_HELD(SQLOCK(sq)));

	/*
	 * Remove the q from the syncq list if all the messages are
	 * drained.
	 */
	if (q->q_sqhead == NULL) {
		mutex_enter(SQLOCK(sq));
		if (q->q_sqflags & Q_SQQUEUED)
			SQRM_Q(sq, q);
		mutex_exit(SQLOCK(sq));
		/*
		 * Since the queue is removed from the list, reset its priority.
		 */
		q->q_spri = 0;
	}

	/*
	 * Remember, the q_draining flag is used to let another
	 * thread know that there is a thread currently draining
	 * the messages for a queue.  Since we are now done with
	 * this queue (even if there may be messages still there),
	 * we need to clear this flag so some thread will work
	 * on it if needed.
	 */
	ASSERT(q->q_draining);
	q->q_draining = 0;

	/* called with a claim, so OK to drop all locks. */
	mutex_exit(QLOCK(q));

	TRACE_1(TR_FAC_STREAMS_FR, TR_DRAIN_SYNCQ_END,
		"drain_syncq end:%p", sq);
}
/* END OF QDRAIN_SYNCQ  */


/*
 * This is the mate to qdrain_syncq, except that it is putting the
 * message onto the the queue instead draining.  Since the
 * message is destined for the queue that is selected, there is
 * no need to identify the function because the message is
 * intended for the put routine for the queue.  But this
 * routine will do it anyway just in case (but only for debug kernels).
 *
 * After the message is enqueued on the syncq, it calls putnext_tail()
 * which will schedule a background thread to actually process the message.
 *
 * Assumes that there is a claim on the syncq (sq->sq_count > 0) and
 * SQLOCK(sq) and QLOCK(q) are not held.
 */
void
qfill_syncq(syncq_t *sq, queue_t *q, mblk_t *mp)
{
	queue_t		*fq = NULL;

	ASSERT(MUTEX_NOT_HELD(SQLOCK(sq)));
	ASSERT(MUTEX_NOT_HELD(QLOCK(q)));
	ASSERT(sq->sq_count > 0);
	ASSERT(q->q_syncq == sq);
	ASSERT((sq->sq_outer == NULL && sq->sq_onext == NULL &&
		sq->sq_oprev == NULL) ||
		(sq->sq_outer != NULL && sq->sq_onext != NULL &&
		sq->sq_oprev != NULL));

	mutex_enter(QLOCK(q));

	/*
	 * Set QFULL in next service procedure queue (that cares) if not
	 * already set and if there are already more messages on the syncq
	 * than sq_max_size.  If sq_max_size is 0, no flow control will be
	 * asserted on any syncq.
	 *
	 * The fq here is the next queue with a service procedure.
	 * This is where we would fail canputnext, so this is where we
	 * need to set QFULL.
	 *
	 * LOCKING HIERARCHY: In the case when fq != q we need to
	 *  a) Take QLOCK(fq) to set QFULL flag and
	 *  b) Take sd_reflock in the case of the hot stream to update
	 *  	sd_refcnt.
	 * We already have QLOCK at this point. To avoid cross-locks with
	 * freezestr() which grabs all QLOCKs and with strlock() which grabs
	 * both SQLOCK and sd_reflock, we need to drop respective locks first.
	 */
	if ((sq_max_size != 0) && (!(q->q_nfsrv->q_flag & QFULL)) &&
	    (q->q_syncqmsgs > sq_max_size)) {
		if ((fq = q->q_nfsrv) == q) {
			fq->q_flag |= QFULL;
		} else {
			mutex_exit(QLOCK(q));
			mutex_enter(QLOCK(fq));
			fq->q_flag |= QFULL;
			mutex_exit(QLOCK(fq));
			mutex_enter(QLOCK(q));
		}
	}

#ifdef DEBUG
	/*
	 * This is used for debug in the qfill_syncq/qdrain_syncq case
	 * to trace the queue that the message is intended for.  Note
	 * that the original use was to identify the queue and function
	 * to call on the drain.  In the new syncq, we have the context
	 * of the queue that we are draining, so call it's putproc and
	 * don't rely on the saved values.  But for debug this is still
	 * usefull information.
	 */
	mp->b_prev = (mblk_t *)q->q_qinfo->qi_putp;
	mp->b_queue = q;
	mp->b_next = NULL;
#endif
	ASSERT(q->q_syncq == sq);
	/*
	 * Enqueue the message on the list.
	 * SQPUT_MP() accesses q_syncqmsgs.  We are already holding QLOCK to
	 * protect it.  So its ok to acquire SQLOCK after SQPUT_MP().
	 */
	SQPUT_MP(q, mp);
	mutex_enter(SQLOCK(sq));

	/*
	 * And queue on syncq for scheduling, if not already queued.
	 * Note that we need the SQLOCK for this, and for testing flags
	 * at the end to see if we will drain.  So grab it now, and
	 * release it before we call qdrain_syncq or return.
	 */
	if (!(q->q_sqflags & Q_SQQUEUED)) {
		q->q_spri = curthread->t_pri;
		SQPUT_Q(sq, q);
	}
#ifdef DEBUG
	else {
		/*
		 * All of these conditions MUST be true!
		 */
		ASSERT(sq->sq_tail != NULL);
		if (sq->sq_tail == sq->sq_head) {
			ASSERT((q->q_sqprev == NULL) &&
			    (q->q_sqnext == NULL));
		} else {
			ASSERT((q->q_sqprev != NULL) ||
			    (q->q_sqnext != NULL));
		}
		ASSERT(sq->sq_flags & SQ_QUEUED);
		ASSERT(q->q_syncqmsgs != 0);
		ASSERT(q->q_sqflags & Q_SQQUEUED);
	}
#endif
	mutex_exit(QLOCK(q));
	/*
	 * SQLOCK is still held, so sq_count can be safely decremented.
	 */
	sq->sq_count--;

	putnext_tail(sq, q, 0);
	/* Should not reference sq or q after this point. */
}

/*  End of qfill_syncq  */

/*
 * Remove all messages from a syncq (if qp is NULL) or remove all messages
 * that would be put into qp by drain_syncq.
 * Used when deleting the syncq (qp == NULL) or when detaching
 * a queue (qp != NULL).
 * Return non-zero if one or more messages were freed.
 *
 * no need to grab sq_putlocks here. See comment in strsubr.h that explains when
 * sq_putlocks are used.
 *
 * NOTE: This function assumes that it is called from the close() context and
 * that all the queues in the syncq are going aay. For this reason it doesn't
 * acquire QLOCK for modifying q_sqhead/q_sqtail fields. This assumption is
 * currently valid, but it is useful to rethink this function to behave properly
 * in other cases.
 */
int
flush_syncq(syncq_t *sq, queue_t *qp)
{
	mblk_t		*bp, *mp_head, *mp_next, *mp_prev;
	queue_t		*q;
	int		ret = 0;

	mutex_enter(SQLOCK(sq));

	/*
	 * Before we leave, we need to make sure there are no
	 * events listed for this queue.  All events for this queue
	 * will just be freed.
	 */
	if (qp != NULL && sq->sq_evhead != NULL) {
		ASSERT(sq->sq_flags & SQ_EVENTS);

		mp_prev = NULL;
		for (bp = sq->sq_evhead; bp != NULL; bp = mp_next) {
			mp_next = bp->b_next;
			if (bp->b_queue == qp) {
				/* Delete this message */
				if (mp_prev != NULL) {
					mp_prev->b_next = mp_next;
					/*
					 * Update sq_evtail if the last element
					 * is removed.
					 */
					if (bp == sq->sq_evtail) {
						ASSERT(mp_next == NULL);
						sq->sq_evtail = mp_prev;
					}
				} else
					sq->sq_evhead = mp_next;
				if (sq->sq_evhead == NULL)
					sq->sq_flags &= ~SQ_EVENTS;
				bp->b_prev = bp->b_next = NULL;
				freemsg(bp);
				ret++;
			} else {
				mp_prev = bp;
			}
		}
	}

	/*
	 * Walk sq_head and:
	 *	- match qp if qp is set, remove it's messages
	 *	- all if qp is not set
	 */
	q = sq->sq_head;
	while (q != NULL) {
		ASSERT(q->q_syncq == sq);
		if ((qp == NULL) || (qp == q)) {
			/*
			 * Yank the messages as a list off the queue
			 */
			mp_head = q->q_sqhead;
			/*
			 * We do not have QLOCK(q) here (which is safe due to
			 * assumptions mentioned above). To obtain the lock we
			 * need to release SQLOCK which may allow lots of things
			 * to change upon us. This place requires more analysis.
			 */
			q->q_sqhead = q->q_sqtail = NULL;
			ASSERT(mp_head->b_queue &&
			    mp_head->b_queue->q_syncq == sq);

			/*
			 * Free each of the messages.
			 */
			for (bp = mp_head; bp != NULL; bp = mp_next) {
				mp_next = bp->b_next;
				bp->b_prev = bp->b_next = NULL;
				freemsg(bp);
				ret++;
			}
			/*
			 * Now remove the queue from the syncq.
			 */
			ASSERT(q->q_sqflags & Q_SQQUEUED);
			SQRM_Q(sq, q);
			q->q_spri = 0;
			q->q_syncqmsgs = 0;

			/*
			 * If qp was specified, we are done with it and are
			 * going to drop SQLOCK(sq) and return. We wakeup syncq
			 * waiters while we still have the SQLOCK.
			 */
			if ((qp != NULL) && (sq->sq_flags & SQ_WANTWAKEUP)) {
				sq->sq_flags &= ~SQ_WANTWAKEUP;
				cv_broadcast(&sq->sq_wait);
			}
			/* Drop SQLOCK across clr_qfull */
			mutex_exit(SQLOCK(sq));

			/*
			 * We avoid doing the test that drain_syncq does and
			 * unconditionally clear qfull for every flushed
			 * message. Since flush_syncq is only called during
			 * close this should not be a problem.
			 */
			clr_qfull(q);
			if (qp != NULL) {
				return (ret);
			} else {
				mutex_enter(SQLOCK(sq));
				/*
				 * The head was removed by SQRM_Q above.
				 * reread the new head and flush it.
				 */
				q = sq->sq_head;
			}
		} else {
			q = q->q_sqnext;
		}
		ASSERT(MUTEX_HELD(SQLOCK(sq)));
	}

	if (sq->sq_flags & SQ_WANTWAKEUP) {
		sq->sq_flags &= ~SQ_WANTWAKEUP;
		cv_broadcast(&sq->sq_wait);
	}

	mutex_exit(SQLOCK(sq));
	return (ret);
}

/*
 * Propagate all messages from a syncq to the next syncq that are associated
 * with the specified queue. If the queue is attached to a driver or if the
 * messages have been added due to a qwriter(PERIM_INNER), free the messages.
 *
 * Assumes that the stream is strlock()'ed. We don't come here if there
 * are no messages to propagate.
 *
 * NOTE : If the queue is attached to a driver, all the messages are freed
 * as there is no point in propagating the messages from the driver syncq
 * to the closing stream head which will in turn get freed later.
 */
static int
propagate_syncq(queue_t *qp)
{
	mblk_t		*bp, *head, *tail, *prev, *next;
	syncq_t 	*sq;
	queue_t		*nqp;
	syncq_t		*nsq;
	boolean_t	isdriver;
	int 		moved = 0;
	uint16_t	flags;
	pri_t		priority = curthread->t_pri;
#ifdef DEBUG
	void		(*func)();
#endif

	sq = qp->q_syncq;
	ASSERT(MUTEX_HELD(SQLOCK(sq)));
	/* debug macro */
	SQ_PUTLOCKS_HELD(sq);
	/*
	 * As entersq() does not increment the sq_count for
	 * the write side, check sq_count for non-QPERQ
	 * perimeters alone.
	 */
	ASSERT((qp->q_flag & QPERQ) || (sq->sq_count >= 1));

	/*
	 * propagate_syncq() can be called because of either messages on the
	 * queue syncq or because on events on the queue syncq. Do actual
	 * message propagations if there are any messages.
	 */
	if (qp->q_syncqmsgs) {
		isdriver = (qp->q_flag & QISDRV);

		if (!isdriver) {
			nqp = qp->q_next;
			nsq = nqp->q_syncq;
			ASSERT(MUTEX_HELD(SQLOCK(nsq)));
			/* debug macro */
			SQ_PUTLOCKS_HELD(nsq);
#ifdef DEBUG
			func = (void (*)())nqp->q_qinfo->qi_putp;
#endif
		}

		SQRM_Q(sq, qp);
		priority = MAX(qp->q_spri, priority);
		qp->q_spri = 0;
		head = qp->q_sqhead;
		tail = qp->q_sqtail;
		qp->q_sqhead = qp->q_sqtail = NULL;
		qp->q_syncqmsgs = 0;

		/*
		 * Walk the list of messages, and free them if this is a driver,
		 * otherwise reset the b_prev and b_queue value to the new putp.
		 * Afterward, we will just add the head to the end of the next
		 * syncq, and point the tail to the end of this one.
		 */

		for (bp = head; bp != NULL; bp = next) {
			next = bp->b_next;
			if (isdriver) {
				bp->b_prev = bp->b_next = NULL;
				freemsg(bp);
				continue;
			}
			/* Change the q values for this message */
			bp->b_queue = nqp;
#ifdef DEBUG
			bp->b_prev = (mblk_t *)func;
#endif
			moved++;
		}
		/*
		 * Attach list of messages to the end of the new queue (if there
		 * is a list of messages).
		 */

		if (!isdriver && head != NULL) {
			ASSERT(tail != NULL);
			if (nqp->q_sqhead == NULL) {
				nqp->q_sqhead = head;
			} else {
				ASSERT(nqp->q_sqtail != NULL);
				nqp->q_sqtail->b_next = head;
			}
			nqp->q_sqtail = tail;
			/*
			 * When messages are moved from high priority queue to
			 * another queue, the destination queue priority is
			 * upgraded.
			 */

			if (priority > nqp->q_spri)
				nqp->q_spri = priority;

			SQPUT_Q(nsq, nqp);

			nqp->q_syncqmsgs += moved;
			ASSERT(nqp->q_syncqmsgs != 0);
		}
	}

	/*
	 * Before we leave, we need to make sure there are no
	 * events listed for this queue.  All events for this queue
	 * will just be freed.
	 */
	if (sq->sq_evhead != NULL) {
		ASSERT(sq->sq_flags & SQ_EVENTS);
		prev = NULL;
		for (bp = sq->sq_evhead; bp != NULL; bp = next) {
			next = bp->b_next;
			if (bp->b_queue == qp) {
				/* Delete this message */
				if (prev != NULL) {
					prev->b_next = next;
					/*
					 * Update sq_evtail if the last element
					 * is removed.
					 */
					if (bp == sq->sq_evtail) {
						ASSERT(next == NULL);
						sq->sq_evtail = prev;
					}
				} else
					sq->sq_evhead = next;
				if (sq->sq_evhead == NULL)
					sq->sq_flags &= ~SQ_EVENTS;
				bp->b_prev = bp->b_next = NULL;
				freemsg(bp);
			} else {
				prev = bp;
			}
		}
	}

	flags = sq->sq_flags;

	/* Wake up any waiter before leaving. */
	if (flags & SQ_WANTWAKEUP) {
		flags &= ~SQ_WANTWAKEUP;
		cv_broadcast(&sq->sq_wait);
	}
	sq->sq_flags = flags;

	return (moved);
}

/*
 * Try and upgrade to exclusive access at the inner perimeter. If this can
 * not be done without blocking then request will be queued on the syncq
 * and drain_syncq will run it later.
 *
 * This routine can only be called from put or service procedures plus
 * asynchronous callback routines that have properly entered to
 * queue (with entersq.) Thus qwriter_inner assumes the caller has one claim
 * on the syncq associated with q.
 */
void
qwriter_inner(queue_t *q, mblk_t *mp, void (*func)())
{
	syncq_t	*sq = q->q_syncq;
	uint16_t count;

	mutex_enter(SQLOCK(sq));
	count = sq->sq_count;
	SQ_PUTLOCKS_ENTER(sq);
	SUM_SQ_PUTCOUNTS(sq, count);
	ASSERT(count >= 1);
	ASSERT(sq->sq_type & (SQ_CIPUT|SQ_CISVC));

	if (count == 1) {
		/*
		 * Can upgrade. This case also handles nested qwriter calls
		 * (when the qwriter callback function calls qwriter). In that
		 * case SQ_EXCL is already set.
		 */
		sq->sq_flags |= SQ_EXCL;
		SQ_PUTLOCKS_EXIT(sq);
		mutex_exit(SQLOCK(sq));
		(*func)(q, mp);
		/*
		 * Assumes that leavesq, putnext, and drain_syncq will reset
		 * SQ_EXCL for SQ_CIPUT/SQ_CISVC queues. We leave SQ_EXCL on
		 * until putnext, leavesq, or drain_syncq drops it.
		 * That way we handle nested qwriter(INNER) without dropping
		 * SQ_EXCL until the outermost qwriter callback routine is
		 * done.
		 */
		return;
	}
	SQ_PUTLOCKS_EXIT(sq);
	sqfill_events(sq, q, mp, func);
}

/*
 * Synchronous callback support functions
 */

/*
 * Allocate a callback parameter structure.
 * Assumes that caller initializes the flags and the id.
 * Acquires SQLOCK(sq) if non-NULL is returned.
 */
callbparams_t *
callbparams_alloc(syncq_t *sq, void (*func)(void *), void *arg, int kmflags)
{
	callbparams_t *cbp;
	size_t size = sizeof (callbparams_t);

	cbp = kmem_alloc(size, kmflags & ~KM_PANIC);

	/*
	 * Only try tryhard allocation if the caller is ready to panic.
	 * Otherwise just fail.
	 */
	if (cbp == NULL) {
		if (kmflags & KM_PANIC)
			cbp = kmem_alloc_tryhard(sizeof (callbparams_t),
			    &size, kmflags);
		else
			return (NULL);
	}

	ASSERT(size >= sizeof (callbparams_t));
	cbp->cbp_size = size;
	cbp->cbp_sq = sq;
	cbp->cbp_func = func;
	cbp->cbp_arg = arg;
	mutex_enter(SQLOCK(sq));
	cbp->cbp_next = sq->sq_callbpend;
	sq->sq_callbpend = cbp;
	return (cbp);
}

void
callbparams_free(syncq_t *sq, callbparams_t *cbp)
{
	callbparams_t **pp, *p;

	ASSERT(MUTEX_HELD(SQLOCK(sq)));

	for (pp = &sq->sq_callbpend; (p = *pp) != NULL; pp = &p->cbp_next) {
		if (p == cbp) {
			*pp = p->cbp_next;
			kmem_free(p, p->cbp_size);
			return;
		}
	}
	(void) (STRLOG(0, 0, 0, SL_CONSOLE,
	    "callbparams_free: not found\n"));
}

void
callbparams_free_id(syncq_t *sq, callbparams_id_t id, int32_t flag)
{
	callbparams_t **pp, *p;

	ASSERT(MUTEX_HELD(SQLOCK(sq)));

	for (pp = &sq->sq_callbpend; (p = *pp) != NULL; pp = &p->cbp_next) {
		if (p->cbp_id == id && p->cbp_flags == flag) {
			*pp = p->cbp_next;
			kmem_free(p, p->cbp_size);
			return;
		}
	}
	(void) (STRLOG(0, 0, 0, SL_CONSOLE,
	    "callbparams_free_id: not found\n"));
}

/*
 * Callback wrapper function used by once-only callbacks that can be
 * cancelled (qtimeout and qbufcall)
 * Contains inline version of entersq(sq, SQ_CALLBACK) that can be
 * cancelled by the qun* functions.
 */
void
qcallbwrapper(void *arg)
{
	callbparams_t *cbp = arg;
	syncq_t	*sq;
	uint16_t count = 0;
	uint16_t waitflags = SQ_STAYAWAY | SQ_EVENTS | SQ_EXCL;
	uint16_t type;

	sq = cbp->cbp_sq;
	mutex_enter(SQLOCK(sq));
	type = sq->sq_type;
	if (!(type & SQ_CICB)) {
		count = sq->sq_count;
		SQ_PUTLOCKS_ENTER(sq);
		SQ_PUTCOUNT_CLRFAST_LOCKED(sq);
		SUM_SQ_PUTCOUNTS(sq, count);
		sq->sq_needexcl++;
		ASSERT(sq->sq_needexcl != 0);	/* wraparound */
		waitflags |= SQ_MESSAGES;
	}
	/* Can not handle exlusive entry at outer perimeter */
	ASSERT(type & SQ_COCB);

	while ((sq->sq_flags & waitflags) || (!(type & SQ_CICB) &&count != 0)) {
		if ((sq->sq_callbflags & cbp->cbp_flags) &&
		    (sq->sq_cancelid == cbp->cbp_id)) {
			/* timeout has been cancelled */
			sq->sq_callbflags |= SQ_CALLB_BYPASSED;
			callbparams_free(sq, cbp);
			if (!(type & SQ_CICB)) {
				ASSERT(sq->sq_needexcl > 0);
				sq->sq_needexcl--;
				if (sq->sq_needexcl == 0) {
					SQ_PUTCOUNT_SETFAST_LOCKED(sq);
				}
				SQ_PUTLOCKS_EXIT(sq);
			}
			mutex_exit(SQLOCK(sq));
			return;
		}
		sq->sq_flags |= SQ_WANTWAKEUP;
		if (!(type & SQ_CICB)) {
			SQ_PUTLOCKS_EXIT(sq);
		}
		cv_wait(&sq->sq_wait, SQLOCK(sq));
		if (!(type & SQ_CICB)) {
			count = sq->sq_count;
			SQ_PUTLOCKS_ENTER(sq);
			SUM_SQ_PUTCOUNTS(sq, count);
		}
	}

	sq->sq_count++;
	ASSERT(sq->sq_count != 0);	/* Wraparound */
	if (!(type & SQ_CICB)) {
		ASSERT(count == 0);
		sq->sq_flags |= SQ_EXCL;
		ASSERT(sq->sq_needexcl > 0);
		sq->sq_needexcl--;
		if (sq->sq_needexcl == 0) {
			SQ_PUTCOUNT_SETFAST_LOCKED(sq);
		}
		SQ_PUTLOCKS_EXIT(sq);
	}

	mutex_exit(SQLOCK(sq));

	cbp->cbp_func(cbp->cbp_arg);

	/*
	 * We drop the lock only for leavesq to re-acquire it.
	 * Possible optimization is inline of leavesq.
	 */
	mutex_enter(SQLOCK(sq));
	callbparams_free(sq, cbp);
	mutex_exit(SQLOCK(sq));
	leavesq(sq, SQ_CALLBACK);
}

/*
 * no need to grab sq_putlocks here. See comment in strsubr.h that
 * explains when sq_putlocks are used.
 *
 * sq_count (or one of the sq_putcounts) has already been
 * decremented by the caller, and if SQ_QUEUED, we need to call
 * drain_syncq (the global syncq drain).
 * If putnext_tail is called with the SQ_EXCL bit set, we are in
 * one of two states, non-CIPUT perimiter, and we need to clear
 * it, or we went exclusive in the put procedure.  In any case,
 * we want to clear the bit now, and it is probably easier to do
 * this at the beginning of this function (remember, we hold
 * the SQLOCK).  Lastly, if there are other messages queued
 * on the syncq (and not for our destination), enable the syncq
 * for background work.
 */

/* ARGSUSED */
void
putnext_tail(syncq_t *sq, queue_t *qp, uint32_t passflags)
{
	uint16_t	flags = sq->sq_flags;

	ASSERT(MUTEX_HELD(SQLOCK(sq)));
	ASSERT(MUTEX_NOT_HELD(QLOCK(qp)));

	/* Clear SQ_EXCL if set in passflags */
	if (passflags & SQ_EXCL) {
		flags &= ~SQ_EXCL;
	}
	if (flags & SQ_WANTWAKEUP) {
		flags &= ~SQ_WANTWAKEUP;
		cv_broadcast(&sq->sq_wait);
	}
	if (flags & SQ_WANTEXWAKEUP) {
		flags &= ~SQ_WANTEXWAKEUP;
		cv_broadcast(&sq->sq_exitwait);
	}
	sq->sq_flags = flags;

	/*
	 * We have cleared SQ_EXCL if we were asked to, and started
	 * the wakeup process for waiters.  If there are no writers
	 * then we need to drain the syncq if we were told to, or
	 * enable the background thread to do it.
	 */
	if (!(flags & (SQ_STAYAWAY|SQ_EXCL))) {
		if ((passflags & SQ_QUEUED) ||
		    (sq->sq_svcflags & SQ_DISABLED)) {
			/* drain_syncq will take care of events in the list */
			drain_syncq(sq);
			return;
		} else if (flags & SQ_QUEUED) {
			sqenable(sq);
		}
	}
	/* Drop the SQLOCK on exit */
	mutex_exit(SQLOCK(sq));
	TRACE_3(TR_FAC_STREAMS_FR, TR_PUTNEXT_END,
		"putnext_end:(%p, %p, %p) done", NULL, qp, sq);
}

void
set_qend(queue_t *q)
{
	mutex_enter(QLOCK(q));
	if (!O_SAMESTR(q))
		q->q_flag |= QEND;
	else
		q->q_flag &= ~QEND;
	mutex_exit(QLOCK(q));
	q = _OTHERQ(q);
	mutex_enter(QLOCK(q));
	if (!O_SAMESTR(q))
		q->q_flag |= QEND;
	else
		q->q_flag &= ~QEND;
	mutex_exit(QLOCK(q));
}


void
clr_qfull(queue_t *q)
{
	queue_t	*oq = q;

	q = q->q_nfsrv;
	/* Fast check if there is any work to do before getting the lock. */
	if ((q->q_flag & (QFULL|QWANTW)) == 0) {
		return;
	}

	/*
	 * Do not reset QFULL (and backenable) if the q_count is the reason
	 * for QFULL being set.
	 */
	mutex_enter(QLOCK(q));
	/*
	 * If both q_count and q_mblkcnt are less than the hiwat mark
	 */
	if ((q->q_count < q->q_hiwat) && (q->q_mblkcnt < q->q_hiwat)) {
		q->q_flag &= ~QFULL;
		/*
		 * A little more confusing, how about this way:
		 * if someone wants to write,
		 * AND
		 *    both counts are less than the lowat mark
		 *    OR
		 *    the lowat mark is zero
		 * THEN
		 * backenable
		 */
		if ((q->q_flag & QWANTW) &&
		    (((q->q_count < q->q_lowat) &&
		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
			q->q_flag &= ~QWANTW;
			mutex_exit(QLOCK(q));
			backenable(oq, 0);
		} else
			mutex_exit(QLOCK(q));
	} else
		mutex_exit(QLOCK(q));
}

/*
 * Set the forward service procedure pointer.
 *
 * Called at insert-time to cache a queue's next forward service procedure in
 * q_nfsrv; used by canput() and canputnext().  If the queue to be inserted
 * has a service procedure then q_nfsrv points to itself.  If the queue to be
 * inserted does not have a service procedure, then q_nfsrv points to the next
 * queue forward that has a service procedure.  If the queue is at the logical
 * end of the stream (driver for write side, stream head for the read side)
 * and does not have a service procedure, then q_nfsrv also points to itself.
 */
void
set_nfsrv_ptr(
	queue_t  *rnew,		/* read queue pointer to new module */
	queue_t  *wnew,		/* write queue pointer to new module */
	queue_t  *prev_rq,	/* read queue pointer to the module above */
	queue_t  *prev_wq)	/* write queue pointer to the module above */
{
	queue_t *qp;

	if (prev_wq->q_next == NULL) {
		/*
		 * Insert the driver, initialize the driver and stream head.
		 * In this case, prev_rq/prev_wq should be the stream head.
		 * _I_INSERT does not allow inserting a driver.  Make sure
		 * that it is not an insertion.
		 */
		ASSERT(!(rnew->q_flag & _QINSERTING));
		wnew->q_nfsrv = wnew;
		if (rnew->q_qinfo->qi_srvp)
			rnew->q_nfsrv = rnew;
		else
			rnew->q_nfsrv = prev_rq;
		prev_rq->q_nfsrv = prev_rq;
		prev_wq->q_nfsrv = prev_wq;
	} else {
		/*
		 * set up read side q_nfsrv pointer.  This MUST be done
		 * before setting the write side, because the setting of
		 * the write side for a fifo may depend on it.
		 *
		 * Suppose we have a fifo that only has pipemod pushed.
		 * pipemod has no read or write service procedures, so
		 * nfsrv for both pipemod queues points to prev_rq (the
		 * stream read head).  Now push bufmod (which has only a
		 * read service procedure).  Doing the write side first,
		 * wnew->q_nfsrv is set to pipemod's writeq nfsrv, which
		 * is WRONG; the next queue forward from wnew with a
		 * service procedure will be rnew, not the stream read head.
		 * Since the downstream queue (which in the case of a fifo
		 * is the read queue rnew) can affect upstream queues, it
		 * needs to be done first.  Setting up the read side first
		 * sets nfsrv for both pipemod queues to rnew and then
		 * when the write side is set up, wnew-q_nfsrv will also
		 * point to rnew.
		 */
		if (rnew->q_qinfo->qi_srvp) {
			/*
			 * use _OTHERQ() because, if this is a pipe, next
			 * module may have been pushed from other end and
			 * q_next could be a read queue.
			 */
			qp = _OTHERQ(prev_wq->q_next);
			while (qp && qp->q_nfsrv != qp) {
				qp->q_nfsrv = rnew;
				qp = backq(qp);
			}
			rnew->q_nfsrv = rnew;
		} else
			rnew->q_nfsrv = prev_rq->q_nfsrv;

		/* set up write side q_nfsrv pointer */
		if (wnew->q_qinfo->qi_srvp) {
			wnew->q_nfsrv = wnew;

			/*
			 * For insertion, need to update nfsrv of the modules
			 * above which do not have a service routine.
			 */
			if (rnew->q_flag & _QINSERTING) {
				for (qp = prev_wq;
				    qp != NULL && qp->q_nfsrv != qp;
				    qp = backq(qp)) {
					qp->q_nfsrv = wnew->q_nfsrv;
				}
			}
		} else {
			if (prev_wq->q_next == prev_rq)
				/*
				 * Since prev_wq/prev_rq are the middle of a
				 * fifo, wnew/rnew will also be the middle of
				 * a fifo and wnew's nfsrv is same as rnew's.
				 */
				wnew->q_nfsrv = rnew->q_nfsrv;
			else
				wnew->q_nfsrv = prev_wq->q_next->q_nfsrv;
		}
	}
}

/*
 * Reset the forward service procedure pointer; called at remove-time.
 */
void
reset_nfsrv_ptr(queue_t *rqp, queue_t *wqp)
{
	queue_t *tmp_qp;

	/* Reset the write side q_nfsrv pointer for _I_REMOVE */
	if ((rqp->q_flag & _QREMOVING) && (wqp->q_qinfo->qi_srvp != NULL)) {
		for (tmp_qp = backq(wqp);
		    tmp_qp != NULL && tmp_qp->q_nfsrv == wqp;
		    tmp_qp = backq(tmp_qp)) {
			tmp_qp->q_nfsrv = wqp->q_nfsrv;
		}
	}

	/* reset the read side q_nfsrv pointer */
	if (rqp->q_qinfo->qi_srvp) {
		if (wqp->q_next) {	/* non-driver case */
			tmp_qp = _OTHERQ(wqp->q_next);
			while (tmp_qp && tmp_qp->q_nfsrv == rqp) {
				/* Note that rqp->q_next cannot be NULL */
				ASSERT(rqp->q_next != NULL);
				tmp_qp->q_nfsrv = rqp->q_next->q_nfsrv;
				tmp_qp = backq(tmp_qp);
			}
		}
	}
}

/*
 * This routine should be called after all stream geometry changes to update
 * the stream head cached struio() rd/wr queue pointers. Note must be called
 * with the streamlock()ed.
 *
 * Note: only enables Synchronous STREAMS for a side of a Stream which has
 *	 an explicit synchronous barrier module queue. That is, a queue that
 *	 has specified a struio() type.
 */
static void
strsetuio(stdata_t *stp)
{
	queue_t *wrq;

	if (stp->sd_flag & STPLEX) {
		/*
		 * Not stremahead, but a mux, so no Synchronous STREAMS.
		 */
		stp->sd_struiowrq = NULL;
		stp->sd_struiordq = NULL;
		return;
	}
	/*
	 * Scan the write queue(s) while synchronous
	 * until we find a qinfo uio type specified.
	 */
	wrq = stp->sd_wrq->q_next;
	while (wrq) {
		if (wrq->q_struiot == STRUIOT_NONE) {
			wrq = 0;
			break;
		}
		if (wrq->q_struiot != STRUIOT_DONTCARE)
			break;
		if (! _SAMESTR(wrq)) {
			wrq = 0;
			break;
		}
		wrq = wrq->q_next;
	}
	stp->sd_struiowrq = wrq;
	/*
	 * Scan the read queue(s) while synchronous
	 * until we find a qinfo uio type specified.
	 */
	wrq = stp->sd_wrq->q_next;
	while (wrq) {
		if (_RD(wrq)->q_struiot == STRUIOT_NONE) {
			wrq = 0;
			break;
		}
		if (_RD(wrq)->q_struiot != STRUIOT_DONTCARE)
			break;
		if (! _SAMESTR(wrq)) {
			wrq = 0;
			break;
		}
		wrq = wrq->q_next;
	}
	stp->sd_struiordq = wrq ? _RD(wrq) : 0;
}

/*
 * pass_wput, unblocks the passthru queues, so that
 * messages can arrive at muxs lower read queue, before
 * I_LINK/I_UNLINK is acked/nacked.
 */
static void
pass_wput(queue_t *q, mblk_t *mp)
{
	syncq_t *sq;

	sq = _RD(q)->q_syncq;
	if (sq->sq_flags & SQ_BLOCKED)
		unblocksq(sq, SQ_BLOCKED, 0);
	putnext(q, mp);
}

/*
 * Set up queues for the link/unlink.
 * Create a new queue and block it and then insert it
 * below the stream head on the lower stream.
 * This prevents any messages from arriving during the setq
 * as well as while the mux is processing the LINK/I_UNLINK.
 * The blocked passq is unblocked once the LINK/I_UNLINK has
 * been acked or nacked or if a message is generated and sent
 * down muxs write put procedure.
 * see pass_wput().
 *
 * After the new queue is inserted, all messages coming from below are
 * blocked. The call to strlock will ensure that all activity in the stream head
 * read queue syncq is stopped (sq_count drops to zero).
 */
static queue_t *
link_addpassthru(stdata_t *stpdown)
{
	queue_t *passq;
	sqlist_t sqlist;

	passq = allocq();
	STREAM(passq) = STREAM(_WR(passq)) = stpdown;
	/* setq might sleep in allocator - avoid holding locks. */
	setq(passq, &passthru_rinit, &passthru_winit, NULL, QPERQ,
	    SQ_CI|SQ_CO, B_FALSE);
	claimq(passq);
	blocksq(passq->q_syncq, SQ_BLOCKED, 1);
	insertq(STREAM(passq), passq);

	/*
	 * Use strlock() to wait for the stream head sq_count to drop to zero
	 * since we are going to change q_ptr in the stream head.  Note that
	 * insertq() doesn't wait for any syncq counts to drop to zero.
	 */
	sqlist.sqlist_head = NULL;
	sqlist.sqlist_index = 0;
	sqlist.sqlist_size = sizeof (sqlist_t);
	sqlist_insert(&sqlist, _RD(stpdown->sd_wrq)->q_syncq);
	strlock(stpdown, &sqlist);
	strunlock(stpdown, &sqlist);

	releaseq(passq);
	return (passq);
}

/*
 * Let messages flow up into the mux by removing
 * the passq.
 */
static void
link_rempassthru(queue_t *passq)
{
	claimq(passq);
	removeq(passq);
	releaseq(passq);
	freeq(passq);
}

/*
 * Wait for the condition variable pointed to by `cvp' to be signaled,
 * or for `tim' milliseconds to elapse, whichever comes first.  If `tim'
 * is negative, then there is no time limit.  If `nosigs' is non-zero,
 * then the wait will be non-interruptible.
 *
 * Returns >0 if signaled, 0 if interrupted, or -1 upon timeout.
 */
clock_t
str_cv_wait(kcondvar_t *cvp, kmutex_t *mp, clock_t tim, int nosigs)
{
	clock_t ret, now, tick;

	if (tim < 0) {
		if (nosigs) {
			cv_wait(cvp, mp);
			ret = 1;
		} else {
			ret = cv_wait_sig(cvp, mp);
		}
	} else if (tim > 0) {
		/*
		 * convert milliseconds to clock ticks
		 */
		tick = MSEC_TO_TICK_ROUNDUP(tim);
		time_to_wait(&now, tick);
		if (nosigs) {
			ret = cv_timedwait(cvp, mp, now);
		} else {
			ret = cv_timedwait_sig(cvp, mp, now);
		}
	} else {
		ret = -1;
	}
	return (ret);
}

/*
 * Wait until the stream head can determine if it is at the mark but
 * don't wait forever to prevent a race condition between the "mark" state
 * in the stream head and any mark state in the caller/user of this routine.
 *
 * This is used by sockets and for a socket it would be incorrect
 * to return a failure for SIOCATMARK when there is no data in the receive
 * queue and the marked urgent data is traveling up the stream.
 *
 * This routine waits until the mark is known by waiting for one of these
 * three events:
 *	The stream head read queue becoming non-empty (including an EOF)
 *	The STRATMARK flag being set. (Due to a MSGMARKNEXT message.)
 *	The STRNOTATMARK flag being set (which indicates that the transport
 *	has sent a MSGNOTMARKNEXT message to indicate that it is not at
 *	the mark).
 *
 * The routine returns 1 if the stream is at the mark; 0 if it can
 * be determined that the stream is not at the mark.
 * If the wait times out and it can't determine
 * whether or not the stream might be at the mark the routine will return -1.
 *
 * Note: This routine should only be used when a mark is pending i.e.,
 * in the socket case the SIGURG has been posted.
 * Note2: This can not wakeup just because synchronous streams indicate
 * that data is available since it is not possible to use the synchronous
 * streams interfaces to determine the b_flag value for the data queued below
 * the stream head.
 */
int
strwaitmark(vnode_t *vp)
{
	struct stdata *stp = vp->v_stream;
	queue_t *rq = _RD(stp->sd_wrq);
	int mark;

	mutex_enter(&stp->sd_lock);
	while (rq->q_first == NULL &&
	    !(stp->sd_flag & (STRATMARK|STRNOTATMARK|STREOF))) {
		stp->sd_flag |= RSLEEP;

		/* Wait for 100 milliseconds for any state change. */
		if (str_cv_wait(&rq->q_wait, &stp->sd_lock, 100, 1) == -1) {
			mutex_exit(&stp->sd_lock);
			return (-1);
		}
	}
	if (stp->sd_flag & STRATMARK)
		mark = 1;
	else if (rq->q_first != NULL && (rq->q_first->b_flag & MSGMARK))
		mark = 1;
	else
		mark = 0;

	mutex_exit(&stp->sd_lock);
	return (mark);
}

/*
 * Set a read side error. If persist is set change the socket error
 * to persistent. If errfunc is set install the function as the exported
 * error handler.
 */
void
strsetrerror(vnode_t *vp, int error, int persist, errfunc_t errfunc)
{
	struct stdata *stp = vp->v_stream;

	mutex_enter(&stp->sd_lock);
	stp->sd_rerror = error;
	if (error == 0 && errfunc == NULL)
		stp->sd_flag &= ~STRDERR;
	else
		stp->sd_flag |= STRDERR;
	if (persist) {
		stp->sd_flag &= ~STRDERRNONPERSIST;
	} else {
		stp->sd_flag |= STRDERRNONPERSIST;
	}
	stp->sd_rderrfunc = errfunc;
	if (error != 0 || errfunc != NULL) {
		cv_broadcast(&_RD(stp->sd_wrq)->q_wait);	/* readers */
		cv_broadcast(&stp->sd_wrq->q_wait);		/* writers */
		cv_broadcast(&stp->sd_monitor);			/* ioctllers */

		mutex_exit(&stp->sd_lock);
		pollwakeup(&stp->sd_pollist, POLLERR);
		mutex_enter(&stp->sd_lock);

		if (stp->sd_sigflags & S_ERROR)
			strsendsig(stp->sd_siglist, S_ERROR, 0, error);
	}
	mutex_exit(&stp->sd_lock);
}

/*
 * Set a write side error. If persist is set change the socket error
 * to persistent.
 */
void
strsetwerror(vnode_t *vp, int error, int persist, errfunc_t errfunc)
{
	struct stdata *stp = vp->v_stream;

	mutex_enter(&stp->sd_lock);
	stp->sd_werror = error;
	if (error == 0 && errfunc == NULL)
		stp->sd_flag &= ~STWRERR;
	else
		stp->sd_flag |= STWRERR;
	if (persist) {
		stp->sd_flag &= ~STWRERRNONPERSIST;
	} else {
		stp->sd_flag |= STWRERRNONPERSIST;
	}
	stp->sd_wrerrfunc = errfunc;
	if (error != 0 || errfunc != NULL) {
		cv_broadcast(&_RD(stp->sd_wrq)->q_wait);	/* readers */
		cv_broadcast(&stp->sd_wrq->q_wait);		/* writers */
		cv_broadcast(&stp->sd_monitor);			/* ioctllers */

		mutex_exit(&stp->sd_lock);
		pollwakeup(&stp->sd_pollist, POLLERR);
		mutex_enter(&stp->sd_lock);

		if (stp->sd_sigflags & S_ERROR)
			strsendsig(stp->sd_siglist, S_ERROR, 0, error);
	}
	mutex_exit(&stp->sd_lock);
}

/*
 * Make the stream return 0 (EOF) when all data has been read.
 * No effect on write side.
 */
void
strseteof(vnode_t *vp, int eof)
{
	struct stdata *stp = vp->v_stream;

	mutex_enter(&stp->sd_lock);
	if (!eof) {
		stp->sd_flag &= ~STREOF;
		mutex_exit(&stp->sd_lock);
		return;
	}
	stp->sd_flag |= STREOF;
	if (stp->sd_flag & RSLEEP) {
		stp->sd_flag &= ~RSLEEP;
		cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
	}

	mutex_exit(&stp->sd_lock);
	pollwakeup(&stp->sd_pollist, POLLIN|POLLRDNORM);
	mutex_enter(&stp->sd_lock);

	if (stp->sd_sigflags & (S_INPUT|S_RDNORM))
		strsendsig(stp->sd_siglist, S_INPUT|S_RDNORM, 0, 0);
	mutex_exit(&stp->sd_lock);
}

void
strflushrq(vnode_t *vp, int flag)
{
	struct stdata *stp = vp->v_stream;

	mutex_enter(&stp->sd_lock);
	flushq(_RD(stp->sd_wrq), flag);
	mutex_exit(&stp->sd_lock);
}

void
strsetrputhooks(vnode_t *vp, uint_t flags,
		msgfunc_t protofunc, msgfunc_t miscfunc)
{
	struct stdata *stp = vp->v_stream;

	mutex_enter(&stp->sd_lock);

	if (protofunc == NULL)
		stp->sd_rprotofunc = strrput_proto;
	else
		stp->sd_rprotofunc = protofunc;

	if (miscfunc == NULL)
		stp->sd_rmiscfunc = strrput_misc;
	else
		stp->sd_rmiscfunc = miscfunc;

	if (flags & SH_CONSOL_DATA)
		stp->sd_rput_opt |= SR_CONSOL_DATA;
	else
		stp->sd_rput_opt &= ~SR_CONSOL_DATA;

	if (flags & SH_SIGALLDATA)
		stp->sd_rput_opt |= SR_SIGALLDATA;
	else
		stp->sd_rput_opt &= ~SR_SIGALLDATA;

	if (flags & SH_IGN_ZEROLEN)
		stp->sd_rput_opt |= SR_IGN_ZEROLEN;
	else
		stp->sd_rput_opt &= ~SR_IGN_ZEROLEN;

	mutex_exit(&stp->sd_lock);
}

void
strsetwputhooks(vnode_t *vp, uint_t flags, clock_t closetime)
{
	struct stdata *stp = vp->v_stream;

	mutex_enter(&stp->sd_lock);
	stp->sd_closetime = closetime;

	if (flags & SH_SIGPIPE)
		stp->sd_wput_opt |= SW_SIGPIPE;
	else
		stp->sd_wput_opt &= ~SW_SIGPIPE;
	if (flags & SH_RECHECK_ERR)
		stp->sd_wput_opt |= SW_RECHECK_ERR;
	else
		stp->sd_wput_opt &= ~SW_RECHECK_ERR;

	mutex_exit(&stp->sd_lock);
}

/* Used within framework when the queue is already locked */
void
qenable_locked(queue_t *q)
{
	stdata_t *stp = STREAM(q);

	ASSERT(MUTEX_HELD(QLOCK(q)));

	if (!q->q_qinfo->qi_srvp)
		return;

	/*
	 * Do not place on run queue if already enabled or closing.
	 */
	if (q->q_flag & (QWCLOSE|QENAB))
		return;

	/*
	 * mark queue enabled and place on run list if it is not already being
	 * serviced. If it is serviced, the runservice() function will detect
	 * that QENAB is set and call service procedure before clearing
	 * QINSERVICE flag.
	 */
	q->q_flag |= QENAB;
	if (q->q_flag & QINSERVICE)
		return;

	/* Record the time of qenable */
	q->q_qtstamp = lbolt;

	/*
	 * Put the queue in the stp list and schedule it for background
	 * processing if it is not already scheduled or if stream head does not
	 * intent to process it in the foreground later by setting
	 * STRS_WILLSERVICE flag.
	 */
	mutex_enter(&stp->sd_qlock);
	/*
	 * If there are already something on the list, stp flags should show
	 * intention to drain it.
	 */
	IMPLY(STREAM_NEEDSERVICE(stp),
	    (stp->sd_svcflags & (STRS_WILLSERVICE | STRS_SCHEDULED)));

	ENQUEUE(q, stp->sd_qhead, stp->sd_qtail, q_link);
	stp->sd_nqueues++;

	/*
	 * If no one will drain this stream we are the first producer and
	 * need to schedule it for background thread.
	 */
	if (!(stp->sd_svcflags & (STRS_WILLSERVICE | STRS_SCHEDULED))) {
		/*
		 * No one will service this stream later, so we have to
		 * schedule it now.
		 */
		STRSTAT(stenables);
		stp->sd_svcflags |= STRS_SCHEDULED;
		stp->sd_servid = (void *)taskq_dispatch(streams_taskq,
		    (task_func_t *)stream_service, stp, TQ_NOSLEEP|TQ_NOQUEUE);

		if (stp->sd_servid == NULL) {
			/*
			 * Task queue failed so fail over to the backup
			 * servicing thread.
			 */
			STRSTAT(taskqfails);
			/*
			 * It is safe to clear STRS_SCHEDULED flag because it
			 * was set by this thread above.
			 */
			stp->sd_svcflags &= ~STRS_SCHEDULED;

			/*
			 * Failover scheduling is protected by service_queue
			 * lock.
			 */
			mutex_enter(&service_queue);
			ASSERT((stp->sd_qhead == q) && (stp->sd_qtail == q));
			ASSERT(q->q_link == NULL);
			/*
			 * Append the queue to qhead/qtail list.
			 */
			if (qhead == NULL)
				qhead = q;
			else
				qtail->q_link = q;
			qtail = q;
			/*
			 * Clear stp queue list.
			 */
			stp->sd_qhead = stp->sd_qtail = NULL;
			stp->sd_nqueues = 0;
			/*
			 * Wakeup background queue processing thread.
			 */
			cv_signal(&services_to_run);
			mutex_exit(&service_queue);
		}
	}
	mutex_exit(&stp->sd_qlock);
}

static void
queue_service(queue_t *q)
{
	/*
	 * The queue in the list should have
	 * QENAB flag set and should not have
	 * QINSERVICE flag set. QINSERVICE is
	 * set when the queue is dequeued and
	 * qenable_locked doesn't enqueue a
	 * queue with QINSERVICE set.
	 */

	ASSERT(!(q->q_flag & QINSERVICE));
	ASSERT((q->q_flag & QENAB));
	mutex_enter(QLOCK(q));
	q->q_flag &= ~QENAB;
	q->q_flag |= QINSERVICE;
	mutex_exit(QLOCK(q));
	runservice(q);
}

static void
syncq_service(syncq_t *sq)
{
	STRSTAT(syncqservice);
	mutex_enter(SQLOCK(sq));
	ASSERT(!(sq->sq_svcflags & SQ_SERVICE));
	ASSERT(sq->sq_servcount != 0);
	ASSERT(sq->sq_next == NULL);

	/* if we came here from the background thread, clear the flag */
	if (sq->sq_svcflags & SQ_BGTHREAD)
		sq->sq_svcflags &= ~SQ_BGTHREAD;

	/* let drain_syncq know that it's being called in the background */
	sq->sq_svcflags |= SQ_SERVICE;
	drain_syncq(sq);
}

static void
qwriter_outer_service(syncq_t *outer)
{
	/*
	 * Note that SQ_WRITER is used on the outer perimeter
	 * to signal that a qwriter(OUTER) is either investigating
	 * running or that it is actually running a function.
	 */
	outer_enter(outer, SQ_BLOCKED|SQ_WRITER);

	/*
	 * All inner syncq are empty and have SQ_WRITER set
	 * to block entering the outer perimeter.
	 *
	 * We do not need to explicitly call write_now since
	 * outer_exit does it for us.
	 */
	outer_exit(outer);
}

static void
mblk_free(mblk_t *mp)
{
	dblk_t *dbp = mp->b_datap;
	frtn_t *frp = dbp->db_frtnp;

	mp->b_next = NULL;
	if (dbp->db_fthdr != NULL)
		str_ftfree(dbp);

	ASSERT(dbp->db_fthdr == NULL);
	frp->free_func(frp->free_arg);
	ASSERT(dbp->db_mblk == mp);

	if (dbp->db_credp != NULL) {
		crfree(dbp->db_credp);
		dbp->db_credp = NULL;
	}
	dbp->db_cpid = -1;
	dbp->db_struioflag = 0;
	dbp->db_struioun.cksum.flags = 0;

	kmem_cache_free(dbp->db_cache, dbp);
}

/*
 * Background processing of the stream queue list.
 */
static void
stream_service(stdata_t *stp)
{
	queue_t *q;

	mutex_enter(&stp->sd_qlock);

	STR_SERVICE(stp, q);

	stp->sd_svcflags &= ~STRS_SCHEDULED;
	stp->sd_servid = NULL;
	cv_signal(&stp->sd_qcv);
	mutex_exit(&stp->sd_qlock);
}

/*
 * Foreground processing of the stream queue list.
 */
void
stream_runservice(stdata_t *stp)
{
	queue_t *q;

	mutex_enter(&stp->sd_qlock);
	STRSTAT(rservice);
	/*
	 * We are going to drain this stream queue list, so qenable_locked will
	 * not schedule it until we finish.
	 */
	stp->sd_svcflags |= STRS_WILLSERVICE;

	STR_SERVICE(stp, q);

	stp->sd_svcflags &= ~STRS_WILLSERVICE;
	mutex_exit(&stp->sd_qlock);
	/*
	 * Help backup background thread to drain the qhead/qtail list.
	 */
	while (qhead != NULL) {
		STRSTAT(qhelps);
		mutex_enter(&service_queue);
		DQ(q, qhead, qtail, q_link);
		mutex_exit(&service_queue);
		if (q != NULL)
			queue_service(q);
	}
}

void
stream_willservice(stdata_t *stp)
{
	mutex_enter(&stp->sd_qlock);
	stp->sd_svcflags |= STRS_WILLSERVICE;
	mutex_exit(&stp->sd_qlock);
}

/*
 * Replace the cred currently in the mblk with a different one.
 */
void
mblk_setcred(mblk_t *mp, cred_t *cr)
{
	cred_t *ocr = DB_CRED(mp);

	ASSERT(cr != NULL);

	if (cr != ocr) {
		crhold(mp->b_datap->db_credp = cr);
		if (ocr != NULL)
			crfree(ocr);
	}
}

int
hcksum_assoc(mblk_t *mp,  multidata_t *mmd, pdesc_t *pd,
    uint32_t start, uint32_t stuff, uint32_t end, uint32_t value,
    uint32_t flags, int km_flags)
{
	int rc = 0;

	ASSERT(DB_TYPE(mp) == M_DATA || DB_TYPE(mp) == M_MULTIDATA);
	if (mp->b_datap->db_type == M_DATA) {
		/* Associate values for M_DATA type */
		DB_CKSUMSTART(mp) = (intptr_t)start;
		DB_CKSUMSTUFF(mp) = (intptr_t)stuff;
		DB_CKSUMEND(mp) = (intptr_t)end;
		DB_CKSUMFLAGS(mp) = flags;
		DB_CKSUM16(mp) = (uint16_t)value;

	} else {
		pattrinfo_t pa_info;

		ASSERT(mmd != NULL);

		pa_info.type = PATTR_HCKSUM;
		pa_info.len = sizeof (pattr_hcksum_t);

		if (mmd_addpattr(mmd, pd, &pa_info, B_TRUE, km_flags) != NULL) {
			pattr_hcksum_t *hck = (pattr_hcksum_t *)pa_info.buf;

			hck->hcksum_start_offset = start;
			hck->hcksum_stuff_offset = stuff;
			hck->hcksum_end_offset = end;
			hck->hcksum_cksum_val.inet_cksum = (uint16_t)value;
			hck->hcksum_flags = flags;
		} else {
			rc = -1;
		}
	}
	return (rc);
}

void
hcksum_retrieve(mblk_t *mp, multidata_t *mmd, pdesc_t *pd,
    uint32_t *start, uint32_t *stuff, uint32_t *end,
    uint32_t *value, uint32_t *flags)
{
	ASSERT(DB_TYPE(mp) == M_DATA || DB_TYPE(mp) == M_MULTIDATA);
	if (mp->b_datap->db_type == M_DATA) {
		if (flags != NULL) {
			*flags = DB_CKSUMFLAGS(mp);
			if (*flags & HCK_PARTIALCKSUM) {
				if (start != NULL)
					*start = (uint32_t)DB_CKSUMSTART(mp);
				if (stuff != NULL)
					*stuff = (uint32_t)DB_CKSUMSTUFF(mp);
				if (end != NULL)
					*end = (uint32_t)DB_CKSUMEND(mp);
				if (value != NULL)
					*value = (uint32_t)DB_CKSUM16(mp);
			}
		}
	} else {
		pattrinfo_t hck_attr = {PATTR_HCKSUM};

		ASSERT(mmd != NULL);

		/* get hardware checksum attribute */
		if (mmd_getpattr(mmd, pd, &hck_attr) != NULL) {
			pattr_hcksum_t *hck = (pattr_hcksum_t *)hck_attr.buf;

			ASSERT(hck_attr.len >= sizeof (pattr_hcksum_t));
			if (flags != NULL)
				*flags = hck->hcksum_flags;
			if (start != NULL)
				*start = hck->hcksum_start_offset;
			if (stuff != NULL)
				*stuff = hck->hcksum_stuff_offset;
			if (end != NULL)
				*end = hck->hcksum_end_offset;
			if (value != NULL)
				*value = (uint32_t)
				    hck->hcksum_cksum_val.inet_cksum;
		}
	}
}

/*
 * Checksum buffer *bp for len bytes with psum partial checksum,
 * or 0 if none, and return the 16 bit partial checksum.
 */
unsigned
bcksum(uchar_t *bp, int len, unsigned int psum)
{
	int odd = len & 1;
	extern unsigned int ip_ocsum();

	if (((intptr_t)bp & 1) == 0 && !odd) {
		/*
		 * Bp is 16 bit aligned and len is multiple of 16 bit word.
		 */
		return (ip_ocsum((ushort_t *)bp, len >> 1, psum));
	}
	if (((intptr_t)bp & 1) != 0) {
		/*
		 * Bp isn't 16 bit aligned.
		 */
		unsigned int tsum;

#ifdef _LITTLE_ENDIAN
		psum += *bp;
#else
		psum += *bp << 8;
#endif
		len--;
		bp++;
		tsum = ip_ocsum((ushort_t *)bp, len >> 1, 0);
		psum += (tsum << 8) & 0xffff | (tsum >> 8);
		if (len & 1) {
			bp += len - 1;
#ifdef _LITTLE_ENDIAN
			psum += *bp << 8;
#else
			psum += *bp;
#endif
		}
	} else {
		/*
		 * Bp is 16 bit aligned.
		 */
		psum = ip_ocsum((ushort_t *)bp, len >> 1, psum);
		if (odd) {
			bp += len - 1;
#ifdef _LITTLE_ENDIAN
			psum += *bp;
#else
			psum += *bp << 8;
#endif
		}
	}
	/*
	 * Normalize psum to 16 bits before returning the new partial
	 * checksum. The max psum value before normalization is 0x3FDFE.
	 */
	return ((psum >> 16) + (psum & 0xFFFF));
}

boolean_t
is_vmloaned_mblk(mblk_t *mp, multidata_t *mmd, pdesc_t *pd)
{
	boolean_t rc;

	ASSERT(DB_TYPE(mp) == M_DATA || DB_TYPE(mp) == M_MULTIDATA);
	if (DB_TYPE(mp) == M_DATA) {
		rc = (((mp)->b_datap->db_struioflag & STRUIO_ZC) != 0);
	} else {
		pattrinfo_t zcopy_attr = {PATTR_ZCOPY};

		ASSERT(mmd != NULL);
		rc = (mmd_getpattr(mmd, pd, &zcopy_attr) != NULL);
	}
	return (rc);
}

void
freemsgchain(mblk_t *mp)
{
	mblk_t	*next;

	while (mp != NULL) {
		next = mp->b_next;
		mp->b_next = NULL;

		freemsg(mp);
		mp = next;
	}
}

mblk_t *
copymsgchain(mblk_t *mp)
{
	mblk_t	*nmp = NULL;
	mblk_t	**nmpp = &nmp;

	for (; mp != NULL; mp = mp->b_next) {
		if ((*nmpp = copymsg(mp)) == NULL) {
			freemsgchain(nmp);
			return (NULL);
		}

		nmpp = &((*nmpp)->b_next);
	}

	return (nmp);
}

/* NOTE: Do not add code after this point. */
#undef QLOCK

/*
 * replacement for QLOCK macro for those that can't use it.
 */
kmutex_t *
QLOCK(queue_t *q)
{
	return (&(q)->q_lock);
}

/*
 * Dummy runqueues/queuerun functions functions for backwards compatibility.
 */
#undef runqueues
void
runqueues(void)
{
}

#undef queuerun
void
queuerun(void)
{
}