6760947 NFS/RDMA port should be changed to IANA assigned 20049
authorSiddheshwar Mahesh <Siddheshwar.Mahesh@Sun.COM>
Fri, 10 Apr 2009 22:57:35 -0700
changeset 9348 7155ecb17858
parent 9347 3ddbbc1bab4b
child 9349 af3421bbd991
6760947 NFS/RDMA port should be changed to IANA assigned 20049 6762173 rdma panic on writes from linux client 6790590 readdir fails from Linux client against Solaris server 6790588 linux client fails to decode READ replies from Solaris server 6790586 Solaris server should better handle chunked RPC/RDMA messages 6826476 rpcib leaks memory registrations while handling multiple chunks
usr/src/uts/common/fs/nfs/nfs3_srv.c
usr/src/uts/common/fs/nfs/nfs3_xdr.c
usr/src/uts/common/fs/nfs/nfs4_srv.c
usr/src/uts/common/fs/nfs/nfs4_xdr.c
usr/src/uts/common/fs/nfs/nfs_srv.c
usr/src/uts/common/fs/nfs/nfs_xdr.c
usr/src/uts/common/rpc/clnt_rdma.c
usr/src/uts/common/rpc/rdma_subr.c
usr/src/uts/common/rpc/rpc_rdma.h
usr/src/uts/common/rpc/rpcib.c
usr/src/uts/common/rpc/svc_rdma.c
usr/src/uts/common/rpc/xdr_rdma.c
--- a/usr/src/uts/common/fs/nfs/nfs3_srv.c	Fri Apr 10 21:47:04 2009 -0700
+++ b/usr/src/uts/common/fs/nfs/nfs3_srv.c	Fri Apr 10 22:57:35 2009 -0700
@@ -1069,6 +1069,8 @@
 		/* RDMA */
 		resp->resok.wlist = args->wlist;
 		resp->resok.wlist_len = resp->resok.count;
+		if (resp->resok.wlist)
+			clist_zero_len(resp->resok.wlist);
 		goto done;
 	}
 
@@ -1086,6 +1088,8 @@
 		/* RDMA */
 		resp->resok.wlist = args->wlist;
 		resp->resok.wlist_len = resp->resok.count;
+		if (resp->resok.wlist)
+			clist_zero_len(resp->resok.wlist);
 		goto done;
 	}
 
@@ -4589,43 +4593,17 @@
 rdma_setup_read_data3(READ3args *args, READ3resok *rok)
 {
 	struct clist	*wcl;
-	int		data_len, avail_len, num;
+	int		wlist_len;
 	count3		count = rok->count;
 
-	data_len = num = avail_len = 0;
-
 	wcl = args->wlist;
-	while (wcl != NULL) {
-		if (wcl->c_dmemhandle.mrc_rmr == 0)
-			break;
-
-		avail_len += wcl->c_len;
-		if (wcl->c_len < count) {
-			data_len += wcl->c_len;
-		} else {
-			/* Can make the rest chunks all 0-len */
-			data_len += count;
-			wcl->c_len = count;
-		}
-		count -= wcl->c_len;
-		num ++;
-		wcl = wcl->c_next;
-	}
-
-	/*
-	 * MUST fail if there are still more data
-	 */
-	if (count > 0) {
-		DTRACE_PROBE2(nfss__e__read3_wlist_fail,
-		    int, data_len, int, count);
+	if (rdma_setup_read_chunks(wcl, count, &wlist_len) == FALSE) {
 		return (FALSE);
 	}
 
 	wcl = args->wlist;
-	rok->count = data_len;
-	rok->wlist_len = data_len;
+	rok->wlist_len = wlist_len;
 	rok->wlist = wcl;
-
 	return (TRUE);
 }
 
--- a/usr/src/uts/common/fs/nfs/nfs3_xdr.c	Fri Apr 10 21:47:04 2009 -0700
+++ b/usr/src/uts/common/fs/nfs/nfs3_xdr.c	Fri Apr 10 22:57:35 2009 -0700
@@ -19,7 +19,7 @@
  * CDDL HEADER END
  */
 /*
- * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
 
@@ -1342,12 +1342,10 @@
 			 * uses xdr_READ3vres/xdr_READ3uiores to decode results.
 			 */
 			if (resokp->wlist) {
-				if (resokp->wlist->c_len != resokp->count) {
-					resokp->wlist->c_len = resokp->count;
-				}
 				if (resokp->count != 0) {
 					return (xdrrdma_send_read_data(
-					    xdrs, resokp->wlist));
+					    xdrs, resokp->count,
+					    resokp->wlist));
 				}
 				return (TRUE);
 			}
@@ -1412,11 +1410,22 @@
 			if (ocount != objp->count) {
 				DTRACE_PROBE2(xdr__e__read3vres_fail,
 				    int, ocount, int, objp->count);
+				objp->wlist = NULL;
 				return (FALSE);
 			}
 
-			objp->wlist_len = cl->c_len;
-			objp->data.data_len = objp->wlist_len;
+			objp->wlist_len = clist_len(cl);
+			objp->data.data_len = ocount;
+
+			if (objp->wlist_len !=
+			    roundup(objp->data.data_len, BYTES_PER_XDR_UNIT)) {
+				DTRACE_PROBE2(
+				    xdr__e__read3vres_fail,
+				    int, ocount,
+				    int, objp->data.data_len);
+				objp->wlist = NULL;
+				return (FALSE);
+			}
 			return (TRUE);
 		}
 	}
@@ -1526,7 +1535,7 @@
 				return (FALSE);
 			}
 
-			objp->wlist_len = cl->c_len;
+			objp->wlist_len = clist_len(cl);
 
 			uiop->uio_resid -= objp->count;
 			uiop->uio_iov->iov_len -= objp->count;
@@ -1536,7 +1545,7 @@
 			/*
 			 * XXX: Assume 1 iov, needs to be changed.
 			 */
-			objp->size = objp->wlist_len;
+			objp->size = objp->count;
 
 			return (TRUE);
 		}
@@ -1613,7 +1622,7 @@
 			    &objp->conn, nfs3tsize()) == TRUE) {
 				objp->data.data_val = NULL;
 				if (xdrrdma_read_from_client(
-				    &objp->rlist,
+				    objp->rlist,
 				    &objp->conn,
 				    objp->count) == FALSE) {
 					return (FALSE);
--- a/usr/src/uts/common/fs/nfs/nfs4_srv.c	Fri Apr 10 21:47:04 2009 -0700
+++ b/usr/src/uts/common/fs/nfs/nfs4_srv.c	Fri Apr 10 22:57:35 2009 -0700
@@ -3191,6 +3191,8 @@
 		resp->wlist = args->wlist;
 		resp->wlist_len = resp->data_len;
 		*cs->statusp = resp->status = NFS4_OK;
+		if (resp->wlist)
+			clist_zero_len(resp->wlist);
 		goto out;
 	}
 
@@ -3203,6 +3205,8 @@
 		/* RDMA */
 		resp->wlist = args->wlist;
 		resp->wlist_len = resp->data_len;
+		if (resp->wlist)
+			clist_zero_len(resp->wlist);
 		goto out;
 	}
 
@@ -9295,44 +9299,18 @@
 }
 
 static int
