xprtsock.c 82.7 KB
Newer Older
1 2 3 4 5
/*
 * linux/net/sunrpc/xprtsock.c
 *
 * Client-side transport implementation for sockets.
 *
6 7
 * TCP callback races fixes (C) 1998 Red Hat
 * TCP send fixes (C) 1998 Red Hat
8 9 10 11 12 13
 * TCP NFS related read + write fixes
 *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
 *
 * Rewrite of larges part of the code in order to stabilize TCP stuff.
 * Fix behaviour when socket buffer is full.
 *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14 15
 *
 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16 17 18
 *
 * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
 *   <gilles.quillard@bull.net>
19 20 21
 */

#include <linux/types.h>
22
#include <linux/string.h>
23
#include <linux/slab.h>
24
#include <linux/module.h>
25 26 27 28 29 30 31
#include <linux/capability.h>
#include <linux/pagemap.h>
#include <linux/errno.h>
#include <linux/socket.h>
#include <linux/in.h>
#include <linux/net.h>
#include <linux/mm.h>
32
#include <linux/un.h>
33 34 35
#include <linux/udp.h>
#include <linux/tcp.h>
#include <linux/sunrpc/clnt.h>
36
#include <linux/sunrpc/addr.h>
37
#include <linux/sunrpc/sched.h>
38
#include <linux/sunrpc/svcsock.h>
39
#include <linux/sunrpc/xprtsock.h>
40
#include <linux/file.h>
41
#ifdef CONFIG_SUNRPC_BACKCHANNEL
42 43
#include <linux/sunrpc/bc_xprt.h>
#endif
44 45 46 47 48 49

#include <net/sock.h>
#include <net/checksum.h>
#include <net/udp.h>
#include <net/tcp.h>

50 51
#include <trace/events/sunrpc.h>

52
#include "sunrpc.h"
53 54 55

static void xs_close(struct rpc_xprt *xprt);

56 57 58
/*
 * xprtsock tunables
 */
59 60 61
static unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
static unsigned int xprt_tcp_slot_table_entries = RPC_MIN_SLOT_TABLE;
static unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE;
62

63 64
static unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
static unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
65

66 67
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)

68
#define XS_TCP_LINGER_TO	(15U * HZ)
69
static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
70

71 72 73 74 75 76 77 78 79 80 81
/*
 * We can register our own files under /proc/sys/sunrpc by
 * calling register_sysctl_table() again.  The files in that
 * directory become the union of all files registered there.
 *
 * We simply need to make sure that we don't collide with
 * someone else's file names!
 */

static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
82
static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT;
83 84 85 86 87 88 89 90 91
static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;

static struct ctl_table_header *sunrpc_table_header;

/*
 * FIXME: changing the UDP slot table size should also resize the UDP
 *        socket buffers for existing UDP transports
 */
92
static struct ctl_table xs_tunables_table[] = {
93 94 95 96 97
	{
		.procname	= "udp_slot_table_entries",
		.data		= &xprt_udp_slot_table_entries,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
98
		.proc_handler	= proc_dointvec_minmax,
99 100 101 102 103 104 105 106
		.extra1		= &min_slot_table_size,
		.extra2		= &max_slot_table_size
	},
	{
		.procname	= "tcp_slot_table_entries",
		.data		= &xprt_tcp_slot_table_entries,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
107
		.proc_handler	= proc_dointvec_minmax,
108 109 110
		.extra1		= &min_slot_table_size,
		.extra2		= &max_slot_table_size
	},
111 112 113 114 115 116 117 118 119
	{
		.procname	= "tcp_max_slot_table_entries",
		.data		= &xprt_max_tcp_slot_table_entries,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
		.proc_handler	= proc_dointvec_minmax,
		.extra1		= &min_slot_table_size,
		.extra2		= &max_tcp_slot_table_limit
	},
120 121 122 123 124
	{
		.procname	= "min_resvport",
		.data		= &xprt_min_resvport,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
125
		.proc_handler	= proc_dointvec_minmax,
126 127 128 129 130 131 132 133
		.extra1		= &xprt_min_resvport_limit,
		.extra2		= &xprt_max_resvport_limit
	},
	{
		.procname	= "max_resvport",
		.data		= &xprt_max_resvport,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
134
		.proc_handler	= proc_dointvec_minmax,
135 136 137
		.extra1		= &xprt_min_resvport_limit,
		.extra2		= &xprt_max_resvport_limit
	},
138 139 140 141 142
	{
		.procname	= "tcp_fin_timeout",
		.data		= &xs_tcp_fin_timeout,
		.maxlen		= sizeof(xs_tcp_fin_timeout),
		.mode		= 0644,
143
		.proc_handler	= proc_dointvec_jiffies,
144
	},
145
	{ },
146 147
};

