usr/src/uts/common/rpc/xdr_rdma.c
changeset 9348 7155ecb17858
parent 7387 0b3a92e31fd8
child 9803 35261329983d
--- a/usr/src/uts/common/rpc/xdr_rdma.c	Fri Apr 10 21:47:04 2009 -0700
+++ b/usr/src/uts/common/rpc/xdr_rdma.c	Fri Apr 10 22:57:35 2009 -0700
@@ -19,7 +19,7 @@
  * CDDL HEADER END
  */
 /*
- * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
 
@@ -64,6 +64,8 @@
 static rpc_inline_t *xdrrdma_inline(XDR *, int);
 void		xdrrdma_destroy(XDR *);
 static bool_t   xdrrdma_control(XDR *, int, void *);
+static bool_t  xdrrdma_read_a_chunk(XDR *, CONN **);
+static void xdrrdma_free_xdr_chunks(CONN *, struct clist *);
 
 struct xdr_ops  xdrrdmablk_ops = {
 	xdrrdma_getbytes,
@@ -94,6 +96,47 @@
  * separately from the rest of the RPC message. xp_min_chunk = 0, is a
  * special case for ENCODING, which means do not chunk the incoming stream of
  * data.
+ *
+ * A read chunk can contain part of the RPC message in addition to the
+ * inline message. In such a case, (xp_offp - x_base) will not provide
+ * the correct xdr offset of the entire message. xp_off is used in such
+ * a case to denote the offset or current position in the overall message
+ * covering both the inline and the chunk. This is used only in the case
+ * of decoding and useful to compare read chunk 'c_xdroff' offsets.
+ *
+ * An example for a read chunk containing an XDR message:
+ * An NFSv4 compound as following:
+ *
+ * PUTFH
+ * WRITE [4109 bytes]
+ * GETATTR
+ *
+ * Solaris Encoding is:
+ * -------------------
+ *
+ * <Inline message>: [PUTFH WRITE4args GETATTR]
+ *                                   |
+ *                                   v
+ * [RDMA_READ chunks]:               [write data]
+ *
+ *
+ * Linux encoding is:
+ * -----------------
+ *
+ * <Inline message>: [PUTFH WRITE4args]
+ *                                    |
+ *                                    v
+ * [RDMA_READ chunks]:                [Write data] [Write data2] [Getattr chunk]
+ *                                     chunk1       chunk2         chunk3
+ *
+ * where the READ chunks are as:
+ *
+ *             - chunk1 - 4k
+ * write data |
+ *             - chunk2 - 13 bytes(4109 - 4k)
+ * getattr op  - chunk3 - 19 bytes
+ * (getattr op starts at byte 4 after 3 bytes of roundup)
+ *
  */
 
 typedef struct {
@@ -101,8 +144,10 @@
 	int		xp_min_chunk;
 	uint_t		xp_flags;	/* Controls setting for rdma xdr */
 	int		xp_buf_size;	/* size of xdr buffer */
-	struct clist	*xp_rcl;		/* head of chunk list */
+	int		xp_off;		/* overall offset */
+	struct clist	*xp_rcl;	/* head of chunk list */
 	struct clist	**xp_rcl_next;	/* location to place/find next chunk */
+	struct clist	*xp_rcl_xdr;	/* copy of rcl containing RPC message */
 	struct clist	*xp_wcl;	/* head of write chunk list */
 	CONN		*xp_conn;	/* connection for chunk data xfer */
 	uint_t		xp_reply_chunk_len;
@@ -118,7 +163,6 @@
 {
 	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
 	struct clist	*cle = *(xdrp->xp_rcl_next);
-	struct clist	*cls = *(xdrp->xp_rcl_next);
 	struct clist	*rdclist = NULL, *prev = NULL;
 	bool_t		retval = TRUE;
 	uint32_t	cur_offset = 0;
@@ -174,8 +218,19 @@
 	 */
 	for (actual_segments = 0;
 	    actual_segments < total_segments; actual_segments++) {
+
+		DTRACE_PROBE3(krpc__i__xdrrdma_getrdmablk, uint32_t, cle->c_len,
+		    uint32_t, total_len, uint32_t, cle->c_xdroff);
+
 		if (total_len <= 0)
 			break;
+
+		/*
+		 * not the first time in the loop
+		 */
+		if (actual_segments > 0)
+			cle = cle->c_next;
+
 		cle->u.c_daddr = (uint64) cur_offset;
 		alen = 0;
 		if (cle->c_len > total_len) {
@@ -211,14 +266,17 @@
 			prev = rdclist;
 		}
 
-		cle = cle->c_next;
 	}
 
 out:
 	if (prev != NULL)
 		prev->c_next = NULL;
 
-	cle = cls;
+	/*
+	 * Adjust the chunk length, if we read only a part of
+	 * a chunk.
+	 */
+
 	if (alen) {
 		cle->w.c_saddr =
 		    (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len;
@@ -284,8 +342,7 @@
 
 	if (xdrp->xp_wcl) {
 		if (xdrp->xp_flags & XDR_RDMA_WLIST_REG) {
-			(void) clist_deregister(xdrp->xp_conn,
-			    xdrp->xp_wcl, CLIST_REG_DST);
+			(void) clist_deregister(xdrp->xp_conn, xdrp->xp_wcl);
 			rdma_buf_free(xdrp->xp_conn,
 			    &xdrp->xp_wcl->rb_longbuf);
 		}
@@ -294,14 +351,16 @@
 
 	if (xdrp->xp_rcl) {
 		if (xdrp->xp_flags & XDR_RDMA_RLIST_REG) {
-			(void) clist_deregister(xdrp->xp_conn,
-			    xdrp->xp_rcl, CLIST_REG_SOURCE);
+			(void) clist_deregister(xdrp->xp_conn, xdrp->xp_rcl);
 			rdma_buf_free(xdrp->xp_conn,
 			    &xdrp->xp_rcl->rb_longbuf);
 		}
 		clist_free(xdrp->xp_rcl);
 	}
 
+	if (xdrp->xp_rcl_xdr)
+		xdrrdma_free_xdr_chunks(xdrp->xp_conn, xdrp->xp_rcl_xdr);
+
 	(void) kmem_free(xdrs->x_private, sizeof (xrdma_private_t));
 	xdrs->x_private = NULL;
 }
@@ -310,14 +369,32 @@
 xdrrdma_getint32(XDR *xdrs, int32_t *int32p)
 {
 	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
+	int chunked = 0;
 
-	if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0)
-		return (FALSE);
+	if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0) {
+		/*
+		 * check if rest of the rpc message is in a chunk
+		 */
+		if (!xdrrdma_read_a_chunk(xdrs, &xdrp->xp_conn)) {
+			return (FALSE);
+		}
+		chunked = 1;
+	}
 
 	/* LINTED pointer alignment */
 	*int32p = (int32_t)ntohl((uint32_t)(*((int32_t *)(xdrp->xp_offp))));
+
+	DTRACE_PROBE1(krpc__i__xdrrdma_getint32, int32_t, *int32p);
+
 	xdrp->xp_offp += sizeof (int32_t);
 
+	if (chunked)
+		xdrs->x_handy -= (int)sizeof (int32_t);
+
+	if (xdrp->xp_off != 0) {
+		xdrp->xp_off += sizeof (int32_t);
+	}
+
 	return (TRUE);
 }
 
@@ -355,6 +432,7 @@
 	uint32_t	actual_segments = 0;
 	uint32_t	status;
 	uint32_t	alen;
+	uint32_t	xpoff;
 
 	while (cle) {
 		total_segments++;
@@ -362,14 +440,20 @@
 	}
 
 	cle = *(xdrp->xp_rcl_next);
+
+	if (xdrp->xp_off) {
+		xpoff = xdrp->xp_off;
+	} else {
+		xpoff = (xdrp->xp_offp - xdrs->x_base);
+	}
+
 	/*
 	 * If there was a chunk at the current offset, then setup a read
 	 * chunk list which records the destination address and length
 	 * and will RDMA READ the data in later.
 	 */
 
-	if (cle != NULL &&
-	    cle->c_xdroff == (xdrp->xp_offp - xdrs->x_base)) {
+	if (cle != NULL && cle->c_xdroff == xpoff) {
 		for (actual_segments = 0;
 		    actual_segments < total_segments; actual_segments++) {
 			if (total_len <= 0)
@@ -443,7 +527,8 @@
 		cl = *cls;
 		cl.c_next = NULL;
 		cl.c_len = cur_offset;
-		if (clist_syncmem(xdrp->xp_conn, &cl, 0) != RDMA_SUCCESS) {
+		if (clist_syncmem(
+		    xdrp->xp_conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) {
 			retval = FALSE;
 		}
 out:
@@ -454,7 +539,7 @@
 		cl = *cle;
 		cl.c_next = NULL;
 		cl.c_len = cur_offset;
-		(void) clist_deregister(xdrp->xp_conn, &cl, CLIST_REG_DST);
+		(void) clist_deregister(xdrp->xp_conn, &cl);
 		if (alen) {
 			cle->w.c_saddr =
 			    (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len;
@@ -462,12 +547,17 @@
 		}
 		return (retval);
 	}
+
 	if ((xdrs->x_handy -= len) < 0)
 		return (FALSE);
 
 	bcopy(xdrp->xp_offp, addr, len);
+
 	xdrp->xp_offp += len;
 
+	if (xdrp->xp_off != 0)
+		xdrp->xp_off += len;
+
 	return (TRUE);
 }
 
@@ -871,6 +961,9 @@
 	if (!xdr_uint32(xdrs, &num_segment))
 		return (FALSE);
 	for (i = 0; i < num_segment; i++) {
+
+		DTRACE_PROBE1(krpc__i__xdr_encode_wlist_len, uint_t, w->c_len);
+
 		if (!xdr_uint32(xdrs, &w->c_dmemhandle.mrc_rmr))
 			return (FALSE);
 
@@ -922,10 +1015,15 @@
 
 	tmp = *w = clist_alloc();
 	for (i = 0; i < seg_array_len; i++) {
+
 		if (!xdr_uint32(xdrs, &tmp->c_dmemhandle.mrc_rmr))
 			return (FALSE);
 		if (!xdr_uint32(xdrs, &tmp->c_len))
 			return (FALSE);
+
+		DTRACE_PROBE1(krpc__i__xdr_decode_wlist_len,
+		    uint_t, tmp->c_len);
+
 		if (!xdr_uint64(xdrs, &tmp->u.c_daddr))
 			return (FALSE);
 		if (i < seg_array_len - 1) {
@@ -980,6 +1078,7 @@
 	first = ncl = clist_alloc();
 
 	for (i = 0; i < num_wclist; i++) {
+
 		if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr))
 			goto err_out;
 		if (!xdr_uint32(xdrs, &ncl->c_len))
@@ -993,6 +1092,9 @@
 			ncl->c_len = MAX_SVC_XFER_SIZE;
 		}
 
+		DTRACE_PROBE1(krpc__i__xdr_decode_wlist_svc_len,
+		    uint_t, ncl->c_len);
+
 		wcl_length += ncl->c_len;
 
 		if (i < num_wclist - 1) {
@@ -1060,6 +1162,12 @@
 	first = ncl = clist_alloc();
 
 	for (i = 0; i < num_wclist; i++) {
+
+		if (i > 0) {
+			ncl->c_next = clist_alloc();
+			ncl = ncl->c_next;
+		}
+
 		if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr))
 			goto err_out;
 		if (!xdr_uint32(xdrs, &ncl->c_len))
@@ -1077,10 +1185,9 @@
 			DTRACE_PROBE(
 			    krpc__e__xdrrdma__replywchunk__invalid_segaddr);
 
-		if (i > 0) {
-			ncl->c_next = clist_alloc();
-			ncl = ncl->c_next;
-		}
+		DTRACE_PROBE1(krpc__i__xdr_decode_reply_wchunk_c_len,
+		    uint32_t, ncl->c_len);
+
 	}
 	*clist = first;
 	return (TRUE);
@@ -1112,6 +1219,10 @@
 			length = cl_longreply->c_len;
 			offset = (uint64) cl_longreply->u.c_daddr;
 
+			DTRACE_PROBE1(
+			    krpc__i__xdr_encode_reply_wchunk_c_len,
+			    uint32_t, length);
+
 			if (!xdr_uint32(xdrs,
 			    &cl_longreply->c_dmemhandle.mrc_rmr))
 				return (FALSE);
@@ -1129,7 +1240,7 @@
 	return (TRUE);
 }
 bool_t
-xdrrdma_read_from_client(struct clist **rlist, CONN **conn, uint_t count)
+xdrrdma_read_from_client(struct clist *rlist, CONN **conn, uint_t count)
 {
 	struct clist	*rdclist;
 	struct clist	cl;
@@ -1137,36 +1248,54 @@
 	uint32_t	status;
 	bool_t		retval = TRUE;
 
-	(*rlist)->rb_longbuf.type = RDMA_LONG_BUFFER;
-	(*rlist)->rb_longbuf.len =
+	rlist->rb_longbuf.type = RDMA_LONG_BUFFER;
+	rlist->rb_longbuf.len =
 	    count > RCL_BUF_LEN ? count : RCL_BUF_LEN;
 
-	if (rdma_buf_alloc(*conn, &(*rlist)->rb_longbuf)) {
+	if (rdma_buf_alloc(*conn, &rlist->rb_longbuf)) {
 		return (FALSE);
 	}
 
-	for (rdclist = *rlist;
+	/*
+	 * The entire buffer is registered with the first chunk.
+	 * Later chunks will use the same registered memory handle.
+	 */
+
+	cl = *rlist;
+	cl.c_next = NULL;
+	if (clist_register(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) {
+		rdma_buf_free(*conn, &rlist->rb_longbuf);
+		DTRACE_PROBE(
+		    krpc__e__xdrrdma__readfromclient__clist__reg);
+		return (FALSE);
+	}
+
+	rlist->c_regtype = CLIST_REG_DST;
+	rlist->c_dmemhandle = cl.c_dmemhandle;
+	rlist->c_dsynchandle = cl.c_dsynchandle;
+
+	for (rdclist = rlist;
 	    rdclist != NULL; rdclist = rdclist->c_next) {
 		total_len += rdclist->c_len;
 #if (defined(OBJ32)||defined(DEBUG32))
 		rdclist->u.c_daddr3 =
-		    (caddr_t)((char *)(*rlist)->rb_longbuf.addr +
+		    (caddr_t)((char *)rlist->rb_longbuf.addr +
 		    (uint32) rdclist->u.c_daddr3);
 #else
 		rdclist->u.c_daddr3 =
-		    (caddr_t)((char *)(*rlist)->rb_longbuf.addr +
+		    (caddr_t)((char *)rlist->rb_longbuf.addr +
 		    (uint64) rdclist->u.c_daddr);
 
 #endif
 		cl = (*rdclist);
 		cl.c_next = NULL;
 
-		if (clist_register(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) {
-			rdma_buf_free(*conn, &(*rlist)->rb_longbuf);
-			DTRACE_PROBE(
-			    krpc__e__xdrrdma__readfromclient__clist__reg);
-			return (FALSE);
-		}
+		/*
+		 * Use the same memory handle for all the chunks
+		 */
+		cl.c_dmemhandle = rlist->c_dmemhandle;
+		cl.c_dsynchandle = rlist->c_dsynchandle;
+
 
 		DTRACE_PROBE1(krpc__i__xdrrdma__readfromclient__buflen,
 		    int, rdclist->c_len);
@@ -1182,15 +1311,15 @@
 		if (status != RDMA_SUCCESS) {
 			DTRACE_PROBE(
 			    krpc__e__xdrrdma__readfromclient__readfailed);
-			rdma_buf_free(*conn, &(*rlist)->rb_longbuf);
+			rdma_buf_free(*conn, &rlist->rb_longbuf);
 			return (FALSE);
 		}
 	}
 
-	cl = (*(*rlist));
+	cl = (*rlist);
 	cl.c_next = NULL;
 	cl.c_len = total_len;
-	if (clist_syncmem(*conn, &cl, 0) != RDMA_SUCCESS) {
+	if (clist_syncmem(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) {
 		retval = FALSE;
 	}
 	return (retval);
@@ -1205,30 +1334,234 @@
 }
 
 bool_t
-xdrrdma_send_read_data(XDR *xdrs, struct clist *wcl)
+xdrrdma_send_read_data(XDR *xdrs, uint_t data_len, struct clist *wcl)
 {
 	int status;
 	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
 	struct xdr_ops *xops = xdrrdma_xops();
+	struct clist *tcl, *wrcl, *cl;
+	struct clist fcl;
+	int rndup_present, rnduplen;
+
+	rndup_present = 0;
+	wrcl = NULL;
 
 	/* caller is doing a sizeof */
 	if (xdrs->x_ops != &xdrrdma_ops || xdrs->x_ops == xops)
 		return (TRUE);
 
-	status = clist_register(xdrp->xp_conn, wcl, CLIST_REG_SOURCE);
+	/* copy of the first chunk */
+	fcl = *wcl;
+	fcl.c_next = NULL;
+
+	/*
+	 * The entire buffer is registered with the first chunk.
+	 * Later chunks will use the same registered memory handle.
+	 */
+
+	status = clist_register(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE);
 	if (status != RDMA_SUCCESS) {
 		return (FALSE);
 	}
 
-	status = clist_syncmem(xdrp->xp_conn, wcl, CLIST_REG_SOURCE);
+	wcl->c_regtype = CLIST_REG_SOURCE;
+	wcl->c_smemhandle = fcl.c_smemhandle;
+	wcl->c_ssynchandle = fcl.c_ssynchandle;
+
+	/*
+	 * Only transfer the read data ignoring any trailing
+	 * roundup chunks. A bit of work, but it saves an
+	 * unnecessary extra RDMA_WRITE containing only
+	 * roundup bytes.
+	 */
+
+	rnduplen = clist_len(wcl) - data_len;
+
+	if (rnduplen) {
+
+		tcl = wcl->c_next;
+
+		/*
+		 * Check if there is a trailing roundup chunk
+		 */
+		while (tcl) {
+			if ((tcl->c_next == NULL) && (tcl->c_len == rnduplen)) {
+				rndup_present = 1;
+				break;
+			}
+			tcl = tcl->c_next;
+		}
+
+		/*
+		 * Make a copy chunk list skipping the last chunk
+		 */
+		if (rndup_present) {
+			cl = wcl;
+			tcl = NULL;
+			while (cl) {
+				if (tcl == NULL) {
+					tcl = clist_alloc();
+					wrcl = tcl;
+				} else {
+					tcl->c_next = clist_alloc();
+					tcl = tcl->c_next;
+				}
+
+				*tcl = *cl;
+				cl = cl->c_next;
+				/* last chunk */
+				if (cl->c_next == NULL)
+					break;
+			}
+			tcl->c_next = NULL;
+		}
+	}
+
+	if (wrcl == NULL) {
+		/* No roundup chunks */
+		wrcl = wcl;
+	}
+
+	/*
+	 * Set the registered memory handles for the
+	 * rest of the chunks same as the first chunk.
+	 */
+	tcl = wrcl->c_next;
+	while (tcl) {
+		tcl->c_smemhandle = fcl.c_smemhandle;
+		tcl->c_ssynchandle = fcl.c_ssynchandle;
+		tcl = tcl->c_next;
+	}
+
+	/*
+	 * Sync the total len beginning from the first chunk.
+	 */
+	fcl.c_len = clist_len(wrcl);
+	status = clist_syncmem(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE);
 	if (status != RDMA_SUCCESS) {
 		return (FALSE);
 	}
 
-	status = RDMA_WRITE(xdrp->xp_conn, wcl, WAIT);
+	status = RDMA_WRITE(xdrp->xp_conn, wrcl, WAIT);
+
+	if (rndup_present)
+		clist_free(wrcl);
+
 	if (status != RDMA_SUCCESS) {
 		return (FALSE);
 	}
 
 	return (TRUE);
 }
+
+
+/*
+ * Reads one chunk at a time
+ */
+
+static bool_t
+xdrrdma_read_a_chunk(XDR *xdrs, CONN **conn)
+{
+	int status;
+	int32_t len = 0;
+	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
+	struct clist *cle = *(xdrp->xp_rcl_next);
+	struct clist *rclp = xdrp->xp_rcl;
+	struct clist *clp;
+
+	/*
+	 * len is used later to decide xdr offset in
+	 * the chunk factoring any 4-byte XDR alignment
+	 * (See read chunk example top of this file)
+	 */
+	while (rclp != cle) {
+		len += rclp->c_len;
+		rclp = rclp->c_next;
+	}
+
+	len = RNDUP(len) - len;
+
+	ASSERT(xdrs->x_handy <= 0);
+
+	/*
+	 * If this is the first chunk to contain the RPC
+	 * message set xp_off to the xdr offset of the
+	 * inline message.
+	 */
+	if (xdrp->xp_off == 0)
+		xdrp->xp_off = (xdrp->xp_offp - xdrs->x_base);
+
+	if (cle == NULL || (cle->c_xdroff != xdrp->xp_off))
+		return (FALSE);
+
+	/*
+	 * Make a copy of the chunk to read from client.
+	 * Chunks are read on demand, so read only one
+	 * for now.
+	 */
+
+	rclp = clist_alloc();
+	*rclp = *cle;
+	rclp->c_next = NULL;
+
+	xdrp->xp_rcl_next = &cle->c_next;
+
+	/*
+	 * If there is a roundup present, then skip those
+	 * bytes when reading.
+	 */
+	if (len) {
+		rclp->w.c_saddr =
+		    (uint64)(uintptr_t)rclp->w.c_saddr + len;
+			rclp->c_len = rclp->c_len - len;
+	}
+
+	status = xdrrdma_read_from_client(rclp, conn, rclp->c_len);
+
+	if (status == FALSE) {
+		clist_free(rclp);
+		return (status);
+	}
+
+	xdrp->xp_offp = rclp->rb_longbuf.addr;
+	xdrs->x_base = xdrp->xp_offp;
+	xdrs->x_handy = rclp->c_len;
+
+	/*
+	 * This copy of read chunks containing the XDR
+	 * message is freed later in xdrrdma_destroy()
+	 */
+
+	if (xdrp->xp_rcl_xdr) {
+		/* Add the chunk to end of the list */
+		clp = xdrp->xp_rcl_xdr;
+		while (clp->c_next != NULL)
+			clp = clp->c_next;
+		clp->c_next = rclp;
+	} else {
+		xdrp->xp_rcl_xdr = rclp;
+	}
+	return (TRUE);
+}
+
+static void
+xdrrdma_free_xdr_chunks(CONN *conn, struct clist *xdr_rcl)
+{
+	struct clist *cl;
+
+	(void) clist_deregister(conn, xdr_rcl);
+
+	/*
+	 * Read chunks containing parts XDR message are
+	 * special: in case of multiple chunks each has
+	 * its own buffer.
+	 */
+
+	cl = xdr_rcl;
+	while (cl) {
+		rdma_buf_free(conn, &cl->rb_longbuf);
+		cl = cl->c_next;
+	}
+
+	clist_free(xdr_rcl);
+}