Project

General

Profile

Bug #4683 » copy_stream_interrupt_handling.patch

akr (Akira Tanaka), 05/29/2011 07:29 PM

View differences:

io.c (working copy)
#include "ruby/ruby.h"
#include "ruby/io.h"
#include "vm_core.h"
#include "dln.h"
#include <ctype.h>
#include <errno.h>
......
VALUE th;
};
static void *
exec_interrupts(void *arg)
{
rb_thread_t *th = arg;
rb_threadptr_execute_interrupts(th);
return NULL;
}
/*
* returns TRUE if the preceding system call was interrupted
* so we can continue. If the thread was interrupted, we
* reacquire the GVL to execute interrupts before continuing.
*/
static int
maygvl_copy_stream_wait_read(struct copy_stream_struct *stp)
maygvl_copy_stream_continue_p(int has_gvl, struct copy_stream_struct *stp)
{
switch (errno) {
case EINTR:
#if defined(ERESTART)
case ERESTART:
#endif
if (rb_thread_interrupted(stp->th))
if (has_gvl)
rb_threadptr_execute_interrupts((rb_thread_t *)stp->th);
else
rb_thread_call_with_gvl(exec_interrupts, (void *)stp->th);
return TRUE;
}
return FALSE;
}
static int
maygvl_select(int has_gvl, int n, rb_fdset_t *rfds, rb_fdset_t *wfds, rb_fdset_t *efds, struct timeval *timeout)
{
if (has_gvl)
return rb_thread_fd_select(n, rfds, wfds, efds, timeout);
else
return rb_fd_select(n, rfds, wfds, efds, timeout);
}
static int
maygvl_copy_stream_wait_read(int has_gvl, struct copy_stream_struct *stp)
{
int ret;
rb_fd_zero(&stp->fds);
rb_fd_set(stp->src_fd, &stp->fds);
ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
do {
rb_fd_zero(&stp->fds);
rb_fd_set(stp->src_fd, &stp->fds);
ret = maygvl_select(has_gvl, rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
} while (ret == -1 && maygvl_copy_stream_continue_p(has_gvl, stp));
if (ret == -1) {
stp->syserr = "select";
stp->error_no = errno;
......
nogvl_copy_stream_wait_write(struct copy_stream_struct *stp)
{
int ret;
rb_fd_zero(&stp->fds);
rb_fd_set(stp->dst_fd, &stp->fds);
ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL);
do {
rb_fd_zero(&stp->fds);
rb_fd_set(stp->dst_fd, &stp->fds);
ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL);
} while (ret == -1 && maygvl_copy_stream_continue_p(0, stp));
if (ret == -1) {
stp->syserr = "select";
stp->error_no = errno;
......
#ifdef USE_SENDFILE
static int
maygvl_copy_stream_wait_readwrite(struct copy_stream_struct *stp)
maygvl_copy_stream_wait_readwrite(int has_gvl, struct copy_stream_struct *stp)
{
int ret;
rb_fd_zero(&stp->fds);
rb_fd_set(stp->src_fd, &stp->fds);
rb_fd_set(stp->dst_fd, &stp->fds);
ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
ret = maygvl_select(has_gvl, rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
if (ret == -1) {
stp->syserr = "select";
stp->error_no = errno;
......
}
}
if (ss == -1) {
if (maygvl_copy_stream_continue_p(0, stp))
goto retry_sendfile;
switch (errno) {
case EINVAL:
#ifdef ENOSYS
......
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
if (maygvl_copy_stream_wait_readwrite(stp) == -1)
return -1;
if (rb_thread_interrupted(stp->th))
if (maygvl_copy_stream_wait_readwrite(0, stp) == -1)
return -1;
goto retry_sendfile;
}
......
#endif
static ssize_t
maygvl_copy_stream_read(struct copy_stream_struct *stp, char *buf, size_t len, off_t offset)
maygvl_read(int has_gvl, int fd, void *buf, size_t count)
{
if (has_gvl)
return rb_read_internal(fd, buf, count);
else
return read(fd, buf, count);
}
static ssize_t
maygvl_copy_stream_read(int has_gvl, struct copy_stream_struct *stp, char *buf, size_t len, off_t offset)
{
ssize_t ss;
retry_read:
if (offset == (off_t)-1)
ss = read(stp->src_fd, buf, len);
if (offset == (off_t)-1) {
ss = maygvl_read(has_gvl, stp->src_fd, buf, len);
}
else {
#ifdef HAVE_PREAD
ss = pread(stp->src_fd, buf, len, offset);
......
return 0;
}
if (ss == -1) {
if (maygvl_copy_stream_continue_p(has_gvl, stp))
goto retry_read;
switch (errno) {
case EAGAIN:
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
if (maygvl_copy_stream_wait_read(stp) == -1)
if (maygvl_copy_stream_wait_read(has_gvl, stp) == -1)
return -1;
goto retry_read;
#ifdef ENOSYS
......
while (len) {
ss = write(stp->dst_fd, buf+off, len);
if (ss == -1) {
if (maygvl_copy_stream_continue_p(0, stp))
continue;
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (nogvl_copy_stream_wait_write(stp) == -1)
return -1;
......
len = sizeof(buf);
}
if (use_pread) {
ss = maygvl_copy_stream_read(stp, buf, len, src_offset);
ss = maygvl_copy_stream_read(0, stp, buf, len, src_offset);
if (0 < ss)
src_offset += ss;
}
else {
ss = maygvl_copy_stream_read(stp, buf, len, (off_t)-1);
ss = maygvl_copy_stream_read(0, stp, buf, len, (off_t)-1);
}
if (ss <= 0) /* EOF or error */
return;
......
if (!use_eof)
copy_length -= ss;
if (rb_thread_interrupted(stp->th))
return;
}
}
......
ssize_t ss;
rb_thread_wait_fd(stp->src_fd);
rb_str_resize(buf, buflen);
ss = maygvl_copy_stream_read(stp, RSTRING_PTR(buf), l, off);
ss = maygvl_copy_stream_read(1, stp, RSTRING_PTR(buf), l, off);
if (ss == -1)
return Qnil;
if (ss == 0)
test/ruby/test_io.rb (working copy)
}
end
def trapping_usr1
@usr1_rcvd = 0
trap(:USR1) { @usr1_rcvd += 1 }
yield
ensure
trap(:USR1, "DEFAULT")
end
def test_pipe
r, w = IO.pipe
assert_instance_of(IO, r)
......
result = t.value
assert_equal(megacontent, result)
}
with_socketpair {|s1, s2|
begin
s1.nonblock = true
rescue Errno::EBADF
skip "nonblocking IO for pipe is not implemented"
end
trapping_usr1 do
nr = 10
pid = fork do
s1.close
IO.select([s2])
Process.kill(:USR1, Process.ppid)
s2.read
end
s2.close
nr.times do
assert_equal megacontent.bytesize, IO.copy_stream("megasrc", s1)
end
assert_equal(1, @usr1_rcvd)
s1.close
_, status = Process.waitpid2(pid)
assert status.success?, status.inspect
end
}
end
}
end
(3-3/4)