xprtsock.c 78.7 KB
Newer Older
1
2
3
4
5
/*
 * linux/net/sunrpc/xprtsock.c
 *
 * Client-side transport implementation for sockets.
 *
6
7
 * TCP callback races fixes (C) 1998 Red Hat
 * TCP send fixes (C) 1998 Red Hat
8
9
10
11
12
13
 * TCP NFS related read + write fixes
 *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
 *
 * Rewrite of larges part of the code in order to stabilize TCP stuff.
 * Fix behaviour when socket buffer is full.
 *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14
15
 *
 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16
17
18
 *
 * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
 *   <gilles.quillard@bull.net>
19
20
21
 */

#include <linux/types.h>
22
#include <linux/string.h>
23
#include <linux/slab.h>
24
#include <linux/module.h>
25
26
27
28
29
30
31
#include <linux/capability.h>
#include <linux/pagemap.h>
#include <linux/errno.h>
#include <linux/socket.h>
#include <linux/in.h>
#include <linux/net.h>
#include <linux/mm.h>
32
#include <linux/un.h>
33
34
35
#include <linux/udp.h>
#include <linux/tcp.h>
#include <linux/sunrpc/clnt.h>
36
#include <linux/sunrpc/addr.h>
37
#include <linux/sunrpc/sched.h>
38
#include <linux/sunrpc/svcsock.h>
39
#include <linux/sunrpc/xprtsock.h>
40
#include <linux/file.h>
41
#ifdef CONFIG_SUNRPC_BACKCHANNEL
42
43
#include <linux/sunrpc/bc_xprt.h>
#endif
44
45
46
47
48
49

#include <net/sock.h>
#include <net/checksum.h>
#include <net/udp.h>
#include <net/tcp.h>

50
#include "sunrpc.h"
51
52
53

static void xs_close(struct rpc_xprt *xprt);

54
55
56
/*
 * xprtsock tunables
 */
57
58
59
static unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
static unsigned int xprt_tcp_slot_table_entries = RPC_MIN_SLOT_TABLE;
static unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE;
60

61
62
static unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
static unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
63

64
#define XS_TCP_LINGER_TO	(15U * HZ)
65
static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
66

67
68
69
70
71
72
73
74
75
76
77
78
79
/*
 * We can register our own files under /proc/sys/sunrpc by
 * calling register_sysctl_table() again.  The files in that
 * directory become the union of all files registered there.
 *
 * We simply need to make sure that we don't collide with
 * someone else's file names!
 */

#ifdef RPC_DEBUG

static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
80
static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT;
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;

static struct ctl_table_header *sunrpc_table_header;

/*
 * FIXME: changing the UDP slot table size should also resize the UDP
 *        socket buffers for existing UDP transports
 */
static ctl_table xs_tunables_table[] = {
	{
		.procname	= "udp_slot_table_entries",
		.data		= &xprt_udp_slot_table_entries,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
96
		.proc_handler	= proc_dointvec_minmax,
97
98
99
100
101
102
103
104
		.extra1		= &min_slot_table_size,
		.extra2		= &max_slot_table_size
	},
	{
		.procname	= "tcp_slot_table_entries",
		.data		= &xprt_tcp_slot_table_entries,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
105
		.proc_handler	= proc_dointvec_minmax,
106
107
108
		.extra1		= &min_slot_table_size,
		.extra2		= &max_slot_table_size
	},
109
110
111
112
113
114
115
116
117
	{
		.procname	= "tcp_max_slot_table_entries",
		.data		= &xprt_max_tcp_slot_table_entries,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
		.proc_handler	= proc_dointvec_minmax,
		.extra1		= &min_slot_table_size,
		.extra2		= &max_tcp_slot_table_limit
	},
118
119
120
121
122
	{
		.procname	= "min_resvport",
		.data		= &xprt_min_resvport,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
123
		.proc_handler	= proc_dointvec_minmax,
124
125
126
127
128
129
130
131
		.extra1		= &xprt_min_resvport_limit,
		.extra2		= &xprt_max_resvport_limit
	},
	{
		.procname	= "max_resvport",
		.data		= &xprt_max_resvport,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
132
		.proc_handler	= proc_dointvec_minmax,
133
134
135
		.extra1		= &xprt_min_resvport_limit,
		.extra2		= &xprt_max_resvport_limit
	},
136
137
138
139
140
	{
		.procname	= "tcp_fin_timeout",
		.data		= &xs_tcp_fin_timeout,
		.maxlen		= sizeof(xs_tcp_fin_timeout),
		.mode		= 0644,
141
		.proc_handler	= proc_dointvec_jiffies,
142
	},
143
	{ },
144
145
146
147
148
149
150
151
};

