summaryrefslogtreecommitdiffstats
path: root/net/sunrpc
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/auth_gss/svcauth_gss.c10
-rw-r--r--net/sunrpc/rpc_pipe.c2
-rw-r--r--net/sunrpc/svc.c42
-rw-r--r--net/sunrpc/xprt.c9
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_backchannel.c2
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_rw.c181
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c148
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c15
-rw-r--r--net/sunrpc/xprtsock.c32
9 files changed, 280 insertions, 161 deletions
diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c
index 96ab50eda..24de94184 100644
--- a/net/sunrpc/auth_gss/svcauth_gss.c
+++ b/net/sunrpc/auth_gss/svcauth_gss.c
@@ -1033,11 +1033,17 @@ null_verifier:
static void gss_free_in_token_pages(struct gssp_in_token *in_token)
{
+ u32 inlen;
int i;
i = 0;
- while (in_token->pages[i])
- put_page(in_token->pages[i++]);
+ inlen = in_token->page_len;
+ while (inlen) {
+ if (in_token->pages[i])
+ put_page(in_token->pages[i]);
+ inlen -= inlen > PAGE_SIZE ? PAGE_SIZE : inlen;
+ }
+
kfree(in_token->pages);
in_token->pages = NULL;
}
diff --git a/net/sunrpc/rpc_pipe.c b/net/sunrpc/rpc_pipe.c
index dcc2b4f49..910a5d850 100644
--- a/net/sunrpc/rpc_pipe.c
+++ b/net/sunrpc/rpc_pipe.c
@@ -1490,7 +1490,7 @@ int register_rpc_pipefs(void)
rpc_inode_cachep = kmem_cache_create("rpc_inode_cache",
sizeof(struct rpc_inode),
0, (SLAB_HWCACHE_ALIGN|SLAB_RECLAIM_ACCOUNT|
- SLAB_MEM_SPREAD|SLAB_ACCOUNT),
+ SLAB_ACCOUNT),
init_once);
if (!rpc_inode_cachep)
return -ENOMEM;
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index bd61e257c..b33e42933 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -451,8 +451,8 @@ __svc_init_bc(struct svc_serv *serv)
* Create an RPC service
*/
static struct svc_serv *
-__svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
- int (*threadfn)(void *data))
+__svc_create(struct svc_program *prog, struct svc_stat *stats,
+ unsigned int bufsize, int npools, int (*threadfn)(void *data))
{
struct svc_serv *serv;
unsigned int vers;
@@ -463,7 +463,7 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
return NULL;
serv->sv_name = prog->pg_name;
serv->sv_program = prog;
- serv->sv_stats = prog->pg_stats;
+ serv->sv_stats = stats;
if (bufsize > RPCSVC_MAXPAYLOAD)
bufsize = RPCSVC_MAXPAYLOAD;
serv->sv_max_payload = bufsize? bufsize : 4096;
@@ -529,26 +529,28 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
struct svc_serv *svc_create(struct svc_program *prog, unsigned int bufsize,
int (*threadfn)(void *data))
{
- return __svc_create(prog, bufsize, 1, threadfn);
+ return __svc_create(prog, NULL, bufsize, 1, threadfn);
}
EXPORT_SYMBOL_GPL(svc_create);
/**
* svc_create_pooled - Create an RPC service with pooled threads
* @prog: the RPC program the new service will handle
+ * @stats: the stats struct if desired
* @bufsize: maximum message size for @prog
* @threadfn: a function to service RPC requests for @prog
*
* Returns an instantiated struct svc_serv object or NULL.
*/
struct svc_serv *svc_create_pooled(struct svc_program *prog,
+ struct svc_stat *stats,
unsigned int bufsize,
int (*threadfn)(void *data))
{
struct svc_serv *serv;
unsigned int npools = svc_pool_map_get();
- serv = __svc_create(prog, bufsize, npools, threadfn);
+ serv = __svc_create(prog, stats, bufsize, npools, threadfn);
if (!serv)
goto out_err;
return serv;
@@ -1263,6 +1265,8 @@ svc_generic_init_request(struct svc_rqst *rqstp,
if (rqstp->rq_proc >= versp->vs_nproc)
goto err_bad_proc;
rqstp->rq_procinfo = procp = &versp->vs_proc[rqstp->rq_proc];
+ if (!procp)
+ goto err_bad_proc;
/* Initialize storage for argp and resp */
memset(rqstp->rq_argp, 0, procp->pc_argzero);
@@ -1373,7 +1377,8 @@ svc_process_common(struct svc_rqst *rqstp)
goto err_bad_proc;
/* Syntactic check complete */
- serv->sv_stats->rpccnt++;
+ if (serv->sv_stats)
+ serv->sv_stats->rpccnt++;
trace_svc_process(rqstp, progp->pg_name);
aoffset = xdr_stream_pos(xdr);
@@ -1425,7 +1430,8 @@ err_short_len:
goto close_xprt;
err_bad_rpc:
- serv->sv_stats->rpcbadfmt++;
+ if (serv->sv_stats)
+ serv->sv_stats->rpcbadfmt++;
xdr_stream_encode_u32(xdr, RPC_MSG_DENIED);
xdr_stream_encode_u32(xdr, RPC_MISMATCH);
/* Only RPCv2 supported */
@@ -1436,7 +1442,8 @@ err_bad_rpc:
err_bad_auth:
dprintk("svc: authentication failed (%d)\n",
be32_to_cpu(rqstp->rq_auth_stat));
- serv->sv_stats->rpcbadauth++;
+ if (serv->sv_stats)
+ serv->sv_stats->rpcbadauth++;
/* Restore write pointer to location of reply status: */
xdr_truncate_encode(xdr, XDR_UNIT * 2);
xdr_stream_encode_u32(xdr, RPC_MSG_DENIED);
@@ -1446,7 +1453,8 @@ err_bad_auth:
err_bad_prog:
dprintk("svc: unknown program %d\n", rqstp->rq_prog);
- serv->sv_stats->rpcbadfmt++;
+ if (serv->sv_stats)
+ serv->sv_stats->rpcbadfmt++;
*rqstp->rq_accept_statp = rpc_prog_unavail;
goto sendit;
@@ -1454,7 +1462,8 @@ err_bad_vers:
svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
rqstp->rq_vers, rqstp->rq_prog, progp->pg_name);
- serv->sv_stats->rpcbadfmt++;
+ if (serv->sv_stats)
+ serv->sv_stats->rpcbadfmt++;
*rqstp->rq_accept_statp = rpc_prog_mismatch;
/*
@@ -1468,19 +1477,22 @@ err_bad_vers:
err_bad_proc:
svc_printk(rqstp, "unknown procedure (%d)\n", rqstp->rq_proc);
- serv->sv_stats->rpcbadfmt++;
+ if (serv->sv_stats)
+ serv->sv_stats->rpcbadfmt++;
*rqstp->rq_accept_statp = rpc_proc_unavail;
goto sendit;
err_garbage_args:
svc_printk(rqstp, "failed to decode RPC header\n");
- serv->sv_stats->rpcbadfmt++;
+ if (serv->sv_stats)
+ serv->sv_stats->rpcbadfmt++;
*rqstp->rq_accept_statp = rpc_garbage_args;
goto sendit;
err_system_err:
- serv->sv_stats->rpcbadfmt++;
+ if (serv->sv_stats)
+ serv->sv_stats->rpcbadfmt++;
*rqstp->rq_accept_statp = rpc_system_err;
goto sendit;
}
@@ -1532,7 +1544,8 @@ void svc_process(struct svc_rqst *rqstp)
out_baddir:
svc_printk(rqstp, "bad direction 0x%08x, dropping request\n",
be32_to_cpu(*p));
- rqstp->rq_server->sv_stats->rpcbadfmt++;
+ if (rqstp->rq_server->sv_stats)
+ rqstp->rq_server->sv_stats->rpcbadfmt++;
out_drop:
svc_drop(rqstp);
}
@@ -1610,7 +1623,6 @@ void svc_process_bc(struct rpc_rqst *req, struct svc_rqst *rqstp)
WARN_ON_ONCE(atomic_read(&task->tk_count) != 1);
rpc_put_task(task);
}
-EXPORT_SYMBOL_GPL(svc_process_bc);
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
/**
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index af13fdfa6..09f245cda 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1398,6 +1398,12 @@ xprt_request_dequeue_transmit_locked(struct rpc_task *task)
if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
return;
if (!list_empty(&req->rq_xmit)) {
+ struct rpc_xprt *xprt = req->rq_xprt;
+
+ if (list_is_first(&req->rq_xmit, &xprt->xmit_queue) &&
+ xprt->ops->abort_send_request)
+ xprt->ops->abort_send_request(req);
+
list_del(&req->rq_xmit);
if (!list_empty(&req->rq_xmit2)) {
struct rpc_rqst *next = list_first_entry(&req->rq_xmit2,
@@ -1541,6 +1547,9 @@ xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
int is_retrans = RPC_WAS_SENT(task);
int status;
+ if (test_bit(XPRT_CLOSE_WAIT, &xprt->state))
+ return -ENOTCONN;
+
if (!req->rq_bytes_sent) {
if (xprt_request_data_received(task)) {
status = 0;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index c9be67786..e5a78b761 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -90,7 +90,7 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
*/
get_page(virt_to_page(rqst->rq_buffer));
sctxt->sc_send_wr.opcode = IB_WR_SEND;
- return svc_rdma_send(rdma, sctxt);
+ return svc_rdma_post_send(rdma, sctxt);
}
/* Server-side transport endpoint wants a whole page for its send
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index c00fcce61..40797114d 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -197,28 +197,6 @@ void svc_rdma_cc_release(struct svcxprt_rdma *rdma,
llist_add_batch(first, last, &rdma->sc_rw_ctxts);
}
-/* State for sending a Write or Reply chunk.
- * - Tracks progress of writing one chunk over all its segments
- * - Stores arguments for the SGL constructor functions
- */
-struct svc_rdma_write_info {
- struct svcxprt_rdma *wi_rdma;
-
- const struct svc_rdma_chunk *wi_chunk;
-
- /* write state of this chunk */
- unsigned int wi_seg_off;
- unsigned int wi_seg_no;
-
- /* SGL constructor arguments */
- const struct xdr_buf *wi_xdr;
- unsigned char *wi_base;
- unsigned int wi_next_off;
-
- struct svc_rdma_chunk_ctxt wi_cc;
- struct work_struct wi_work;
-};
-
static struct svc_rdma_write_info *
svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma,
const struct svc_rdma_chunk *chunk)
@@ -253,6 +231,49 @@ static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
}
/**
+ * svc_rdma_reply_chunk_release - Release Reply chunk I/O resources
+ * @rdma: controlling transport
+ * @ctxt: Send context that is being released
+ */
+void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *ctxt)
+{
+ struct svc_rdma_chunk_ctxt *cc = &ctxt->sc_reply_info.wi_cc;
+
+ if (!cc->cc_sqecount)
+ return;
+ svc_rdma_cc_release(rdma, cc, DMA_TO_DEVICE);
+}
+
+/**
+ * svc_rdma_reply_done - Reply chunk Write completion handler
+ * @cq: controlling Completion Queue
+ * @wc: Work Completion report
+ *
+ * Pages under I/O are released by a subsequent Send completion.
+ */
+static void svc_rdma_reply_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_chunk_ctxt *cc =
+ container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
+ struct svcxprt_rdma *rdma = cq->cq_context;
+
+ switch (wc->status) {
+ case IB_WC_SUCCESS:
+ trace_svcrdma_wc_reply(&cc->cc_cid);
+ return;
+ case IB_WC_WR_FLUSH_ERR:
+ trace_svcrdma_wc_reply_flush(wc, &cc->cc_cid);
+ break;
+ default:
+ trace_svcrdma_wc_reply_err(wc, &cc->cc_cid);
+ }
+
+ svc_xprt_deferred_close(&rdma->sc_xprt);
+}
+
+/**
* svc_rdma_write_done - Write chunk completion
* @cq: controlling Completion Queue
* @wc: Work Completion
@@ -580,41 +601,33 @@ static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data)
return xdr->len;
}
-/**
- * svc_rdma_send_write_chunk - Write all segments in a Write chunk
- * @rdma: controlling RDMA transport
- * @chunk: Write chunk provided by the client
- * @xdr: xdr_buf containing the data payload
- *
- * Returns a non-negative number of bytes the chunk consumed, or
- * %-E2BIG if the payload was larger than the Write chunk,
- * %-EINVAL if client provided too many segments,
- * %-ENOMEM if rdma_rw context pool was exhausted,
- * %-ENOTCONN if posting failed (connection is lost),
- * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
- */
-int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
- const struct svc_rdma_chunk *chunk,
- const struct xdr_buf *xdr)
+static int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
+ const struct svc_rdma_chunk *chunk,
+ const struct xdr_buf *xdr)
{
struct svc_rdma_write_info *info;
struct svc_rdma_chunk_ctxt *cc;
+ struct xdr_buf payload;
int ret;
+ if (xdr_buf_subsegment(xdr, &payload, chunk->ch_position,
+ chunk->ch_payload_length))
+ return -EMSGSIZE;
+
info = svc_rdma_write_info_alloc(rdma, chunk);
if (!info)
return -ENOMEM;
cc = &info->wi_cc;
- ret = svc_rdma_xb_write(xdr, info);
- if (ret != xdr->len)
+ ret = svc_rdma_xb_write(&payload, info);
+ if (ret != payload.len)
goto out_err;
trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
ret = svc_rdma_post_chunk_ctxt(rdma, cc);
if (ret < 0)
goto out_err;
- return xdr->len;
+ return 0;
out_err:
svc_rdma_write_info_free(info);
@@ -622,9 +635,37 @@ out_err:
}
/**
- * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
+ * svc_rdma_send_write_list - Send all chunks on the Write list
* @rdma: controlling RDMA transport
- * @rctxt: Write and Reply chunks from client
+ * @rctxt: Write list provisioned by the client
+ * @xdr: xdr_buf containing an RPC Reply message
+ *
+ * Returns zero on success, or a negative errno if one or more
+ * Write chunks could not be sent.
+ */
+int svc_rdma_send_write_list(struct svcxprt_rdma *rdma,
+ const struct svc_rdma_recv_ctxt *rctxt,
+ const struct xdr_buf *xdr)
+{
+ struct svc_rdma_chunk *chunk;
+ int ret;
+
+ pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
+ if (!chunk->ch_payload_length)
+ break;
+ ret = svc_rdma_send_write_chunk(rdma, chunk, xdr);
+ if (ret < 0)
+ return ret;
+ }
+ return 0;
+}
+
+/**
+ * svc_rdma_prepare_reply_chunk - Construct WR chain for writing the Reply chunk
+ * @rdma: controlling RDMA transport
+ * @write_pcl: Write chunk list provided by client
+ * @reply_pcl: Reply chunk provided by client
+ * @sctxt: Send WR resources
* @xdr: xdr_buf containing an RPC Reply
*
* Returns a non-negative number of bytes the chunk consumed, or
@@ -634,39 +675,45 @@ out_err:
* %-ENOTCONN if posting failed (connection is lost),
* %-EIO if rdma_rw initialization failed (DMA mapping, etc).
*/
-int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
- const struct svc_rdma_recv_ctxt *rctxt,
- const struct xdr_buf *xdr)
+int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma,
+ const struct svc_rdma_pcl *write_pcl,
+ const struct svc_rdma_pcl *reply_pcl,
+ struct svc_rdma_send_ctxt *sctxt,
+ const struct xdr_buf *xdr)
{
- struct svc_rdma_write_info *info;
- struct svc_rdma_chunk_ctxt *cc;
- struct svc_rdma_chunk *chunk;
+ struct svc_rdma_write_info *info = &sctxt->sc_reply_info;
+ struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
+ struct ib_send_wr *first_wr;
+ struct list_head *pos;
+ struct ib_cqe *cqe;
int ret;
- if (pcl_is_empty(&rctxt->rc_reply_pcl))
- return 0;
-
- chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
- info = svc_rdma_write_info_alloc(rdma, chunk);
- if (!info)
- return -ENOMEM;
- cc = &info->wi_cc;
+ info->wi_rdma = rdma;
+ info->wi_chunk = pcl_first_chunk(reply_pcl);
+ info->wi_seg_off = 0;
+ info->wi_seg_no = 0;
+ info->wi_cc.cc_cqe.done = svc_rdma_reply_done;
- ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
+ ret = pcl_process_nonpayloads(write_pcl, xdr,
svc_rdma_xb_write, info);
if (ret < 0)
- goto out_err;
+ return ret;
- trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount);
- ret = svc_rdma_post_chunk_ctxt(rdma, cc);
- if (ret < 0)
- goto out_err;
+ first_wr = sctxt->sc_wr_chain;
+ cqe = &cc->cc_cqe;
+ list_for_each(pos, &cc->cc_rwctxts) {
+ struct svc_rdma_rw_ctxt *rwc;
- return xdr->len;
+ rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list);
+ first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp,
+ rdma->sc_port_num, cqe, first_wr);
+ cqe = NULL;
+ }
+ sctxt->sc_wr_chain = first_wr;
+ sctxt->sc_sqecount += cc->cc_sqecount;
-out_err:
- svc_rdma_write_info_free(info);
- return ret;
+ trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount);
+ return xdr->len;
}
/**
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 1a49b7f02..bb5436b71 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -205,9 +205,13 @@ out:
xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf,
ctxt->sc_xprt_buf, NULL);
+ svc_rdma_cc_init(rdma, &ctxt->sc_reply_info.wi_cc);
ctxt->sc_send_wr.num_sge = 0;
ctxt->sc_cur_sge_no = 0;
ctxt->sc_page_count = 0;
+ ctxt->sc_wr_chain = &ctxt->sc_send_wr;
+ ctxt->sc_sqecount = 1;
+
return ctxt;
out_empty:
@@ -223,6 +227,8 @@ static void svc_rdma_send_ctxt_release(struct svcxprt_rdma *rdma,
struct ib_device *device = rdma->sc_cm_id->device;
unsigned int i;
+ svc_rdma_reply_chunk_release(rdma, ctxt);
+
if (ctxt->sc_page_count)
release_pages(ctxt->sc_pages, ctxt->sc_page_count);
@@ -293,7 +299,7 @@ static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
struct svc_rdma_send_ctxt *ctxt =
container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
- svc_rdma_wake_send_waiters(rdma, 1);
+ svc_rdma_wake_send_waiters(rdma, ctxt->sc_sqecount);
if (unlikely(wc->status != IB_WC_SUCCESS))
goto flushed;
@@ -312,51 +318,76 @@ flushed:
}
/**
- * svc_rdma_send - Post a single Send WR
- * @rdma: transport on which to post the WR
- * @ctxt: send ctxt with a Send WR ready to post
+ * svc_rdma_post_send - Post a WR chain to the Send Queue
+ * @rdma: transport context
+ * @ctxt: WR chain to post
+ *
+ * Copy fields in @ctxt to stack variables in order to guarantee
+ * that these values remain available after the ib_post_send() call.
+ * In some error flow cases, svc_rdma_wc_send() releases @ctxt.
+ *
+ * Note there is potential for starvation when the Send Queue is
+ * full because there is no order to when waiting threads are
+ * awoken. The transport is typically provisioned with a deep
+ * enough Send Queue that SQ exhaustion should be a rare event.
*
- * Returns zero if the Send WR was posted successfully. Otherwise, a
- * negative errno is returned.
+ * Return values:
+ * %0: @ctxt's WR chain was posted successfully
+ * %-ENOTCONN: The connection was lost
*/
-int svc_rdma_send(struct svcxprt_rdma *rdma, struct svc_rdma_send_ctxt *ctxt)
+int svc_rdma_post_send(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *ctxt)
{
- struct ib_send_wr *wr = &ctxt->sc_send_wr;
- int ret;
+ struct ib_send_wr *first_wr = ctxt->sc_wr_chain;
+ struct ib_send_wr *send_wr = &ctxt->sc_send_wr;
+ const struct ib_send_wr *bad_wr = first_wr;
+ struct rpc_rdma_cid cid = ctxt->sc_cid;
+ int ret, sqecount = ctxt->sc_sqecount;
might_sleep();
/* Sync the transport header buffer */
ib_dma_sync_single_for_device(rdma->sc_pd->device,
- wr->sg_list[0].addr,
- wr->sg_list[0].length,
+ send_wr->sg_list[0].addr,
+ send_wr->sg_list[0].length,
DMA_TO_DEVICE);
/* If the SQ is full, wait until an SQ entry is available */
- while (1) {
- if ((atomic_dec_return(&rdma->sc_sq_avail) < 0)) {
+ while (!test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags)) {
+ if (atomic_sub_return(sqecount, &rdma->sc_sq_avail) < 0) {
+ svc_rdma_wake_send_waiters(rdma, sqecount);
+
+ /* When the transport is torn down, assume
+ * ib_drain_sq() will trigger enough Send
+ * completions to wake us. The XPT_CLOSE test
+ * above should then cause the while loop to
+ * exit.
+ */
percpu_counter_inc(&svcrdma_stat_sq_starve);
- trace_svcrdma_sq_full(rdma, &ctxt->sc_cid);
- atomic_inc(&rdma->sc_sq_avail);
+ trace_svcrdma_sq_full(rdma, &cid);
wait_event(rdma->sc_send_wait,
- atomic_read(&rdma->sc_sq_avail) > 1);
- if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
- return -ENOTCONN;
- trace_svcrdma_sq_retry(rdma, &ctxt->sc_cid);
+ atomic_read(&rdma->sc_sq_avail) > 0);
+ trace_svcrdma_sq_retry(rdma, &cid);
continue;
}
trace_svcrdma_post_send(ctxt);
- ret = ib_post_send(rdma->sc_qp, wr, NULL);
- if (ret)
- break;
+ ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
+ if (ret) {
+ trace_svcrdma_sq_post_err(rdma, &cid, ret);
+ svc_xprt_deferred_close(&rdma->sc_xprt);
+
+ /* If even one WR was posted, there will be a
+ * Send completion that bumps sc_sq_avail.
+ */
+ if (bad_wr == first_wr) {
+ svc_rdma_wake_send_waiters(rdma, sqecount);
+ break;
+ }
+ }
return 0;
}
-
- trace_svcrdma_sq_post_err(rdma, &ctxt->sc_cid, ret);
- svc_xprt_deferred_close(&rdma->sc_xprt);
- wake_up(&rdma->sc_send_wait);
- return ret;
+ return -ENOTCONN;
}
/**
@@ -839,16 +870,10 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
* in sc_sges[0], and the RPC xdr_buf is prepared in following sges.
*
* Depending on whether a Write list or Reply chunk is present,
- * the server may send all, a portion of, or none of the xdr_buf.
+ * the server may Send all, a portion of, or none of the xdr_buf.
* In the latter case, only the transport header (sc_sges[0]) is
* transmitted.
*
- * RDMA Send is the last step of transmitting an RPC reply. Pages
- * involved in the earlier RDMA Writes are here transferred out
- * of the rqstp and into the sctxt's page array. These pages are
- * DMA unmapped by each Write completion, but the subsequent Send
- * completion finally releases these pages.
- *
* Assumptions:
* - The Reply's transport header will never be larger than a page.
*/
@@ -857,6 +882,7 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
const struct svc_rdma_recv_ctxt *rctxt,
struct svc_rqst *rqstp)
{
+ struct ib_send_wr *send_wr = &sctxt->sc_send_wr;
int ret;
ret = svc_rdma_map_reply_msg(rdma, sctxt, &rctxt->rc_write_pcl,
@@ -864,16 +890,19 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
if (ret < 0)
return ret;
+ /* Transfer pages involved in RDMA Writes to the sctxt's
+ * page array. Completion handling releases these pages.
+ */
svc_rdma_save_io_pages(rqstp, sctxt);
if (rctxt->rc_inv_rkey) {
- sctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
- sctxt->sc_send_wr.ex.invalidate_rkey = rctxt->rc_inv_rkey;
+ send_wr->opcode = IB_WR_SEND_WITH_INV;
+ send_wr->ex.invalidate_rkey = rctxt->rc_inv_rkey;
} else {
- sctxt->sc_send_wr.opcode = IB_WR_SEND;
+ send_wr->opcode = IB_WR_SEND;
}
- return svc_rdma_send(rdma, sctxt);
+ return svc_rdma_post_send(rdma, sctxt);
}
/**
@@ -937,7 +966,7 @@ void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
sctxt->sc_send_wr.num_sge = 1;
sctxt->sc_send_wr.opcode = IB_WR_SEND;
sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
- if (svc_rdma_send(rdma, sctxt))
+ if (svc_rdma_post_send(rdma, sctxt))
goto put_ctxt;
return;
@@ -984,10 +1013,19 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
if (!p)
goto put_ctxt;
- ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res);
+ ret = svc_rdma_send_write_list(rdma, rctxt, &rqstp->rq_res);
if (ret < 0)
- goto reply_chunk;
- rc_size = ret;
+ goto put_ctxt;
+
+ rc_size = 0;
+ if (!pcl_is_empty(&rctxt->rc_reply_pcl)) {
+ ret = svc_rdma_prepare_reply_chunk(rdma, &rctxt->rc_write_pcl,
+ &rctxt->rc_reply_pcl, sctxt,
+ &rqstp->rq_res);
+ if (ret < 0)
+ goto reply_chunk;
+ rc_size = ret;
+ }
*p++ = *rdma_argp;
*p++ = *(rdma_argp + 1);
@@ -1030,45 +1068,33 @@ drop_connection:
/**
* svc_rdma_result_payload - special processing for a result payload
- * @rqstp: svc_rqst to operate on
- * @offset: payload's byte offset in @xdr
+ * @rqstp: RPC transaction context
+ * @offset: payload's byte offset in @rqstp->rq_res
* @length: size of payload, in bytes
*
+ * Assign the passed-in result payload to the current Write chunk,
+ * and advance to cur_result_payload to the next Write chunk, if
+ * there is one.
+ *
* Return values:
* %0 if successful or nothing needed to be done
- * %-EMSGSIZE on XDR buffer overflow
* %-E2BIG if the payload was larger than the Write chunk
- * %-EINVAL if client provided too many segments
- * %-ENOMEM if rdma_rw context pool was exhausted
- * %-ENOTCONN if posting failed (connection is lost)
- * %-EIO if rdma_rw initialization failed (DMA mapping, etc)
*/
int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
unsigned int length)
{
struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
struct svc_rdma_chunk *chunk;
- struct svcxprt_rdma *rdma;
- struct xdr_buf subbuf;
- int ret;
chunk = rctxt->rc_cur_result_payload;
if (!length || !chunk)
return 0;
rctxt->rc_cur_result_payload =
pcl_next_chunk(&rctxt->rc_write_pcl, chunk);
+
if (length > chunk->ch_length)
return -E2BIG;
-
chunk->ch_position = offset;
chunk->ch_payload_length = length;
-
- if (xdr_buf_subsegment(&rqstp->rq_res, &subbuf, offset, length))
- return -EMSGSIZE;
-
- rdma = container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
- ret = svc_rdma_send_write_chunk(rdma, chunk, &subbuf);
- if (ret < 0)
- return ret;
return 0;
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 4f27325ac..2b1c16b95 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -415,15 +415,20 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
if (newxprt->sc_max_send_sges > dev->attrs.max_send_sge)
newxprt->sc_max_send_sges = dev->attrs.max_send_sge;
rq_depth = newxprt->sc_max_requests + newxprt->sc_max_bc_requests +
- newxprt->sc_recv_batch;
+ newxprt->sc_recv_batch + 1 /* drain */;
if (rq_depth > dev->attrs.max_qp_wr) {
rq_depth = dev->attrs.max_qp_wr;
newxprt->sc_recv_batch = 1;
newxprt->sc_max_requests = rq_depth - 2;
newxprt->sc_max_bc_requests = 2;
}
- ctxts = rdma_rw_mr_factor(dev, newxprt->sc_port_num, RPCSVC_MAXPAGES);
- ctxts *= newxprt->sc_max_requests;
+
+ /* Arbitrarily estimate the number of rw_ctxs needed for
+ * this transport. This is enough rw_ctxs to make forward
+ * progress even if the client is using one rkey per page
+ * in each Read chunk.
+ */
+ ctxts = 3 * RPCSVC_MAXPAGES;
newxprt->sc_sq_depth = rq_depth + ctxts;
if (newxprt->sc_sq_depth > dev->attrs.max_qp_wr)
newxprt->sc_sq_depth = dev->attrs.max_qp_wr;
@@ -460,12 +465,14 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
qp_attr.cap.max_send_wr, qp_attr.cap.max_recv_wr);
dprintk(" cap.max_send_sge = %d, cap.max_recv_sge = %d\n",
qp_attr.cap.max_send_sge, qp_attr.cap.max_recv_sge);
-
+ dprintk(" send CQ depth = %u, recv CQ depth = %u\n",
+ newxprt->sc_sq_depth, rq_depth);
ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
if (ret) {
trace_svcrdma_qp_err(newxprt, ret);
goto errout;
}
+ newxprt->sc_max_send_sges = qp_attr.cap.max_send_sge;
newxprt->sc_qp = newxprt->sc_cm_id->qp;
if (!(dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 004d2bd8b..ce1871649 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -62,6 +62,7 @@
#include "sunrpc.h"
static void xs_close(struct rpc_xprt *xprt);
+static void xs_reset_srcport(struct sock_xprt *transport);
static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock);
static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,
struct socket *sock);
@@ -883,6 +884,17 @@ static int xs_stream_prepare_request(struct rpc_rqst *req, struct xdr_buf *buf)
return xdr_alloc_bvec(buf, rpc_task_gfp_mask());
}
+static void xs_stream_abort_send_request(struct rpc_rqst *req)
+{
+ struct rpc_xprt *xprt = req->rq_xprt;
+ struct sock_xprt *transport =
+ container_of(xprt, struct sock_xprt, xprt);
+
+ if (transport->xmit.offset != 0 &&
+ !test_bit(XPRT_CLOSE_WAIT, &xprt->state))
+ xprt_force_disconnect(xprt);
+}
+
/*
* Determine if the previous message in the stream was aborted before it
* could complete transmission.
@@ -1565,8 +1577,10 @@ static void xs_tcp_state_change(struct sock *sk)
break;
case TCP_CLOSE:
if (test_and_clear_bit(XPRT_SOCK_CONNECTING,
- &transport->sock_state))
+ &transport->sock_state)) {
+ xs_reset_srcport(transport);
xprt_clear_connecting(xprt);
+ }
clear_bit(XPRT_CLOSING, &xprt->state);
/* Trigger the socket release */
xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
@@ -1722,6 +1736,11 @@ static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
xs_update_peer_port(xprt);
}
+static void xs_reset_srcport(struct sock_xprt *transport)
+{
+ transport->srcport = 0;
+}
+
static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock)
{
if (transport->srcport == 0 && transport->xprt.reuseport)
@@ -2988,20 +3007,11 @@ static int bc_send_request(struct rpc_rqst *req)
return len;
}
-/*
- * The close routine. Since this is client initiated, we do nothing
- */
-
static void bc_close(struct rpc_xprt *xprt)
{
xprt_disconnect_done(xprt);
}
-/*
- * The xprt destroy routine. Again, because this connection is client
- * initiated, we do nothing
- */
-
static void bc_destroy(struct rpc_xprt *xprt)
{
dprintk("RPC: bc_destroy xprt %p\n", xprt);
@@ -3022,6 +3032,7 @@ static const struct rpc_xprt_ops xs_local_ops = {
.buf_free = rpc_free,
.prepare_request = xs_stream_prepare_request,
.send_request = xs_local_send_request,
+ .abort_send_request = xs_stream_abort_send_request,
.wait_for_reply_request = xprt_wait_for_reply_request_def,
.close = xs_close,
.destroy = xs_destroy,
@@ -3069,6 +3080,7 @@ static const struct rpc_xprt_ops xs_tcp_ops = {
.buf_free = rpc_free,
.prepare_request = xs_stream_prepare_request,
.send_request = xs_tcp_send_request,
+ .abort_send_request = xs_stream_abort_send_request,
.wait_for_reply_request = xprt_wait_for_reply_request_def,
.close = xs_tcp_shutdown,
.destroy = xs_destroy,