148
static struct ctl_table sunrpc_table[] = {
149 150 151 152 153
	{
		.procname	= "sunrpc",
		.mode		= 0555,
		.child		= xs_tunables_table
	},
154
	{ },
155 156 157 158
};

#endif

159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
/*
 * Wait duration for a reply from the RPC portmapper.
 */
#define XS_BIND_TO		(60U * HZ)

/*
 * Delay if a UDP socket connect error occurs.  This is most likely some
 * kind of resource problem on the local host.
 */
#define XS_UDP_REEST_TO		(2U * HZ)

/*
 * The reestablish timeout allows clients to delay for a bit before attempting
 * to reconnect to a server that just dropped our connection.
 *
 * We implement an exponential backoff when trying to reestablish a TCP
 * transport connection with the server.  Some servers like to drop a TCP
 * connection when they are overworked, so we start with a short timeout and
 * increase over time if the server is down or not responding.
 */
#define XS_TCP_INIT_REEST_TO	(3U * HZ)
#define XS_TCP_MAX_REEST_TO	(5U * 60 * HZ)

/*
 * TCP idle timeout; client drops the transport socket if it is idle
 * for this long.  Note that we also timeout UDP sockets to prevent
 * holding port numbers when there is no RPC traffic.
 */
#define XS_IDLE_DISC_TO		(5U * 60 * HZ)

Jeff Layton's avatar
Jeff Layton committed
189
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
190
# undef  RPC_DEBUG_DATA
191
# define RPCDBG_FACILITY	RPCDBG_TRANS
192 193 194
#endif

#ifdef RPC_DEBUG_DATA
195
static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
196
{
197 198
	u8 *buf = (u8 *) packet;
	int j;
199

200
	dprintk("RPC:       %s\n", msg);
201 202 203 204 205 206 207 208 209 210 211 212
	for (j = 0; j < count && j < 128; j += 4) {
		if (!(j & 31)) {
			if (j)
				dprintk("\n");
			dprintk("0x%04x ", j);
		}
		dprintk("%02x%02x%02x%02x ",
			buf[j], buf[j+1], buf[j+2], buf[j+3]);
	}
	dprintk("\n");
}
#else
213
static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
214 215 216 217 218
{
	/* NOP */
}
#endif

219 220 221 222 223
static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
{
	return (struct rpc_xprt *) sk->sk_user_data;
}

224 225 226 227 228
static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
{
	return (struct sockaddr *) &xprt->addr;
}

229 230 231 232 233
static inline struct sockaddr_un *xs_addr_un(struct rpc_xprt *xprt)
{
	return (struct sockaddr_un *) &xprt->addr;
}

234
static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
235
{
236 237 238 239 240 241 242 243
	return (struct sockaddr_in *) &xprt->addr;
}

static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
{
	return (struct sockaddr_in6 *) &xprt->addr;
}