static ctl_table sunrpc_table[] = {
	{
		.procname	= "sunrpc",
		.mode		= 0555,
		.child		= xs_tunables_table
	},
152
	{ },
153
154
155
156
};

#endif

157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
/*
 * Wait duration for a reply from the RPC portmapper.
 */
#define XS_BIND_TO		(60U * HZ)

/*
 * Delay if a UDP socket connect error occurs.  This is most likely some
 * kind of resource problem on the local host.
 */
#define XS_UDP_REEST_TO		(2U * HZ)

/*
 * The reestablish timeout allows clients to delay for a bit before attempting
 * to reconnect to a server that just dropped our connection.
 *
 * We implement an exponential backoff when trying to reestablish a TCP
 * transport connection with the server.  Some servers like to drop a TCP
 * connection when they are overworked, so we start with a short timeout and
 * increase over time if the server is down or not responding.
 */
#define XS_TCP_INIT_REEST_TO	(3U * HZ)
#define XS_TCP_MAX_REEST_TO	(5U * 60 * HZ)

/*
 * TCP idle timeout; client drops the transport socket if it is idle
 * for this long.  Note that we also timeout UDP sockets to prevent
 * holding port numbers when there is no RPC traffic.
 */
#define XS_IDLE_DISC_TO		(5U * 60 * HZ)

187
188
#ifdef RPC_DEBUG
# undef  RPC_DEBUG_DATA
189
# define RPCDBG_FACILITY	RPCDBG_TRANS
190
191
192
#endif

#ifdef RPC_DEBUG_DATA
193
static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
194
{
195
196
	u8 *buf = (u8 *) packet;
	int j;
197

198
	dprintk("RPC:       %s\n", msg);
199
200
201
202
203
204
205
206
207
208
209
210
	for (j = 0; j < count && j < 128; j += 4) {
		if (!(j & 31)) {
			if (j)
				dprintk("\n");
			dprintk("0x%04x ", j);
		}
		dprintk("%02x%02x%02x%02x ",
			buf[j], buf[j+1], buf[j+2], buf[j+3]);
	}
	dprintk("\n");
}
#else
211
static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
212
213
214
215
216
{
	/* NOP */
}
#endif

217
218
struct sock_xprt {
	struct rpc_xprt		xprt;
219
220
221
222
223
224

	/*
	 * Network layer
	 */
	struct socket *		sock;
	struct sock *		inet;
225
226
227
228
229

	/*
	 * State of TCP reply receive
	 */
	__be32			tcp_fraghdr,
230
231
				tcp_xid,
				tcp_calldir;
232
233
234
235
236
237

	u32			tcp_offset,
				tcp_reclen;

	unsigned long		tcp_copied,
				tcp_flags;
238
239
240
241

	/*
	 * Connection of transports
	 */
242
	struct delayed_work	connect_worker;
243
244
	struct sockaddr_storage	srcaddr;
	unsigned short		srcport;
245
246
247
248
249
250

	/*
	 * UDP socket buffer size parameters
	 */
	size_t			rcvsize,
				sndsize;
251
252
253
254
255
256
257

	/*
	 * Saved socket callback addresses
	 */
	void			(*old_data_ready)(struct sock *, int);
	void			(*old_state_change)(struct sock *);
	void			(*old_write_space)(struct sock *);
258
259
};

260
261
262
263
264
265
266
/*
 * TCP receive state flags
 */
#define TCP_RCV_LAST_FRAG	(1UL << 0)
#define TCP_RCV_COPY_FRAGHDR	(1UL << 1)
#define TCP_RCV_COPY_XID	(1UL << 2)
#define TCP_RCV_COPY_DATA	(1UL << 3)
267
268
#define TCP_RCV_READ_CALLDIR	(1UL << 4)
#define TCP_RCV_COPY_CALLDIR	(1UL << 5)
269
270
271
272

/*
 * TCP RPC flags
 */
273
#define TCP_RPC_REPLY		(1UL << 6)
274

275
276
277
278
279
static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
{
	return (struct sockaddr *) &xprt->addr;
}

280
281
282
283
284
static inline struct sockaddr_un *xs_addr_un(struct rpc_xprt *xprt)
{
	return (struct sockaddr_un *) &xprt->addr;
}