-rdma_setup_read_data4(READ4args * args, READ4res * rok)
+rdma_setup_read_data4(READ4args *args, READ4res *rok)
 {
 	struct clist	*wcl;
-	int		data_len, avail_len, num;
 	count4		count = rok->data_len;
-
-	data_len = num = avail_len = 0;
+	int		wlist_len;
 
 	wcl = args->wlist;
-	while (wcl != NULL) {
-		if (wcl->c_dmemhandle.mrc_rmr == 0)
-			break;
-
-		avail_len += wcl->c_len;
-		if (wcl->c_len < count) {
-			data_len += wcl->c_len;
-		} else {
-			/* Can make the rest chunks all 0-len */
-			data_len += count;
-			wcl->c_len = count;
-		}
-		count -= wcl->c_len;
-		num++;
-		wcl = wcl->c_next;
-	}
-
-	/*
-	 * MUST fail if there are still more data
-	 */
-	if (count > 0) {
-		DTRACE_PROBE2(nfss__e__read4_wlist_fail,
-		    int, data_len, int, count);
+	if (rdma_setup_read_chunks(wcl, count, &wlist_len) == FALSE) {
 		return (FALSE);
 	}
 	wcl = args->wlist;
-	rok->data_len = data_len;
-	rok->wlist_len = data_len;
+	rok->wlist_len = wlist_len;
 	rok->wlist = wcl;
-
 	return (TRUE);
 }
--- a/usr/src/uts/common/fs/nfs/nfs4_xdr.c	Fri Apr 10 21:47:04 2009 -0700
+++ b/usr/src/uts/common/fs/nfs/nfs4_xdr.c	Fri Apr 10 22:57:35 2009 -0700
@@ -19,12 +19,10 @@
  * CDDL HEADER END
  */
 /*
- * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
 
-#pragma ident	"@(#)nfs4_xdr.c	1.27	08/08/11 SMI"
-
 /*
  * A handcoded version based on the original rpcgen code.
  *
@@ -3240,12 +3238,9 @@
 		 * uses xdr_READ4res_clnt to decode results.
 		 */
 		if (objp->wlist) {
-			if (objp->wlist->c_len != objp->data_len) {
-				objp->wlist->c_len = objp->data_len;
-			}
 			if (objp->data_len != 0) {
 				return (xdrrdma_send_read_data(
-				    xdrs, objp->wlist));
+				    xdrs, objp->data_len, objp->wlist));
 			}
 			return (TRUE);
 		}
@@ -3341,10 +3336,12 @@
 					return (FALSE);
 				}
 
-				objp->wlist_len = cl->c_len;
-				objp->data_len = objp->wlist_len;
-
-				if (ocount != objp->data_len) {
+				objp->wlist_len = clist_len(cl);
+				objp->data_len = ocount;
+
+				if (objp->wlist_len !=
+				    roundup(
+				    objp->data_len, BYTES_PER_XDR_UNIT)) {
 					DTRACE_PROBE2(
 					    xdr__e__read4resuio_clnt_fail,
 					    int, ocount,
@@ -3429,10 +3426,12 @@
 				return (FALSE);
 			}
 
-			objp->wlist_len = cl->c_len;
-			objp->data_len = objp->wlist_len;
-
-			if (ocount != objp->data_len) {
+			objp->wlist_len = clist_len(cl);
+			objp->data_len = ocount;
+
+			if (objp->wlist_len !=
+			    roundup(
+			    objp->data_len, BYTES_PER_XDR_UNIT)) {
 				DTRACE_PROBE2(
 				    xdr__e__read4res_clnt_fail,
 				    int, ocount,
@@ -3732,7 +3731,7 @@
 				    &objp->conn, NFS4_DATA_LIMIT);
 				if (retval == FALSE)
 					return (FALSE);
-				return (xdrrdma_read_from_client(&objp->rlist,
+				return (xdrrdma_read_from_client(objp->rlist,
 				    &objp->conn, objp->data_len));
 			}
 		}
--- a/usr/src/uts/common/fs/nfs/nfs_srv.c	Fri Apr 10 21:47:04 2009 -0700
+++ b/usr/src/uts/common/fs/nfs/nfs_srv.c	Fri Apr 10 22:57:35 2009 -0700
@@ -723,6 +723,9 @@
 		 * to encode. So set rr_mp to NULL.
 		 */
 		rr->rr_mp = NULL;
+		rr->rr_ok.rrok_wlist = ra->ra_wlist;
+		if (rr->rr_ok.rrok_wlist)
+			clist_zero_len(rr->rr_ok.rrok_wlist);
 		goto done;
 	}
 
@@ -2945,41 +2948,17 @@
 rdma_setup_read_data2(struct nfsreadargs *ra, struct nfsrdresult *rr)
 {
 	struct clist	*wcl;
-	int		data_len, avail_len, num;
+	int		wlist_len;
 	uint32_t	count = rr->rr_count;
 
-	data_len = num = avail_len = 0;
-
 	wcl = ra->ra_wlist;
-	while (wcl != NULL) {
-		if (wcl->c_dmemhandle.mrc_rmr == 0)
-			break;
-
-		avail_len += wcl->c_len;
-		if (wcl->c_len < count) {
-			data_len += wcl->c_len;
-		} else {
-			/* Can make the rest chunks all 0-len */
-			data_len += count;
-			wcl->c_len = count;
-		}
-		count -= wcl->c_len;
-		num ++;
-		wcl = wcl->c_next;
-	}
-
-	/*
-	 * MUST fail if there are still more data
-	 */
-	if (count > 0) {
-		DTRACE_PROBE2(nfss__e__read__wlist__fail,
-		    int, data_len, int, count);
+
+	if (rdma_setup_read_chunks(wcl, count, &wlist_len) == FALSE) {
 		return (FALSE);
 	}
 
 	wcl = ra->ra_wlist;
-	rr->rr_count = data_len;
-	rr->rr_ok.rrok_wlist_len = data_len;
+	rr->rr_ok.rrok_wlist_len = wlist_len;
 	rr->rr_ok.rrok_wlist = wcl;
 
 	return (TRUE);
--- a/usr/src/uts/common/fs/nfs/nfs_xdr.c	Fri Apr 10 21:47:04 2009 -0700
+++ b/usr/src/uts/common/fs/nfs/nfs_xdr.c	Fri Apr 10 22:57:35 2009 -0700
@@ -19,7 +19,7 @@
  * CDDL HEADER END
  */
 /*
- * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
 
@@ -177,7 +177,7 @@
 					    &wa->wa_conn,
 					    NFS_MAXDATA) == TRUE)
 					return (xdrrdma_read_from_client(
-					    &wa->wa_rlist,
+					    wa->wa_rlist,
 					    &wa->wa_conn,
 					    wa->wa_count));
 
@@ -472,15 +472,10 @@
 			 * xdr_bytes() below.   RDMA_WRITE transfers the data.
 			 */
 			if (rrok->rrok_wlist) {
-				/* adjust length to match in the rdma header */
-				if (rrok->rrok_wlist->c_len !=
-				    rrok->rrok_count) {
-					rrok->rrok_wlist->c_len =
-					    rrok->rrok_count;
-				}
 				if (rrok->rrok_count != 0) {
 					return (xdrrdma_send_read_data(
-					    xdrs, rrok->rrok_wlist));
+					    xdrs, rrok->rrok_count,
+					    rrok->rrok_wlist));
 				}
 				return (TRUE);
 			}
@@ -500,8 +495,15 @@
 					rrok->rrok_wlist_len = 0;
 					rrok->rrok_count = 0;
 				} else {
-					rrok->rrok_wlist_len = cl->c_len;
-					rrok->rrok_count = cl->c_len;
+					rrok->rrok_wlist_len = clist_len(cl);
+					if (rrok->rrok_wlist_len !=
+					    roundup(count,
+					    BYTES_PER_XDR_UNIT)) {
+						rrok->rrok_wlist_len = 0;
+						rrok->rrok_count = 0;
+						return (FALSE);
+					}
+					rrok->rrok_count = count;
 				}
 				return (TRUE);
 			}
--- a/usr/src/uts/common/rpc/clnt_rdma.c	Fri Apr 10 21:47:04 2009 -0700
+++ b/usr/src/uts/common/rpc/clnt_rdma.c	Fri Apr 10 22:57:35 2009 -0700
@@ -64,7 +64,7 @@
 static int  clnt_compose_rdma_header(CONN *, CLIENT *, rdma_buf_t *,
 		    XDR **, uint_t *);
 static int clnt_setup_rlist(CONN *, XDR *, XDR *);
-static int clnt_setup_wlist(CONN *, XDR *, XDR *);
+static int clnt_setup_wlist(CONN *, XDR *, XDR *, rdma_buf_t *);
 static int clnt_setup_long_reply(CONN *, struct clist **, uint_t);
 static void clnt_check_credit(CONN *);
 static void clnt_return_credit(CONN *);
@@ -73,7 +73,6 @@
 		struct clist *, uint_t, uint_t);
 
 static void clnt_update_credit(CONN *, uint32_t);