244
static void xs_format_common_peer_addresses(struct rpc_xprt *xprt)
245
{
246
	struct sockaddr *sap = xs_addr(xprt);
247 248
	struct sockaddr_in6 *sin6;
	struct sockaddr_in *sin;
249
	struct sockaddr_un *sun;
250
	char buf[128];
251

252
	switch (sap->sa_family) {
253 254 255 256 257 258
	case AF_LOCAL:
		sun = xs_addr_un(xprt);
		strlcpy(buf, sun->sun_path, sizeof(buf));
		xprt->address_strings[RPC_DISPLAY_ADDR] =
						kstrdup(buf, GFP_KERNEL);
		break;
259
	case AF_INET:
260 261 262
		(void)rpc_ntop(sap, buf, sizeof(buf));
		xprt->address_strings[RPC_DISPLAY_ADDR] =
						kstrdup(buf, GFP_KERNEL);
263
		sin = xs_addr_in(xprt);
264
		snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
265 266
		break;
	case AF_INET6:
267 268 269
		(void)rpc_ntop(sap, buf, sizeof(buf));
		xprt->address_strings[RPC_DISPLAY_ADDR] =
						kstrdup(buf, GFP_KERNEL);
270
		sin6 = xs_addr_in6(xprt);
271
		snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
272 273 274
		break;
	default:
		BUG();
275
	}
276

277
	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
278 279
}

280
static void xs_format_common_peer_ports(struct rpc_xprt *xprt)
281
{
282 283
	struct sockaddr *sap = xs_addr(xprt);
	char buf[128];
284

285
	snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
286
	xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
287

288
	snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
289 290
	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
}
291

292 293 294
static void xs_format_peer_addresses(struct rpc_xprt *xprt,
				     const char *protocol,
				     const char *netid)
295
{
296 297
	xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
	xprt->address_strings[RPC_DISPLAY_NETID] = netid;
298
	xs_format_common_peer_addresses(xprt);
299
	xs_format_common_peer_ports(xprt);
300
}
301

302
static void xs_update_peer_port(struct rpc_xprt *xprt)
303
{
304 305
	kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
	kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
306

307
	xs_format_common_peer_ports(xprt);
308 309 310 311
}

static void xs_free_peer_addresses(struct rpc_xprt *xprt)
{
312 313 314 315 316 317 318 319 320 321
	unsigned int i;

	for (i = 0; i < RPC_DISPLAY_MAX; i++)
		switch (i) {
		case RPC_DISPLAY_PROTO:
		case RPC_DISPLAY_NETID:
			continue;
		default:
			kfree(xprt->address_strings[i]);
		}
322 323
}

324 325
#define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)

326
static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
327 328 329 330
{
	struct msghdr msg = {
		.msg_name	= addr,
		.msg_namelen	= addrlen,
331 332 333 334 335
		.msg_flags	= XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
	};
	struct kvec iov = {
		.iov_base	= vec->iov_base + base,
		.iov_len	= vec->iov_len - base,
336 337
	};

338
	if (iov.iov_len != 0)
339 340 341 342
		return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
	return kernel_sendmsg(sock, &msg, NULL, 0, 0);
}

343
static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more, bool zerocopy, int *sent_p)
344
{
345 346
	ssize_t (*do_sendpage)(struct socket *sock, struct page *page,
			int offset, size_t size, int flags);
347 348
	struct page **ppage;
	unsigned int remainder;
349
	int err;
350 351 352 353 354

	remainder = xdr->page_len - base;
	base += xdr->page_base;
	ppage = xdr->pages + (base >> PAGE_SHIFT);
	base &= ~PAGE_MASK;
355 356 357
	do_sendpage = sock->ops->sendpage;
	if (!zerocopy)
		do_sendpage = sock_no_sendpage;
358 359 360
	for(;;) {
		unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
		int flags = XS_SENDMSG_FLAGS;
361

362
		remainder -= len;
363
		if (more)
364
			flags |= MSG_MORE;
365 366
		if (remainder != 0)
			flags |= MSG_SENDPAGE_NOTLAST | MSG_MORE;
367
		err = do_sendpage(sock, *ppage, base, len, flags);
368 369
		if (remainder == 0 || err != len)
			break;
370
		*sent_p += err;
371 372 373
		ppage++;
		base = 0;
	}
374 375 376 377 378
	if (err > 0) {
		*sent_p += err;
		err = 0;
	}
	return err;
379 380
}