285
static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
286
{
287
288
289
290
291
292
293
294
	return (struct sockaddr_in *) &xprt->addr;
}

static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
{
	return (struct sockaddr_in6 *) &xprt->addr;
}

295
static void xs_format_common_peer_addresses(struct rpc_xprt *xprt)
296
{
297
	struct sockaddr *sap = xs_addr(xprt);
298
299
	struct sockaddr_in6 *sin6;
	struct sockaddr_in *sin;
300
	struct sockaddr_un *sun;
301
	char buf[128];
302

303
	switch (sap->sa_family) {
304
305
306
307
308
309
	case AF_LOCAL:
		sun = xs_addr_un(xprt);
		strlcpy(buf, sun->sun_path, sizeof(buf));
		xprt->address_strings[RPC_DISPLAY_ADDR] =
						kstrdup(buf, GFP_KERNEL);
		break;
310
	case AF_INET:
311
312
313
		(void)rpc_ntop(sap, buf, sizeof(buf));
		xprt->address_strings[RPC_DISPLAY_ADDR] =
						kstrdup(buf, GFP_KERNEL);
314
		sin = xs_addr_in(xprt);
315
		snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
316
317
		break;
	case AF_INET6:
318
319
320
		(void)rpc_ntop(sap, buf, sizeof(buf));
		xprt->address_strings[RPC_DISPLAY_ADDR] =
						kstrdup(buf, GFP_KERNEL);
321
		sin6 = xs_addr_in6(xprt);
322
		snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
323
324
325
		break;
	default:
		BUG();
326
	}
327

328
	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
329
330
}

331
static void xs_format_common_peer_ports(struct rpc_xprt *xprt)
332
{
333
334
	struct sockaddr *sap = xs_addr(xprt);
	char buf[128];
335

336
	snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
337
	xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
338

339
	snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
340
341
	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
}
342

343
344
345
static void xs_format_peer_addresses(struct rpc_xprt *xprt,
				     const char *protocol,
				     const char *netid)
346
{
347
348
	xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
	xprt->address_strings[RPC_DISPLAY_NETID] = netid;
349
	xs_format_common_peer_addresses(xprt);
350
	xs_format_common_peer_ports(xprt);
351
}
352

353
static void xs_update_peer_port(struct rpc_xprt *xprt)
354
{
355
356
	kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
	kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
357

358
	xs_format_common_peer_ports(xprt);
359
360
361
362
}

static void xs_free_peer_addresses(struct rpc_xprt *xprt)
{
363
364
365
366
367
368
369
370
371
372
	unsigned int i;

	for (i = 0; i < RPC_DISPLAY_MAX; i++)
		switch (i) {
		case RPC_DISPLAY_PROTO:
		case RPC_DISPLAY_NETID:
			continue;
		default:
			kfree(xprt->address_strings[i]);
		}
373
374
}

375
376
#define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)

377
static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
378
379
380
381
{
	struct msghdr msg = {
		.msg_name	= addr,
		.msg_namelen	= addrlen,
382
383
384
385
386
		.msg_flags	= XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
	};
	struct kvec iov = {
		.iov_base	= vec->iov_base + base,
		.iov_len	= vec->iov_len - base,
387
388
	};

389
	if (iov.iov_len != 0)
390
391
392
393
		return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
	return kernel_sendmsg(sock, &msg, NULL, 0, 0);
}

394
static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
395
{
396
397
398
399
400
401
402
403
404
405
406
	struct page **ppage;
	unsigned int remainder;
	int err, sent = 0;

	remainder = xdr->page_len - base;
	base += xdr->page_base;
	ppage = xdr->pages + (base >> PAGE_SHIFT);
	base &= ~PAGE_MASK;
	for(;;) {
		unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
		int flags = XS_SENDMSG_FLAGS;
407

408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
		remainder -= len;
		if (remainder != 0 || more)
			flags |= MSG_MORE;
		err = sock->ops->sendpage(sock, *ppage, base, len, flags);
		if (remainder == 0 || err != len)
			break;
		sent += err;
		ppage++;
		base = 0;
	}
	if (sent == 0)
		return err;
	if (err > 0)
		sent += err;
	return sent;
423
424
}

425
426
427
428
429
430
431
432
/**
 * xs_sendpages - write pages directly to a socket
 * @sock: socket to send on
 * @addr: UDP only -- address of destination
 * @addrlen: UDP only -- length of destination address
 * @xdr: buffer containing this request
 * @base: starting position in the buffer
 *
433
 */