-static void check_dereg_wlist(CONN *, struct clist *);
 
 static enum clnt_stat clnt_rdma_kcallit(CLIENT *, rpcproc_t, xdrproc_t,
     caddr_t, xdrproc_t, caddr_t, struct timeval);
@@ -456,24 +455,63 @@
  * the memory and encode the clist into the outbound XDR stream.
  */
 static int
-clnt_setup_wlist(CONN *conn, XDR *xdrs, XDR *call_xdrp)
+clnt_setup_wlist(CONN *conn, XDR *xdrs, XDR *call_xdrp, rdma_buf_t *rndbuf)
 {
 	int status;
-	struct clist *wlist;
+	struct clist *wlist, *rndcl;
+	int wlen, rndlen;
 	int32_t xdr_flag = XDR_RDMA_WLIST_REG;
 
 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_WLIST, &wlist);
 
 	if (wlist != NULL) {
+		/*
+		 * If we are sending a non 4-byte alligned length
+		 * the server will roundup the length to 4-byte
+		 * boundary. In such a case, a trailing chunk is
+		 * added to take any spill over roundup bytes.
+		 */
+		wlen = clist_len(wlist);
+		rndlen = (roundup(wlen, BYTES_PER_XDR_UNIT) - wlen);
+		if (rndlen) {
+			rndcl = clist_alloc();
+			/*
+			 * calc_length() will allocate a PAGESIZE
+			 * buffer below.
+			 */
+			rndcl->c_len = calc_length(rndlen);
+			rndcl->rb_longbuf.type = RDMA_LONG_BUFFER;
+			rndcl->rb_longbuf.len = rndcl->c_len;
+			if (rdma_buf_alloc(conn, &rndcl->rb_longbuf)) {
+				clist_free(rndcl);
+				return (CLNT_RDMA_FAIL);
+			}
+
+			/* Roundup buffer freed back in caller */
+			*rndbuf = rndcl->rb_longbuf;
+
+			rndcl->u.c_daddr3 = rndcl->rb_longbuf.addr;
+			rndcl->c_next = NULL;
+			rndcl->c_dmemhandle = rndcl->rb_longbuf.handle;
+			wlist->c_next = rndcl;
+		}
+
 		status = clist_register(conn, wlist, CLIST_REG_DST);
 		if (status != RDMA_SUCCESS) {
+			rdma_buf_free(conn, rndbuf);
+			bzero(rndbuf, sizeof (rdma_buf_t));
 			return (CLNT_RDMA_FAIL);
 		}
 		XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
 	}
 
-	if (!xdr_encode_wlist(xdrs, wlist))
+	if (!xdr_encode_wlist(xdrs, wlist)) {
+		if (rndlen) {
+			rdma_buf_free(conn, rndbuf);
+			bzero(rndbuf, sizeof (rdma_buf_t));
+		}
 		return (CLNT_RDMA_FAIL);
+	}
 
 	return (CLNT_RDMA_SUCCESS);
 }
@@ -539,6 +577,7 @@
 	struct clist *cl_rdma_reply;
 	struct clist *cl_rpcreply_wlist;
 	struct clist *cl_long_reply;
+	rdma_buf_t  rndup;
 
 	uint_t vers;
 	uint_t op;
@@ -564,6 +603,7 @@
 
 	bzero(&clmsg, sizeof (clmsg));
 	bzero(&rpcmsg, sizeof (rpcmsg));
+	bzero(&rndup, sizeof (rndup));
 	try_call_again = 0;
 	cl_sendlist = NULL;
 	cl_recvlist = NULL;
@@ -813,7 +853,7 @@
 	 * other operations will have a NULL which will result
 	 * as a NULL list in the XDR stream.
 	 */
-	status = clnt_setup_wlist(conn, rdmahdr_o_xdrs, call_xdrp);
+	status = clnt_setup_wlist(conn, rdmahdr_o_xdrs, call_xdrp, &rndup);
 	if (status != CLNT_RDMA_SUCCESS) {
 		rdma_buf_free(conn, &clmsg);
 		p->cku_err.re_status = RPC_CANTSEND;
@@ -1092,7 +1132,7 @@
 	 * If rpc reply is in a chunk, free it now.
 	 */
 	if (cl_long_reply) {
-		(void) clist_deregister(conn, cl_long_reply, CLIST_REG_DST);
+		(void) clist_deregister(conn, cl_long_reply);
 		rdma_buf_free(conn, &cl_long_reply->rb_longbuf);
 		clist_free(cl_long_reply);
 	}
@@ -1100,6 +1140,10 @@
 	if (call_xdrp)
 		XDR_DESTROY(call_xdrp);
 
+	if (rndup.rb_private) {
+		rdma_buf_free(conn, &rndup);
+	}
+
 	if (reply_xdrp) {
 		(void) xdr_rpc_free_verifier(reply_xdrp, &reply_msg);
 		XDR_DESTROY(reply_xdrp);
@@ -1322,22 +1366,3 @@
 	rw_exit(&rdma_lock);
 	return (-1);
 }
-
-static void
-check_dereg_wlist(CONN *conn, clist *rwc)
-{
-	int status;
-
-	if (rwc == NULL)
-		return;
-
-	if (rwc->c_dmemhandle.mrc_rmr && rwc->c_len) {
-
-		status = clist_deregister(conn, rwc, CLIST_REG_DST);
-
-		if (status != RDMA_SUCCESS) {
-			DTRACE_PROBE1(krpc__e__clntrdma__dereg_wlist,
-			    int, status);
-		}
-	}
-}
--- a/usr/src/uts/common/rpc/rdma_subr.c	Fri Apr 10 21:47:04 2009 -0700
+++ b/usr/src/uts/common/rpc/rdma_subr.c	Fri Apr 10 22:57:35 2009 -0700
@@ -206,6 +206,28 @@
 	return (clp);
 }
 
