Bug #4683 » copy_stream_interrupt_handling.patch
io.c (working copy) | ||
---|---|---|
#include "ruby/ruby.h"
|
||
#include "ruby/io.h"
|
||
#include "vm_core.h"
|
||
#include "dln.h"
|
||
#include <ctype.h>
|
||
#include <errno.h>
|
||
... | ... | |
VALUE th;
|
||
};
|
||
static void *
|
||
exec_interrupts(void *arg)
|
||
{
|
||
rb_thread_t *th = arg;
|
||
rb_threadptr_execute_interrupts(th);
|
||
return NULL;
|
||
}
|
||
/*
|
||
* returns TRUE if the preceding system call was interrupted
|
||
* so we can continue. If the thread was interrupted, we
|
||
* reacquire the GVL to execute interrupts before continuing.
|
||
*/
|
||
static int
|
||
maygvl_copy_stream_wait_read(struct copy_stream_struct *stp)
|
||
maygvl_copy_stream_continue_p(int has_gvl, struct copy_stream_struct *stp)
|
||
{
|
||
switch (errno) {
|
||
case EINTR:
|
||
#if defined(ERESTART)
|
||
case ERESTART:
|
||
#endif
|
||
if (rb_thread_interrupted(stp->th))
|
||
if (has_gvl)
|
||
rb_threadptr_execute_interrupts((rb_thread_t *)stp->th);
|
||
else
|
||
rb_thread_call_with_gvl(exec_interrupts, (void *)stp->th);
|
||
return TRUE;
|
||
}
|
||
return FALSE;
|
||
}
|
||
static int
|
||
maygvl_select(int has_gvl, int n, rb_fdset_t *rfds, rb_fdset_t *wfds, rb_fdset_t *efds, struct timeval *timeout)
|
||
{
|
||
if (has_gvl)
|
||
return rb_thread_fd_select(n, rfds, wfds, efds, timeout);
|
||
else
|
||
return rb_fd_select(n, rfds, wfds, efds, timeout);
|
||
}
|
||
static int
|
||
maygvl_copy_stream_wait_read(int has_gvl, struct copy_stream_struct *stp)
|
||
{
|
||
int ret;
|
||
rb_fd_zero(&stp->fds);
|
||
rb_fd_set(stp->src_fd, &stp->fds);
|
||
ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
|
||
do {
|
||
rb_fd_zero(&stp->fds);
|
||
rb_fd_set(stp->src_fd, &stp->fds);
|
||
ret = maygvl_select(has_gvl, rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
|
||
} while (ret == -1 && maygvl_copy_stream_continue_p(has_gvl, stp));
|
||
if (ret == -1) {
|
||
stp->syserr = "select";
|
||
stp->error_no = errno;
|
||
... | ... | |
nogvl_copy_stream_wait_write(struct copy_stream_struct *stp)
|
||
{
|
||
int ret;
|
||
rb_fd_zero(&stp->fds);
|
||
rb_fd_set(stp->dst_fd, &stp->fds);
|
||
ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL);
|
||
do {
|
||
rb_fd_zero(&stp->fds);
|
||
rb_fd_set(stp->dst_fd, &stp->fds);
|
||
ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL);
|
||
} while (ret == -1 && maygvl_copy_stream_continue_p(0, stp));
|
||
if (ret == -1) {
|
||
stp->syserr = "select";
|
||
stp->error_no = errno;
|
||
... | ... | |
#ifdef USE_SENDFILE
|
||
static int
|
||
maygvl_copy_stream_wait_readwrite(struct copy_stream_struct *stp)
|
||
maygvl_copy_stream_wait_readwrite(int has_gvl, struct copy_stream_struct *stp)
|
||
{
|
||
int ret;
|
||
rb_fd_zero(&stp->fds);
|
||
rb_fd_set(stp->src_fd, &stp->fds);
|
||
rb_fd_set(stp->dst_fd, &stp->fds);
|
||
ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
|
||
ret = maygvl_select(has_gvl, rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
|
||
if (ret == -1) {
|
||
stp->syserr = "select";
|
||
stp->error_no = errno;
|
||
... | ... | |
}
|
||
}
|
||
if (ss == -1) {
|
||
if (maygvl_copy_stream_continue_p(0, stp))
|
||
goto retry_sendfile;
|
||
switch (errno) {
|
||
case EINVAL:
|
||
#ifdef ENOSYS
|
||
... | ... | |
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
|
||
case EWOULDBLOCK:
|
||
#endif
|
||
if (maygvl_copy_stream_wait_readwrite(stp) == -1)
|
||
return -1;
|
||
if (rb_thread_interrupted(stp->th))
|
||
if (maygvl_copy_stream_wait_readwrite(0, stp) == -1)
|
||
return -1;
|
||
goto retry_sendfile;
|
||
}
|
||
... | ... | |
#endif
|
||
static ssize_t
|
||
maygvl_copy_stream_read(struct copy_stream_struct *stp, char *buf, size_t len, off_t offset)
|
||
maygvl_read(int has_gvl, int fd, void *buf, size_t count)
|
||
{
|
||
if (has_gvl)
|
||
return rb_read_internal(fd, buf, count);
|
||
else
|
||
return read(fd, buf, count);
|
||
}
|
||
static ssize_t
|
||
maygvl_copy_stream_read(int has_gvl, struct copy_stream_struct *stp, char *buf, size_t len, off_t offset)
|
||
{
|
||
ssize_t ss;
|
||
retry_read:
|
||
if (offset == (off_t)-1)
|
||
ss = read(stp->src_fd, buf, len);
|
||
if (offset == (off_t)-1) {
|
||
ss = maygvl_read(has_gvl, stp->src_fd, buf, len);
|
||
}
|
||
else {
|
||
#ifdef HAVE_PREAD
|
||
ss = pread(stp->src_fd, buf, len, offset);
|
||
... | ... | |
return 0;
|
||
}
|
||
if (ss == -1) {
|
||
if (maygvl_copy_stream_continue_p(has_gvl, stp))
|
||
goto retry_read;
|
||
switch (errno) {
|
||
case EAGAIN:
|
||
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
|
||
case EWOULDBLOCK:
|
||
#endif
|
||
if (maygvl_copy_stream_wait_read(stp) == -1)
|
||
if (maygvl_copy_stream_wait_read(has_gvl, stp) == -1)
|
||
return -1;
|
||
goto retry_read;
|
||
#ifdef ENOSYS
|
||
... | ... | |
while (len) {
|
||
ss = write(stp->dst_fd, buf+off, len);
|
||
if (ss == -1) {
|
||
if (maygvl_copy_stream_continue_p(0, stp))
|
||
continue;
|
||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||
if (nogvl_copy_stream_wait_write(stp) == -1)
|
||
return -1;
|
||
... | ... | |
len = sizeof(buf);
|
||
}
|
||
if (use_pread) {
|
||
ss = maygvl_copy_stream_read(stp, buf, len, src_offset);
|
||
ss = maygvl_copy_stream_read(0, stp, buf, len, src_offset);
|
||
if (0 < ss)
|
||
src_offset += ss;
|
||
}
|
||
else {
|
||
ss = maygvl_copy_stream_read(stp, buf, len, (off_t)-1);
|
||
ss = maygvl_copy_stream_read(0, stp, buf, len, (off_t)-1);
|
||
}
|
||
if (ss <= 0) /* EOF or error */
|
||
return;
|
||
... | ... | |
if (!use_eof)
|
||
copy_length -= ss;
|
||
if (rb_thread_interrupted(stp->th))
|
||
return;
|
||
}
|
||
}
|
||
... | ... | |
ssize_t ss;
|
||
rb_thread_wait_fd(stp->src_fd);
|
||
rb_str_resize(buf, buflen);
|
||
ss = maygvl_copy_stream_read(stp, RSTRING_PTR(buf), l, off);
|
||
ss = maygvl_copy_stream_read(1, stp, RSTRING_PTR(buf), l, off);
|
||
if (ss == -1)
|
||
return Qnil;
|
||
if (ss == 0)
|
test/ruby/test_io.rb (working copy) | ||
---|---|---|
}
|
||
end
|
||
def trapping_usr1
|
||
@usr1_rcvd = 0
|
||
trap(:USR1) { @usr1_rcvd += 1 }
|
||
yield
|
||
ensure
|
||
trap(:USR1, "DEFAULT")
|
||
end
|
||
def test_pipe
|
||
r, w = IO.pipe
|
||
assert_instance_of(IO, r)
|
||
... | ... | |
result = t.value
|
||
assert_equal(megacontent, result)
|
||
}
|
||
with_socketpair {|s1, s2|
|
||
begin
|
||
s1.nonblock = true
|
||
rescue Errno::EBADF
|
||
skip "nonblocking IO for pipe is not implemented"
|
||
end
|
||
trapping_usr1 do
|
||
nr = 10
|
||
pid = fork do
|
||
s1.close
|
||
IO.select([s2])
|
||
Process.kill(:USR1, Process.ppid)
|
||
s2.read
|
||
end
|
||
s2.close
|
||
nr.times do
|
||
assert_equal megacontent.bytesize, IO.copy_stream("megasrc", s1)
|
||
end
|
||
assert_equal(1, @usr1_rcvd)
|
||
s1.close
|
||
_, status = Process.waitpid2(pid)
|
||
assert status.success?, status.inspect
|
||
end
|
||
}
|
||
end
|
||
}
|
||
end
|