434
static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
435
{
436
437
	unsigned int remainder = xdr->len - base;
	int err, sent = 0;
438

439
	if (unlikely(!sock))
440
		return -ENOTSOCK;
441
442

	clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
443
444
445
446
	if (base != 0) {
		addr = NULL;
		addrlen = 0;
	}
447

448
449
450
451
452
	if (base < xdr->head[0].iov_len || addr != NULL) {
		unsigned int len = xdr->head[0].iov_len - base;
		remainder -= len;
		err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
		if (remainder == 0 || err != len)
453
			goto out;
454
		sent += err;
455
456
		base = 0;
	} else
457
		base -= xdr->head[0].iov_len;
458

459
460
461
462
463
	if (base < xdr->page_len) {
		unsigned int len = xdr->page_len - base;
		remainder -= len;
		err = xs_send_pagedata(sock, xdr, base, remainder != 0);
		if (remainder == 0 || err != len)
464
			goto out;
465
		sent += err;
466
		base = 0;
467
468
469
470
471
472
	} else
		base -= xdr->page_len;

	if (base >= xdr->tail[0].iov_len)
		return sent;
	err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
473
out:
474
475
476
477
478
	if (sent == 0)
		return err;
	if (err > 0)
		sent += err;
	return sent;
479
480
}

481
482
483
484
485
486
487
488
static void xs_nospace_callback(struct rpc_task *task)
{
	struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);

	transport->inet->sk_write_pending--;
	clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
}

489
/**
490
491
 * xs_nospace - place task on wait queue if transmit was incomplete
 * @task: task to put to sleep
492
 *
493
 */
494
static int xs_nospace(struct rpc_task *task)
495
{
496
497
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
498
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
499
	int ret = -EAGAIN;
500

501
	dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
502
503
504
			task->tk_pid, req->rq_slen - req->rq_bytes_sent,
			req->rq_slen);

505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
	/* Protect against races with write_space */
	spin_lock_bh(&xprt->transport_lock);

	/* Don't race with disconnect */
	if (xprt_connected(xprt)) {
		if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
			/*
			 * Notify TCP that we're limited by the application
			 * window size
			 */
			set_bit(SOCK_NOSPACE, &transport->sock->flags);
			transport->inet->sk_write_pending++;
			/* ...and wait for more buffer space */
			xprt_wait_for_buffer_space(task, xs_nospace_callback);
		}
	} else {
		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
522
		ret = -ENOTCONN;
523
	}
524

525
	spin_unlock_bh(&xprt->transport_lock);
526
	return ret;
527
528
}

529
530
531
532
533
534
535
536
537
538
/*
 * Construct a stream transport record marker in @buf.
 */
static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
{
	u32 reclen = buf->len - sizeof(rpc_fraghdr);
	rpc_fraghdr *base = buf->head[0].iov_base;
	*base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen);
}

539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
/**
 * xs_local_send_request - write an RPC request to an AF_LOCAL socket
 * @task: RPC task that manages the state of an RPC request
 *
 * Return values:
 *        0:	The request has been sent
 *   EAGAIN:	The socket was blocked, please call again later to
 *		complete the request
 * ENOTCONN:	Caller needs to invoke connect logic then call again
 *    other:	Some other error occured, the request was not sent
 */
static int xs_local_send_request(struct rpc_task *task)
{
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
	struct sock_xprt *transport =
				container_of(xprt, struct sock_xprt, xprt);
	struct xdr_buf *xdr = &req->rq_snd_buf;
	int status;

	xs_encode_stream_record_marker(&req->rq_snd_buf);

	xs_pktdump("packet data:",
			req->rq_svec->iov_base, req->rq_svec->iov_len);

	status = xs_sendpages(transport->sock, NULL, 0,
						xdr, req->rq_bytes_sent);
	dprintk("RPC:       %s(%u) = %d\n",
			__func__, xdr->len - req->rq_bytes_sent, status);
	if (likely(status >= 0)) {
		req->rq_bytes_sent += status;
		req->rq_xmit_bytes_sent += status;
		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
			req->rq_bytes_sent = 0;
			return 0;
		}
		status = -EAGAIN;
	}

	switch (status) {
	case -EAGAIN:
		status = xs_nospace(task);
		break;
	default:
		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
			-status);
	case -EPIPE:
		xs_close(xprt);
		status = -ENOTCONN;
	}

	return status;
}