+uint32_t
+clist_len(struct clist *cl)
+{
+	uint32_t len = 0;
+	while (cl) {
+		len += cl->c_len;
+		cl = cl->c_next;
+	}
+	return (len);
+}
+
+void
+clist_zero_len(struct clist *cl)
+{
+	while (cl != NULL) {
+		if (cl->c_dmemhandle.mrc_rmr == 0)
+			break;
+		cl->c_len = 0;
+		cl = cl->c_next;
+	}
+}
+
 /*
  * Creates a new chunk list entry, and
  * adds it to the end of a chunk list.
@@ -245,17 +267,20 @@
 	for (c = cl; c; c = c->c_next) {
 		if (c->c_len <= 0)
 			continue;
+
+		c->c_regtype = dstsrc;
+
 		switch (dstsrc) {
 		case CLIST_REG_SOURCE:
 			status = RDMA_REGMEMSYNC(conn,
-			    (caddr_t)(struct as *)cl->c_adspc,
+			    (caddr_t)(struct as *)c->c_adspc,
 			    (caddr_t)(uintptr_t)c->w.c_saddr3, c->c_len,
 			    &c->c_smemhandle, (void **)&c->c_ssynchandle,
 			    (void *)c->rb_longbuf.rb_private);
 			break;
 		case CLIST_REG_DST:
 			status = RDMA_REGMEMSYNC(conn,
-			    (caddr_t)(struct as *)cl->c_adspc,
+			    (caddr_t)(struct as *)c->c_adspc,
 			    (caddr_t)(uintptr_t)c->u.c_daddr3, c->c_len,
 			    &c->c_dmemhandle, (void **)&c->c_dsynchandle,
 			    (void *)c->rb_longbuf.rb_private);
@@ -264,7 +289,7 @@
 			return (RDMA_INVAL);
 		}
 		if (status != RDMA_SUCCESS) {
-			(void) clist_deregister(conn, cl, dstsrc);
+			(void) clist_deregister(conn, cl);
 			return (status);
 		}
 	}
@@ -273,12 +298,12 @@
 }
 
 rdma_stat
-clist_deregister(CONN *conn, struct clist *cl, clist_dstsrc dstsrc)
+clist_deregister(CONN *conn, struct clist *cl)
 {
 	struct clist *c;
 
 	for (c = cl; c; c = c->c_next) {
-		switch (dstsrc) {
+		switch (c->c_regtype) {
 		case CLIST_REG_SOURCE:
 			if (c->c_smemhandle.mrc_rmr != 0) {
 				(void) RDMA_DEREGMEMSYNC(conn,
@@ -302,7 +327,8 @@
 			}
 			break;
 		default:
-			return (RDMA_INVAL);
+			/* clist unregistered. continue */
+			break;
 		}
 	}
 
--- a/usr/src/uts/common/rpc/rpc_rdma.h	Fri Apr 10 21:47:04 2009 -0700
+++ b/usr/src/uts/common/rpc/rpc_rdma.h	Fri Apr 10 22:57:35 2009 -0700
@@ -134,7 +134,7 @@
 }rpccall_write_t;
 
 typedef enum {
-	CLIST_REG_SOURCE,
+	CLIST_REG_SOURCE = 1,
 	CLIST_REG_DST
 } clist_dstsrc;
 