381 382 383 384 385 386 387
/**
 * xs_sendpages - write pages directly to a socket
 * @sock: socket to send on
 * @addr: UDP only -- address of destination
 * @addrlen: UDP only -- length of destination address
 * @xdr: buffer containing this request
 * @base: starting position in the buffer
388
 * @zerocopy: true if it is safe to use sendpage()
389
 * @sent_p: return the total number of bytes successfully queued for sending
390
 *
391
 */
392
static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, bool zerocopy, int *sent_p)
393
{
394
	unsigned int remainder = xdr->len - base;
395 396
	int err = 0;
	int sent = 0;
397

398
	if (unlikely(!sock))
399
		return -ENOTSOCK;
400

401 402 403 404
	if (base != 0) {
		addr = NULL;
		addrlen = 0;
	}
405

406 407 408 409 410
	if (base < xdr->head[0].iov_len || addr != NULL) {
		unsigned int len = xdr->head[0].iov_len - base;
		remainder -= len;
		err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
		if (remainder == 0 || err != len)
411
			goto out;
412
		*sent_p += err;
413 414
		base = 0;
	} else
415
		base -= xdr->head[0].iov_len;
416

417 418 419
	if (base < xdr->page_len) {
		unsigned int len = xdr->page_len - base;
		remainder -= len;
420 421 422
		err = xs_send_pagedata(sock, xdr, base, remainder != 0, zerocopy, &sent);
		*sent_p += sent;
		if (remainder == 0 || sent != len)
423 424
			goto out;
		base = 0;
425 426 427 428
	} else
		base -= xdr->page_len;

	if (base >= xdr->tail[0].iov_len)
429
		return 0;
430
	err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
431
out:
432 433 434 435 436
	if (err > 0) {
		*sent_p += err;
		err = 0;
	}
	return err;
437 438
}

439 440 441 442 443 444 445
static void xs_nospace_callback(struct rpc_task *task)
{
	struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);

	transport->inet->sk_write_pending--;
}

446
/**
447 448
 * xs_nospace - place task on wait queue if transmit was incomplete
 * @task: task to put to sleep
449
 *
450
 */
451
static int xs_nospace(struct rpc_task *task)
452
{
453 454
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
455
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
456
	struct sock *sk = transport->inet;
457
	int ret = -EAGAIN;
458

459
	dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
460 461 462
			task->tk_pid, req->rq_slen - req->rq_bytes_sent,
			req->rq_slen);

463 464 465 466 467
	/* Protect against races with write_space */
	spin_lock_bh(&xprt->transport_lock);

	/* Don't race with disconnect */
	if (xprt_connected(xprt)) {
468 469 470 471
		/* wait for more buffer space */
		sk->sk_write_pending++;
		xprt_wait_for_buffer_space(task, xs_nospace_callback);
	} else
472
		ret = -ENOTCONN;
473

474
	spin_unlock_bh(&xprt->transport_lock);
475 476 477

	/* Race breaker in case memory is freed before above code is called */
	sk->sk_write_space(sk);
478
	return ret;
479 480
}

481 482 483 484 485 486 487 488 489 490
/*
 * Construct a stream transport record marker in @buf.
 */
static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
{
	u32 reclen = buf->len - sizeof(rpc_fraghdr);
	rpc_fraghdr *base = buf->head[0].iov_base;
	*base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen);
}

491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
/**
 * xs_local_send_request - write an RPC request to an AF_LOCAL socket
 * @task: RPC task that manages the state of an RPC request
 *
 * Return values:
 *        0:	The request has been sent
 *   EAGAIN:	The socket was blocked, please call again later to
 *		complete the request
 * ENOTCONN:	Caller needs to invoke connect logic then call again
 *    other:	Some other error occured, the request was not sent
 */