593
594
595
596
597
598
599
600
601
/**
 * xs_udp_send_request - write an RPC request to a UDP socket
 * @task: address of RPC task that manages the state of an RPC request
 *
 * Return values:
 *        0:	The request has been sent
 *   EAGAIN:	The socket was blocked, please call again later to
 *		complete the request
 * ENOTCONN:	Caller needs to invoke connect logic then call again
Lucas De Marchi's avatar
Lucas De Marchi committed
602
 *    other:	Some other error occurred, the request was not sent
603
604
605
606
607
 */
static int xs_udp_send_request(struct rpc_task *task)
{
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
608
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
609
610
	struct xdr_buf *xdr = &req->rq_snd_buf;
	int status;
611

612
	xs_pktdump("packet data:",
613
614
615
				req->rq_svec->iov_base,
				req->rq_svec->iov_len);

616
617
	if (!xprt_bound(xprt))
		return -ENOTCONN;
618
	status = xs_sendpages(transport->sock,
619
			      xs_addr(xprt),
620
621
			      xprt->addrlen, xdr,
			      req->rq_bytes_sent);
622

623
	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
624
			xdr->len - req->rq_bytes_sent, status);
625

626
	if (status >= 0) {
627
		req->rq_xmit_bytes_sent += status;
628
629
630
		if (status >= req->rq_slen)
			return 0;
		/* Still some bytes left; set up for a retry later. */
631
		status = -EAGAIN;
632
	}
633

634
	switch (status) {
635
636
637
638
	case -ENOTSOCK:
		status = -ENOTCONN;
		/* Should we call xs_close() here? */
		break;
639
	case -EAGAIN:
640
		status = xs_nospace(task);
641
		break;
642
643
644
	default:
		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
			-status);
645
646
	case -ENETUNREACH:
	case -EPIPE:
647
648
	case -ECONNREFUSED:
		/* When the server has died, an ICMP port unreachable message
649
		 * prompts ECONNREFUSED. */
650
		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
651
	}
652

653
	return status;
654
655
}

656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
/**
 * xs_tcp_shutdown - gracefully shut down a TCP socket
 * @xprt: transport
 *
 * Initiates a graceful shutdown of the TCP socket by calling the
 * equivalent of shutdown(SHUT_WR);
 */
static void xs_tcp_shutdown(struct rpc_xprt *xprt)
{
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
	struct socket *sock = transport->sock;

	if (sock != NULL)
		kernel_sock_shutdown(sock, SHUT_WR);
}

672
/**
673
 * xs_tcp_send_request - write an RPC request to a TCP socket
674
675
676
 * @task: address of RPC task that manages the state of an RPC request
 *
 * Return values:
677
678
679
680
 *        0:	The request has been sent
 *   EAGAIN:	The socket was blocked, please call again later to
 *		complete the request
 * ENOTCONN:	Caller needs to invoke connect logic then call again
Lucas De Marchi's avatar
Lucas De Marchi committed
681
 *    other:	Some other error occurred, the request was not sent
682
683
 *
 * XXX: In the case of soft timeouts, should we eventually give up
684
 *	if sendmsg is not able to make progress?
685
 */
686
static int xs_tcp_send_request(struct rpc_task *task)
687
688
689
{
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
690
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
691
	struct xdr_buf *xdr = &req->rq_snd_buf;
692
	int status;
693

694
	xs_encode_stream_record_marker(&req->rq_snd_buf);
695

696
697
698
	xs_pktdump("packet data:",
				req->rq_svec->iov_base,
				req->rq_svec->iov_len);
699
700
701

	/* Continue transmitting the packet/record. We must be careful
	 * to cope with writespace callbacks arriving _after_ we have
702
	 * called sendmsg(). */
703
	while (1) {
704
705
		status = xs_sendpages(transport->sock,
					NULL, 0, xdr, req->rq_bytes_sent);
706

707
		dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
708
				xdr->len - req->rq_bytes_sent, status);
709

710
		if (unlikely(status < 0))
711
712
			break;

713
714
715
		/* If we've sent the entire packet, immediately
		 * reset the count of bytes sent. */
		req->rq_bytes_sent += status;
716
		req->rq_xmit_bytes_sent += status;
717
718
719
720
		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
			req->rq_bytes_sent = 0;
			return 0;
		}
721

722
723
		if (status != 0)
			continue;
724
		status = -EAGAIN;