@@ -244,6 +244,7 @@
 struct clist {
 	uint32		c_xdroff;	/* XDR offset */
 	uint32		c_len;		/* Length */
+	clist_dstsrc	c_regtype;	/* type of registration */
 	struct mrc	c_smemhandle;	/* src memory handle */
 	uint64 		c_ssynchandle;	/* src sync handle */
 	union {
@@ -518,8 +519,10 @@
 extern void clist_add(struct clist **, uint32_t, int,
 	struct mrc *, caddr_t, struct mrc *, caddr_t);
 extern void clist_free(struct clist *);
+extern uint32_t clist_len(struct clist *);
+extern void clist_zero_len(struct clist *);
 extern rdma_stat clist_register(CONN *conn, struct clist *cl, clist_dstsrc);
-extern rdma_stat clist_deregister(CONN *conn, struct clist *cl, clist_dstsrc);
+extern rdma_stat clist_deregister(CONN *conn, struct clist *cl);
 extern rdma_stat clist_syncmem(CONN *conn, struct clist *cl, clist_dstsrc);
 extern rdma_stat rdma_clnt_postrecv(CONN *conn, uint32_t xid);
 extern rdma_stat rdma_clnt_postrecv_remove(CONN *conn, uint32_t xid);
@@ -531,6 +534,7 @@
 extern int rdma_modload();
 extern bool_t   rdma_get_wchunk(struct svc_req *, iovec_t *, struct clist *);
 extern rdma_stat rdma_kwait(void);
+extern int rdma_setup_read_chunks(struct clist *, uint32_t, int *);
 
 /*
  * RDMA XDR
@@ -559,8 +563,8 @@
 		uint32_t seg_array_len);
 bool_t xdrrdma_getrdmablk(XDR *, struct clist **, uint_t *,
 	CONN **conn, const uint_t);
-bool_t xdrrdma_read_from_client(struct clist **, CONN **, uint_t);
-bool_t xdrrdma_send_read_data(XDR *, struct clist *);
+bool_t xdrrdma_read_from_client(struct clist *, CONN **, uint_t);
+bool_t xdrrdma_send_read_data(XDR *, uint_t, struct clist *);
 bool_t xdrrdma_free_clist(CONN *, struct clist *);
 #endif /* _KERNEL */
 
--- a/usr/src/uts/common/rpc/rpcib.c	Fri Apr 10 21:47:04 2009 -0700
+++ b/usr/src/uts/common/rpc/rpcib.c	Fri Apr 10 22:57:35 2009 -0700
@@ -82,7 +82,8 @@
 #include <nfs/nfs.h>
 #include <sys/atomic.h>
 
-#define	NFS_RDMA_PORT	2050
+#define	NFS_RDMA_PORT	20049
+
 
 /*
  * Convenience structures for connection management
@@ -217,6 +218,7 @@
 static bool_t	stats_enabled = FALSE;
 
 static uint64_t max_unsignaled_rws = 5;
+int nfs_rdma_port = NFS_RDMA_PORT;
 
 /*
  * rib_stat: private data pointer used when registering
@@ -1662,7 +1664,7 @@
 		break;
 	}
 
-	ipcm_info.src_port = NFS_RDMA_PORT;
+	ipcm_info.src_port = (in_port_t)nfs_rdma_port;
 
 	ibt_status = ibt_format_ip_private_data(&ipcm_info,
 	    IBT_IP_HDR_PRIV_DATA_SZ, cmp_ip_pvt);
@@ -1685,8 +1687,9 @@
 	qp_attr.rc_control = IBT_CEP_RDMA_RD | IBT_CEP_RDMA_WR;
 	qp_attr.rc_flags = IBT_WR_SIGNALED;
 
-	rptp->path.pi_sid = ibt_get_ip_sid(IPPROTO_TCP, NFS_RDMA_PORT);
+	rptp->path.pi_sid = ibt_get_ip_sid(IPPROTO_TCP, nfs_rdma_port);
 	chan_args.oc_path = &rptp->path;
+
 	chan_args.oc_cm_handler = rib_clnt_cm_handler;
 	chan_args.oc_cm_clnt_private = (void *)rib_stat;
 	chan_args.oc_rdma_ra_out = 4;
@@ -2015,9 +2018,11 @@
 	if (wd->status != (uint_t)SEND_WAIT) {
 		/* got send completion */
 		if (wd->status != RDMA_SUCCESS) {
-			error = wd->status;
-		if (wd->status != RDMA_CONNLOST)
-			error = RDMA_FAILED;
+			if (wd->status != RDMA_CONNLOST) {
+				error = RDMA_FAILED;
+			} else {
+				error = RDMA_CONNLOST;
+			}
 		}
 		for (i = 0; i < wd->nsbufs; i++) {
 			rib_rbuf_free(qptoc(qp), SEND_BUFFER,
@@ -2145,9 +2150,7 @@
 			rib_rbuf_free(conn, SEND_BUFFER,
 			    (void *)(uintptr_t)wdesc->sbufaddr[i]);
 		}
-
 		(void) rib_free_sendwait(wdesc);
-
 		return (RDMA_CONNLOST);
 	}
 	mutex_exit(&conn->c_lock);
@@ -2529,7 +2532,6 @@
 		return (RDMA_FAILED);
 	}
 
-
 	while ((cl != NULL)) {
 		if (cl->c_len > 0) {
 			bzero(&tx_wr, sizeof (ibt_send_wr_t));
@@ -3004,7 +3006,7 @@
 	sdesc.sd_handler = rib_srv_cm_handler;
 	sdesc.sd_flags = 0;
 	ibt_status = ibt_register_service(hca->ibt_clnt_hdl,
-	    &sdesc, ibt_get_ip_sid(IPPROTO_TCP, NFS_RDMA_PORT),
+	    &sdesc, ibt_get_ip_sid(IPPROTO_TCP, nfs_rdma_port),
 	    1, &srv_hdl, &srv_id);
 
 	for (i = 0; i < num_ports; i++) {
--- a/usr/src/uts/common/rpc/svc_rdma.c	Fri Apr 10 21:47:04 2009 -0700
+++ b/usr/src/uts/common/rpc/svc_rdma.c	Fri Apr 10 22:57:35 2009 -0700
@@ -501,14 +501,14 @@
 		status = RDMA_READ(conn, cllong, WAIT);
 		if (status) {
 			DTRACE_PROBE(krpc__e__svcrdma__krecv__read);
-			(void) clist_deregister(conn, cllong, CLIST_REG_DST);
+			(void) clist_deregister(conn, cllong);
 			rdma_buf_free(conn, &cllong->rb_longbuf);
 			clist_free(cllong);
 			goto cll_malloc_err;
 		}
 
 		status = clist_syncmem(conn, cllong, CLIST_REG_DST);
-		(void) clist_deregister(conn, cllong, CLIST_REG_DST);
+		(void) clist_deregister(conn, cllong);
 
 		xdrrdma_create(xdrs, (caddr_t)(uintptr_t)cllong->u.c_daddr3,
 		    cllong->c_len, 0, cl, XDR_DECODE, conn);
@@ -635,6 +635,7 @@
 
 	*final_len = XDR_GETPOS(&xdrslong);
 
+	DTRACE_PROBE1(krpc__i__replylen, uint_t, *final_len);
 	*numchunks = 0;
 	*freelen = 0;
 
@@ -642,10 +643,14 @@
 	wcl->rb_longbuf = long_rpc;
 
 	count = *final_len;
-	while (wcl != NULL) {
+	while ((wcl != NULL) && (count > 0)) {
+
 		if (wcl->c_dmemhandle.mrc_rmr == 0)
 			break;
 
+		DTRACE_PROBE2(krpc__i__write__chunks, uint32_t, count,
+		    uint32_t, wcl->c_len);
+
 		if (wcl->c_len > count) {
 			wcl->c_len = count;
 		}
@@ -653,9 +658,17 @@
 
 		count -= wcl->c_len;
 		*numchunks +=  1;
-		if (count == 0)
+		memp += wcl->c_len;
+		wcl = wcl->c_next;
+	}
+
+	/*
+	 * Make rest of the chunks 0-len
+	 */
+	while (wcl != NULL) {
+		if (wcl->c_dmemhandle.mrc_rmr == 0)
 			break;
-		memp += wcl->c_len;
+		wcl->c_len = 0;
 		wcl = wcl->c_next;
 	}
 
@@ -679,7 +692,7 @@
 	status = clist_syncmem(crdp->conn, wcl, CLIST_REG_SOURCE);
 
 	if (status) {
-		(void) clist_deregister(crdp->conn, wcl, CLIST_REG_SOURCE);
+		(void) clist_deregister(crdp->conn, wcl);
 		rdma_buf_free(crdp->conn, &long_rpc);
 		DTRACE_PROBE(krpc__e__svcrdma__longrep__syncmem);
 		return (SVC_RDMA_FAIL);
@@ -687,7 +700,7 @@
 
 	status = RDMA_WRITE(crdp->conn, wcl, WAIT);
 
-	(void) clist_deregister(crdp->conn, wcl, CLIST_REG_SOURCE);
+	(void) clist_deregister(crdp->conn, wcl);
 	rdma_buf_free(crdp->conn, &wcl->rb_longbuf);
 
 	if (status != RDMA_SUCCESS) {
@@ -1285,3 +1298,84 @@
 
 	return (TRUE);
 }
+
+/*
+ * routine to setup the read chunk lists
+ */
+
+int
+rdma_setup_read_chunks(struct clist *wcl, uint32_t count, int *wcl_len)
+{
+	int		data_len, avail_len;
+	uint_t		round_len;
+
+	data_len = avail_len = 0;
+
+	while (wcl != NULL && count > 0) {
+		if (wcl->c_dmemhandle.mrc_rmr == 0)
+			break;
+
+		if (wcl->c_len < count) {
+			data_len += wcl->c_len;
+			avail_len = 0;
+		} else {
+			data_len += count;
+			avail_len = wcl->c_len - count;
+			wcl->c_len = count;
+		}
+		count -= wcl->c_len;
+
+		if (count == 0)
+			break;
+
+		wcl = wcl->c_next;
+	}
+
+	/*
+	 * MUST fail if there are still more data
+	 */
+	if (count > 0) {
+		DTRACE_PROBE2(krpc__e__rdma_setup_read_chunks_clist_len,
+		    int, data_len, int, count);
+		return (FALSE);
+	}
+
+	/*
+	 * Round up the last chunk to 4-byte boundary
+	 */
+	*wcl_len = roundup(data_len, BYTES_PER_XDR_UNIT);
+	round_len = *wcl_len - data_len;
+
+	if (round_len) {
+
+		/*
+		 * If there is space in the current chunk,
+		 * add the roundup to the chunk.
+		 */
+		if (avail_len >= round_len) {
+			wcl->c_len += round_len;
+		} else  {
+			/*
+			 * try the next one.
+			 */
+			wcl = wcl->c_next;
+			if ((wcl == NULL) || (wcl->c_len < round_len)) {
+				DTRACE_PROBE1(
+				    krpc__e__rdma_setup_read_chunks_rndup,
+				    int, round_len);
+				return (FALSE);
+			}
+			wcl->c_len = round_len;
+		}
+	}
+
+	wcl = wcl->c_next;
+
+	/*
+	 * Make rest of the chunks 0-len
+	 */
+
+	clist_zero_len(wcl);
+
+	return (TRUE);
+}
--- a/usr/src/uts/common/rpc/xdr_rdma.c	Fri Apr 10 21:47:04 2009 -0700
+++ b/usr/src/uts/common/rpc/xdr_rdma.c	Fri Apr 10 22:57:35 2009 -0700
@@ -19,7 +19,7 @@
  * CDDL HEADER END
  */
 /*
- * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
 
@@ -64,6 +64,8 @@
 static rpc_inline_t *xdrrdma_inline(XDR *, int);
 void		xdrrdma_destroy(XDR *);
 static bool_t   xdrrdma_control(XDR *, int, void *);
+static bool_t  xdrrdma_read_a_chunk(XDR *, CONN **);
+static void xdrrdma_free_xdr_chunks(CONN *, struct clist *);
 
 struct xdr_ops  xdrrdmablk_ops = {
 	xdrrdma_getbytes,
@@ -94,6 +96,47 @@
  * separately from the rest of the RPC message. xp_min_chunk = 0, is a
  * special case for ENCODING, which means do not chunk the incoming stream of
  * data.
+ *
+ * A read chunk can contain part of the RPC message in addition to the
+ * inline message. In such a case, (xp_offp - x_base) will not provide
+ * the correct xdr offset of the entire message. xp_off is used in such
+ * a case to denote the offset or current position in the overall message
+ * covering both the inline and the chunk. This is used only in the case
+ * of decoding and useful to compare read chunk 'c_xdroff' offsets.
+ *
+ * An example for a read chunk containing an XDR message:
+ * An NFSv4 compound as following:
+ *
+ * PUTFH
+ * WRITE [4109 bytes]
+ * GETATTR
+ *
+ * Solaris Encoding is:
+ * -------------------
+ *
+ * <Inline message>: [PUTFH WRITE4args GETATTR]
+ *                                   |
+ *                                   v
+ * [RDMA_READ chunks]:               [write data]
+ *
+ *
+ * Linux encoding is:
+ * -----------------
+ *
+ * <Inline message>: [PUTFH WRITE4args]
+ *                                    |
+ *                                    v
+ * [RDMA_READ chunks]:                [Write data] [Write data2] [Getattr chunk]
+ *                                     chunk1       chunk2         chunk3
+ *
+ * where the READ chunks are as:
+ *
+ *             - chunk1 - 4k
+ * write data |
+ *             - chunk2 - 13 bytes(4109 - 4k)
+ * getattr op  - chunk3 - 19 bytes
+ * (getattr op starts at byte 4 after 3 bytes of roundup)
+ *
  */
 
 typedef struct {
@@ -101,8 +144,10 @@
 	int		xp_min_chunk;
 	uint_t		xp_flags;	/* Controls setting for rdma xdr */
 	int		xp_buf_size;	/* size of xdr buffer */
-	struct clist	*xp_rcl;		/* head of chunk list */
+	int		xp_off;		/* overall offset */
+	struct clist	*xp_rcl;	/* head of chunk list */
 	struct clist	**xp_rcl_next;	/* location to place/find next chunk */
+	struct clist	*xp_rcl_xdr;	/* copy of rcl containing RPC message */
 	struct clist	*xp_wcl;	/* head of write chunk list */
 	CONN		*xp_conn;	/* connection for chunk data xfer */
 	uint_t		xp_reply_chunk_len;
@@ -118,7 +163,6 @@
 {
 	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
 	struct clist	*cle = *(xdrp->xp_rcl_next);
-	struct clist	*cls = *(xdrp->xp_rcl_next);
 	struct clist	*rdclist = NULL, *prev = NULL;
 	bool_t		retval = TRUE;
 	uint32_t	cur_offset = 0;
@@ -174,8 +218,19 @@
 	 */
 	for (actual_segments = 0;
 	    actual_segments < total_segments; actual_segments++) {
+
+		DTRACE_PROBE3(krpc__i__xdrrdma_getrdmablk, uint32_t, cle->c_len,
+		    uint32_t, total_len, uint32_t, cle->c_xdroff);
+
 		if (total_len <= 0)
 			break;
+
+		/*
+		 * not the first time in the loop
+		 */
+		if (actual_segments > 0)
+			cle = cle->c_next;
+
 		cle->u.c_daddr = (uint64) cur_offset;
 		alen = 0;
 		if (cle->c_len > total_len) {
@@ -211,14 +266,17 @@
 			prev = rdclist;
 		}
 
-		cle = cle->c_next;
 	}
 
 out:
 	if (prev != NULL)
 		prev->c_next = NULL;
 
-	cle = cls;
+	/*
+	 * Adjust the chunk length, if we read only a part of
+	 * a chunk.
+	 */
+
 	if (alen) {
 		cle->w.c_saddr =
 		    (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len;
@@ -284,8 +342,7 @@
 
 	if (xdrp->xp_wcl) {
 		if (xdrp->xp_flags & XDR_RDMA_WLIST_REG) {
-			(void) clist_deregister(xdrp->xp_conn,
-			    xdrp->xp_wcl, CLIST_REG_DST);
+			(void) clist_deregister(xdrp->xp_conn, xdrp->xp_wcl);
 			rdma_buf_free(xdrp->xp_conn,
 			    &xdrp->xp_wcl->rb_longbuf);
 		}
@@ -294,14 +351,16 @@
 
 	if (xdrp->xp_rcl) {
 		if (xdrp->xp_flags & XDR_RDMA_RLIST_REG) {
-			(void) clist_deregister(xdrp->xp_conn,
-			    xdrp->xp_rcl, CLIST_REG_SOURCE);
+			(void) clist_deregister(xdrp->xp_conn, xdrp->xp_rcl);
 			rdma_buf_free(xdrp->xp_conn,
 			    &xdrp->xp_rcl->rb_longbuf);
 		}
 		clist_free(xdrp->xp_rcl);
 	}
 
+	if (xdrp->xp_rcl_xdr)
+		xdrrdma_free_xdr_chunks(xdrp->xp_conn, xdrp->xp_rcl_xdr);
+
 	(void) kmem_free(xdrs->x_private, sizeof (xrdma_private_t));
 	xdrs->x_private = NULL;
 }
@@ -310,14 +369,32 @@
 xdrrdma_getint32(XDR *xdrs, int32_t *int32p)
 {
 	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
+	int chunked = 0;
 
-	if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0)
-		return (FALSE);
+	if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0) {
+		/*
+		 * check if rest of the rpc message is in a chunk
+		 */
+		if (!xdrrdma_read_a_chunk(xdrs, &xdrp->xp_conn)) {
+			return (FALSE);
+		}
+		chunked = 1;
+	}
 
 	/* LINTED pointer alignment */
 	*int32p = (int32_t)ntohl((uint32_t)(*((int32_t *)(xdrp->xp_offp))));
+
+	DTRACE_PROBE1(krpc__i__xdrrdma_getint32, int32_t, *int32p);
+
 	xdrp->xp_offp += sizeof (int32_t);
 
+	if (chunked)
+		xdrs->x_handy -= (int)sizeof (int32_t);
+
+	if (xdrp->xp_off != 0) {
+		xdrp->xp_off += sizeof (int32_t);
+	}
+
 	return (TRUE);
 }
 
@@ -355,6 +432,7 @@
 	uint32_t	actual_segments = 0;
 	uint32_t	status;
 	uint32_t	alen;
+	uint32_t	xpoff;
 
 	while (cle) {
 		total_segments++;
@@ -362,14 +440,20 @@
 	}
 
 	cle = *(xdrp->xp_rcl_next);
+
+	if (xdrp->xp_off) {
+		xpoff = xdrp->xp_off;
+	} else {
+		xpoff = (xdrp->xp_offp - xdrs->x_base);
+	}
+
 	/*
 	 * If there was a chunk at the current offset, then setup a read
 	 * chunk list which records the destination address and length
 	 * and will RDMA READ the data in later.
 	 */
 
-	if (cle != NULL &&
-	    cle->c_xdroff == (xdrp->xp_offp - xdrs->x_base)) {
+	if (cle != NULL && cle->c_xdroff == xpoff) {
 		for (actual_segments = 0;
 		    actual_segments < total_segments; actual_segments++) {
 			if (total_len <= 0)
@@ -443,7 +527,8 @@
 		cl = *cls;
 		cl.c_next = NULL;
 		cl.c_len = cur_offset;
-		if (clist_syncmem(xdrp->xp_conn, &cl, 0) != RDMA_SUCCESS) {
+		if (clist_syncmem(
+		    xdrp->xp_conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) {
 			retval = FALSE;
 		}
 out:
@@ -454,7 +539,7 @@
 		cl = *cle;
 		cl.c_next = NULL;
 		cl.c_len = cur_offset;
-		(void) clist_deregister(xdrp->xp_conn, &cl, CLIST_REG_DST);
+		(void) clist_deregister(xdrp->xp_conn, &cl);
 		if (alen) {
 			cle->w.c_saddr =
 			    (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len;
@@ -462,12 +547,17 @@
 		}
 		return (retval);
 	}
+
 	if ((xdrs->x_handy -= len) < 0)
 		return (FALSE);
 
 	bcopy(xdrp->xp_offp, addr, len);
+
 	xdrp->xp_offp += len;
 
+	if (xdrp->xp_off != 0)
+		xdrp->xp_off += len;
+
 	return (TRUE);
 }
 
@@ -871,6 +961,9 @@
 	if (!xdr_uint32(xdrs, &num_segment))
 		return (FALSE);
 	for (i = 0; i < num_segment; i++) {
+
+		DTRACE_PROBE1(krpc__i__xdr_encode_wlist_len, uint_t, w->c_len);
+
 		if (!xdr_uint32(xdrs, &w->c_dmemhandle.mrc_rmr))
 			return (FALSE);
 
@@ -922,10 +1015,15 @@
 
 	tmp = *w = clist_alloc();
 	for (i = 0; i < seg_array_len; i++) {
+
 		if (!xdr_uint32(xdrs, &tmp->c_dmemhandle.mrc_rmr))
 			return (FALSE);
 		if (!xdr_uint32(xdrs, &tmp->c_len))
 			return (FALSE);
+
+		DTRACE_PROBE1(krpc__i__xdr_decode_wlist_len,
+		    uint_t, tmp->c_len);
+
 		if (!xdr_uint64(xdrs, &tmp->u.c_daddr))
 			return (FALSE);
 		if (i < seg_array_len - 1) {
@@ -980,6 +1078,7 @@
 	first = ncl = clist_alloc();
 
 	for (i = 0; i < num_wclist; i++) {
+
 		if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr))
 			goto err_out;
 		if (!xdr_uint32(xdrs, &ncl->c_len))
@@ -993,6 +1092,9 @@
 			ncl->c_len = MAX_SVC_XFER_SIZE;
 		}
 
+		DTRACE_PROBE1(krpc__i__xdr_decode_wlist_svc_len,
+		    uint_t, ncl->c_len);
+
 		wcl_length += ncl->c_len;
 
 		if (i < num_wclist - 1) {
@@ -1060,6 +1162,12 @@
 	first = ncl = clist_alloc();
 
 	for (i = 0; i < num_wclist; i++) {
+
+		if (i > 0) {
+			ncl->c_next = clist_alloc();
+			ncl = ncl->c_next;
+		}
+
 		if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr))
 			goto err_out;
 		if (!xdr_uint32(xdrs, &ncl->c_len))
@@ -1077,10 +1185,9 @@
 			DTRACE_PROBE(
 			    krpc__e__xdrrdma__replywchunk__invalid_segaddr);
 
-		if (i > 0) {
-			ncl->c_next = clist_alloc();
-			ncl = ncl->c_next;
-		}
+		DTRACE_PROBE1(krpc__i__xdr_decode_reply_wchunk_c_len,
+		    uint32_t, ncl->c_len);
+
 	}
 	*clist = first;
 	return (TRUE);
@@ -1112,6 +1219,10 @@
 			length = cl_longreply->c_len;
 			offset = (uint64) cl_longreply->u.c_daddr;
 
+			DTRACE_PROBE1(
+			    krpc__i__xdr_encode_reply_wchunk_c_len,
+			    uint32_t, length);
+
 			if (!xdr_uint32(xdrs,
 			    &cl_longreply->c_dmemhandle.mrc_rmr))
 				return (FALSE);
@@ -1129,7 +1240,7 @@
 	return (TRUE);
 }
 bool_t
-xdrrdma_read_from_client(struct clist **rlist, CONN **conn, uint_t count)
+xdrrdma_read_from_client(struct clist *rlist, CONN **conn, uint_t count)
 {
 	struct clist	*rdclist;
 	struct clist	cl;
@@ -1137,36 +1248,54 @@
 	uint32_t	status;
 	bool_t		retval = TRUE;
 
-	(*rlist)->rb_longbuf.type = RDMA_LONG_BUFFER;
-	(*rlist)->rb_longbuf.len =
+	rlist->rb_longbuf.type = RDMA_LONG_BUFFER;
+	rlist->rb_longbuf.len =
 	    count > RCL_BUF_LEN ? count : RCL_BUF_LEN;
 
-	if (rdma_buf_alloc(*conn, &(*rlist)->rb_longbuf)) {
+	if (rdma_buf_alloc(*conn, &rlist->rb_longbuf)) {
 		return (FALSE);
 	}
 
-	for (rdclist = *rlist;
+	/*
+	 * The entire buffer is registered with the first chunk.
+	 * Later chunks will use the same registered memory handle.
+	 */
+
+	cl = *rlist;
+	cl.c_next = NULL;
+	if (clist_register(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) {
+		rdma_buf_free(*conn, &rlist->rb_longbuf);
+		DTRACE_PROBE(
+		    krpc__e__xdrrdma__readfromclient__clist__reg);
+		return (FALSE);
+	}
+
+	rlist->c_regtype = CLIST_REG_DST;
+	rlist->c_dmemhandle = cl.c_dmemhandle;
+	rlist->c_dsynchandle = cl.c_dsynchandle;
+
+	for (rdclist = rlist;
 	    rdclist != NULL; rdclist = rdclist->c_next) {
 		total_len += rdclist->c_len;
 #if (defined(OBJ32)||defined(DEBUG32))
 		rdclist->u.c_daddr3 =
-		    (caddr_t)((char *)(*rlist)->rb_longbuf.addr +
+		    (caddr_t)((char *)rlist->rb_longbuf.addr +
 		    (uint32) rdclist->u.c_daddr3);
 #else
 		rdclist->u.c_daddr3 =
-		    (caddr_t)((char *)(*rlist)->rb_longbuf.addr +
+		    (caddr_t)((char *)rlist->rb_longbuf.addr +
 		    (uint64) rdclist->u.c_daddr);
 
 #endif
 		cl = (*rdclist);
 		cl.c_next = NULL;
 
-		if (clist_register(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) {
-			rdma_buf_free(*conn, &(*rlist)->rb_longbuf);
-			DTRACE_PROBE(
-			    krpc__e__xdrrdma__readfromclient__clist__reg);
-			return (FALSE);
-		}
+		/*
+		 * Use the same memory handle for all the chunks
+		 */
+		cl.c_dmemhandle = rlist->c_dmemhandle;
+		cl.c_dsynchandle = rlist->c_dsynchandle;
+
 
 		DTRACE_PROBE1(krpc__i__xdrrdma__readfromclient__buflen,
 		    int, rdclist->c_len);
@@ -1182,15 +1311,15 @@
 		if (status != RDMA_SUCCESS) {
 			DTRACE_PROBE(
 			    krpc__e__xdrrdma__readfromclient__readfailed);
-			rdma_buf_free(*conn, &(*rlist)->rb_longbuf);
+			rdma_buf_free(*conn, &rlist->rb_longbuf);
 			return (FALSE);
 		}
 	}
 
-	cl = (*(*rlist));
+	cl = (*rlist);
 	cl.c_next = NULL;
 	cl.c_len = total_len;
-	if (clist_syncmem(*conn, &cl, 0) != RDMA_SUCCESS) {
+	if (clist_syncmem(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) {
 		retval = FALSE;
 	}
 	return (retval);
@@ -1205,30 +1334,234 @@
 }
 
 bool_t
-xdrrdma_send_read_data(XDR *xdrs, struct clist *wcl)
+xdrrdma_send_read_data(XDR *xdrs, uint_t data_len, struct clist *wcl)
 {
 	int status;
 	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
 	struct xdr_ops *xops = xdrrdma_xops();
+	struct clist *tcl, *wrcl, *cl;
+	struct clist fcl;
+	int rndup_present, rnduplen;
+
+	rndup_present = 0;
+	wrcl = NULL;
 
 	/* caller is doing a sizeof */
 	if (xdrs->x_ops != &xdrrdma_ops || xdrs->x_ops == xops)
 		return (TRUE);
 
-	status = clist_register(xdrp->xp_conn, wcl, CLIST_REG_SOURCE);
+	/* copy of the first chunk */
+	fcl = *wcl;
+	fcl.c_next = NULL;
+
+	/*
+	 * The entire buffer is registered with the first chunk.
+	 * Later chunks will use the same registered memory handle.
+	 */
+
+	status = clist_register(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE);
 	if (status != RDMA_SUCCESS) {
 		return (FALSE);
 	}
 
-	status = clist_syncmem(xdrp->xp_conn, wcl, CLIST_REG_SOURCE);
+	wcl->c_regtype = CLIST_REG_SOURCE;
+	wcl->c_smemhandle = fcl.c_smemhandle;
+	wcl->c_ssynchandle = fcl.c_ssynchandle;
+
+	/*
+	 * Only transfer the read data ignoring any trailing
+	 * roundup chunks. A bit of work, but it saves an
+	 * unnecessary extra RDMA_WRITE containing only
+	 * roundup bytes.
+	 */
+
+	rnduplen = clist_len(wcl) - data_len;
+
+	if (rnduplen) {
+
+		tcl = wcl->c_next;
+
+		/*
+		 * Check if there is a trailing roundup chunk
+		 */
+		while (tcl) {
+			if ((tcl->c_next == NULL) && (tcl->c_len == rnduplen)) {
+				rndup_present = 1;
+				break;
+			}
+			tcl = tcl->c_next;
+		}
+
+		/*
+		 * Make a copy chunk list skipping the last chunk
+		 */
+		if (rndup_present) {
+			cl = wcl;
+			tcl = NULL;
+			while (cl) {
+				if (tcl == NULL) {
+					tcl = clist_alloc();
+					wrcl = tcl;
+				} else {
+					tcl->c_next = clist_alloc();
+					tcl = tcl->c_next;
+				}
+
+				*tcl = *cl;
+				cl = cl->c_next;
+				/* last chunk */
+				if (cl->c_next == NULL)
+					break;
+			}
+			tcl->c_next = NULL;
+		}
+	}
+
+	if (wrcl == NULL) {
+		/* No roundup chunks */
+		wrcl = wcl;
+	}
+
+	/*
+	 * Set the registered memory handles for the
+	 * rest of the chunks same as the first chunk.
+	 */
+	tcl = wrcl->c_next;
+	while (tcl) {
+		tcl->c_smemhandle = fcl.c_smemhandle;
+		tcl->c_ssynchandle = fcl.c_ssynchandle;
+		tcl = tcl->c_next;
+	}
+
+	/*
+	 * Sync the total len beginning from the first chunk.
+	 */
+	fcl.c_len = clist_len(wrcl);
+	status = clist_syncmem(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE);
 	if (status != RDMA_SUCCESS) {
 		return (FALSE);
 	}
 
-	status = RDMA_WRITE(xdrp->xp_conn, wcl, WAIT);
+	status = RDMA_WRITE(xdrp->xp_conn, wrcl, WAIT);
+
+	if (rndup_present)
+		clist_free(wrcl);
+
 	if (status != RDMA_SUCCESS) {
 		return (FALSE);
 	}
 
 	return (TRUE);
 }
+
+
+/*
+ * Reads one chunk at a time
+ */
+
+static bool_t
+xdrrdma_read_a_chunk(XDR *xdrs, CONN **conn)
+{
+	int status;
+	int32_t len = 0;
+	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
+	struct clist *cle = *(xdrp->xp_rcl_next);
+	struct clist *rclp = xdrp->xp_rcl;
+	struct clist *clp;
+
+	/*
+	 * len is used later to decide xdr offset in
+	 * the chunk factoring any 4-byte XDR alignment
+	 * (See read chunk example top of this file)
+	 */
+	while (rclp != cle) {
+		len += rclp->c_len;
+		rclp = rclp->c_next;
+	}
+
+	len = RNDUP(len) - len;
+
+	ASSERT(xdrs->x_handy <= 0);
+
+	/*
+	 * If this is the first chunk to contain the RPC
+	 * message set xp_off to the xdr offset of the
+	 * inline message.
+	 */
+	if (xdrp->xp_off == 0)
+		xdrp->xp_off = (xdrp->xp_offp - xdrs->x_base);
+
+	if (cle == NULL || (cle->c_xdroff != xdrp->xp_off))
+		return (FALSE);
+
+	/*
+	 * Make a copy of the chunk to read from client.
+	 * Chunks are read on demand, so read only one
+	 * for now.
+	 */
+
+	rclp = clist_alloc();
+	*rclp = *cle;
+	rclp->c_next = NULL;
+
+	xdrp->xp_rcl_next = &cle->c_next;
+
+	/*
+	 * If there is a roundup present, then skip those
+	 * bytes when reading.
+	 */
+	if (len) {
+		rclp->w.c_saddr =
+		    (uint64)(uintptr_t)rclp->w.c_saddr + len;
+			rclp->c_len = rclp->c_len - len;
+	}
+
+	status = xdrrdma_read_from_client(rclp, conn, rclp->c_len);
+
+	if (status == FALSE) {
+		clist_free(rclp);
+		return (status);
+	}
+
+	xdrp->xp_offp = rclp->rb_longbuf.addr;
+	xdrs->x_base = xdrp->xp_offp;
+	xdrs->x_handy = rclp->c_len;
+
+	/*
+	 * This copy of read chunks containing the XDR
+	 * message is freed later in xdrrdma_destroy()
+	 */
+
+	if (xdrp->xp_rcl_xdr) {
+		/* Add the chunk to end of the list */
+		clp = xdrp->xp_rcl_xdr;
+		while (clp->c_next != NULL)
+			clp = clp->c_next;
+		clp->c_next = rclp;
+	} else {
+		xdrp->xp_rcl_xdr = rclp;
+	}
+	return (TRUE);
+}
+
+static void
+xdrrdma_free_xdr_chunks(CONN *conn, struct clist *xdr_rcl)
+{
+	struct clist *cl;
+
+	(void) clist_deregister(conn, xdr_rcl);
+
+	/*
+	 * Read chunks containing parts XDR message are
+	 * special: in case of multiple chunks each has
+	 * its own buffer.
+	 */
+
+	cl = xdr_rcl;
+	while (cl) {
+		rdma_buf_free(conn, &cl->rb_longbuf);
+		cl = cl->c_next;
+	}
+
+	clist_free(xdr_rcl);
+}