static int xs_local_send_request(struct rpc_task *task)
{
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
	struct sock_xprt *transport =
				container_of(xprt, struct sock_xprt, xprt);
	struct xdr_buf *xdr = &req->rq_snd_buf;
	int status;
510
	int sent = 0;
511 512 513 514 515 516

	xs_encode_stream_record_marker(&req->rq_snd_buf);

	xs_pktdump("packet data:",
			req->rq_svec->iov_base, req->rq_svec->iov_len);

517 518
	status = xs_sendpages(transport->sock, NULL, 0, xdr, req->rq_bytes_sent,
			      true, &sent);
519 520
	dprintk("RPC:       %s(%u) = %d\n",
			__func__, xdr->len - req->rq_bytes_sent, status);
521 522 523 524

	if (status == -EAGAIN && sock_writeable(transport->inet))
		status = -ENOBUFS;

525 526 527
	if (likely(sent > 0) || status == 0) {
		req->rq_bytes_sent += sent;
		req->rq_xmit_bytes_sent += sent;
528 529 530 531 532 533 534 535
		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
			req->rq_bytes_sent = 0;
			return 0;
		}
		status = -EAGAIN;
	}

	switch (status) {
536
	case -ENOBUFS:
537
		break;
538 539 540 541 542 543 544 545 546 547 548 549 550 551
	case -EAGAIN:
		status = xs_nospace(task);
		break;
	default:
		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
			-status);
	case -EPIPE:
		xs_close(xprt);
		status = -ENOTCONN;
	}

	return status;
}

552 553 554 555 556 557 558 559 560
/**
 * xs_udp_send_request - write an RPC request to a UDP socket
 * @task: address of RPC task that manages the state of an RPC request
 *
 * Return values:
 *        0:	The request has been sent
 *   EAGAIN:	The socket was blocked, please call again later to
 *		complete the request
 * ENOTCONN:	Caller needs to invoke connect logic then call again
Lucas De Marchi's avatar
Lucas De Marchi committed
561
 *    other:	Some other error occurred, the request was not sent
562 563 564 565 566
 */
static int xs_udp_send_request(struct rpc_task *task)
{
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
567
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
568
	struct xdr_buf *xdr = &req->rq_snd_buf;
569
	int sent = 0;
570
	int status;
571

572
	xs_pktdump("packet data:",
573 574 575
				req->rq_svec->iov_base,
				req->rq_svec->iov_len);

576 577
	if (!xprt_bound(xprt))
		return -ENOTCONN;
578 579
	status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen,
			      xdr, req->rq_bytes_sent, true, &sent);
580

581
	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
582
			xdr->len - req->rq_bytes_sent, status);
583

584 585 586 587
	/* firewall is blocking us, don't return -EAGAIN or we end up looping */
	if (status == -EPERM)
		goto process_status;

588 589 590
	if (status == -EAGAIN && sock_writeable(transport->inet))
		status = -ENOBUFS;

591 592 593
	if (sent > 0 || status == 0) {
		req->rq_xmit_bytes_sent += sent;
		if (sent >= req->rq_slen)
594 595
			return 0;
		/* Still some bytes left; set up for a retry later. */
596
		status = -EAGAIN;
597
	}
598

599
process_status:
600
	switch (status) {
601 602 603 604
	case -ENOTSOCK:
		status = -ENOTCONN;
		/* Should we call xs_close() here? */
		break;
605
	case -EAGAIN:
606
		status = xs_nospace(task);
607
		break;
608
	case -ENETUNREACH:
609
	case -ENOBUFS:
610
	case -EPIPE:
611
	case -ECONNREFUSED:
612
	case -EPERM:
613
		/* When the server has died, an ICMP port unreachable message
614
		 * prompts ECONNREFUSED. */
615 616 617 618
		break;
	default:
		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
			-status);
