summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/clnt.c15
-rw-r--r--net/sunrpc/rpcb_clnt.c8
-rw-r--r--net/sunrpc/sched.c7
-rw-r--r--net/sunrpc/stats.c26
-rw-r--r--net/sunrpc/xprt.c38
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c108
-rw-r--r--net/sunrpc/xprtrdma/transport.c182
-rw-r--r--net/sunrpc/xprtrdma/verbs.c411
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h112
-rw-r--r--net/sunrpc/xprtsock.c238
10 files changed, 628 insertions, 517 deletions
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 05da12a33945..612aa73bbc60 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -286,10 +286,8 @@ static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
{
- clnt->cl_nodelen = strlen(nodename);
- if (clnt->cl_nodelen > UNX_MAXNODENAME)
- clnt->cl_nodelen = UNX_MAXNODENAME;
- memcpy(clnt->cl_nodename, nodename, clnt->cl_nodelen);
+ clnt->cl_nodelen = strlcpy(clnt->cl_nodename,
+ nodename, sizeof(clnt->cl_nodename));
}
static int rpc_client_register(struct rpc_clnt *clnt,
@@ -365,6 +363,7 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
const struct rpc_version *version;
struct rpc_clnt *clnt = NULL;
const struct rpc_timeout *timeout;
+ const char *nodename = args->nodename;
int err;
/* sanity check the name before trying to print it */
@@ -420,8 +419,10 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
atomic_set(&clnt->cl_count, 1);
+ if (nodename == NULL)
+ nodename = utsname()->nodename;
/* save the nodename */
- rpc_clnt_set_nodename(clnt, utsname()->nodename);
+ rpc_clnt_set_nodename(clnt, nodename);
err = rpc_client_register(clnt, args->authflavor, args->client_name);
if (err)
@@ -576,6 +577,7 @@ static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
if (xprt == NULL)
goto out_err;
args->servername = xprt->servername;
+ args->nodename = clnt->cl_nodename;
new = rpc_new_client(args, xprt, clnt);
if (IS_ERR(new)) {
@@ -1824,6 +1826,7 @@ call_connect_status(struct rpc_task *task)
case -ECONNABORTED:
case -ENETUNREACH:
case -EHOSTUNREACH:
+ case -EADDRINUSE:
case -ENOBUFS:
case -EPIPE:
if (RPC_IS_SOFTCONN(task))
@@ -1932,6 +1935,7 @@ call_transmit_status(struct rpc_task *task)
}
case -ECONNRESET:
case -ECONNABORTED:
+ case -EADDRINUSE:
case -ENOTCONN:
case -ENOBUFS:
case -EPIPE:
@@ -2051,6 +2055,7 @@ call_status(struct rpc_task *task)
case -ECONNRESET:
case -ECONNABORTED:
rpc_force_rebind(clnt);
+ case -EADDRINUSE:
case -ENOBUFS:
rpc_delay(task, 3*HZ);
case -EPIPE:
diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c
index 05202012bcfc..cf5770d8f49a 100644
--- a/net/sunrpc/rpcb_clnt.c
+++ b/net/sunrpc/rpcb_clnt.c
@@ -355,7 +355,8 @@ out:
return result;
}
-static struct rpc_clnt *rpcb_create(struct net *net, const char *hostname,
+static struct rpc_clnt *rpcb_create(struct net *net, const char *nodename,
+ const char *hostname,
struct sockaddr *srvaddr, size_t salen,
int proto, u32 version)
{
@@ -365,6 +366,7 @@ static struct rpc_clnt *rpcb_create(struct net *net, const char *hostname,
.address = srvaddr,
.addrsize = salen,
.servername = hostname,
+ .nodename = nodename,
.program = &rpcb_program,
.version = version,
.authflavor = RPC_AUTH_UNIX,
@@ -740,7 +742,9 @@ void rpcb_getport_async(struct rpc_task *task)
dprintk("RPC: %5u %s: trying rpcbind version %u\n",
task->tk_pid, __func__, bind_version);
- rpcb_clnt = rpcb_create(xprt->xprt_net, xprt->servername, sap, salen,
+ rpcb_clnt = rpcb_create(xprt->xprt_net,
+ clnt->cl_nodename,
+ xprt->servername, sap, salen,
xprt->prot, bind_version);
if (IS_ERR(rpcb_clnt)) {
status = PTR_ERR(rpcb_clnt);
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index d20f2329eea3..b91fd9c597b4 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -844,10 +844,10 @@ static void rpc_async_schedule(struct work_struct *work)
void *rpc_malloc(struct rpc_task *task, size_t size)
{
struct rpc_buffer *buf;
- gfp_t gfp = GFP_NOWAIT | __GFP_NOWARN;
+ gfp_t gfp = GFP_NOIO | __GFP_NOWARN;
if (RPC_IS_SWAPPER(task))
- gfp |= __GFP_MEMALLOC;
+ gfp = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
size += sizeof(struct rpc_buffer);
if (size <= RPC_BUFFER_MAXSIZE)
@@ -1069,7 +1069,8 @@ static int rpciod_start(void)
* Create the rpciod thread and wait for it to start.
*/
dprintk("RPC: creating workqueue rpciod\n");
- wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 1);
+ /* Note: highpri because network receive is latency sensitive */
+ wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0);
rpciod_workqueue = wq;
return rpciod_workqueue != NULL;
}
diff --git a/net/sunrpc/stats.c b/net/sunrpc/stats.c
index 9711a155bc50..2ecb994314c1 100644
--- a/net/sunrpc/stats.c
+++ b/net/sunrpc/stats.c
@@ -140,22 +140,20 @@ void rpc_free_iostats(struct rpc_iostats *stats)
EXPORT_SYMBOL_GPL(rpc_free_iostats);
/**
- * rpc_count_iostats - tally up per-task stats
+ * rpc_count_iostats_metrics - tally up per-task stats
* @task: completed rpc_task
- * @stats: array of stat structures
+ * @op_metrics: stat structure for OP that will accumulate stats from @task
*/
-void rpc_count_iostats(const struct rpc_task *task, struct rpc_iostats *stats)
+void rpc_count_iostats_metrics(const struct rpc_task *task,
+ struct rpc_iostats *op_metrics)
{
struct rpc_rqst *req = task->tk_rqstp;
- struct rpc_iostats *op_metrics;
ktime_t delta, now;
- if (!stats || !req)
+ if (!op_metrics || !req)
return;
now = ktime_get();
- op_metrics = &stats[task->tk_msg.rpc_proc->p_statidx];
-
spin_lock(&op_metrics->om_lock);
op_metrics->om_ops++;
@@ -175,6 +173,20 @@ void rpc_count_iostats(const struct rpc_task *task, struct rpc_iostats *stats)
spin_unlock(&op_metrics->om_lock);
}
+EXPORT_SYMBOL_GPL(rpc_count_iostats_metrics);
+
+/**
+ * rpc_count_iostats - tally up per-task stats
+ * @task: completed rpc_task
+ * @stats: array of stat structures
+ *
+ * Uses the statidx from @task
+ */
+void rpc_count_iostats(const struct rpc_task *task, struct rpc_iostats *stats)
+{
+ rpc_count_iostats_metrics(task,
+ &stats[task->tk_msg.rpc_proc->p_statidx]);
+}
EXPORT_SYMBOL_GPL(rpc_count_iostats);
static void _print_name(struct seq_file *seq, unsigned int op,
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index ebbefad21a37..e3015aede0d9 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -683,13 +683,43 @@ xprt_init_autodisconnect(unsigned long data)
if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
goto out_abort;
spin_unlock(&xprt->transport_lock);
- set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
queue_work(rpciod_workqueue, &xprt->task_cleanup);
return;
out_abort:
spin_unlock(&xprt->transport_lock);
}
+bool xprt_lock_connect(struct rpc_xprt *xprt,
+ struct rpc_task *task,
+ void *cookie)
+{
+ bool ret = false;
+
+ spin_lock_bh(&xprt->transport_lock);
+ if (!test_bit(XPRT_LOCKED, &xprt->state))
+ goto out;
+ if (xprt->snd_task != task)
+ goto out;
+ xprt->snd_task = cookie;
+ ret = true;
+out:
+ spin_unlock_bh(&xprt->transport_lock);
+ return ret;
+}
+
+void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
+{
+ spin_lock_bh(&xprt->transport_lock);
+ if (xprt->snd_task != cookie)
+ goto out;
+ if (!test_bit(XPRT_LOCKED, &xprt->state))
+ goto out;
+ xprt->snd_task =NULL;
+ xprt->ops->release_xprt(xprt, NULL);
+out:
+ spin_unlock_bh(&xprt->transport_lock);
+}
+
/**
* xprt_connect - schedule a transport connect operation
* @task: RPC task that is requesting the connect
@@ -712,9 +742,7 @@ void xprt_connect(struct rpc_task *task)
if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state))
xprt->ops->close(xprt);
- if (xprt_connected(xprt))
- xprt_release_write(xprt, task);
- else {
+ if (!xprt_connected(xprt)) {
task->tk_rqstp->rq_bytes_sent = 0;
task->tk_timeout = task->tk_rqstp->rq_timeout;
rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
@@ -726,6 +754,7 @@ void xprt_connect(struct rpc_task *task)
xprt->stat.connect_start = jiffies;
xprt->ops->connect(xprt, task);
}
+ xprt_release_write(xprt, task);
}
static void xprt_connect_status(struct rpc_task *task)
@@ -758,7 +787,6 @@ static void xprt_connect_status(struct rpc_task *task)
dprintk("RPC: %5u xprt_connect_status: error %d connecting to "
"server %s\n", task->tk_pid, -task->tk_status,
xprt->servername);
- xprt_release_write(xprt, task);
task->tk_status = -EIO;
}
}
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index df01d124936c..7e9acd9361c5 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -209,9 +209,11 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
if (cur_rchunk) { /* read */
cur_rchunk->rc_discrim = xdr_one;
/* all read chunks have the same "position" */
- cur_rchunk->rc_position = htonl(pos);
- cur_rchunk->rc_target.rs_handle = htonl(seg->mr_rkey);
- cur_rchunk->rc_target.rs_length = htonl(seg->mr_len);
+ cur_rchunk->rc_position = cpu_to_be32(pos);
+ cur_rchunk->rc_target.rs_handle =
+ cpu_to_be32(seg->mr_rkey);
+ cur_rchunk->rc_target.rs_length =
+ cpu_to_be32(seg->mr_len);
xdr_encode_hyper(
(__be32 *)&cur_rchunk->rc_target.rs_offset,
seg->mr_base);
@@ -222,8 +224,10 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
cur_rchunk++;
r_xprt->rx_stats.read_chunk_count++;
} else { /* write/reply */
- cur_wchunk->wc_target.rs_handle = htonl(seg->mr_rkey);
- cur_wchunk->wc_target.rs_length = htonl(seg->mr_len);
+ cur_wchunk->wc_target.rs_handle =
+ cpu_to_be32(seg->mr_rkey);
+ cur_wchunk->wc_target.rs_length =
+ cpu_to_be32(seg->mr_len);
xdr_encode_hyper(
(__be32 *)&cur_wchunk->wc_target.rs_offset,
seg->mr_base);
@@ -257,7 +261,7 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
*iptr++ = xdr_zero; /* encode a NULL reply chunk */
} else {
warray->wc_discrim = xdr_one;
- warray->wc_nchunks = htonl(nchunks);
+ warray->wc_nchunks = cpu_to_be32(nchunks);
iptr = (__be32 *) cur_wchunk;
if (type == rpcrdma_writech) {
*iptr++ = xdr_zero; /* finish the write chunk list */
@@ -290,7 +294,7 @@ ssize_t
rpcrdma_marshal_chunks(struct rpc_rqst *rqst, ssize_t result)
{
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
- struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)req->rl_base;
+ struct rpcrdma_msg *headerp = rdmab_to_msg(req->rl_rdmabuf);
if (req->rl_rtype != rpcrdma_noch)
result = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
@@ -402,13 +406,12 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
base = rqst->rq_svec[0].iov_base;
rpclen = rqst->rq_svec[0].iov_len;
- /* build RDMA header in private area at front */
- headerp = (struct rpcrdma_msg *) req->rl_base;
- /* don't htonl XID, it's already done in request */
+ headerp = rdmab_to_msg(req->rl_rdmabuf);
+ /* don't byte-swap XID, it's already done in request */
headerp->rm_xid = rqst->rq_xid;
- headerp->rm_vers = xdr_one;
- headerp->rm_credit = htonl(r_xprt->rx_buf.rb_max_requests);
- headerp->rm_type = htonl(RDMA_MSG);
+ headerp->rm_vers = rpcrdma_version;
+ headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
+ headerp->rm_type = rdma_msg;
/*
* Chunks needed for results?
@@ -468,7 +471,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
return -EIO;
}
- hdrlen = 28; /*sizeof *headerp;*/
+ hdrlen = RPCRDMA_HDRLEN_MIN;
padlen = 0;
/*
@@ -482,11 +485,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
RPCRDMA_INLINE_PAD_VALUE(rqst));
if (padlen) {
- headerp->rm_type = htonl(RDMA_MSGP);
+ headerp->rm_type = rdma_msgp;
headerp->rm_body.rm_padded.rm_align =
- htonl(RPCRDMA_INLINE_PAD_VALUE(rqst));
+ cpu_to_be32(RPCRDMA_INLINE_PAD_VALUE(rqst));
headerp->rm_body.rm_padded.rm_thresh =
- htonl(RPCRDMA_INLINE_PAD_THRESH);
+ cpu_to_be32(RPCRDMA_INLINE_PAD_THRESH);
headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero;
headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
@@ -524,7 +527,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd"
" headerp 0x%p base 0x%p lkey 0x%x\n",
__func__, transfertypes[req->rl_wtype], hdrlen, rpclen, padlen,
- headerp, base, req->rl_iov.lkey);
+ headerp, base, rdmab_lkey(req->rl_rdmabuf));
/*
* initialize send_iov's - normally only two: rdma chunk header and
@@ -533,26 +536,26 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
* header and any write data. In all non-rdma cases, any following
* data has been copied into the RPC header buffer.
*/
- req->rl_send_iov[0].addr = req->rl_iov.addr;
+ req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
req->rl_send_iov[0].length = hdrlen;
- req->rl_send_iov[0].lkey = req->rl_iov.lkey;
+ req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
- req->rl_send_iov[1].addr = req->rl_iov.addr + (base - req->rl_base);
+ req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
req->rl_send_iov[1].length = rpclen;
- req->rl_send_iov[1].lkey = req->rl_iov.lkey;
+ req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
req->rl_niovs = 2;
if (padlen) {
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
- req->rl_send_iov[2].addr = ep->rep_pad.addr;
+ req->rl_send_iov[2].addr = rdmab_addr(ep->rep_padbuf);
req->rl_send_iov[2].length = padlen;
- req->rl_send_iov[2].lkey = ep->rep_pad.lkey;
+ req->rl_send_iov[2].lkey = rdmab_lkey(ep->rep_padbuf);
req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
- req->rl_send_iov[3].lkey = req->rl_iov.lkey;
+ req->rl_send_iov[3].lkey = rdmab_lkey(req->rl_sendbuf);
req->rl_niovs = 4;
}
@@ -569,8 +572,9 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b
{
unsigned int i, total_len;
struct rpcrdma_write_chunk *cur_wchunk;
+ char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf);
- i = ntohl(**iptrp); /* get array count */
+ i = be32_to_cpu(**iptrp);
if (i > max)
return -1;
cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
@@ -582,11 +586,11 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b
xdr_decode_hyper((__be32 *)&seg->rs_offset, &off);
dprintk("RPC: %s: chunk %d@0x%llx:0x%x\n",
__func__,
- ntohl(seg->rs_length),
+ be32_to_cpu(seg->rs_length),
(unsigned long long)off,
- ntohl(seg->rs_handle));
+ be32_to_cpu(seg->rs_handle));
}
- total_len += ntohl(seg->rs_length);
+ total_len += be32_to_cpu(seg->rs_length);
++cur_wchunk;
}
/* check and adjust for properly terminated write chunk */
@@ -596,7 +600,7 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b
return -1;
cur_wchunk = (struct rpcrdma_write_chunk *) w;
}
- if ((char *) cur_wchunk > rep->rr_base + rep->rr_len)
+ if ((char *)cur_wchunk > base + rep->rr_len)
return -1;
*iptrp = (__be32 *) cur_wchunk;
@@ -691,7 +695,9 @@ rpcrdma_connect_worker(struct work_struct *work)
{
struct rpcrdma_ep *ep =
container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
- struct rpc_xprt *xprt = ep->rep_xprt;
+ struct rpcrdma_xprt *r_xprt =
+ container_of(ep, struct rpcrdma_xprt, rx_ep);
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
spin_lock_bh(&xprt->transport_lock);
if (++xprt->connect_cookie == 0) /* maintain a reserved value */
@@ -732,7 +738,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
struct rpc_xprt *xprt = rep->rr_xprt;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
__be32 *iptr;
- int rdmalen, status;
+ int credits, rdmalen, status;
unsigned long cwnd;
/* Check status. If bad, signal disconnect and return rep to pool */
@@ -744,14 +750,14 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
}
return;
}
- if (rep->rr_len < 28) {
+ if (rep->rr_len < RPCRDMA_HDRLEN_MIN) {
dprintk("RPC: %s: short/invalid reply\n", __func__);
goto repost;
}
- headerp = (struct rpcrdma_msg *) rep->rr_base;
- if (headerp->rm_vers != xdr_one) {
+ headerp = rdmab_to_msg(rep->rr_rdmabuf);
+ if (headerp->rm_vers != rpcrdma_version) {
dprintk("RPC: %s: invalid version %d\n",
- __func__, ntohl(headerp->rm_vers));
+ __func__, be32_to_cpu(headerp->rm_vers));
goto repost;
}
@@ -762,7 +768,8 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
spin_unlock(&xprt->transport_lock);
dprintk("RPC: %s: reply 0x%p failed "
"to match any request xid 0x%08x len %d\n",
- __func__, rep, headerp->rm_xid, rep->rr_len);
+ __func__, rep, be32_to_cpu(headerp->rm_xid),
+ rep->rr_len);
repost:
r_xprt->rx_stats.bad_reply_count++;
rep->rr_func = rpcrdma_reply_handler;
@@ -778,13 +785,14 @@ repost:
spin_unlock(&xprt->transport_lock);
dprintk("RPC: %s: duplicate reply 0x%p to RPC "
"request 0x%p: xid 0x%08x\n", __func__, rep, req,
- headerp->rm_xid);
+ be32_to_cpu(headerp->rm_xid));
goto repost;
}
dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
" RPC request 0x%p xid 0x%08x\n",
- __func__, rep, req, rqst, headerp->rm_xid);
+ __func__, rep, req, rqst,
+ be32_to_cpu(headerp->rm_xid));
/* from here on, the reply is no longer an orphan */
req->rl_reply = rep;
@@ -793,7 +801,7 @@ repost:
/* check for expected message types */
/* The order of some of these tests is important. */
switch (headerp->rm_type) {
- case htonl(RDMA_MSG):
+ case rdma_msg:
/* never expect read chunks */
/* never expect reply chunks (two ways to check) */
/* never expect write chunks without having offered RDMA */
@@ -824,22 +832,24 @@ repost:
} else {
/* else ordinary inline */
rdmalen = 0;
- iptr = (__be32 *)((unsigned char *)headerp + 28);
- rep->rr_len -= 28; /*sizeof *headerp;*/
+ iptr = (__be32 *)((unsigned char *)headerp +
+ RPCRDMA_HDRLEN_MIN);
+ rep->rr_len -= RPCRDMA_HDRLEN_MIN;
status = rep->rr_len;
}
/* Fix up the rpc results for upper layer */
rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen);
break;
- case htonl(RDMA_NOMSG):
+ case rdma_nomsg:
/* never expect read or write chunks, always reply chunks */
if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
headerp->rm_body.rm_chunks[1] != xdr_zero ||
headerp->rm_body.rm_chunks[2] != xdr_one ||
req->rl_nchunks == 0)
goto badheader;
- iptr = (__be32 *)((unsigned char *)headerp + 28);
+ iptr = (__be32 *)((unsigned char *)headerp +
+ RPCRDMA_HDRLEN_MIN);
rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr);
if (rdmalen < 0)
goto badheader;
@@ -853,7 +863,7 @@ badheader:
dprintk("%s: invalid rpcrdma reply header (type %d):"
" chunks[012] == %d %d %d"
" expected chunks <= %d\n",
- __func__, ntohl(headerp->rm_type),
+ __func__, be32_to_cpu(headerp->rm_type),
headerp->rm_body.rm_chunks[0],
headerp->rm_body.rm_chunks[1],
headerp->rm_body.rm_chunks[2],
@@ -863,8 +873,14 @@ badheader:
break;
}
+ credits = be32_to_cpu(headerp->rm_credit);
+ if (credits == 0)
+ credits = 1; /* don't deadlock */
+ else if (credits > r_xprt->rx_buf.rb_max_requests)
+ credits = r_xprt->rx_buf.rb_max_requests;
+
cwnd = xprt->cwnd;
- xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
+ xprt->cwnd = credits << RPC_CWNDSHIFT;
if (xprt->cwnd > cwnd)
xprt_release_rqst_cong(rqst->rq_task);
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index bbd6155d3e34..2e192baa59f3 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -200,9 +200,9 @@ xprt_rdma_free_addresses(struct rpc_xprt *xprt)
static void
xprt_rdma_connect_worker(struct work_struct *work)
{
- struct rpcrdma_xprt *r_xprt =
- container_of(work, struct rpcrdma_xprt, rdma_connect.work);
- struct rpc_xprt *xprt = &r_xprt->xprt;
+ struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
+ rx_connect_worker.work);
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
int rc = 0;
xprt_clear_connected(xprt);
@@ -235,7 +235,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt)
dprintk("RPC: %s: called\n", __func__);
- cancel_delayed_work_sync(&r_xprt->rdma_connect);
+ cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
xprt_clear_connected(xprt);
@@ -364,8 +364,7 @@ xprt_setup_rdma(struct xprt_create *args)
* any inline data. Also specify any padding which will be provided
* from a preregistered zero buffer.
*/
- rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia,
- &new_xprt->rx_data);
+ rc = rpcrdma_buffer_create(new_xprt);
if (rc)
goto out3;
@@ -374,9 +373,8 @@ xprt_setup_rdma(struct xprt_create *args)
* connection loss notification is async. We also catch connection loss
* when reaping receives.
*/
- INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker);
- new_ep->rep_func = rpcrdma_conn_func;
- new_ep->rep_xprt = xprt;
+ INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
+ xprt_rdma_connect_worker);
xprt_rdma_format_addresses(xprt);
xprt->max_payload = rpcrdma_max_payload(new_xprt);
@@ -434,94 +432,101 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
if (r_xprt->rx_ep.rep_connected != 0) {
/* Reconnect */
- schedule_delayed_work(&r_xprt->rdma_connect,
- xprt->reestablish_timeout);
+ schedule_delayed_work(&r_xprt->rx_connect_worker,
+ xprt->reestablish_timeout);
xprt->reestablish_timeout <<= 1;
if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
} else {
- schedule_delayed_work(&r_xprt->rdma_connect, 0);
+ schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
if (!RPC_IS_ASYNC(task))
- flush_delayed_work(&r_xprt->rdma_connect);
+ flush_delayed_work(&r_xprt->rx_connect_worker);
}
}
/*
* The RDMA allocate/free functions need the task structure as a place
* to hide the struct rpcrdma_req, which is necessary for the actual send/recv
- * sequence. For this reason, the recv buffers are attached to send
- * buffers for portions of the RPC. Note that the RPC layer allocates
- * both send and receive buffers in the same call. We may register
- * the receive buffer portion when using reply chunks.
+ * sequence.
+ *
+ * The RPC layer allocates both send and receive buffers in the same call
+ * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
+ * We may register rq_rcv_buf when using reply chunks.
*/
static void *
xprt_rdma_allocate(struct rpc_task *task, size_t size)
{
struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
- struct rpcrdma_req *req, *nreq;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_regbuf *rb;
+ struct rpcrdma_req *req;
+ size_t min_size;
+ gfp_t flags;
- req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
+ req = rpcrdma_buffer_get(&r_xprt->rx_buf);
if (req == NULL)
return NULL;
- if (size > req->rl_size) {
- dprintk("RPC: %s: size %zd too large for buffer[%zd]: "
- "prog %d vers %d proc %d\n",
- __func__, size, req->rl_size,
- task->tk_client->cl_prog, task->tk_client->cl_vers,
- task->tk_msg.rpc_proc->p_proc);
- /*
- * Outgoing length shortage. Our inline write max must have
- * been configured to perform direct i/o.
- *
- * This is therefore a large metadata operation, and the
- * allocate call was made on the maximum possible message,
- * e.g. containing long filename(s) or symlink data. In
- * fact, while these metadata operations *might* carry
- * large outgoing payloads, they rarely *do*. However, we
- * have to commit to the request here, so reallocate and
- * register it now. The data path will never require this
- * reallocation.
- *
- * If the allocation or registration fails, the RPC framework
- * will (doggedly) retry.
- */
- if (task->tk_flags & RPC_TASK_SWAPPER)
- nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
- else
- nreq = kmalloc(sizeof *req + size, GFP_NOFS);
- if (nreq == NULL)
- goto outfail;
-
- if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
- nreq->rl_base, size + sizeof(struct rpcrdma_req)
- - offsetof(struct rpcrdma_req, rl_base),
- &nreq->rl_handle, &nreq->rl_iov)) {
- kfree(nreq);
- goto outfail;
- }
- rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
- nreq->rl_size = size;
- nreq->rl_niovs = 0;
- nreq->rl_nchunks = 0;
- nreq->rl_buffer = (struct rpcrdma_buffer *)req;
- nreq->rl_reply = req->rl_reply;
- memcpy(nreq->rl_segments,
- req->rl_segments, sizeof nreq->rl_segments);
- /* flag the swap with an unused field */
- nreq->rl_iov.length = 0;
- req->rl_reply = NULL;
- req = nreq;
- }
+ flags = GFP_NOIO | __GFP_NOWARN;
+ if (RPC_IS_SWAPPER(task))
+ flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
+
+ if (req->rl_rdmabuf == NULL)
+ goto out_rdmabuf;
+ if (req->rl_sendbuf == NULL)
+ goto out_sendbuf;
+ if (size > req->rl_sendbuf->rg_size)
+ goto out_sendbuf;
+
+out:
dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
req->rl_connect_cookie = 0; /* our reserved value */
- return req->rl_xdr_buf;
-
-outfail:
+ return req->rl_sendbuf->rg_base;
+
+out_rdmabuf:
+ min_size = RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
+ if (IS_ERR(rb))
+ goto out_fail;
+ req->rl_rdmabuf = rb;
+
+out_sendbuf:
+ /* XDR encoding and RPC/RDMA marshaling of this request has not
+ * yet occurred. Thus a lower bound is needed to prevent buffer
+ * overrun during marshaling.
+ *
+ * RPC/RDMA marshaling may choose to send payload bearing ops
+ * inline, if the result is smaller than the inline threshold.
+ * The value of the "size" argument accounts for header
+ * requirements but not for the payload in these cases.
+ *
+ * Likewise, allocate enough space to receive a reply up to the
+ * size of the inline threshold.
+ *
+ * It's unlikely that both the send header and the received
+ * reply will be large, but slush is provided here to allow
+ * flexibility when marshaling.
+ */
+ min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp);
+ min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+ if (size < min_size)
+ size = min_size;
+
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
+ if (IS_ERR(rb))
+ goto out_fail;
+ rb->rg_owner = req;
+
+ r_xprt->rx_stats.hardway_register_count += size;
+ rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
+ req->rl_sendbuf = rb;
+ goto out;
+
+out_fail:
rpcrdma_buffer_put(req);
- rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
+ r_xprt->rx_stats.failed_marshal_count++;
return NULL;
}
@@ -533,47 +538,24 @@ xprt_rdma_free(void *buffer)
{
struct rpcrdma_req *req;
struct rpcrdma_xprt *r_xprt;
- struct rpcrdma_rep *rep;
+ struct rpcrdma_regbuf *rb;
int i;
if (buffer == NULL)
return;
- req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
- if (req->rl_iov.length == 0) { /* see allocate above */
- r_xprt = container_of(((struct rpcrdma_req *) req->rl_buffer)->rl_buffer,
- struct rpcrdma_xprt, rx_buf);
- } else
- r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
- rep = req->rl_reply;
+ rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]);
+ req = rb->rg_owner;
+ r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
- dprintk("RPC: %s: called on 0x%p%s\n",
- __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
+ dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);
- /*
- * Finish the deregistration. The process is considered
- * complete when the rr_func vector becomes NULL - this
- * was put in place during rpcrdma_reply_handler() - the wait
- * call below will not block if the dereg is "done". If
- * interrupted, our framework will clean up.
- */
for (i = 0; req->rl_nchunks;) {
--req->rl_nchunks;
i += rpcrdma_deregister_external(
&req->rl_segments[i], r_xprt);
}
- if (req->rl_iov.length == 0) { /* see allocate above */
- struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
- oreq->rl_reply = req->rl_reply;
- (void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
- req->rl_handle,
- &req->rl_iov);
- kfree(req);
- req = oreq;
- }
-
- /* Put back request+reply buffers */
rpcrdma_buffer_put(req);
}
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index c98e40643910..124676c13780 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -49,6 +49,7 @@
#include <linux/interrupt.h>
#include <linux/slab.h>
+#include <linux/prefetch.h>
#include <asm/bitops.h>
#include "xprt_rdma.h"
@@ -153,7 +154,7 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
event->device->name, context);
if (ep->rep_connected == 1) {
ep->rep_connected = -EIO;
- ep->rep_func(ep);
+ rpcrdma_conn_func(ep);
wake_up_all(&ep->rep_connect_wait);
}
}
@@ -168,23 +169,59 @@ rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
event->device->name, context);
if (ep->rep_connected == 1) {
ep->rep_connected = -EIO;
- ep->rep_func(ep);
+ rpcrdma_conn_func(ep);
wake_up_all(&ep->rep_connect_wait);
}
}
+static const char * const wc_status[] = {
+ "success",
+ "local length error",
+ "local QP operation error",
+ "local EE context operation error",
+ "local protection error",
+ "WR flushed",
+ "memory management operation error",
+ "bad response error",
+ "local access error",
+ "remote invalid request error",
+ "remote access error",
+ "remote operation error",
+ "transport retry counter exceeded",
+ "RNR retrycounter exceeded",
+ "local RDD violation error",
+ "remove invalid RD request",
+ "operation aborted",
+ "invalid EE context number",
+ "invalid EE context state",
+ "fatal error",
+ "response timeout error",
+ "general error",
+};
+
+#define COMPLETION_MSG(status) \
+ ((status) < ARRAY_SIZE(wc_status) ? \
+ wc_status[(status)] : "unexpected completion error")
+
static void
rpcrdma_sendcq_process_wc(struct ib_wc *wc)
{
- struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
+ if (likely(wc->status == IB_WC_SUCCESS))
+ return;
- dprintk("RPC: %s: frmr %p status %X opcode %d\n",
- __func__, frmr, wc->status, wc->opcode);
+ /* WARNING: Only wr_id and status are reliable at this point */
+ if (wc->wr_id == 0ULL) {
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("RPC: %s: SEND: %s\n",
+ __func__, COMPLETION_MSG(wc->status));
+ } else {
+ struct rpcrdma_mw *r;
- if (wc->wr_id == 0ULL)
- return;
- if (wc->status != IB_WC_SUCCESS)
- frmr->r.frmr.fr_state = FRMR_IS_STALE;
+ r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
+ r->r.frmr.fr_state = FRMR_IS_STALE;
+ pr_err("RPC: %s: frmr %p (stale): %s\n",
+ __func__, r, COMPLETION_MSG(wc->status));
+ }
}
static int
@@ -248,33 +285,32 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
struct rpcrdma_rep *rep =