Project

General

Profile

Feature #8788 » 0001-thread_pthread-use-eventfd-under-Linux-for-timer-thr.patch

use eventfd on Linux - normalperson (Eric Wong), 08/17/2013 06:40 AM

View differences:

configure.in
AC_CHECK_FUNCS(dup3)
AC_CHECK_FUNCS(eaccess)
AC_CHECK_FUNCS(endgrent)
AC_CHECK_FUNCS(eventfd)
AC_CHECK_FUNCS(fchmod)
AC_CHECK_FUNCS(fchown)
AC_CHECK_FUNCS(fcntl)
thread_pthread.c
#define TIME_QUANTUM_USEC (100 * 1000)
#if USE_SLEEPY_TIMER_THREAD
# ifdef HAVE_EVENTFD
# include <sys/eventfd.h>
static int timer_thread_efd = -1;
static int timer_thread_efd_low = -1; /* low priority */
static const uint64_t tt_write_buf = 1;
static uint64_t tt_read_buf[1]; /* value ignored */
# define TT_FDPTR (&timer_thread_efd)
# define TT_FDPTR_LOW (&timer_thread_efd_low)
# define TT_READFD timer_thread_efd
# define TT_READFD_LOW timer_thread_efd_low
# define TT_WRITEFD timer_thread_efd
# define TT_WRITEFD_LOW timer_thread_efd_low
# if defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK)
# define TT_EFD_FLAGS (EFD_CLOEXEC|EFD_NONBLOCK)
# else
# define TT_EFD_FLAGS (0)
# endif
int
rb_reserved_fd_p(int fd)
{
return (fd == timer_thread_efd || fd == timer_thread_efd_low);
}
# else /* use self-pipe */
static int timer_thread_pipe[2] = {-1, -1};
static int timer_thread_pipe_low[2] = {-1, -1}; /* low priority */
static const char tt_write_buf = '!';
/* buffer can be shared because no one refers to them. */
static char tt_read_buf[1024];
# define TT_FDPTR (timer_thread_pipe)
# define TT_FDPTR_LOW (timer_thread_pipe_low)
# define TT_READFD (timer_thread_pipe[0])
# define TT_READFD_LOW (timer_thread_pipe_low[0])
# define TT_WRITEFD (timer_thread_pipe[1])
# define TT_WRITEFD_LOW (timer_thread_pipe_low[1])
int
rb_reserved_fd_p(int fd)
{
if (fd == timer_thread_pipe[0] ||
fd == timer_thread_pipe[1] ||
fd == timer_thread_pipe_low[0] ||
fd == timer_thread_pipe_low[1]) {
return 1;
}
return 0;
}
# endif /* !HAVE_EVENTFD */
static int timer_thread_pipe_owner_process;
/* only use signal-safe system calls here */
......
/* already opened */
if (timer_thread_pipe_owner_process == getpid()) {
const char *buff = "!";
retry:
if ((result = write(fd, buff, 1)) <= 0) {
if ((result = write(fd, &tt_write_buf, sizeof(tt_write_buf))) <= 0) {
switch (errno) {
case EINTR: goto retry;
case EAGAIN:
......
void
rb_thread_wakeup_timer_thread(void)
{
rb_thread_wakeup_timer_thread_fd(timer_thread_pipe[1]);
rb_thread_wakeup_timer_thread_fd(TT_WRITEFD);
}
static void
rb_thread_wakeup_timer_thread_low(void)
{
rb_thread_wakeup_timer_thread_fd(timer_thread_pipe_low[1]);
rb_thread_wakeup_timer_thread_fd(TT_WRITEFD_LOW);
}
/* VM-dependent API is not available for this function */
static void
consume_communication_pipe(int fd)
consume_communication_fd(int fd)
{
#define CCP_READ_BUFF_SIZE 1024
/* buffer can be shared because no one refers to them. */
static char buff[CCP_READ_BUFF_SIZE];
ssize_t result;
while (1) {
result = read(fd, buff, sizeof(buff));
result = read(fd, tt_read_buf, sizeof(tt_read_buf));
if (result == 0) {
return;
}
......
case EAGAIN:
return;
default:
rb_async_bug_errno("consume_communication_pipe: read\n", errno);
rb_async_bug_errno("consume_communication_fd: read\n", errno);
}
}
}
}
#ifdef HAVE_EVENTFD
static void
close_communication_efd(int *efd)
{
if (close(*efd) < 0)
rb_bug_errno("native_stop_timer_thread - close(ttp[0])", errno);
*efd = -1;
}
#else /* !HAVE_EVENTFD */
static void
close_communication_pipe(int pipes[2])
{
......
}
pipes[0] = pipes[1] = -1;
}
#endif /* !HAVE_EVENTFD */
#ifdef HAVE_EVENTFD
/* Linux <= 2.6.27 compatibility, emulate EFD_CLOEXEC|EFD_NONBLOCK */
static int
eventfd_compat(int fd)
{
int flags;
flags = fcntl(fd, F_GETFD);
if (flags == -1)
rb_bug_errno("eventfd_compat: failed F_GETFD on eventfd", errno);
if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC) < 0)
rb_bug_errno("eventfd_compat: failed F_SETFD on eventfd", errno);
flags = fcntl(fd, F_GETFL);
if (flags == -1)
rb_bug_errno("eventfd_compat: failed F_GETFL on eventfd", errno);
if (fcntl(fd, F_SETFL | O_NONBLOCK) < 0)
rb_bug_errno("eventfd_compat: failed F_SETFL on eventfd", errno);
return fd;
}
static void
setup_communication_fds_internal(int *efd)
{
if (*efd != -1) {
/* close eventfd of parent process */
close_communication_efd(efd);
}
*efd = eventfd(0, TT_EFD_FLAGS);
if (*efd < 0) {
if (errno == EINVAL) {
/* new glibc headers, old kernel */
*efd = eventfd(0, 0);
if (*efd < 0)
rb_sys_fail("eventfd");
eventfd_compat(*efd);
} else {
rb_sys_fail("eventfd");
}
} else {
if (TT_EFD_FLAGS == 0)
eventfd_compat(*efd);
}
rb_update_max_fd(*efd);
}
#else /* !HAVE_EVENTFD */
static void
set_nonblock(int fd)
{
......
}
static void
setup_communication_pipe_internal(int pipes[2])
setup_communication_fds_internal(int pipes[2])
{
int err;
......
err = rb_cloexec_pipe(pipes);
if (err != 0) {
rb_bug_errno("setup_communication_pipe: Failed to create communication pipe for timer thread", errno);
rb_bug_errno("setup_communication_fds: Failed to create communication pipe for timer thread", errno);
}
rb_update_max_fd(pipes[0]);
rb_update_max_fd(pipes[1]);
set_nonblock(pipes[0]);
set_nonblock(pipes[1]);
}
#endif /* !HAVE_EVENTFD */
/* communication pipe with timer thread and signal handler */
static void
setup_communication_pipe(void)
setup_communication_fds(void)
{
if (timer_thread_pipe_owner_process == getpid()) {
/* already set up. */
return;
}
setup_communication_pipe_internal(timer_thread_pipe);
setup_communication_pipe_internal(timer_thread_pipe_low);
setup_communication_fds_internal(TT_FDPTR);
setup_communication_fds_internal(TT_FDPTR_LOW);
/* validate pipe on this process */
timer_thread_pipe_owner_process = getpid();
......
int need_polling;
struct pollfd pollfds[2];
pollfds[0].fd = timer_thread_pipe[0];
pollfds[0].fd = TT_READFD;
pollfds[0].events = POLLIN;
pollfds[1].fd = timer_thread_pipe_low[0];
pollfds[1].fd = TT_READFD_LOW;
pollfds[1].events = POLLIN;
need_polling = check_signal_thread_list();
......
/* maybe timeout */
}
else if (result > 0) {
consume_communication_pipe(timer_thread_pipe[0]);
consume_communication_pipe(timer_thread_pipe_low[0]);
consume_communication_fd(TT_READFD);
consume_communication_fd(TT_READFD_LOW);
}
else { /* result < 0 */
switch (errno) {
......
native_cond_timedwait(&timer_thread_cond, &timer_thread_lock, &ts);
}
int
rb_reserved_fd_p(int fd)
{
return 0;
}
#endif /* USE_SLEEPY_TIMER_THREAD */
static void *
......
#endif
#if USE_SLEEPY_TIMER_THREAD
setup_communication_pipe();
setup_communication_fds();
#endif /* USE_SLEEPY_TIMER_THREAD */
/* create timer thread */
......
}
#endif
int
rb_reserved_fd_p(int fd)
{
#if USE_SLEEPY_TIMER_THREAD
if (fd == timer_thread_pipe[0] ||
fd == timer_thread_pipe[1] ||
fd == timer_thread_pipe_low[0] ||
fd == timer_thread_pipe_low[1]) {
return 1;
}
else {
return 0;
}
#else
return 0;
#endif
}
rb_nativethread_id_t
rb_nativethread_self(void)
{
(1-1/3)