619
	}
620

621
	return status;
622 623
}

624
/**
625
 * xs_tcp_send_request - write an RPC request to a TCP socket
626 627 628
 * @task: address of RPC task that manages the state of an RPC request
 *
 * Return values:
629 630 631 632
 *        0:	The request has been sent
 *   EAGAIN:	The socket was blocked, please call again later to
 *		complete the request
 * ENOTCONN:	Caller needs to invoke connect logic then call again
Lucas De Marchi's avatar
Lucas De Marchi committed
633
 *    other:	Some other error occurred, the request was not sent
634 635
 *
 * XXX: In the case of soft timeouts, should we eventually give up
636
 *	if sendmsg is not able to make progress?
637
 */
638
static int xs_tcp_send_request(struct rpc_task *task)
639 640 641
{
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
642
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
643
	struct xdr_buf *xdr = &req->rq_snd_buf;
644
	bool zerocopy = true;
645
	int status;
646
	int sent;
647

648
	xs_encode_stream_record_marker(&req->rq_snd_buf);
649

650 651 652
	xs_pktdump("packet data:",
				req->rq_svec->iov_base,
				req->rq_svec->iov_len);
653 654 655 656 657 658
	/* Don't use zero copy if this is a resend. If the RPC call
	 * completes while the socket holds a reference to the pages,
	 * then we may end up resending corrupted data.
	 */
	if (task->tk_flags & RPC_TASK_SENT)
		zerocopy = false;
659 660 661

	/* Continue transmitting the packet/record. We must be careful
	 * to cope with writespace callbacks arriving _after_ we have
662
	 * called sendmsg(). */
663
	while (1) {
664 665 666
		sent = 0;
		status = xs_sendpages(transport->sock, NULL, 0, xdr,
				      req->rq_bytes_sent, zerocopy, &sent);
667

668
		dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
669
				xdr->len - req->rq_bytes_sent, status);
670

671 672
		/* If we've sent the entire packet, immediately
		 * reset the count of bytes sent. */
673 674
		req->rq_bytes_sent += sent;
		req->rq_xmit_bytes_sent += sent;
675 676 677 678
		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
			req->rq_bytes_sent = 0;
			return 0;
		}
679

680 681 682 683 684 685
		if (status < 0)
			break;
		if (sent == 0) {
			status = -EAGAIN;
			break;
		}
686
	}
687 688
	if (status == -EAGAIN && sk_stream_is_writeable(transport->inet))
		status = -ENOBUFS;
689

690
	switch (status) {
691 692 693 694
	case -ENOTSOCK:
		status = -ENOTCONN;
		/* Should we call xs_close() here? */
		break;
695
	case -EAGAIN:
696
		status = xs_nospace(task);
697 698
		break;
	case -ECONNRESET:
699
	case -ECONNREFUSED:
700
	case -ENOTCONN:
701
	case -EADDRINUSE:
702
	case -ENOBUFS:
703
	case -EPIPE:
704 705 706 707
		break;
	default:
		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
			-status);
708
	}
709

710 711 712
	return status;
}

713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730
/**
 * xs_tcp_release_xprt - clean up after a tcp transmission
 * @xprt: transport
 * @task: rpc task
 *
 * This cleans up if an error causes us to abort the transmission of a request.
 * In this case, the socket may need to be reset in order to avoid confusing
 * the server.
 */
static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
{
	struct rpc_rqst *req;

	if (task != xprt->snd_task)
		return;
	if (task == NULL)
		goto out_release;
	req = task->tk_rqstp;
731 732
	if (req == NULL)
		goto out_release;
733 734 735 736
	if (req->rq_bytes_sent == 0)
		goto out_release;
	if (req->rq_bytes_sent == req->rq_snd_buf.len)
		goto out_release;
737
	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
738 739 740 741
out_release:
	xprt_release_xprt(xprt, task);
}

