$B$J$+$@$G$9!#(B http://rryu.sakura.ne.jp/nisenise-fuhito/2008/04/22/950.html $B$H(B $B$+(B($B8+Mn$H$7$F$^$7$?$,(B)[ruby-list:43356]$B$K$"$k!"@\B3$,@Z$l$?%=%1%C(B $B%H$K=q$-9~$b$&$H$9$k$H!"FsEYL\0J9_(Bselect$B$NBT$A>uBV$+$iH4$1$J$/$J(B $B$kLdBj$N%Q%C%A$G$9!#(Brb_io_wait_{read,writ}able()$B$N8_49@-$,$J$/$J$C(B $B$F$7$^$&$N$,5$$K$J$k$N$G!"L>A0$rJQ$($?$[$&$,$$$$$+$b$7$l$^$;$s$,!"(B $B$I$&$$$&$N$,$$$$$G$9$+$M$'!#(B Index: include/ruby/io.h =================================================================== --- include/ruby/io.h (revision 16258) +++ include/ruby/io.h (working copy) @@ -61,4 +61,5 @@ typedef struct rb_io_t { #define FMODE_TTY 16 #define FMODE_DUPLEX 32 +#define FMODE_BROKENPIPE 256 #define FMODE_WSPLIT 0x200 #define FMODE_WSPLIT_INITIALIZED 0x400 @@ -107,6 +108,6 @@ void rb_io_synchronized(rb_io_t*); void rb_io_check_initialized(rb_io_t*); void rb_io_check_closed(rb_io_t*); -int rb_io_wait_readable(int); -int rb_io_wait_writable(int); +int rb_io_wait_readable(rb_io_t*); +int rb_io_wait_writable(rb_io_t*); void rb_io_set_nonblock(rb_io_t *fptr); Index: io.c =================================================================== --- io.c (revision 16258) +++ io.c (working copy) @@ -419,4 +419,10 @@ rb_io_check_writable(rb_io_t *fptr) rb_raise(rb_eIOError, "not opened for writing"); } +#ifdef __linux__ + if (fptr->mode & FMODE_BROKENPIPE) { + errno = EPIPE; + rb_sys_fail(0); + } +#endif if (fptr->rbuf_len) { io_unread(fptr); @@ -590,5 +596,5 @@ io_fflush(rb_io_t *fptr) errno = EAGAIN; } - if (rb_io_wait_writable(fptr->fd)) { + if (rb_io_wait_writable(fptr)) { rb_io_check_closed(fptr); goto retry; @@ -608,7 +614,8 @@ wait_readable(VALUE p) int -rb_io_wait_readable(int f) +rb_io_wait_readable(rb_io_t *fptr) { rb_fdset_t rfds; + int f = fptr->fd; switch (errno) { @@ -650,7 +657,8 @@ wait_writable(VALUE p) int -rb_io_wait_writable(int f) +rb_io_wait_writable(rb_io_t *fptr) { rb_fdset_t wfds; + int f = fptr->fd; switch (errno) { @@ -676,4 +684,7 @@ rb_io_wait_writable(int f) return Qtrue; + case EPIPE: + fptr->mode |= FMODE_BROKENPIPE; + default: return Qfalse; @@ -752,5 +763,5 @@ io_fwrite(VALUE str, rb_io_t *fptr) errno = EAGAIN; } - if (rb_io_wait_writable(fptr->fd)) { + if (rb_io_wait_writable(fptr)) { rb_io_check_closed(fptr); if (offset < RSTRING_LEN(str)) @@ -1038,5 +1049,5 @@ io_fillbuf(rb_io_t *fptr) } if (r < 0) { - if (rb_io_wait_readable(fptr->fd)) + if (rb_io_wait_readable(fptr)) goto retry; rb_sys_fail(fptr->path); @@ -1480,5 +1491,5 @@ io_getpartial(int argc, VALUE *argv, VAL } if (n < 0) { - if (!nonblock && rb_io_wait_readable(fptr->fd)) + if (!nonblock && rb_io_wait_readable(fptr)) goto again; rb_sys_fail(fptr->path); Index: ext/socket/socket.c =================================================================== --- ext/socket/socket.c (revision 16258) +++ ext/socket/socket.c (working copy) @@ -525,5 +525,5 @@ bsock_send(int argc, VALUE *argv, VALUE } if (n < 0) { - if (rb_io_wait_writable(fd)) { + if (rb_io_wait_writable(fptr)) { goto retry; } @@ -1809,5 +1809,5 @@ udp_send(int argc, VALUE *argv, VALUE so return INT2FIX(n); } - if (rb_io_wait_writable(fptr->fd)) { + if (rb_io_wait_writable(fptr)) { goto retry; } Index: ext/openssl/ossl_ssl.c =================================================================== --- ext/openssl/ossl_ssl.c (revision 16258) +++ ext/openssl/ossl_ssl.c (working copy) @@ -951,8 +951,8 @@ ossl_start_ssl(VALUE self, int (*func)() switch((ret2 = ssl_get_error(ssl, ret))){ case SSL_ERROR_WANT_WRITE: - rb_io_wait_writable(FPTR_TO_FD(fptr)); + rb_io_wait_writable(fptr); continue; case SSL_ERROR_WANT_READ: - rb_io_wait_readable(FPTR_TO_FD(fptr)); + rb_io_wait_readable(fptr); continue; case SSL_ERROR_SYSCALL: @@ -1033,8 +1033,8 @@ ossl_ssl_read(int argc, VALUE *argv, VAL rb_eof_error(); case SSL_ERROR_WANT_WRITE: - rb_io_wait_writable(FPTR_TO_FD(fptr)); + rb_io_wait_writable(fptr); continue; case SSL_ERROR_WANT_READ: - rb_io_wait_readable(FPTR_TO_FD(fptr)); + rb_io_wait_readable(fptr); continue; case SSL_ERROR_SYSCALL: @@ -1081,8 +1081,8 @@ ossl_ssl_write(VALUE self, VALUE str) goto end; case SSL_ERROR_WANT_WRITE: - rb_io_wait_writable(FPTR_TO_FD(fptr)); + rb_io_wait_writable(fptr); continue; case SSL_ERROR_WANT_READ: - rb_io_wait_readable(FPTR_TO_FD(fptr)); + rb_io_wait_readable(fptr); continue; case SSL_ERROR_SYSCALL:
on 01.05.2008 19:02
on 01.05.2008 21:36
$B$^$D$b$H(B $B$f$-$R$m$G$9(B
In message "Re: [ruby-dev:34567] write to broken pipe on Linux"
on Fri, 2 May 2008 02:01:59 +0900, Nobuyoshi Nakada
<nobu@ruby-lang.org> writes:
|http://rryu.sakura.ne.jp/nisenise-fuhito/2008/04/22/950.html $B$H(B
|$B$+(B($B8+Mn$H$7$F$^$7$?$,(B)[ruby-list:43356]$B$K$"$k!"@\B3$,@Z$l$?%=%1%C(B
|$B%H$K=q$-9~$b$&$H$9$k$H!"FsEYL\0J9_(Bselect$B$NBT$A>uBV$+$iH4$1$J$/$J(B
|$B$kLdBj$N%Q%C%A$G$9!#(Brb_io_wait_{read,writ}able()$B$N8_49@-$,$J$/$J$C(B
|$B$F$7$^$&$N$,5$$K$J$k$N$G!"L>A0$rJQ$($?$[$&$,$$$$$+$b$7$l$^$;$s$,!"(B
|$B$I$&$$$&$N$,$$$$$G$9$+$M$'!#(B
$BL>A0$b5$$K$J$k$N$G$9$,!"$3$N8=>]$,(Blinux$BFCM-$N$b$N$+$I$&$+$b5$(B
$B$K$J$j$^$9!#$$$C$=!"(BEPIPE$B$r<u$1$?$i(Bclose$B$H$+(Bshutdown$B$H$+8F$s(B
$B$8$c$($P$$$$$s$8$c$J$$$G$9$+$M!#$=$l$@$H(Blinux$B8GM-$+$I$&$+9M$((B
$B$J$/$F$b$$$$$7!"0z?tJQ$($J$/$F$$$$$7!#(B
on 02.05.2008 07:57
$B$J$+$@$G$9!#(B
At Fri, 2 May 2008 04:35:52 +0900,
Yukihiro Matsumoto wrote in [ruby-dev:34568]:
> $B$J$/$F$b$$$$$7!"0z?tJQ$($J$/$F$$$$$7!#(B
$B>e5-%j%s%/@h$N>pJs$K$h$k$H!">/$J$/$H$b(BWindows Vista$B$H(BMac OS
X
10.5$B$G$OH/@8$7$J$$$h$&$G$9!#(B
shutdown$B$G$O!"(BEPIPE$B$NBe$o$j$K(BENOTCONN$B$,H/@8$9$k$h$&$K$J$k$@$1$G(B
$B$=$N<!$N=q$-9~$_$G$d$O$j%V%m%C%/$7$F$7$^$&$h$&$G$9!#(Bclose$B$K$9$k(B
$B$H(BEBADF$B$K$O$J$k$N$G$9$,!"(Beof?$B$d(Bclose$B$G$bF1$8$/(BEBADF$B$,H/@8$7$F$7(B
$B$^$&$h$&$K$J$j$^$9!#B>$N%9%l%C%I$NM-L5$K$h$C$FF0:n$,JQ$o$C$F$7$^(B
$B$&$N$b$I$&$+$H$$$&5$$b$7$^$9$,!#(B
$B$H$$$&$+!"(B1.9$B$G$O$9$G$K(Bblocking
region$B$r;H$($P(Bselect$B$OI,MW$J$$$O(B
$B$:$G$9!#(B
Index: io.c
===================================================================
--- io.c (revision 16261)
+++ io.c (working copy)
@@ -187,5 +187,4 @@ static int max_file_descriptor = NOFILE;
#define READ_CHECK(fptr) do {\
if (!READ_DATA_PENDING(fptr)) {\
- rb_thread_wait_fd((fptr)->fd);\
rb_io_check_closed(fptr);\
}\
@@ -513,18 +512,19 @@ struct io_internal_struct {
void *buf;
size_t capa;
- int is_read;
};
static VALUE
-internal_io_func(void *ptr)
+internal_read_func(void *ptr)
{
struct io_internal_struct *iis = (struct io_internal_struct*)ptr;
- if (iis->is_read) {
return read(iis->fd, iis->buf, iis->capa);
}
- else {
+
+static VALUE
+internal_write_func(void *ptr)
+{
+ struct io_internal_struct *iis = (struct io_internal_struct*)ptr;
return write(iis->fd, iis->buf, iis->capa);
}
-}
static int
@@ -535,7 +535,6 @@ rb_read_internal(int fd, void *buf, size
iis.buf = buf;
iis.capa = count;
- iis.is_read = 1;
- return rb_thread_blocking_region(internal_io_func, &iis,
RB_UBF_DFL, 0);
+ return rb_thread_blocking_region(internal_read_func, &iis,
RB_UBF_DFL, 0);
}
@@ -547,7 +546,6 @@ rb_write_internal(int fd, void *buf, siz
iis.buf = buf;
iis.capa = count;
- iis.is_read = 0;
- return rb_thread_blocking_region(internal_io_func, &iis,
RB_UBF_DFL, 0);
+ return rb_thread_blocking_region(internal_write_func, &iis,
RB_UBF_DFL, 0);
}
@@ -617,5 +615,4 @@ rb_io_wait_readable(int f)
case ERESTART:
#endif
- rb_thread_wait_fd(f);
return Qtrue;
@@ -659,5 +656,4 @@ rb_io_wait_writable(int f)
case ERESTART:
#endif
- rb_thread_fd_writable(f);
return Qtrue;
@@ -731,9 +727,5 @@ io_fwrite(VALUE str, rb_io_t *fptr)
if (n == 0)
return len;
- /* avoid context switch between "a" and "\n" in STDERR.puts
"a".
- [ruby-dev:25080] */
- if (fptr->stdio_file != stderr && !rb_thread_fd_writable(fptr->fd)) {
rb_io_check_closed(fptr);
- }
retry:
l = n;
@@ -1303,5 +1295,4 @@ io_fread(VALUE str, long offset, rb_io_t
if ((n -= c) <= 0) break;
}
- rb_thread_wait_fd(fptr->fd);
rb_io_check_closed(fptr);
if (io_fillbuf(fptr) < 0) {
@@ -1474,9 +1465,6 @@ io_getpartial(int argc, VALUE *argv, VAL
if (nonblock) {
rb_io_set_nonblock(fptr);
- n = rb_read_internal(fptr->fd, RSTRING_PTR(str), len);
}
- else {
n = rb_read_internal(fptr->fd, RSTRING_PTR(str), len);
- }
if (n < 0) {
if (!nonblock && rb_io_wait_readable(fptr->fd))
@@ -1776,5 +1764,4 @@ appendline(rb_io_t *fptr, int delim, VAL
}
}
- rb_thread_wait_fd(fptr->fd);
rb_io_check_closed(fptr);
if (io_fillbuf(fptr) < 0) {
@@ -1813,5 +1800,4 @@ swallow(rb_io_t *fptr, int term)
rb_sys_fail(fptr->path);
}
- rb_thread_wait_fd(fptr->fd);
rb_io_check_closed(fptr);
} while (io_fillbuf(fptr) == 0);
@@ -1853,5 +1839,4 @@ rb_io_getline_fast(rb_io_t *fptr)
if (e) break;
}
- rb_thread_wait_fd(fptr->fd);
rb_io_check_closed(fptr);
if (io_fillbuf(fptr) < 0) {
@@ -2419,17 +2404,9 @@ rb_io_getc(VALUE io)
return io_getc(fptr, enc);
}
+
int
rb_getc(FILE *f)
{
- int c;
-
- if (!STDIO_READ_DATA_PENDING(f)) {
- rb_thread_wait_fd(fileno(f));
- }
- TRAP_BEG;
- c = getc(f);
- TRAP_END;
-
- return c;
+ return rb_thread_blocking_region((rb_blocking_function_t*)fgetc, f,
RB_UBF_DFL, 0);
}
@@ -3024,10 +3001,6 @@ rb_io_syswrite(VALUE io, VALUE str)
rb_warn("syswrite for buffered IO");
}
- if (!rb_thread_fd_writable(fptr->fd)) {
rb_io_check_closed(fptr);
- }
- TRAP_BEG;
- n = write(fptr->fd, RSTRING_PTR(str), RSTRING_LEN(str));
- TRAP_END;
+ n = rb_write_internal(fptr->fd, RSTRING_PTR(str),
RSTRING_LEN(str));
if (n == -1) rb_sys_fail(fptr->path);
@@ -3080,5 +3053,4 @@ rb_io_sysread(int argc, VALUE *argv, VAL
n = fptr->fd;
- rb_thread_wait_fd(fptr->fd);
rb_io_check_closed(fptr);
if (RSTRING_LEN(str) != ilen) {
@@ -6632,5 +6604,4 @@ copy_stream_fallback_body(VALUE arg)
else {
ssize_t ss;
- rb_thread_wait_fd(stp->src_fd);
rb_str_resize(buf, buflen);
ss = copy_stream_read(stp, RSTRING_PTR(buf), l, off);
Index: ext/socket/socket.c
===================================================================
--- ext/socket/socket.c (revision 16261)
+++ ext/socket/socket.c (working copy)
@@ -103,4 +103,6 @@ int Rconnect();
#endif
+#define BLOCKING_REGION(func, arg)
(long)rb_thread_blocking_region((func), (arg), RB_UBF_DFL, 0)
+
#define INET_CLIENT 0
#define INET_SERVER 1
@@ -496,35 +498,58 @@ bsock_getpeername(VALUE sock)
}
+struct send_arg {
+ int fd, flags;
+ VALUE mesg;
+ struct sockaddr *to;
+ socklen_t tolen;
+};
+
+static VALUE
+sendto_blocking(void *data)
+{
+ struct send_arg *arg = data;
+ VALUE mesg = arg->mesg;
+ return (VALUE)sendto(arg->fd, RSTRING_PTR(mesg), RSTRING_LEN(mesg),
+ arg->flags, arg->to, arg->tolen);
+}
+
+static VALUE
+send_blocking(void *data)
+{
+ struct send_arg *arg = data;
+ VALUE mesg = arg->mesg;
+ return (VALUE)send(arg->fd, RSTRING_PTR(mesg), RSTRING_LEN(mesg),
+ arg->flags);
+}
+
static VALUE
bsock_send(int argc, VALUE *argv, VALUE sock)
{
- VALUE mesg, to;
- VALUE flags;
+ struct send_arg arg;
+ VALUE flags, to;
rb_io_t *fptr;
- int fd, n;
+ int n;
+ rb_blocking_function_t *func;
rb_secure(4);
- rb_scan_args(argc, argv, "21", &mesg, &flags, &to);
+ rb_scan_args(argc, argv, "21", &arg.mesg, &flags, &to);
- StringValue(mesg);
- if (!NIL_P(to)) StringValue(to);
- GetOpenFile(sock, fptr);
- fd = fptr->fd;
- rb_thread_fd_writable(fd);
- retry:
+ StringValue(arg.mesg);
if (!NIL_P(to)) {
- TRAP_BEG;
- n = sendto(fd, RSTRING_PTR(mesg), RSTRING_LEN(mesg), NUM2INT(flags),
- (struct sockaddr*)RSTRING_PTR(to), RSTRING_LEN(to));
- TRAP_END;
+ StringValue(to);
+ to = rb_str_new4(to);
+ arg.to = (struct sockaddr *)RSTRING_PTR(to);
+ arg.tolen = RSTRING_LEN(to);
+ func = sendto_blocking;
}
else {
- TRAP_BEG;
- n = send(fd, RSTRING_PTR(mesg), RSTRING_LEN(mesg), NUM2INT(flags));
- TRAP_END;
+ func = send_blocking;
}
- if (n < 0) {
- if (rb_io_wait_writable(fd)) {
- goto retry;
+ GetOpenFile(sock, fptr);
+ arg.fd = fptr->fd;
+ arg.flags = NUM2INT(flags);
+ while ((n = (int)BLOCKING_REGION(func, &arg)) < 0) {
+ if (rb_io_wait_writable(arg.fd)) {
+ continue;
}
rb_sys_fail("send(2)");
@@ -570,20 +595,33 @@ enum sock_recv_type {
};
+struct recvfrom_arg {
+ int fd, flags;
+ VALUE str;
+ socklen_t alen;
+ char buf[1024];
+};
+
+static VALUE
+recvfrom_blocking(void *data)
+{
+ struct recvfrom_arg *arg = data;
+ return (VALUE)recvfrom(arg->fd, RSTRING_PTR(arg->str),
RSTRING_LEN(arg->str),
+ arg->flags, (struct sockaddr*)arg->buf, &arg->alen);
+}
+
static VALUE
s_recvfrom(VALUE sock, int argc, VALUE *argv, enum sock_recv_type from)
{
rb_io_t *fptr;
- VALUE str;
- char buf[1024];
- socklen_t alen = sizeof buf;
+ VALUE str, klass;
+ struct recvfrom_arg arg;
VALUE len, flg;
long buflen;
long slen;
- int fd, flags;
rb_scan_args(argc, argv, "11", &len, &flg);
- if (flg == Qnil) flags = 0;
- else flags = NUM2INT(flg);
+ if (flg == Qnil) arg.flags = 0;
+ else arg.flags = NUM2INT(flg);
buflen = NUM2INT(len);
@@ -592,24 +630,19 @@ s_recvfrom(VALUE sock, int argc, VALUE *
rb_raise(rb_eIOError, "recv for buffered IO");
}
- fd = fptr->fd;
-
- str = rb_tainted_str_new(0, buflen);
+ arg.fd = fptr->fd;
+ arg.alen = sizeof(arg.buf);
- retry:
- rb_thread_wait_fd(fd);
- rb_io_check_closed(fptr);
- if (RSTRING_LEN(str) != buflen) {
+ arg.str = str = rb_tainted_str_new(0, buflen);
+ klass = RBASIC(str)->klass;
+ RBASIC(str)->klass = 0;
+
+ while (rb_io_check_closed(fptr),
+ (slen = BLOCKING_REGION(recvfrom_blocking, &arg)) < 0) {
+ if (RBASIC(str)->klass || RSTRING_LEN(str) != buflen) {
rb_raise(rb_eRuntimeError, "buffer string modified");
}
- TRAP_BEG;
- slen = recvfrom(fd, RSTRING_PTR(str), buflen, flags, (struct
sockaddr*)buf, &alen);
- TRAP_END;
-
- if (slen < 0) {
- if (rb_io_wait_readable(fd)) {
- goto retry;
- }
- rb_sys_fail("recvfrom(2)");
}
+
+ RBASIC(str)->klass = klass;
if (slen < RSTRING_LEN(str)) {
rb_str_set_len(str, slen);
@@ -618,13 +651,13 @@ s_recvfrom(VALUE sock, int argc, VALUE *
switch (from) {
case RECV_RECV:
- return (VALUE)str;
+ return str;
case RECV_IP:
#if 0
- if (alen != sizeof(struct sockaddr_in)) {
+ if (arg.alen != sizeof(struct sockaddr_in)) {
rb_raise(rb_eTypeError, "sockaddr size differs - should not
happen");
}
#endif
- if (alen && alen != sizeof(buf)) /* OSX doesn't return a from result
for connection-oriented sockets */
- return rb_assoc_new(str, ipaddr((struct sockaddr*)buf, fptr->mode
& FMODE_NOREVLOOKUP));
+ if (arg.alen && arg.alen != sizeof(arg.buf)) /* OSX doesn't return a
from result for connection-oriented sockets */
+ return rb_assoc_new(str, ipaddr((struct sockaddr*)arg.buf,
fptr->mode & FMODE_NOREVLOOKUP));
else
return rb_assoc_new(str, Qnil);
@@ -632,8 +665,8 @@ s_recvfrom(VALUE sock, int argc, VALUE *
#ifdef HAVE_SYS_UN_H
case RECV_UNIX:
- return rb_assoc_new(str, unixaddr((struct sockaddr_un*)buf,
alen));
+ return rb_assoc_new(str, unixaddr((struct sockaddr_un*)arg.buf,
arg.alen));
#endif
case RECV_SOCKET:
- return rb_assoc_new(str, rb_str_new(buf, alen));
+ return rb_assoc_new(str, rb_str_new(arg.buf, arg.alen));
default:
rb_bug("s_recvfrom called with bad value");
@@ -924,5 +957,6 @@ sock_addrinfo(VALUE host, VALUE port, in
if (r->ai_socktype == SOCK_DGRAM) {
r->ai_protocol = IPPROTO_UDP;
- } else if (r->ai_socktype == SOCK_STREAM) {
+ }
+ else if (r->ai_socktype == SOCK_STREAM) {
r->ai_protocol = IPPROTO_TCP;
}
@@ -1096,9 +1130,32 @@ wait_connectable(int fd)
#endif
+struct connect_arg {
+ int fd;
+ const struct sockaddr *sockaddr;
+ socklen_t len;
+};
+
+static VALUE
+connect_blocking(void *data)
+{
+ struct connect_arg *arg = data;
+ return (VALUE)connect(arg->fd, arg->sockaddr, arg->len);
+}
+
+#if defined(SOCKS) && !defined(SOCKS5)
+static VALUE
+socks_connect_blocking(void *data)
+{
+ struct connect_arg *arg = data;
+ return (VALUE)Rconnect(arg->fd, arg->sockaddr, arg->len);
+}
+#endif
+
static int
-ruby_connect(int fd, struct sockaddr *sockaddr, int len, int socks)
+ruby_connect(int fd, const struct sockaddr *sockaddr, int len, int
socks)
{
int status;
- int mode;
+ rb_blocking_function_t *func = connect_blocking;
+ struct connect_arg arg;
#if WAIT_IN_PROGRESS > 0
int wait_in_progress = -1;
@@ -1107,36 +1164,12 @@ ruby_connect(int fd, struct sockaddr *so
#endif
-#if defined(HAVE_FCNTL)
-# if defined(F_GETFL)
- mode = fcntl(fd, F_GETFL, 0);
-# else
- mode = 0;
-# endif
-
-#ifdef O_NDELAY
-# define NONBLOCKING O_NDELAY
-#else
-#ifdef O_NBIO
-# define NONBLOCKING O_NBIO
-#else
-# define NONBLOCKING O_NONBLOCK
-#endif
-#endif
-#ifdef SOCKS5
- if (!socks)
-#endif
- fcntl(fd, F_SETFL, mode|NONBLOCKING);
-#endif /* HAVE_FCNTL */
-
- for (;;) {
+ arg.fd = fd;
+ arg.sockaddr = sockaddr;
+ arg.len = len;
#if defined(SOCKS) && !defined(SOCKS5)
- if (socks) {
- status = Rconnect(fd, sockaddr, len);
- }
- else
+ if (socks) func = socks_connect_blocking;
#endif
- {
- status = connect(fd, sockaddr, len);
- }
+ for (;;) {
+ status = (int)BLOCKING_REGION(func, &arg);
if (status < 0) {
switch (errno) {
@@ -1199,7 +1232,4 @@ ruby_connect(int fd, struct sockaddr *so
}
}
-#ifdef HAVE_FCNTL
- fcntl(fd, F_SETFL, mode);
-#endif
return status;
}
@@ -1240,5 +1270,5 @@ init_inetsock_internal(struct inetsock_a
struct addrinfo *res;
int fd, status = 0;
- char *syscall;
+ char *syscall = 0;
arg->remote.res = sock_addrinfo(arg->remote.host, arg->remote.serv,
SOCK_STREAM,
@@ -1296,7 +1326,4 @@ init_inetsock_internal(struct inetsock_a
arg->fd = -1;
- if (type == INET_SERVER)
- listen(fd, 5);
-
/* create new instance */
return init_sock(arg->sock, fd);
@@ -1450,10 +1477,17 @@ static VALUE
tcp_svr_init(int argc, VALUE *argv, VALUE sock)
{
- VALUE arg1, arg2;
+ VALUE arg1, arg2, blog;
+ rb_io_t *fptr;
+ int backlog = 5;
- if (rb_scan_args(argc, argv, "11", &arg1, &arg2) == 2)
- return init_inetsock(sock, arg1, arg2, Qnil, Qnil, INET_SERVER);
- else
- return init_inetsock(sock, Qnil, arg1, Qnil, Qnil, INET_SERVER);
+ if (rb_scan_args(argc, argv, "11:backlog", &arg1, &arg2, &blog) !=
2)
+ arg2 = arg1, arg1 = Qnil;
+ if (!NIL_P(blog)) backlog = NUM2INT(blog);
+ sock = init_inetsock(sock, arg1, arg2, Qnil, Qnil, INET_SERVER);
+ GetOpenFile(sock, fptr);
+ if (listen(fptr->fd, backlog)) {
+ rb_sys_fail("listen");
+ }
+ return sock;
}
@@ -1491,4 +1525,17 @@ s_accept_nonblock(VALUE klass, rb_io_t *
}
+struct accept_arg {
+ int fd;
+ struct sockaddr *sockaddr;
+ socklen_t *len;
+};
+
+static VALUE
+accept_blocking(void *data)
+{
+ struct accept_arg *arg = data;
+ return (VALUE)accept(arg->fd, arg->sockaddr, arg->len);
+}
+
static VALUE
s_accept(VALUE klass, int fd, struct sockaddr *sockaddr, socklen_t
*len)
@@ -1496,15 +1543,12 @@ s_accept(VALUE klass, int fd, struct soc
int fd2;
int retry = 0;
+ struct accept_arg arg;
rb_secure(3);
+ arg.fd = fd;
+ arg.sockaddr = sockaddr;
+ arg.len = len;
retry:
- rb_thread_wait_fd(fd);
-#if defined(_nec_ews)
- fd2 = accept(fd, sockaddr, len);
-#else
- TRAP_BEG;
- fd2 = accept(fd, sockaddr, len);
- TRAP_END;
-#endif
+ fd2 = (int)BLOCKING_REGION(accept_blocking, &arg);
if (fd2 < 0) {
switch (errno) {
@@ -1787,8 +1831,9 @@ static VALUE
udp_send(int argc, VALUE *argv, VALUE sock)
{
- VALUE mesg, flags, host, port;
+ VALUE flags, host, port;
rb_io_t *fptr;
int n;
struct addrinfo *res0, *res;
+ struct send_arg arg;
if (argc == 2 || argc == 3) {
@@ -1796,13 +1841,16 @@ udp_send(int argc, VALUE *argv, VALUE so
}
rb_secure(4);
- rb_scan_args(argc, argv, "4", &mesg, &flags, &host, &port);
+ rb_scan_args(argc, argv, "4", &arg.mesg, &flags, &host, &port);
- StringValue(mesg);
+ StringValue(arg.mesg);
res0 = sock_addrinfo(host, port, SOCK_DGRAM, 0);
GetOpenFile(sock, fptr);
+ arg.fd = fptr->fd;
+ arg.flags = NUM2INT(flags);
for (res = res0; res; res = res->ai_next) {
retry:
- n = sendto(fptr->fd, RSTRING_PTR(mesg), RSTRING_LEN(mesg),
NUM2INT(flags),
- res->ai_addr, res->ai_addrlen);
+ arg.to = res->ai_addr;
+ arg.tolen = res->ai_addrlen;
+ n = (int)BLOCKING_REGION(sendto_blocking, &arg);
if (n >= 0) {
freeaddrinfo(res0);
@@ -1920,4 +1968,16 @@ unix_recvfrom(int argc, VALUE *argv, VAL
#endif
+struct iomsg_arg {
+ int fd;
+ struct msghdr msg;
+};
+
+static VALUE
+sendmsg_blocking(void *data)
+{
+ struct iomsg_arg *arg = data;
+ return sendmsg(arg->fd, &arg->msg, 0);
+}
+
static VALUE
unix_send_io(VALUE sock, VALUE val)
@@ -1926,5 +1986,5 @@ unix_send_io(VALUE sock, VALUE val)
int fd;
rb_io_t *fptr;
- struct msghdr msg;
+ struct iomsg_arg arg;
struct iovec vec[1];
char buf[1];
@@ -1951,6 +2011,6 @@ unix_send_io(VALUE sock, VALUE val)
GetOpenFile(sock, fptr);
- msg.msg_name = NULL;
- msg.msg_namelen = 0;
+ arg.msg.msg_name = NULL;
+ arg.msg.msg_namelen = 0;
/* Linux and Solaris doesn't work if msg_iov is NULL. */
@@ -1958,11 +2018,11 @@ unix_send_io(VALUE sock, VALUE val)
vec[0].iov_base = buf;
vec[0].iov_len = 1;
- msg.msg_iov = vec;
- msg.msg_iovlen = 1;
+ arg.msg.msg_iov = vec;
+ arg.msg.msg_iovlen = 1;
#if FD_PASSING_BY_MSG_CONTROL
- msg.msg_control = (caddr_t)&cmsg;
- msg.msg_controllen = CMSG_LEN(sizeof(int));
- msg.msg_flags = 0;
+ arg.msg.msg_control = (caddr_t)&cmsg;
+ arg.msg.msg_controllen = CMSG_LEN(sizeof(int));
+ arg.msg.msg_flags = 0;
MEMZERO((char*)&cmsg, char, sizeof(cmsg));
cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(int));
@@ -1971,9 +2031,10 @@ unix_send_io(VALUE sock, VALUE val)
*(int *)CMSG_DATA(&cmsg.hdr) = fd;
#else
- msg.msg_accrights = (caddr_t)&fd;
- msg.msg_accrightslen = sizeof(fd);
+ arg.msg.msg_accrights = (caddr_t)&fd;
+ arg.msg.msg_accrightslen = sizeof(fd);
#endif
- if (sendmsg(fptr->fd, &msg, 0) == -1)
+ arg.fd = fptr->fd;
+ if ((int)BLOCKING_REGION(sendmsg_blocking, &arg) == -1)
rb_sys_fail("sendmsg(2)");
@@ -1986,4 +2047,11 @@ unix_send_io(VALUE sock, VALUE val)
static VALUE
+recvmsg_blocking(void *data)
+{
+ struct iomsg_arg *arg = data;
+ return recvmsg(arg->fd, &arg->msg, 0);
+}
+
+static VALUE
unix_recv_io(int argc, VALUE *argv, VALUE sock)
{
@@ -1991,5 +2059,5 @@ unix_recv_io(int argc, VALUE *argv, VALU
VALUE klass, mode;
rb_io_t *fptr;
- struct msghdr msg;
+ struct iomsg_arg arg;
struct iovec vec[2];
char buf[1];
@@ -2011,18 +2079,16 @@ unix_recv_io(int argc, VALUE *argv, VALU
GetOpenFile(sock, fptr);
- rb_io_wait_readable(fptr->fd);
-
- msg.msg_name = NULL;
- msg.msg_namelen = 0;
+ arg.msg.msg_name = NULL;
+ arg.msg.msg_namelen = 0;
vec[0].iov_base = buf;
vec[0].iov_len = sizeof(buf);
- msg.msg_iov = vec;
- msg.msg_iovlen = 1;
+ arg.msg.msg_iov = vec;
+ arg.msg.msg_iovlen = 1;
#if FD_PASSING_BY_MSG_CONTROL
- msg.msg_control = (caddr_t)&cmsg;
- msg.msg_controllen = CMSG_SPACE(sizeof(int));
- msg.msg_flags = 0;
+ arg.msg.msg_control = (caddr_t)&cmsg;
+ arg.msg.msg_controllen = CMSG_SPACE(sizeof(int));
+ arg.msg.msg_flags = 0;
cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(int));
cmsg.hdr.cmsg_level = SOL_SOCKET;
@@ -2030,17 +2096,18 @@ unix_recv_io(int argc, VALUE *argv, VALU
*(int *)CMSG_DATA(&cmsg.hdr) = -1;
#else
- msg.msg_accrights = (caddr_t)&fd;
- msg.msg_accrightslen = sizeof(fd);
+ arg.msg.msg_accrights = (caddr_t)&fd;
+ arg.msg.msg_accrightslen = sizeof(fd);
fd = -1;
#endif
- if (recvmsg(fptr->fd, &msg, 0) == -1)
+ arg.fd = fptr->fd;
+ if ((int)BLOCKING_REGION(recvmsg_blocking, &arg) == -1)
rb_sys_fail("recvmsg(2)");
#if FD_PASSING_BY_MSG_CONTROL
- if (msg.msg_controllen != CMSG_SPACE(sizeof(int))) {
+ if (arg.msg.msg_controllen != CMSG_SPACE(sizeof(int))) {
rb_raise(rb_eSocket,
"file descriptor was not passed (msg_controllen=%d, %d
expected)",
- msg.msg_controllen, CMSG_SPACE(sizeof(int)));
+ arg.msg.msg_controllen, CMSG_SPACE(sizeof(int)));
}
if (cmsg.hdr.cmsg_len != CMSG_LEN(sizeof(int))) {
@@ -2060,8 +2127,8 @@ unix_recv_io(int argc, VALUE *argv, VALU
}
#else
- if (msg.msg_accrightslen != sizeof(fd)) {
+ if (arg.msg.msg_accrightslen != sizeof(fd)) {
rb_raise(rb_eSocket,
"file descriptor was not passed (accrightslen) : %d != %d",
- msg.msg_accrightslen, sizeof(fd));
+ arg.msg.msg_accrightslen, sizeof(fd));
}
#endif
--- $BKM$NA0$K(BBug$B$O$J$$!#(B
--- $BKM$N8e$m$K(BBug$B$O$G$-$k!#(B
$BCfED(B $B?-1Y(B
on 02.05.2008 09:28
$B$^$D$b$H(B $B$f$-$R$m$G$9(B
In message "Re: [ruby-dev:34572] Re: write to broken pipe on Linux"
on Fri, 2 May 2008 14:57:23 +0900, Nobuyoshi Nakada
<nobu@ruby-lang.org> writes:
|> $BL>A0$b5$$K$J$k$N$G$9$,!"$3$N8=>]$,(Blinux$BFCM-$N$b$N$+$I$&$+$b5$(B
|> $B$K$J$j$^$9!#$$$C$=!"(BEPIPE$B$r<u$1$?$i(Bclose$B$H$+(Bshutdown$B$H$+8F$s(B
|> $B$8$c$($P$$$$$s$8$c$J$$$G$9$+$M!#$=$l$@$H(Blinux$B8GM-$+$I$&$+9M$((B
|> $B$J$/$F$b$$$$$7!"0z?tJQ$($J$/$F$$$$$7!#(B
|
|$B>e5-%j%s%/@h$N>pJs$K$h$k$H!">/$J$/$H$b(BWindows Vista$B$H(BMac OS X
|10.5$B$G$OH/@8$7$J$$$h$&$G$9!#(B
$B$^$"!"(BWindows$B$O$H$b$+$/!"(BOS
X$B0J30$N(BUNIX$B7O(BOS$B$GH/@8$7$J$$$+$I(B
$B$&$+$O$+$J$j5$$K$J$j$^$9!#(BOS
X$B$,Bg>fIW$J$i(BBSD$B7O$OBg>fIW$G$"(B
$B$k2DG=@-$O9b$$$G$7$g$&$,!"(BAIX$B$H$+(BHP-UX$B$H$+M=A[$7$,$?$$$N$b$"(B
$B$k$7!#(B
EPIPE$B$d(BENOTCONN$B$,H/@8$7$?(Bfd$B$r(Bselect$B$KEO$9$H$U$?$?$S$=$l$i$,H/(B
$B@8$9$k$Y$-$G$"$k$H(B($B$?$H$($P(B)POSIX$BE*$KL@<($5$l$F$$$k$N$G$"$l$P!"(B
linux$B8GM-$N%P%0$HG'Dj$7$F!"EvLL!">lEv$?$jE*$JBP1~$r$9$k$H$+$b(B
$B$"$j$($k$H$O;W$&$N$G$9$,!#(B
|shutdown$B$G$O!"(BEPIPE$B$NBe$o$j$K(BENOTCONN$B$,H/@8$9$k$h$&$K$J$k$@$1$G(B
|$B$=$N<!$N=q$-9~$_$G$d$O$j%V%m%C%/$7$F$7$^$&$h$&$G$9!#(Bclose$B$K$9$k(B
|$B$H(BEBADF$B$K$O$J$k$N$G$9$,!"(Beof?$B$d(Bclose$B$G$bF1$8$/(BEBADF$B$,H/@8$7$F$7(B
|$B$^$&$h$&$K$J$j$^$9!#B>$N%9%l%C%I$NM-L5$K$h$C$FF0:n$,JQ$o$C$F$7$^(B
|$B$&$N$b$I$&$+$H$$$&5$$b$7$^$9$,!#(B
$B$O$8$a$KN)$AJV$C$F9M$($k$H!"FI$_=P$=$&$H$7$?;~$K(BEPIPE$B$,JV$C(B
$B$F$-$F$7$^$C$?(BIO$B$KBP$7$F$I$N$h$&$KBP=h$9$Y$-$J$s$G$7$g$&$M!#(B
BROKENPIPE$B%U%i%0$rN)$F$k!"$H$$$&$N$b$"$s$^$j@5$7$$BP=h$N$h$&(B
$B$K$O;W$($J$$$s$G$9$,!#$d$O$j6/@)E*$K(Bclose$B$7$FNc30$+$J$"!#(B
|$B$H$$$&$+!"(B1.9$B$G$O$9$G$K(Bblocking region$B$r;H$($P(Bselect$B$OI,MW$J$$$O(B
|$B$:$G$9!#(B
1.9$B$G$O$=$NJ}8~$GBP1~$7$^$7$g$&$+!#(B
$B$^$D$b$H(B $B$f$-$R$m(B /:|)
on 03.05.2008 02:25
$B$J$+$@$G$9!#(B At Fri, 2 May 2008 16:27:45 +0900, Yukihiro Matsumoto wrote in [ruby-dev:34574]: > EPIPE$B$d(BENOTCONN$B$,H/@8$7$?(Bfd$B$r(Bselect$B$KEO$9$H$U$?$?$S$=$l$i$,H/(B > $B@8$9$k$Y$-$G$"$k$H(B($B$?$H$($P(B)POSIX$BE*$KL@<($5$l$F$$$k$N$G$"$l$P!"(B > linux$B8GM-$N%P%0$HG'Dj$7$F!"EvLL!">lEv$?$jE*$JBP1~$r$9$k$H$+$b(B > $B$"$j$($k$H$O;W$&$N$G$9$,!#(B $B$=$3$^$G$O5,Dj$5$l$F$$$J$$$h$&$J5$$,$7$^$9!#(B > |shutdown$B$G$O!"(BEPIPE$B$NBe$o$j$K(BENOTCONN$B$,H/@8$9$k$h$&$K$J$k$@$1$G(B > |$B$=$N<!$N=q$-9~$_$G$d$O$j%V%m%C%/$7$F$7$^$&$h$&$G$9!#(Bclose$B$K$9$k(B > |$B$H(BEBADF$B$K$O$J$k$N$G$9$,!"(Beof?$B$d(Bclose$B$G$bF1$8$/(BEBADF$B$,H/@8$7$F$7(B > |$B$^$&$h$&$K$J$j$^$9!#B>$N%9%l%C%I$NM-L5$K$h$C$FF0:n$,JQ$o$C$F$7$^(B > |$B$&$N$b$I$&$+$H$$$&5$$b$7$^$9$,!#(B > > $B$O$8$a$KN)$AJV$C$F9M$($k$H!"FI$_=P$=$&$H$7$?;~$K(BEPIPE$B$,JV$C(B > $B$F$-$F$7$^$C$?(BIO$B$KBP$7$F$I$N$h$&$KBP=h$9$Y$-$J$s$G$7$g$&$M!#(B > BROKENPIPE$B%U%i%0$rN)$F$k!"$H$$$&$N$b$"$s$^$j@5$7$$BP=h$N$h$&(B > $B$K$O;W$($J$$$s$G$9$,!#$d$O$j6/@)E*$K(Bclose$B$7$FNc30$+$J$"!#(B $B$G$9$+$M$'!#(B > |$B$H$$$&$+!"(B1.9$B$G$O$9$G$K(Bblocking region$B$r;H$($P(Bselect$B$OI,MW$J$$$O(B > |$B$:$G$9!#(B > > 1.9$B$G$O$=$NJ}8~$GBP1~$7$^$7$g$&$+!#(B 1.8$B$,LdBj$G$9$M!#$H$j$"$($:;n$7$F$_$?$s$G$9$,!"(Bdeadlock$B$K$J$C$F(B $B$7$^$$$^$7$?!#(B # $BK!;v$GIT:_$N$?$a!"(B5/5$B$^$G(BIP unreachable$B!#(B Index: eval.c =================================================================== --- eval.c (revision 16276) +++ eval.c (working copy) @@ -10930,4 +10930,11 @@ rb_thread_schedule() #endif FOREACH_THREAD_FROM(curr, th) { + if ((th->wait_for&WAIT_FD) && FD_ISSET(th->fd, &readfds)) { + FD_CLR(th->fd, &readfds); + th->status = THREAD_RUNNABLE; + th->select_value = n; + n = max; + found = 1; + } if (th->wait_for & WAIT_SELECT) { int v = 0; @@ -10937,6 +10944,8 @@ rb_thread_schedule() v |= find_bad_fds(&exceptfds, &th->exceptfds, th->fd); if (v) { + th->status = THREAD_RUNNABLE; th->select_value = n; n = max; + found = 1; } } @@ -11019,5 +11028,5 @@ rb_thread_schedule() th->thread, thread_status_name(th->status)); if (th->wait_for & WAIT_FD) warn_printf("F(%d)", th->fd); - if (th->wait_for & WAIT_SELECT) warn_printf("S"); + if (th->wait_for & WAIT_SELECT) warn_printf("S(%d)", th->fd); if (th->wait_for & WAIT_TIME) warn_printf("T(%f)", th->delay); if (th->wait_for & WAIT_JOIN) Index: io.c =================================================================== --- io.c (revision 16276) +++ io.c (working copy) @@ -351,4 +351,5 @@ io_fflush(f, fptr) int n; + rb_io_check_closed(fptr); if (!rb_thread_fd_writable(fileno(f))) { rb_io_check_closed(fptr); @@ -388,4 +389,8 @@ rb_io_wait_readable(f) return Qtrue; + case EPIPE: + close(f); + return Qfalse; + default: return Qfalse; @@ -416,4 +421,8 @@ rb_io_wait_writable(f) return Qtrue; + case EPIPE: + close(f); + return Qfalse; + default: return Qfalse; @@ -1724,5 +1733,5 @@ rb_io_getline(rs, io) while ((c = appendline(fptr, newline, &str)) != EOF && (c != newline || RSTRING(str)->len < rslen || - (rspara || rscheck(rsptr,rslen,rs), 0) || + ((rspara || rscheck(rsptr,rslen,rs)) && 0) || memcmp(RSTRING(str)->ptr+RSTRING(str)->len-rslen,rsptr,rslen)));
on 08.05.2008 18:02
$B$^$D$b$H(B $B$f$-$R$m$G$9(B
In message "Re: [ruby-dev:34578] Re: write to broken pipe on Linux"
on Sat, 3 May 2008 09:24:58 +0900, Nobuyoshi Nakada
<nobu@ruby-lang.org> writes:
|> $B$O$8$a$KN)$AJV$C$F9M$($k$H!"FI$_=P$=$&$H$7$?;~$K(BEPIPE$B$,JV$C(B
|> $B$F$-$F$7$^$C$?(BIO$B$KBP$7$F$I$N$h$&$KBP=h$9$Y$-$J$s$G$7$g$&$M!#(B
|> BROKENPIPE$B%U%i%0$rN)$F$k!"$H$$$&$N$b$"$s$^$j@5$7$$BP=h$N$h$&(B
|> $B$K$O;W$($J$$$s$G$9$,!#$d$O$j6/@)E*$K(Bclose$B$7$FNc30$+$J$"!#(B
|1.8$B$,LdBj$G$9$M!#$H$j$"$($:;n$7$F$_$?$s$G$9$,!"(Bdeadlock$B$K$J$C$F(B
|$B$7$^$$$^$7$?!#(B
$B%Q%C%A$O(Bdeadlock$B$K$J$C$?$b$N$G$7$g$&$+!#J}8~@-$H$7$F$O(BEPIPE
$B$G6/@)(Bclose+$BNc30$G$$$$$H;W$&$s$G$9$,!#(B
on 08.05.2008 18:12
In article <E1Ju8YS-0002z1-7k@x61.netlab.jp>, Yukihiro Matsumoto <matz@ruby-lang.org> writes: > $B%Q%C%A$O(Bdeadlock$B$K$J$C$?$b$N$G$7$g$&$+!#J}8~@-$H$7$F$O(BEPIPE > $B$G6/@)(Bclose+$BNc30$G$$$$$H;W$&$s$G$9$,!#(B $B%=%1%C%H$G$O$^$:$$$s$8$c$J$$$G$9$+!#(B % ruby -rsocket -e ' s1, s2 = UNIXSocket.pair s1.shutdown(Socket::SHUT_WR) begin s1.write "a" rescue p $! end s2.write "a" p s1.read(1) ' #<Errno::EPIPE: Broken pipe> "a" EPIPE $B$N$"$H$G$b!"5UJ}8~$NDL?.$O2DG=$G!"(Bclose $B$5$l$F$7$^$&$H(B $BDL?.$,=PMh$J$/$J$j$^$9!#(B
on 08.05.2008 18:15
$B$^$D$b$H(B $B$f$-$R$m$G$9(B
In message "Re: [ruby-dev:34619] Re: write to broken pipe on Linux"
on Fri, 9 May 2008 01:09:44 +0900, Tanaka Akira <akr@fsij.org>
writes:
|> $B%Q%C%A$O(Bdeadlock$B$K$J$C$?$b$N$G$7$g$&$+!#J}8~@-$H$7$F$O(BEPIPE
|> $B$G6/@)(Bclose+$BNc30$G$$$$$H;W$&$s$G$9$,!#(B
|
|$B%=%1%C%H$G$O$^$:$$$s$8$c$J$$$G$9$+!#(B
|EPIPE $B$N$"$H$G$b!"5UJ}8~$NDL?.$O2DG=$G!"(Bclose $B$5$l$F$7$^$&$H(B
|$BDL?.$,=PMh$J$/$J$j$^$9!#(B
$B$=$&$+$"!#$7$+$b(BEPIPE$B$,5/$-$=$&$J$N$O<g$K%=%1%C%H$@$C$?$j$7(B
$B$^$9$7$M$(!#$d$O$j%U%i%0$G>pJs$rJ];}$9$k$7$+$J$$$N$+$J!#(B