725
		break;
726
727
	}

728
	switch (status) {
729
730
731
732
	case -ENOTSOCK:
		status = -ENOTCONN;
		/* Should we call xs_close() here? */
		break;
733
	case -EAGAIN:
734
		status = xs_nospace(task);
735
		break;
736
737
738
	default:
		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
			-status);
739
	case -ECONNRESET:
740
741
		xs_tcp_shutdown(xprt);
	case -ECONNREFUSED:
742
	case -ENOTCONN:
743
	case -EPIPE:
744
		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
745
	}
746

747
748
749
	return status;
}

750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
/**
 * xs_tcp_release_xprt - clean up after a tcp transmission
 * @xprt: transport
 * @task: rpc task
 *
 * This cleans up if an error causes us to abort the transmission of a request.
 * In this case, the socket may need to be reset in order to avoid confusing
 * the server.
 */
static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
{
	struct rpc_rqst *req;

	if (task != xprt->snd_task)
		return;
	if (task == NULL)
		goto out_release;
	req = task->tk_rqstp;
768
769
	if (req == NULL)
		goto out_release;
770
771
772
773
	if (req->rq_bytes_sent == 0)
		goto out_release;
	if (req->rq_bytes_sent == req->rq_snd_buf.len)
		goto out_release;
774
	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
775
776
777
778
out_release:
	xprt_release_xprt(xprt, task);
}

779
780
781
782
783
784
785
786
787
788
789
790
791
792
static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
{
	transport->old_data_ready = sk->sk_data_ready;
	transport->old_state_change = sk->sk_state_change;
	transport->old_write_space = sk->sk_write_space;
}

static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
{
	sk->sk_data_ready = transport->old_data_ready;
	sk->sk_state_change = transport->old_state_change;
	sk->sk_write_space = transport->old_write_space;
}

793
static void xs_reset_transport(struct sock_xprt *transport)
794
{
795
796
	struct socket *sock = transport->sock;
	struct sock *sk = transport->inet;
797

798
799
	if (sk == NULL)
		return;
800

801
802
	transport->srcport = 0;

803
	write_lock_bh(&sk->sk_callback_lock);
804
805
	transport->inet = NULL;
	transport->sock = NULL;
806

807
	sk->sk_user_data = NULL;
808
809

	xs_restore_old_callbacks(transport, sk);
810
811
	write_unlock_bh(&sk->sk_callback_lock);

812
	sk->sk_no_check = 0;
813
814

	sock_release(sock);
815
816
817
818
819
820
821
822
}

/**
 * xs_close - close a socket
 * @xprt: transport
 *
 * This is used when all requests are complete; ie, no DRC state remains
 * on the server we want to save.
823
824
825
 *
 * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
 * xs_reset_transport() zeroing the socket from underneath a writer.
826
827
828
829
830
831
832
833
 */
static void xs_close(struct rpc_xprt *xprt)
{
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);

	dprintk("RPC:       xs_close xprt %p\n", xprt);

	xs_reset_transport(transport);
834
	xprt->reestablish_timeout = 0;
835

836
	smp_mb__before_clear_bit();
837
	clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
838
	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
839
	clear_bit(XPRT_CLOSING, &xprt->state);
840
	smp_mb__after_clear_bit();
841
	xprt_disconnect_done(xprt);
842
843
}

844
845
846
847
848
849
850
851
static void xs_tcp_close(struct rpc_xprt *xprt)
{
	if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
		xs_close(xprt);
	else
		xs_tcp_shutdown(xprt);
}

852
853
854
855
856
857
858
859
static void xs_local_destroy(struct rpc_xprt *xprt)
{
	xs_close(xprt);
	xs_free_peer_addresses(xprt);
	xprt_free(xprt);
	module_put(THIS_MODULE);
}

860
861
862
863
864
865
/**
 * xs_destroy - prepare to shutdown a transport
 * @xprt: doomed transport
 *
 */
static void xs_destroy(struct rpc_xprt *xprt)
866
{
867
868
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);

869
	dprintk("RPC:       xs_destroy xprt %p\n", xprt);
870

871
	cancel_delayed_work_sync(&transport->connect_worker);
872

873
	xs_local_destroy(xprt);
874
875
}

876
877
878
879
880
static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
{
	return (struct rpc_xprt *) sk->sk_user_data;
}