742 743 744 745 746
static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
{
	transport->old_data_ready = sk->sk_data_ready;
	transport->old_state_change = sk->sk_state_change;
	transport->old_write_space = sk->sk_write_space;
747
	transport->old_error_report = sk->sk_error_report;
748 749 750 751 752 753 754
}

static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
{
	sk->sk_data_ready = transport->old_data_ready;
	sk->sk_state_change = transport->old_state_change;
	sk->sk_write_space = transport->old_write_space;
755 756 757
	sk->sk_error_report = transport->old_error_report;
}

758 759 760 761 762 763 764 765 766 767 768 769 770 771 772
static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
{
	smp_mb__before_atomic();
	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
	clear_bit(XPRT_CLOSING, &xprt->state);
	smp_mb__after_atomic();
}

static void xs_sock_mark_closed(struct rpc_xprt *xprt)
{
	xs_sock_reset_connection_flags(xprt);
	/* Mark transport as closed and wake up all pending tasks */
	xprt_disconnect_done(xprt);
}

773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791
/**
 * xs_error_report - callback to handle TCP socket state errors
 * @sk: socket
 *
 * Note: we don't call sock_error() since there may be a rpc_task
 * using the socket, and so we don't want to clear sk->sk_err.
 */
static void xs_error_report(struct sock *sk)
{
	struct rpc_xprt *xprt;
	int err;

	read_lock_bh(&sk->sk_callback_lock);
	if (!(xprt = xprt_from_sock(sk)))
		goto out;

	err = -sk->sk_err;
	if (err == 0)
		goto out;
792 793 794
	/* Is this a reset event? */
	if (sk->sk_state == TCP_CLOSE)
		xs_sock_mark_closed(xprt);
795 796
	dprintk("RPC:       xs_error_report client %p, error=%d...\n",
			xprt, -err);
797
	trace_rpc_socket_error(xprt, sk->sk_socket, err);
798 799 800
	xprt_wake_pending_tasks(xprt, err);
 out:
	read_unlock_bh(&sk->sk_callback_lock);
801 802
}

803
static void xs_reset_transport(struct sock_xprt *transport)
804
{
805 806
	struct socket *sock = transport->sock;
	struct sock *sk = transport->inet;
807
	struct rpc_xprt *xprt = &transport->xprt;
808

809 810
	if (sk == NULL)
		return;
811

812 813 814
	if (atomic_read(&transport->xprt.swapper))
		sk_clear_memalloc(sk);

815 816
	kernel_sock_shutdown(sock, SHUT_RDWR);

817
	mutex_lock(&transport->recv_mutex);
818
	write_lock_bh(&sk->sk_callback_lock);
819 820
	transport->inet = NULL;
	transport->sock = NULL;
821

822
	sk->sk_user_data = NULL;
823 824

	xs_restore_old_callbacks(transport, sk);
825
	xprt_clear_connected(xprt);
826
	write_unlock_bh(&sk->sk_callback_lock);
827
	xs_sock_reset_connection_flags(xprt);
828
	mutex_unlock(&transport->recv_mutex);
829

830
	trace_rpc_socket_close(xprt, sock);
831
	sock_release(sock);
832 833 834 835 836 837 838 839
}

/**
 * xs_close - close a socket
 * @xprt: transport
 *
 * This is used when all requests are complete; ie, no DRC state remains
 * on the server we want to save.
840 841 842
 *
 * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
 * xs_reset_transport() zeroing the socket from underneath a writer.
843 844 845 846 847 848 849 850
 */
static void xs_close(struct rpc_xprt *xprt)
{
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);

	dprintk("RPC:       xs_close xprt %p\n", xprt);

	xs_reset_transport(transport);
851
	xprt->reestablish_timeout = 0;
852

853
	xprt_disconnect_done(xprt);
854 855
}

856 857 858 859 860 861 862
static void xs_inject_disconnect(struct rpc_xprt *xprt)
{
	dprintk("RPC:       injecting transport disconnect on xprt=%p\n",
		xprt);
	xprt_disconnect_done(xprt);
}

