--- a/usr/src/uts/common/rpc/xdr_rdma.c Fri Apr 10 21:47:04 2009 -0700
+++ b/usr/src/uts/common/rpc/xdr_rdma.c Fri Apr 10 22:57:35 2009 -0700
@@ -19,7 +19,7 @@
* CDDL HEADER END
*/
/*
- * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
+ * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
@@ -64,6 +64,8 @@
static rpc_inline_t *xdrrdma_inline(XDR *, int);
void xdrrdma_destroy(XDR *);
static bool_t xdrrdma_control(XDR *, int, void *);
+static bool_t xdrrdma_read_a_chunk(XDR *, CONN **);
+static void xdrrdma_free_xdr_chunks(CONN *, struct clist *);
struct xdr_ops xdrrdmablk_ops = {
xdrrdma_getbytes,
@@ -94,6 +96,47 @@
* separately from the rest of the RPC message. xp_min_chunk = 0, is a
* special case for ENCODING, which means do not chunk the incoming stream of
* data.
+ *
+ * A read chunk can contain part of the RPC message in addition to the
+ * inline message. In such a case, (xp_offp - x_base) will not provide
+ * the correct xdr offset of the entire message. xp_off is used in such
+ * a case to denote the offset or current position in the overall message
+ * covering both the inline and the chunk. This is used only in the case
+ * of decoding and useful to compare read chunk 'c_xdroff' offsets.
+ *
+ * An example for a read chunk containing an XDR message:
+ * An NFSv4 compound as following:
+ *
+ * PUTFH
+ * WRITE [4109 bytes]
+ * GETATTR
+ *
+ * Solaris Encoding is:
+ * -------------------
+ *
+ * <Inline message>: [PUTFH WRITE4args GETATTR]
+ * |
+ * v
+ * [RDMA_READ chunks]: [write data]
+ *
+ *
+ * Linux encoding is:
+ * -----------------
+ *
+ * <Inline message>: [PUTFH WRITE4args]
+ * |
+ * v
+ * [RDMA_READ chunks]: [Write data] [Write data2] [Getattr chunk]
+ * chunk1 chunk2 chunk3
+ *
+ * where the READ chunks are as:
+ *
+ * - chunk1 - 4k
+ * write data |
+ * - chunk2 - 13 bytes(4109 - 4k)
+ * getattr op - chunk3 - 19 bytes
+ * (getattr op starts at byte 4 after 3 bytes of roundup)
+ *
*/
typedef struct {
@@ -101,8 +144,10 @@
int xp_min_chunk;
uint_t xp_flags; /* Controls setting for rdma xdr */
int xp_buf_size; /* size of xdr buffer */
- struct clist *xp_rcl; /* head of chunk list */
+ int xp_off; /* overall offset */
+ struct clist *xp_rcl; /* head of chunk list */
struct clist **xp_rcl_next; /* location to place/find next chunk */
+ struct clist *xp_rcl_xdr; /* copy of rcl containing RPC message */
struct clist *xp_wcl; /* head of write chunk list */
CONN *xp_conn; /* connection for chunk data xfer */
uint_t xp_reply_chunk_len;
@@ -118,7 +163,6 @@
{
xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);
struct clist *cle = *(xdrp->xp_rcl_next);
- struct clist *cls = *(xdrp->xp_rcl_next);
struct clist *rdclist = NULL, *prev = NULL;
bool_t retval = TRUE;
uint32_t cur_offset = 0;
@@ -174,8 +218,19 @@
*/
for (actual_segments = 0;
actual_segments < total_segments; actual_segments++) {
+
+ DTRACE_PROBE3(krpc__i__xdrrdma_getrdmablk, uint32_t, cle->c_len,
+ uint32_t, total_len, uint32_t, cle->c_xdroff);
+
if (total_len <= 0)
break;
+
+ /*
+ * not the first time in the loop
+ */
+ if (actual_segments > 0)
+ cle = cle->c_next;
+
cle->u.c_daddr = (uint64) cur_offset;
alen = 0;
if (cle->c_len > total_len) {
@@ -211,14 +266,17 @@
prev = rdclist;
}
- cle = cle->c_next;
}
out:
if (prev != NULL)
prev->c_next = NULL;
- cle = cls;
+ /*
+ * Adjust the chunk length, if we read only a part of
+ * a chunk.
+ */
+
if (alen) {
cle->w.c_saddr =
(uint64)(uintptr_t)cle->w.c_saddr + cle->c_len;
@@ -284,8 +342,7 @@
if (xdrp->xp_wcl) {
if (xdrp->xp_flags & XDR_RDMA_WLIST_REG) {
- (void) clist_deregister(xdrp->xp_conn,
- xdrp->xp_wcl, CLIST_REG_DST);
+ (void) clist_deregister(xdrp->xp_conn, xdrp->xp_wcl);
rdma_buf_free(xdrp->xp_conn,
&xdrp->xp_wcl->rb_longbuf);
}
@@ -294,14 +351,16 @@
if (xdrp->xp_rcl) {
if (xdrp->xp_flags & XDR_RDMA_RLIST_REG) {
- (void) clist_deregister(xdrp->xp_conn,
- xdrp->xp_rcl, CLIST_REG_SOURCE);
+ (void) clist_deregister(xdrp->xp_conn, xdrp->xp_rcl);
rdma_buf_free(xdrp->xp_conn,
&xdrp->xp_rcl->rb_longbuf);
}
clist_free(xdrp->xp_rcl);
}
+ if (xdrp->xp_rcl_xdr)
+ xdrrdma_free_xdr_chunks(xdrp->xp_conn, xdrp->xp_rcl_xdr);
+
(void) kmem_free(xdrs->x_private, sizeof (xrdma_private_t));
xdrs->x_private = NULL;
}
@@ -310,14 +369,32 @@
xdrrdma_getint32(XDR *xdrs, int32_t *int32p)
{
xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);
+ int chunked = 0;
- if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0)
- return (FALSE);
+ if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0) {
+ /*
+ * check if rest of the rpc message is in a chunk
+ */
+ if (!xdrrdma_read_a_chunk(xdrs, &xdrp->xp_conn)) {
+ return (FALSE);
+ }
+ chunked = 1;
+ }
/* LINTED pointer alignment */
*int32p = (int32_t)ntohl((uint32_t)(*((int32_t *)(xdrp->xp_offp))));
+
+ DTRACE_PROBE1(krpc__i__xdrrdma_getint32, int32_t, *int32p);
+
xdrp->xp_offp += sizeof (int32_t);
+ if (chunked)
+ xdrs->x_handy -= (int)sizeof (int32_t);
+
+ if (xdrp->xp_off != 0) {
+ xdrp->xp_off += sizeof (int32_t);
+ }
+
return (TRUE);
}
@@ -355,6 +432,7 @@
uint32_t actual_segments = 0;
uint32_t status;
uint32_t alen;
+ uint32_t xpoff;
while (cle) {
total_segments++;
@@ -362,14 +440,20 @@
}
cle = *(xdrp->xp_rcl_next);
+
+ if (xdrp->xp_off) {
+ xpoff = xdrp->xp_off;
+ } else {
+ xpoff = (xdrp->xp_offp - xdrs->x_base);
+ }
+
/*
* If there was a chunk at the current offset, then setup a read
* chunk list which records the destination address and length
* and will RDMA READ the data in later.
*/
- if (cle != NULL &&
- cle->c_xdroff == (xdrp->xp_offp - xdrs->x_base)) {
+ if (cle != NULL && cle->c_xdroff == xpoff) {
for (actual_segments = 0;
actual_segments < total_segments; actual_segments++) {
if (total_len <= 0)
@@ -443,7 +527,8 @@
cl = *cls;
cl.c_next = NULL;
cl.c_len = cur_offset;
- if (clist_syncmem(xdrp->xp_conn, &cl, 0) != RDMA_SUCCESS) {
+ if (clist_syncmem(
+ xdrp->xp_conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) {
retval = FALSE;
}
out:
@@ -454,7 +539,7 @@
cl = *cle;
cl.c_next = NULL;
cl.c_len = cur_offset;
- (void) clist_deregister(xdrp->xp_conn, &cl, CLIST_REG_DST);
+ (void) clist_deregister(xdrp->xp_conn, &cl);
if (alen) {
cle->w.c_saddr =
(uint64)(uintptr_t)cle->w.c_saddr + cle->c_len;
@@ -462,12 +547,17 @@
}
return (retval);
}
+
if ((xdrs->x_handy -= len) < 0)
return (FALSE);
bcopy(xdrp->xp_offp, addr, len);
+
xdrp->xp_offp += len;
+ if (xdrp->xp_off != 0)
+ xdrp->xp_off += len;
+
return (TRUE);
}
@@ -871,6 +961,9 @@
if (!xdr_uint32(xdrs, &num_segment))
return (FALSE);
for (i = 0; i < num_segment; i++) {
+
+ DTRACE_PROBE1(krpc__i__xdr_encode_wlist_len, uint_t, w->c_len);
+
if (!xdr_uint32(xdrs, &w->c_dmemhandle.mrc_rmr))
return (FALSE);
@@ -922,10 +1015,15 @@
tmp = *w = clist_alloc();
for (i = 0; i < seg_array_len; i++) {
+
if (!xdr_uint32(xdrs, &tmp->c_dmemhandle.mrc_rmr))
return (FALSE);
if (!xdr_uint32(xdrs, &tmp->c_len))
return (FALSE);
+
+ DTRACE_PROBE1(krpc__i__xdr_decode_wlist_len,
+ uint_t, tmp->c_len);
+
if (!xdr_uint64(xdrs, &tmp->u.c_daddr))
return (FALSE);
if (i < seg_array_len - 1) {
@@ -980,6 +1078,7 @@
first = ncl = clist_alloc();
for (i = 0; i < num_wclist; i++) {
+
if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr))
goto err_out;
if (!xdr_uint32(xdrs, &ncl->c_len))
@@ -993,6 +1092,9 @@
ncl->c_len = MAX_SVC_XFER_SIZE;
}
+ DTRACE_PROBE1(krpc__i__xdr_decode_wlist_svc_len,
+ uint_t, ncl->c_len);
+
wcl_length += ncl->c_len;
if (i < num_wclist - 1) {
@@ -1060,6 +1162,12 @@
first = ncl = clist_alloc();
for (i = 0; i < num_wclist; i++) {
+
+ if (i > 0) {
+ ncl->c_next = clist_alloc();
+ ncl = ncl->c_next;
+ }
+
if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr))
goto err_out;
if (!xdr_uint32(xdrs, &ncl->c_len))
@@ -1077,10 +1185,9 @@
DTRACE_PROBE(
krpc__e__xdrrdma__replywchunk__invalid_segaddr);
- if (i > 0) {
- ncl->c_next = clist_alloc();
- ncl = ncl->c_next;
- }
+ DTRACE_PROBE1(krpc__i__xdr_decode_reply_wchunk_c_len,
+ uint32_t, ncl->c_len);
+
}
*clist = first;
return (TRUE);
@@ -1112,6 +1219,10 @@
length = cl_longreply->c_len;
offset = (uint64) cl_longreply->u.c_daddr;
+ DTRACE_PROBE1(
+ krpc__i__xdr_encode_reply_wchunk_c_len,
+ uint32_t, length);
+
if (!xdr_uint32(xdrs,
&cl_longreply->c_dmemhandle.mrc_rmr))
return (FALSE);
@@ -1129,7 +1240,7 @@
return (TRUE);
}
bool_t
-xdrrdma_read_from_client(struct clist **rlist, CONN **conn, uint_t count)
+xdrrdma_read_from_client(struct clist *rlist, CONN **conn, uint_t count)
{
struct clist *rdclist;
struct clist cl;
@@ -1137,36 +1248,54 @@
uint32_t status;
bool_t retval = TRUE;
- (*rlist)->rb_longbuf.type = RDMA_LONG_BUFFER;
- (*rlist)->rb_longbuf.len =
+ rlist->rb_longbuf.type = RDMA_LONG_BUFFER;
+ rlist->rb_longbuf.len =
count > RCL_BUF_LEN ? count : RCL_BUF_LEN;
- if (rdma_buf_alloc(*conn, &(*rlist)->rb_longbuf)) {
+ if (rdma_buf_alloc(*conn, &rlist->rb_longbuf)) {
return (FALSE);
}
- for (rdclist = *rlist;
+ /*
+ * The entire buffer is registered with the first chunk.
+ * Later chunks will use the same registered memory handle.
+ */
+
+ cl = *rlist;
+ cl.c_next = NULL;
+ if (clist_register(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) {
+ rdma_buf_free(*conn, &rlist->rb_longbuf);
+ DTRACE_PROBE(
+ krpc__e__xdrrdma__readfromclient__clist__reg);
+ return (FALSE);
+ }
+
+ rlist->c_regtype = CLIST_REG_DST;
+ rlist->c_dmemhandle = cl.c_dmemhandle;
+ rlist->c_dsynchandle = cl.c_dsynchandle;
+
+ for (rdclist = rlist;
rdclist != NULL; rdclist = rdclist->c_next) {
total_len += rdclist->c_len;
#if (defined(OBJ32)||defined(DEBUG32))
rdclist->u.c_daddr3 =
- (caddr_t)((char *)(*rlist)->rb_longbuf.addr +
+ (caddr_t)((char *)rlist->rb_longbuf.addr +
(uint32) rdclist->u.c_daddr3);
#else
rdclist->u.c_daddr3 =
- (caddr_t)((char *)(*rlist)->rb_longbuf.addr +
+ (caddr_t)((char *)rlist->rb_longbuf.addr +
(uint64) rdclist->u.c_daddr);
#endif
cl = (*rdclist);
cl.c_next = NULL;
- if (clist_register(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) {
- rdma_buf_free(*conn, &(*rlist)->rb_longbuf);
- DTRACE_PROBE(
- krpc__e__xdrrdma__readfromclient__clist__reg);
- return (FALSE);
- }
+ /*
+ * Use the same memory handle for all the chunks
+ */
+ cl.c_dmemhandle = rlist->c_dmemhandle;
+ cl.c_dsynchandle = rlist->c_dsynchandle;
+
DTRACE_PROBE1(krpc__i__xdrrdma__readfromclient__buflen,
int, rdclist->c_len);
@@ -1182,15 +1311,15 @@
if (status != RDMA_SUCCESS) {
DTRACE_PROBE(
krpc__e__xdrrdma__readfromclient__readfailed);
- rdma_buf_free(*conn, &(*rlist)->rb_longbuf);
+ rdma_buf_free(*conn, &rlist->rb_longbuf);
return (FALSE);
}
}
- cl = (*(*rlist));
+ cl = (*rlist);
cl.c_next = NULL;
cl.c_len = total_len;
- if (clist_syncmem(*conn, &cl, 0) != RDMA_SUCCESS) {
+ if (clist_syncmem(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) {
retval = FALSE;
}
return (retval);
@@ -1205,30 +1334,234 @@
}
bool_t
-xdrrdma_send_read_data(XDR *xdrs, struct clist *wcl)
+xdrrdma_send_read_data(XDR *xdrs, uint_t data_len, struct clist *wcl)
{
int status;
xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);
struct xdr_ops *xops = xdrrdma_xops();
+ struct clist *tcl, *wrcl, *cl;
+ struct clist fcl;
+ int rndup_present, rnduplen;
+
+ rndup_present = 0;
+ wrcl = NULL;
/* caller is doing a sizeof */
if (xdrs->x_ops != &xdrrdma_ops || xdrs->x_ops == xops)
return (TRUE);
- status = clist_register(xdrp->xp_conn, wcl, CLIST_REG_SOURCE);
+ /* copy of the first chunk */
+ fcl = *wcl;
+ fcl.c_next = NULL;
+
+ /*
+ * The entire buffer is registered with the first chunk.
+ * Later chunks will use the same registered memory handle.
+ */
+
+ status = clist_register(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE);
if (status != RDMA_SUCCESS) {
return (FALSE);
}
- status = clist_syncmem(xdrp->xp_conn, wcl, CLIST_REG_SOURCE);
+ wcl->c_regtype = CLIST_REG_SOURCE;
+ wcl->c_smemhandle = fcl.c_smemhandle;
+ wcl->c_ssynchandle = fcl.c_ssynchandle;
+
+ /*
+ * Only transfer the read data ignoring any trailing
+ * roundup chunks. A bit of work, but it saves an
+ * unnecessary extra RDMA_WRITE containing only
+ * roundup bytes.
+ */
+
+ rnduplen = clist_len(wcl) - data_len;
+
+ if (rnduplen) {
+
+ tcl = wcl->c_next;
+
+ /*
+ * Check if there is a trailing roundup chunk
+ */
+ while (tcl) {
+ if ((tcl->c_next == NULL) && (tcl->c_len == rnduplen)) {
+ rndup_present = 1;
+ break;
+ }
+ tcl = tcl->c_next;
+ }
+
+ /*
+ * Make a copy chunk list skipping the last chunk
+ */
+ if (rndup_present) {
+ cl = wcl;
+ tcl = NULL;
+ while (cl) {
+ if (tcl == NULL) {
+ tcl = clist_alloc();
+ wrcl = tcl;
+ } else {
+ tcl->c_next = clist_alloc();
+ tcl = tcl->c_next;
+ }
+
+ *tcl = *cl;
+ cl = cl->c_next;
+ /* last chunk */
+ if (cl->c_next == NULL)
+ break;
+ }
+ tcl->c_next = NULL;
+ }
+ }
+
+ if (wrcl == NULL) {
+ /* No roundup chunks */
+ wrcl = wcl;
+ }
+
+ /*
+ * Set the registered memory handles for the
+ * rest of the chunks same as the first chunk.
+ */
+ tcl = wrcl->c_next;
+ while (tcl) {
+ tcl->c_smemhandle = fcl.c_smemhandle;
+ tcl->c_ssynchandle = fcl.c_ssynchandle;
+ tcl = tcl->c_next;
+ }
+
+ /*
+ * Sync the total len beginning from the first chunk.
+ */
+ fcl.c_len = clist_len(wrcl);
+ status = clist_syncmem(xdrp->xp_conn, &fcl, CLIST_REG_SOURCE);
if (status != RDMA_SUCCESS) {
return (FALSE);
}
- status = RDMA_WRITE(xdrp->xp_conn, wcl, WAIT);
+ status = RDMA_WRITE(xdrp->xp_conn, wrcl, WAIT);
+
+ if (rndup_present)
+ clist_free(wrcl);
+
if (status != RDMA_SUCCESS) {
return (FALSE);
}
return (TRUE);
}
+
+
+/*
+ * Reads one chunk at a time
+ */
+
+static bool_t
+xdrrdma_read_a_chunk(XDR *xdrs, CONN **conn)
+{
+ int status;
+ int32_t len = 0;
+ xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);
+ struct clist *cle = *(xdrp->xp_rcl_next);
+ struct clist *rclp = xdrp->xp_rcl;
+ struct clist *clp;
+
+ /*
+ * len is used later to decide xdr offset in
+ * the chunk factoring any 4-byte XDR alignment
+ * (See read chunk example top of this file)
+ */
+ while (rclp != cle) {
+ len += rclp->c_len;
+ rclp = rclp->c_next;
+ }
+
+ len = RNDUP(len) - len;
+
+ ASSERT(xdrs->x_handy <= 0);
+
+ /*
+ * If this is the first chunk to contain the RPC
+ * message set xp_off to the xdr offset of the
+ * inline message.
+ */
+ if (xdrp->xp_off == 0)
+ xdrp->xp_off = (xdrp->xp_offp - xdrs->x_base);
+
+ if (cle == NULL || (cle->c_xdroff != xdrp->xp_off))
+ return (FALSE);
+
+ /*
+ * Make a copy of the chunk to read from client.
+ * Chunks are read on demand, so read only one
+ * for now.
+ */
+
+ rclp = clist_alloc();
+ *rclp = *cle;
+ rclp->c_next = NULL;
+
+ xdrp->xp_rcl_next = &cle->c_next;
+
+ /*
+ * If there is a roundup present, then skip those
+ * bytes when reading.
+ */
+ if (len) {
+ rclp->w.c_saddr =
+ (uint64)(uintptr_t)rclp->w.c_saddr + len;
+ rclp->c_len = rclp->c_len - len;
+ }
+
+ status = xdrrdma_read_from_client(rclp, conn, rclp->c_len);
+
+ if (status == FALSE) {
+ clist_free(rclp);
+ return (status);
+ }
+
+ xdrp->xp_offp = rclp->rb_longbuf.addr;
+ xdrs->x_base = xdrp->xp_offp;
+ xdrs->x_handy = rclp->c_len;
+
+ /*
+ * This copy of read chunks containing the XDR
+ * message is freed later in xdrrdma_destroy()
+ */
+
+ if (xdrp->xp_rcl_xdr) {
+ /* Add the chunk to end of the list */
+ clp = xdrp->xp_rcl_xdr;
+ while (clp->c_next != NULL)
+ clp = clp->c_next;
+ clp->c_next = rclp;
+ } else {
+ xdrp->xp_rcl_xdr = rclp;
+ }
+ return (TRUE);
+}
+
+static void
+xdrrdma_free_xdr_chunks(CONN *conn, struct clist *xdr_rcl)
+{
+ struct clist *cl;
+
+ (void) clist_deregister(conn, xdr_rcl);
+
+ /*
+ * Read chunks containing parts XDR message are
+ * special: in case of multiple chunks each has
+ * its own buffer.
+ */
+
+ cl = xdr_rcl;
+ while (cl) {
+ rdma_buf_free(conn, &cl->rb_longbuf);
+ cl = cl->c_next;
+ }
+
+ clist_free(xdr_rcl);
+}