881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
{
	struct xdr_skb_reader desc = {
		.skb		= skb,
		.offset		= sizeof(rpc_fraghdr),
		.count		= skb->len - sizeof(rpc_fraghdr),
	};

	if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0)
		return -1;
	if (desc.count)
		return -1;
	return 0;
}

/**
 * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets
 * @sk: socket with data to read
 * @len: how much data to read
 *
 * Currently this assumes we can read the whole reply in a single gulp.
 */
static void xs_local_data_ready(struct sock *sk, int len)
{
	struct rpc_task *task;
	struct rpc_xprt *xprt;
	struct rpc_rqst *rovr;
	struct sk_buff *skb;
	int err, repsize, copied;
	u32 _xid;
	__be32 *xp;

	read_lock_bh(&sk->sk_callback_lock);
	dprintk("RPC:       %s...\n", __func__);
	xprt = xprt_from_sock(sk);
	if (xprt == NULL)
		goto out;

	skb = skb_recv_datagram(sk, 0, 1, &err);
	if (skb == NULL)
		goto out;

	repsize = skb->len - sizeof(rpc_fraghdr);
	if (repsize < 4) {
		dprintk("RPC:       impossible RPC reply size %d\n", repsize);
		goto dropit;
	}

	/* Copy the XID from the skb... */
	xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
	if (xp == NULL)
		goto dropit;

	/* Look up and lock the request corresponding to the given XID */
	spin_lock(&xprt->transport_lock);
	rovr = xprt_lookup_rqst(xprt, *xp);
	if (!rovr)
		goto out_unlock;
	task = rovr->rq_task;

	copied = rovr->rq_private_buf.buflen;
	if (copied > repsize)
		copied = repsize;

	if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
		dprintk("RPC:       sk_buff copy failed\n");
		goto out_unlock;
	}

	xprt_complete_rqst(task, copied);

 out_unlock:
	spin_unlock(&xprt->transport_lock);
 dropit:
	skb_free_datagram(sk, skb);
 out:
	read_unlock_bh(&sk->sk_callback_lock);
}

960
961
962
963
964
/**
 * xs_udp_data_ready - "data ready" callback for UDP sockets
 * @sk: socket with data to read
 * @len: how much data to read
 *
965
 */
966
static void xs_udp_data_ready(struct sock *sk, int len)
967
{
968
969
	struct rpc_task *task;
	struct rpc_xprt *xprt;
970
	struct rpc_rqst *rovr;
971
	struct sk_buff *skb;
972
	int err, repsize, copied;
973
974
	u32 _xid;
	__be32 *xp;
975

Eric Dumazet's avatar
Eric Dumazet committed
976
	read_lock_bh(&sk->sk_callback_lock);
977
	dprintk("RPC:       xs_udp_data_ready...\n");
978
	if (!(xprt = xprt_from_sock(sk)))
979
980
981
982
983
984
985
		goto out;

	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
		goto out;

	repsize = skb->len - sizeof(struct udphdr);
	if (repsize < 4) {
986
		dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
987
988
989
990
991
992
993
994
995
996
		goto dropit;
	}

	/* Copy the XID from the skb... */
	xp = skb_header_pointer(skb, sizeof(struct udphdr),
				sizeof(_xid), &_xid);
	if (xp == NULL)
		goto dropit;

	/* Look up and lock the request corresponding to the given XID */
Chuck Lever's avatar
Chuck Lever committed
997
	spin_lock(&xprt->transport_lock);
998
999
1000
1001
1002
1003
1004
1005
1006
	rovr = xprt_lookup_rqst(xprt, *xp);
	if (!rovr)
		goto out_unlock;
	task = rovr->rq_task;

	if ((copied = rovr->rq_private_buf.buflen) > repsize)
		copied = repsize;

	/* Suck it into the iovec, verify checksum if not done by hw. */
1007
1008
	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
		UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
1009
		goto out_unlock;
1010
1011
1012
	}

	UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
1013

1014
	xprt_adjust_cwnd(xprt, task, copied);
1015
	xprt_complete_rqst(task, copied);
1016
1017

 out_unlock:
Chuck Lever's avatar
Chuck Lever committed
1018
	spin_unlock(&xprt->transport_lock);
1019
1020
1021
 dropit:
	skb_free_datagram(sk, skb);
 out:
Eric Dumazet's avatar
Eric Dumazet committed
1022
	read_unlock_bh(&sk->sk_callback_lock);
1023
1024
}

1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
/*
 * Helper function to force a TCP close if the server is sending
 * junk and/or it has put us in CLOSE_WAIT
 */