863 864 865 866 867 868
static void xs_xprt_free(struct rpc_xprt *xprt)
{
	xs_free_peer_addresses(xprt);
	xprt_free(xprt);
}

869 870 871 872 873 874
/**
 * xs_destroy - prepare to shutdown a transport
 * @xprt: doomed transport
 *
 */
static void xs_destroy(struct rpc_xprt *xprt)
875
{
876 877
	struct sock_xprt *transport = container_of(xprt,
			struct sock_xprt, xprt);
878
	dprintk("RPC:       xs_destroy xprt %p\n", xprt);
879

880
	cancel_delayed_work_sync(&transport->connect_worker);
Trond Myklebust's avatar
Trond Myklebust committed
881
	xs_close(xprt);
882
	cancel_work_sync(&transport->recv_worker);
883
	xs_xprt_free(xprt);
Trond Myklebust's avatar
Trond Myklebust committed
884
	module_put(THIS_MODULE);
885 886
}

887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902
static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
{
	struct xdr_skb_reader desc = {
		.skb		= skb,
		.offset		= sizeof(rpc_fraghdr),
		.count		= skb->len - sizeof(rpc_fraghdr),
	};

	if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0)
		return -1;
	if (desc.count)
		return -1;
	return 0;
}

/**
903 904 905 906
 * xs_local_data_read_skb
 * @xprt: transport
 * @sk: socket
 * @skb: skbuff
907 908 909
 *
 * Currently this assumes we can read the whole reply in a single gulp.
 */
910 911 912
static void xs_local_data_read_skb(struct rpc_xprt *xprt,
		struct sock *sk,
		struct sk_buff *skb)
913 914 915
{
	struct rpc_task *task;
	struct rpc_rqst *rovr;
916
	int repsize, copied;
917 918 919 920 921 922
	u32 _xid;
	__be32 *xp;

	repsize = skb->len - sizeof(rpc_fraghdr);
	if (repsize < 4) {
		dprintk("RPC:       impossible RPC reply size %d\n", repsize);
923
		return;
924 925 926 927 928
	}

	/* Copy the XID from the skb... */
	xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
	if (xp == NULL)
929
		return;
930 931

	/* Look up and lock the request corresponding to the given XID */
932
	spin_lock_bh(&xprt->transport_lock);
933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949
	rovr = xprt_lookup_rqst(xprt, *xp);
	if (!rovr)
		goto out_unlock;
	task = rovr->rq_task;

	copied = rovr->rq_private_buf.buflen;
	if (copied > repsize)
		copied = repsize;

	if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
		dprintk("RPC:       sk_buff copy failed\n");
		goto out_unlock;
	}

	xprt_complete_rqst(task, copied);

 out_unlock:
950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978
	spin_unlock_bh(&xprt->transport_lock);
}

static void xs_local_data_receive(struct sock_xprt *transport)
{
	struct sk_buff *skb;
	struct sock *sk;
	int err;

	mutex_lock(&transport->recv_mutex);
	sk = transport->inet;
	if (sk == NULL)
		goto out;
	for (;;) {
		skb = skb_recv_datagram(sk, 0, 1, &err);
		if (skb == NULL)
			break;
		xs_local_data_read_skb(&transport->xprt, sk, skb);
		skb_free_datagram(sk, skb);
	}
out:
	mutex_unlock(&transport->recv_mutex);
}

static void xs_local_data_receive_workfn(struct work_struct *work)
{
	struct sock_xprt *transport =
		container_of(work, struct sock_xprt, recv_worker);
	xs_local_data_receive(transport);
979 980
}

981
/**
982 983 984 985
 * xs_udp_data_read_skb - receive callback for UDP sockets
 * @xprt: transport
 * @sk: socket
 * @skb: skbuff
986
 *
987
 */
988 989 990
static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
		struct sock *sk,
		struct sk_buff *skb)
991
{