static void xs_tcp_force_close(struct rpc_xprt *xprt)
{
	set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
	xprt_force_disconnect(xprt);
}

1035
static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
1036
{
1037
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1038
1039
1040
	size_t len, used;
	char *p;

1041
1042
	p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
	len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
1043
	used = xdr_skb_read_bits(desc, p, len);
1044
	transport->tcp_offset += used;
1045
1046
	if (used != len)
		return;
1047

1048
1049
	transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
	if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
1050
		transport->tcp_flags |= TCP_RCV_LAST_FRAG;
1051
	else
1052
		transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
1053
	transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
1054

1055
	transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
1056
	transport->tcp_offset = 0;
1057

1058
	/* Sanity check of the record length */
1059
	if (unlikely(transport->tcp_reclen < 8)) {
1060
		dprintk("RPC:       invalid TCP record fragment length\n");
1061
		xs_tcp_force_close(xprt);
1062
		return;
1063
	}
1064
	dprintk("RPC:       reading TCP record fragment of length %d\n",
1065
			transport->tcp_reclen);
1066
1067
}

1068
static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
1069
{
1070
	if (transport->tcp_offset == transport->tcp_reclen) {
1071
		transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
1072
		transport->tcp_offset = 0;
1073
1074
1075
		if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
			transport->tcp_flags |= TCP_RCV_COPY_XID;
1076
			transport->tcp_copied = 0;
1077
1078
1079
1080
		}
	}
}

1081
static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1082
1083
1084
1085
{
	size_t len, used;
	char *p;

1086
	len = sizeof(transport->tcp_xid) - transport->tcp_offset;
1087
	dprintk("RPC:       reading XID (%Zu bytes)\n", len);
1088
	p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
1089
	used = xdr_skb_read_bits(desc, p, len);
1090
	transport->tcp_offset += used;
1091
1092
	if (used != len)
		return;
1093
	transport->tcp_flags &= ~TCP_RCV_COPY_XID;
1094
	transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
1095
	transport->tcp_copied = 4;
1096
1097
1098
	dprintk("RPC:       reading %s XID %08x\n",
			(transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
							      : "request with",
1099
1100
			ntohl(transport->tcp_xid));
	xs_tcp_check_fraghdr(transport);
1101
1102
}

1103
1104
static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
				       struct xdr_skb_reader *desc)
1105
{
1106
1107
	size_t len, used;
	u32 offset;
1108
	char *p;
1109
1110
1111
1112
1113
1114
1115
1116

	/*
	 * We want transport->tcp_offset to be 8 at the end of this routine
	 * (4 bytes for the xid and 4 bytes for the call/reply flag).
	 * When this function is called for the first time,
	 * transport->tcp_offset is 4 (after having already read the xid).
	 */
	offset = transport->tcp_offset - sizeof(transport->tcp_xid);
1117
	len = sizeof(transport->tcp_calldir) - offset;
1118
	dprintk("RPC:       reading CALL/REPLY flag (%Zu bytes)\n", len);
1119
1120
	p = ((char *) &transport->tcp_calldir) + offset;
	used = xdr_skb_read_bits(desc, p, len);
1121
1122
1123
	transport->tcp_offset += used;
	if (used != len)
		return;
1124
1125
1126
1127
1128
	transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
	/*
	 * We don't yet have the XDR buffer, so we will write the calldir
	 * out after we get the buffer from the 'struct rpc_rqst'
	 */
1129
1130
1131
1132
	switch (ntohl(transport->tcp_calldir)) {
	case RPC_REPLY:
		transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
		transport->tcp_flags |= TCP_RCV_COPY_DATA;
1133
		transport->tcp_flags |= TCP_RPC_REPLY;
1134
1135
1136
1137
		break;
	case RPC_CALL:
		transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
		transport->tcp_flags |= TCP_RCV_COPY_DATA;
1138
		transport->tcp_flags &= ~TCP_RPC_REPLY;
1139
1140
1141
		break;
	default:
		dprintk("RPC:       invalid request message type\n");
1142
		xs_tcp_force_close(&transport->xprt);
1143
	}
1144
1145
1146
	xs_tcp_check_fraghdr(transport);
}

1147
1148
1149
static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
				     struct xdr_skb_reader *desc,
				     struct rpc_rqst *req)
1150
{
1151
1152
	struct sock_xprt *transport =
				container_of(xprt, struct sock_xprt, xprt);