Feature #13618 » 0001-auto-fiber-schedule-for-rb_wait_for_single_fd-and-rb.patch
common.mk | ||
---|---|---|
thread.$(OBJEXT): {$(VPATH)}intern.h
|
||
thread.$(OBJEXT): {$(VPATH)}internal.h
|
||
thread.$(OBJEXT): {$(VPATH)}io.h
|
||
thread.$(OBJEXT): {$(VPATH)}iom.h
|
||
thread.$(OBJEXT): {$(VPATH)}iom_internal.h
|
||
thread.$(OBJEXT): {$(VPATH)}iom_common.h
|
||
thread.$(OBJEXT): {$(VPATH)}iom_epoll.h
|
||
thread.$(OBJEXT): {$(VPATH)}iom_kqueue.h
|
||
thread.$(OBJEXT): {$(VPATH)}iom_pingable_common.h
|
||
thread.$(OBJEXT): {$(VPATH)}iom_select.h
|
||
thread.$(OBJEXT): {$(VPATH)}method.h
|
||
thread.$(OBJEXT): {$(VPATH)}missing.h
|
||
thread.$(OBJEXT): {$(VPATH)}node.h
|
configure.in | ||
---|---|---|
AC_CHECK_HEADERS(pwd.h)
|
||
AC_CHECK_HEADERS(setjmpex.h)
|
||
AC_CHECK_HEADERS(sys/attr.h)
|
||
AC_CHECK_HEADERS(sys/epoll.h)
|
||
AC_CHECK_HEADERS(sys/event.h)
|
||
AC_CHECK_HEADERS(sys/fcntl.h)
|
||
AC_CHECK_HEADERS(sys/file.h)
|
||
AC_CHECK_HEADERS(sys/id.h)
|
||
... | ... | |
AC_CHECK_FUNCS(dup)
|
||
AC_CHECK_FUNCS(dup3)
|
||
AC_CHECK_FUNCS(eaccess)
|
||
AC_CHECK_FUNCS(epoll_create)
|
||
AC_CHECK_FUNCS(epoll_create1)
|
||
AC_CHECK_FUNCS(epoll_ctl)
|
||
AC_CHECK_FUNCS(epoll_wait)
|
||
AC_CHECK_FUNCS(endgrent)
|
||
AC_CHECK_FUNCS(fchmod)
|
||
AC_CHECK_FUNCS(fchown)
|
||
... | ... | |
AC_CHECK_FUNCS(ioctl)
|
||
AC_CHECK_FUNCS(isfinite)
|
||
AC_CHECK_FUNCS(issetugid)
|
||
AC_CHECK_FUNCS(kevent)
|
||
AC_CHECK_FUNCS(killpg)
|
||
AC_CHECK_FUNCS(kqueue)
|
||
AC_CHECK_FUNCS(lchmod)
|
||
AC_CHECK_FUNCS(lchown)
|
||
AC_CHECK_FUNCS(link)
|
||
... | ... | |
AS_IF([test x$with_valgrind != xno],
|
||
[AC_CHECK_HEADERS(valgrind/memcheck.h)])
|
||
AC_DEFINE_UNQUOTED(IOM_SELECT, 0)
|
||
AC_DEFINE_UNQUOTED(IOM_KQUEUE, 1)
|
||
AC_DEFINE_UNQUOTED(IOM_EPOLL, 2)
|
||
iom_default=select
|
||
AS_CASE([$ac_cv_func_kqueue:$ac_cv_func_kevent:$ac_cv_header_sys_event_h],
|
||
[yes:yes:yes], [iom_default=kqueue],
|
||
[*],
|
||
[AS_CASE(
|
||
[$ac_cv_func_epoll_wait:$ac_cv_func_epoll_create:$ac_cv_header_sys_epoll_h],
|
||
[yes:yes:yes], [iom_default=epoll])]
|
||
)
|
||
AC_ARG_WITH(iom,
|
||
AS_HELP_STRING([--with-iom=XXXXX],
|
||
[I/O manager (select|kqueue|epoll)]),
|
||
[with_iom="$withval"], [with_iom="$iom_default"])
|
||
AS_CASE(["$with_iom"],
|
||
[select], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_SELECT)],
|
||
[kqueue], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_KQUEUE)],
|
||
[epoll], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_EPOLL)],
|
||
[AC_MSG_ERROR(unknown I/O manager: $with_iom)])
|
||
dln_a_out_works=no
|
||
if test "$ac_cv_header_a_out_h" = yes; then
|
||
if test "$with_dln_a_out" = yes || test "$rb_cv_dlopen" = unknown; then
|
||
... | ... | |
config_summary "man page type" "$MANTYPE"
|
||
config_summary "search path" "$search_path"
|
||
config_summary "static-linked-ext" ${EXTSTATIC:+"yes"}
|
||
config_summary "I/O manager" ${with_iom}
|
||
echo ""
|
||
echo "---"
|
cont.c | ||
---|---|---|
#include "vm_core.h"
|
||
#include "gc.h"
|
||
#include "eval_intern.h"
|
||
#include "iom.h"
|
||
/* FIBER_USE_NATIVE enables Fiber performance improvement using system
|
||
* dependent method such as make/setcontext on POSIX system or
|
||
... | ... | |
* You shouldn't mix "transfer" and "resume".
|
||
*/
|
||
int transferred;
|
||
unsigned int auto_fiber:1;
|
||
#if FIBER_USE_NATIVE
|
||
#ifdef _WIN32
|
||
... | ... | |
fiber_switch(return_fiber(), 1, &value, 0);
|
||
}
|
||
VALUE
|
||
rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
|
||
int
|
||
rb_fiber_resumable_p(const rb_thread_t *th, const rb_fiber_t *fib)
|
||
{
|
||
rb_fiber_t *fib;
|
||
GetFiberPtr(fibval, fib);
|
||
return th->root_fiber != fib && !fib->prev;
|
||
}
|
||
static void
|
||
fiber_check_resume(const rb_fiber_t *fib)
|
||
{
|
||
if (fib->prev != 0 || fib->cont.type == ROOT_FIBER_CONTEXT) {
|
||
rb_raise(rb_eFiberError, "double resume");
|
||
}
|
||
if (fib->transferred != 0) {
|
||
rb_raise(rb_eFiberError, "cannot resume transferred Fiber");
|
||
}
|
||
}
|
||
VALUE
|
||
rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
|
||
{
|
||
rb_fiber_t *fib;
|
||
GetFiberPtr(fibval, fib);
|
||
fiber_check_resume(fib);
|
||
return fiber_switch(fib, argc, argv, 1);
|
||
}
|
||
... | ... | |
return rb_fiber_current();
|
||
}
|
||
/* Returns true if auto-fiber is enabled for current fiber */
|
||
int
|
||
rb_fiber_auto_sched_p(const rb_thread_t *th)
|
||
{
|
||
const rb_fiber_t *cur = th->fiber;
|
||
return (cur && cur->auto_fiber && th->root_fiber != cur);
|
||
}
|
||
/*
|
||
* Enable auto-scheduling for the Fiber and resume it
|
||
*/
|
||
static VALUE
|
||
rb_fiber_auto_start(int argc, VALUE *argv, VALUE self)
|
||
{
|
||
rb_thread_t *th = GET_THREAD();
|
||
rb_fiber_t *fib;
|
||
GetFiberPtr(self, fib);
|
||
if (th->root_fiber == fib) {
|
||
rb_raise(rb_eFiberError, "Root fiber cannot #start");
|
||
}
|
||
if (fib->auto_fiber) {
|
||
rb_raise(rb_eFiberError, "Fiber already started");
|
||
}
|
||
fib->auto_fiber = 1;
|
||
fiber_check_resume(fib);
|
||
return fiber_switch(fib, argc, argv, 1);
|
||
}
|
||
rb_thread_t *
|
||
rb_fiber_owner_thread(VALUE self)
|
||
{
|
||
rb_fiber_t *fib;
|
||
rb_thread_t *th;
|
||
GetFiberPtr(self, fib);
|
||
GetThreadPtr(fib->cont.saved_thread.self, th);
|
||
return th;
|
||
}
|
||
static void
|
||
fiber_auto_join(rb_fiber_t *fib, double *timeout)
|
||
{
|
||
rb_thread_t *th = GET_THREAD();
|
||
rb_fiber_t *cur = fiber_current();
|
||
if (cur == fib) {
|
||
rb_raise(rb_eFiberError, "Target fiber must not be current fiber");
|
||
}
|
||
if (th->root_fiber == fib) {
|
||
rb_raise(rb_eFiberError, "Target fiber must not be root fiber");
|
||
}
|
||
if (fib->cont.saved_thread.self != th->self) {
|
||
rb_raise(rb_eFiberError, "Target fiber not owned by current thread");
|
||
}
|
||
if (!fib->auto_fiber) {
|
||
rb_raise(rb_eFiberError, "Target fiber is not an auto-fiber");
|
||
}
|
||
while (fib->status != TERMINATED && (timeout == 0 || *timeout >= 0.0)) {
|
||
rb_iom_schedule(th, timeout);
|
||
}
|
||
}
|
||
static VALUE
|
||
rb_fiber_auto_join(int argc, VALUE *argv, VALUE self)
|
||
{
|
||
rb_fiber_t *fib;
|
||
double timeout, *t;
|
||
VALUE limit;
|
||
GetFiberPtr(self, fib);
|
||
rb_scan_args(argc, argv, "01", &limit);
|
||
if (NIL_P(limit)) {
|
||
t = 0;
|
||
} else {
|
||
timeout = rb_num2dbl(limit);
|
||
t = &timeout;
|
||
}
|
||
fiber_auto_join(fib, t);
|
||
return fib->status == TERMINATED ? fib->cont.self : Qnil;
|
||
}
|
||
static VALUE
|
||
rb_fiber_auto_value(VALUE self)
|
||
{
|
||
rb_fiber_t *fib;
|
||
GetFiberPtr(self, fib);
|
||
fiber_auto_join(fib, 0);
|
||
return fib->cont.value;
|
||
}
|
||
/*
|
||
* Document-class: FiberError
|
||
... | ... | |
rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1);
|
||
rb_define_method(rb_cFiber, "initialize", rb_fiber_init, 0);
|
||
rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1);
|
||
rb_define_method(rb_cFiber, "start", rb_fiber_auto_start, -1);
|
||
rb_define_method(rb_cFiber, "join", rb_fiber_auto_join, -1);
|
||
rb_define_method(rb_cFiber, "value", rb_fiber_auto_value, 0);
|
||
}
|
||
RUBY_SYMBOL_EXPORT_BEGIN
|
include/ruby/io.h | ||
---|---|---|
/* #define FMODE_UNIX 0x00200000 */
|
||
/* #define FMODE_INET 0x00400000 */
|
||
/* #define FMODE_INET6 0x00800000 */
|
||
/* #define FMODE_IOM_PRIVATE1 0x01000000 */ /* OS-dependent */
|
||
/* #define FMODE_IOM_PRIVATE2 0x02000000 */ /* OS-dependent */
|
||
#define GetOpenFile(obj,fp) rb_io_check_closed((fp) = RFILE(rb_io_taint_check(obj))->fptr)
|
||
iom.h | ||
---|---|---|
/*
|
||
* iom -> I/O Manager for RubyVM (auto-Fiber-aware)
|
||
*
|
||
* On platforms with epoll or kqueue, this should be ready for multicore;
|
||
* even if the rest of the RubyVM is not.
|
||
*
|
||
* Some inspiration taken from Mio in GHC:
|
||
* http://haskell.cs.yale.edu/wp-content/uploads/2013/08/hask035-voellmy.pdf
|
||
*/
|
||
#ifndef RUBY_IOM_H
|
||
#define RUBY_IOM_H
|
||
#include "ruby.h"
|
||
#include "ruby/io.h"
|
||
#include "ruby/intern.h"
|
||
#include "vm_core.h"
|
||
typedef struct rb_iom_struct rb_iom_t;
|
||
/* WARNING: unstable API, only for Ruby internal use */
|
||
/*
|
||
* Note: the first "rb_thread_t *" is a placeholder and may be replaced
|
||
* with "rb_execution_context_t *" in the future.
|
||
*/
|
||
/*
|
||
* All functions with "wait" in it take an optional double * +timeout+
|
||
* argument specifying the timeout in seconds. If NULL, it can wait
|
||
* forever until the event happens (or the fiber is explicitly resumed).
|
||
*
|
||
* (maybe) TODO: If non-NULL, the timeout will be updated to the
|
||
* remaining time upon returning. Not sure if useful, could just be
|
||
* a a waste of cycles; so not implemented, yet.
|
||
*/
|
||
/*
|
||
* Relinquish calling fiber while waiting for +events+ on the given
|
||
* +rb_io_t+
|
||
*
|
||
* Multiple native threads can enter this function at the same time.
|
||
*
|
||
* Events are RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI
|
||
*
|
||
* Returns a mask of events.
|
||
*/
|
||
int rb_iom_waitio(rb_thread_t *, rb_io_t *, int events, double *timeout);
|
||
/*
|
||
* Identical to rb_iom_waitio, but takes a pointer to an integer file
|
||
* descriptor, instead of rb_io_t. Use rb_iom_waitio when possible,
|
||
* since it allows us to optimize epoll (and perhaps avoid kqueue
|
||
* portability bugs across different *BSDs).
|
||
*/
|
||
int rb_iom_waitfd(rb_thread_t *, int *fdp, int events, double *timeout);
|
||
/*
|
||
* Relinquish calling fiber to wait for the given PID to change status.
|
||
* Multiple native threads can enter this function at the same time.
|
||
* If timeout is negative, wait forever.
|
||
*/
|
||
rb_pid_t rb_iom_waitpid(rb_thread_t *,
|
||
rb_pid_t, int *status, int options, double *timeout);
|
||
/*
|
||
* Relinquish calling fiber for at least the duration of given timeout
|
||
* in seconds. If timeout is negative, wait forever (until explicitly
|
||
* resumed).
|
||
* Multiple native threads can enter this function at the same time.
|
||
*/
|
||
void rb_iom_sleep(rb_thread_t *, double *timeout);
|
||
/* callback for SIGCHLD, needed to implemented for rb_iom_waitpid */
|
||
void rb_iom_sigchld(rb_vm_t *);
|
||
/*
|
||
* there is no public create function, creation is lazy to avoid incurring
|
||
* overhead for small scripts which do not need fibers, we only need this
|
||
* at VM destruction
|
||
*/
|
||
void rb_iom_destroy(rb_vm_t *);
|
||
/*
|
||
* schedule
|
||
*/
|
||
void rb_iom_schedule(rb_thread_t *th, double *timeout);
|
||
/* cont.c */
|
||
int rb_fiber_auto_sched_p(const rb_thread_t *);
|
||
rb_thread_t *rb_fiber_owner_thread(VALUE);
|
||
#endif /* RUBY_IOM_H */
|
iom_common.h | ||
---|---|---|
/* included by iom_(epoll|select|kqueue).h */
|
||
/* we lazily create this, small scripts may never need iom */
|
||
static rb_iom_t *
|
||
rb_iom_new(rb_thread_t *th)
|
||
{
|
||
rb_iom_t *iom = ALLOC(rb_iom_t);
|
||
rb_iom_init(iom);
|
||
return iom;
|
||
}
|
||
static rb_iom_t *
|
||
rb_iom_get(rb_thread_t *th)
|
||
{
|
||
VM_ASSERT(th && th->vm);
|
||
if (!th->vm->iom) {
|
||
th->vm->iom = rb_iom_new(th);
|
||
}
|
||
return th->vm->iom;
|
||
}
|
||
/* check for expired timers */
|
||
static void
|
||
rb_iom_timer_check(const rb_thread_t *th)
|
||
{
|
||
rb_iom_t *iom = th->vm->iom;
|
||
if (iom) {
|
||
struct rb_iom_timer *t = 0, *next = 0;
|
||
double now = timeofday();
|
||
list_for_each_safe(&iom->timers, t, next, n.tnode) {
|
||
if (t->expires_at <= now) {
|
||
struct rb_iom_waiter *w = rb_iom_waiter_of(t);
|
||
VALUE fibval = rb_iom_timer_fibval(t);
|
||
if (w) {
|
||
list_del_init(&w->wnode);
|
||
}
|
||
list_del_init(&t->n.tnode);
|
||
/* non-auto-fibers may set timer in rb_iom_schedule */
|
||
if (fibval != Qfalse) {
|
||
rb_thread_t *owner = rb_fiber_owner_thread(fibval);
|
||
list_add_tail(&owner->afrunq, &t->n.rnode);
|
||
}
|
||
}
|
||
return; /* done, timers is a sorted list */
|
||
}
|
||
}
|
||
}
|
||
/* insert a new +timer+ into +timers+, maintain sort order by expires_at */
|
||
static void
|
||
rb_iom_timer_add(rb_thread_t *th, struct rb_iom_timer *add,
|
||
const double *timeout, int flags)
|
||
{
|
||
add->_fibval = flags & IOM_FIB ? rb_fiber_current() : Qfalse;
|
||
add->_fibval |= flags & IOM_WAIT ? 0 : IOM_FIBMASK;
|
||
rb_iom_timer_check(th);
|
||
if (timeout) {
|
||
rb_iom_t *iom = rb_iom_get(th);
|
||
struct rb_iom_timer *i = 0;
|
||
add->expires_at = timeofday() + *timeout;
|
||
/*
|
||
* search backwards: assume typical projects have multiple objects
|
||
* sharing the same timeout values, so new timers will expire later
|
||
* than existing timers
|
||
*/
|
||
list_for_each_rev(&iom->timers, i, n.tnode) {
|
||
if (add->expires_at > i->expires_at) {
|
||
list_add_after(&iom->timers, &i->n.tnode, &add->n.tnode);
|
||
return;
|
||
}
|
||
}
|
||
list_add(&iom->timers, &add->n.tnode);
|
||
}
|
||
else {
|
||
/* not active, just allow list_del to function */
|
||
list_node_init(&add->n.tnode);
|
||
}
|
||
}
|
||
/* max == -1 : wake all */
|
||
static void
|
||
rb_iom_blockers_notify(rb_iom_t *iom, int max)
|
||
{
|
||
struct rb_iom_blocker *b = 0, *next = 0;
|
||
list_for_each_safe(&iom->blockers, b, next, bnode) {
|
||
list_del_init(&b->bnode);
|
||
ubf_select(b->th);
|
||
if (--max == 0) {
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
/*
|
||
* TODO: consider EVFILT_PROC for kqueue and netlink+epoll on Linux;
|
||
* see the "god" RubyGem for usage examples.
|
||
* However, I doubt rb_waitpid scalability will be a problem and
|
||
* the simplicity of a single implementation for all is appealing.
|
||
*/
|
||
#ifdef HAVE_SYS_TYPES_H
|
||
# include <sys/types.h>
|
||
#endif
|
||
#ifdef HAVE_SYS_WAIT_H
|
||
# include <sys/wait.h>
|
||
#endif
|
||
#if defined(WNOHANG) && WNOHANG != 0 && \
|
||
(defined(HAVE_WAITPID) || defined(HAVE_WAIT4))
|
||
static VALUE
|
||
iom_schedule_pid(VALUE ptr)
|
||
{
|
||
struct rb_iom_pid_waiter *pw = (struct rb_iom_pid_waiter *)ptr;
|
||
rb_thread_t *th = pw->th;
|
||
rb_fiber_auto_do_yield_p(th);
|
||
RUBY_VM_CHECK_INTS_BLOCKING(th);
|
||
return rb_fiber_yield(0, 0);
|
||
}
|
||
rb_pid_t
|
||
rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
|
||
double *timeout)
|
||
{
|
||
struct rb_iom_pid_waiter pw;
|
||
pw.options = options;
|
||
VM_ASSERT((options & WNOHANG) == 0 &&
|
||
"WNOHANG should be handled in rb_waitpid");
|
||
/*
|
||
* unlike rb_iom_waitfd, we typically call *waitpid before
|
||
* trying with a non-blocking operation
|
||
*/
|
||
pw.pid = rb_waitpid(pid, &pw.status, pw.options | WNOHANG);
|
||
if (pw.pid == 0) {
|
||
rb_iom_t *iom = rb_iom_get(th);
|
||
pw.th = th;
|
||
pw.pid = pid;
|
||
rb_iom_timer_add(th, &pw.w.timer, timeout, IOM_FIB|IOM_WAIT);
|
||
/* LIFO, to match Linux wait4() blocking behavior */
|
||
list_add(&iom->pids, &pw.w.wnode);
|
||
rb_ensure(iom_schedule_pid, (VALUE)&pw,
|
||
rb_iom_waiter_done, (VALUE)&pw.w);
|
||
if (pw.pid == -1) {
|
||
errno = pw.errnum;
|
||
}
|
||
}
|
||
if (status) {
|
||
*status = pw.status;
|
||
}
|
||
if (pw.pid > 0) {
|
||
rb_last_status_set(pw.status, pw.pid);
|
||
}
|
||
return pw.pid;
|
||
}
|
||
void
|
||
rb_iom_sigchld(rb_vm_t *vm)
|
||
{
|
||
rb_iom_t *iom = vm->iom;
|
||
if (iom) {
|
||
struct rb_iom_pid_waiter *pw = 0, *next = 0;
|
||
size_t nr = 0;
|
||
list_for_each_safe(&iom->pids, pw, next, w.wnode) {
|
||
pid_t r = rb_waitpid(pw->pid, &pw->status, pw->options | WNOHANG);
|
||
if (r == 0) {
|
||
continue;
|
||
}
|
||
if (r == -1) {
|
||
pw->errnum = errno;
|
||
}
|
||
nr++;
|
||
pw->pid = r;
|
||
rb_iom_waiter_ready(&pw->w);
|
||
}
|
||
if (nr) {
|
||
rb_iom_blockers_notify(iom, -1);
|
||
}
|
||
}
|
||
}
|
||
#else
|
||
rb_pid_t
|
||
rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
|
||
double *timeout)
|
||
{
|
||
rb_bug("Should not get here, WNOHANG not implemented");
|
||
}
|
||
#endif /* defined(WNOHANG) && (defined(HAVE_WAITPID) || defined(HAVE_WAIT4)) */
|
iom_epoll.h | ||
---|---|---|
/*
|
||
* Linux-only epoll-based implementation of I/O Manager for RubyVM
|
||
*
|
||
* Notes:
|
||
*
|
||
* TODO: epoll_wait only has millisecond resolution; if we need higher
|
||
* resolution we can use timerfd or ppoll on the epoll_fd itself.
|
||
*
|
||
* Inside the Linux kernel, select/poll/ppoll/epoll_wait all use the
|
||
* same notification callback (struct file_operations)->poll.
|
||
* Unlike with kqueue across different *BSDs; we do not need to worry
|
||
* about inconsistencies between these syscalls.
|
||
*
|
||
* See also notes in iom_kqueue.h
|
||
*/
|
||
#include "iom_internal.h"
|
||
#include <sys/epoll.h>
|
||
#include <math.h> /* round() */
|
||
#define FMODE_IOM_ADDED FMODE_IOM_PRIVATE1
|
||
/* allocated on heap (rb_vm_t.iom) */
|
||
struct rb_iom_struct {
|
||
/*
|
||
* Everything here is protected by GVL at this time,
|
||
* URCU lists (LGPL-2.1+) may be used in the future
|
||
*/
|
||
/* we NEVER need to scan epws, only insert + delete + empty check */
|
||
struct list_head epws; /* -epw.w.wnode, order agnostic */
|
||
struct list_head timers; /* -rb_iom_timer.n.tnode, sort by expire_at */
|
||
struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
|
||
struct rb_iom_fdmap fdmap; /* maps each FD to multiple epw */
|
||
int epoll_fd;
|
||
int maxevents; /* auto-increases */
|
||
struct list_head blockers; /* -rb_iom_blocker.bnode */
|
||
};
|
||
/*
|
||
* Not using rb_iom_fd_waiter here, since we never need to reread the
|
||
* FD on this implementation.
|
||
* Allocated on stack
|
||
*/
|
||
struct epw {
|
||
struct rb_iom_waiter w;
|
||
union {
|
||
struct list_node fdnode;
|
||
struct {
|
||
rb_thread_t *th;
|
||
struct rb_iom_fd *fdh;
|
||
} pre_ctl;
|
||
} as;
|
||
int fd; /* no need for "int *", here, we never reread */
|
||
short events; /* requested events, like poll(2) */
|
||
short revents; /* returned events, like poll(2) */
|
||
int *flags; /* &fptr->mode */
|
||
};
|
||
static void
|
||
increase_maxevents(rb_iom_t *iom, int retries)
|
||
{
|
||
/* 1024 is the RUBY_ALLOCV_LIMIT on such systems */
|
||
const int max_alloca = 1024 / sizeof(struct epoll_event);
|
||
const int max = max_alloca * 2;
|
||
if (retries) {
|
||
iom->maxevents *= retries;
|
||
if (iom->maxevents > max || iom->maxevents <= 0) {
|
||
iom->maxevents = max;
|
||
}
|
||
}
|
||
}
|
||
static int
|
||
double2msec(double sec)
|
||
{
|
||
/*
|
||
* clamp timeout to workaround a Linux <= 2.6.37 bug,
|
||
* see epoll_wait(2) manpage
|
||
*/
|
||
const int max_msec = 35 * 60 * 1000; /* floor(35.79 minutes) */
|
||
if (sec < 0) {
|
||
return -1;
|
||
}
|
||
else {
|
||
double msec = round(sec * 1000);
|
||
if (msec < (double)max_msec) {
|
||
int ret = (int)msec;
|
||
return ret < 0 ? 0 : ret;
|
||
}
|
||
return max_msec;
|
||
}
|
||
}
|
||
/* we can avoid branches when mapping RB_WAIT_* bits to EPOLL* bits */
|
||
STATIC_ASSERT(epbits_matches_waitfd_bits,
|
||
RB_WAITFD_IN == EPOLLIN && RB_WAITFD_OUT == EPOLLOUT &&
|
||
RB_WAITFD_PRI == EPOLLPRI);
|
||
/* what goes into epoll_ctl... */
|
||
static int
|
||
rb_events2ep(int events)
|
||
{
|
||
return EPOLLONESHOT | events;
|
||
}
|
||
/* ...what comes out of epoll_wait */
|
||
static short
|
||
rb_ep2revents(int revents)
|
||
{
|
||
return (short)(revents & (EPOLLIN|EPOLLOUT|EPOLLPRI));
|
||
}
|
||
/* lazily create epoll FD, since not everybody waits on I/O */
|
||
static int
|
||
iom_epfd(rb_iom_t *iom)
|
||
{
|
||
if (iom->epoll_fd < 0) {
|
||
#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
|
||
iom->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
|
||
if (iom->epoll_fd < 0) {
|
||
int err = errno;
|
||
if (rb_gc_for_fd(err)) {
|
||
iom->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
|
||
if (iom->epoll_fd < 0) {
|
||
rb_sys_fail("epoll_create1");
|
||
}
|
||
}
|
||
else if (err != ENOSYS) {
|
||
rb_syserr_fail(err, "epoll_create1");
|
||
}
|
||
else { /* libc >= kernel || build-env > run-env */
|
||
#endif /* HAVE_EPOLL_CREATE1 */
|
||
iom->epoll_fd = epoll_create(1);
|
||
if (iom->epoll_fd < 0) {
|
||
if (rb_gc_for_fd(errno)) {
|
||
iom->epoll_fd = epoll_create(1);
|
||
}
|
||
}
|
||
if (iom->epoll_fd < 0) {
|
||
rb_sys_fail("epoll_create");
|
||
}
|
||
rb_maygvl_fd_fix_cloexec(iom->epoll_fd);
|
||
#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
|
||
}
|
||
}
|
||
#endif /* HAVE_EPOLL_CREATE1 */
|
||
rb_update_max_fd(iom->epoll_fd);
|
||
}
|
||
return iom->epoll_fd;
|
||
}
|
||
static void
|
||
rb_iom_init(rb_iom_t *iom)
|
||
{
|
||
list_head_init(&iom->timers);
|
||
list_head_init(&iom->epws);
|
||
list_head_init(&iom->pids);
|
||
list_head_init(&iom->blockers);
|
||
iom->maxevents = 8;
|
||
iom->epoll_fd = -1;
|
||
rb_iom_fdmap_init(&iom->fdmap);
|
||
}
|
||
static void
|
||
check_epoll_wait(rb_thread_t *th, int nr, struct epoll_event *ev)
|
||
{
|
||
if (nr >= 0) {
|
||
int i;
|
||
for (i = 0; i < nr; i++) {
|
||
struct rb_iom_fd *fdh = ev[i].data.ptr;
|
||
struct epw *epw = 0, *next = 0;
|
||
short revents = rb_ep2revents(ev[i].events);
|
||
/*
|
||
* Typical list size is 1; only multiple fibers waiting
|
||
* on the same FD increases fdh list size
|
||
*/
|
||
list_for_each_safe(&fdh->fdhead, epw, next, as.fdnode) {
|
||
epw->revents = epw->events & revents;
|
||
list_del_init(&epw->as.fdnode);
|
||
rb_iom_waiter_ready(&epw->w);
|
||
}
|
||
}
|
||
/* notify the waiter thread in case we enqueued fibers for them */
|
||
if (nr > 0) {
|
||
rb_iom_blockers_notify(th->vm->iom, -1);
|
||
}
|
||
}
|
||
else {
|
||
int err = errno;
|
||
if (err != EINTR) {
|
||
rb_syserr_fail(err, "epoll_wait");
|
||
}
|
||
}
|
||
rb_iom_timer_check(th);
|
||
RUBY_VM_CHECK_INTS_BLOCKING(th);
|
||
}
|
||
/* perform a non-blocking epoll_wait while holding GVL */
|
||
static void
|
||
ping_events(rb_thread_t *th)
|
||
{
|
||
rb_iom_t *iom = th->vm->iom;
|
||
int epfd = iom ? iom->epoll_fd : -1;
|
||
if (epfd >= 0) {
|
||
VALUE v;
|
||
int nr;
|
||
int maxevents = iom->maxevents;
|
||
struct epoll_event *ev = ALLOCV_N(struct epoll_event, v, maxevents);
|
||
int retries = 0;
|
||
do {
|
||
nr = epoll_wait(epfd, ev, maxevents, 0);
|
||
check_epoll_wait(th, nr, ev);
|
||
} while (nr == maxevents && ++retries);
|
||
if (v) {
|
||
ALLOCV_END(v);
|
||
}
|
||
increase_maxevents(iom, retries);
|
||
}
|
||
}
|
||
/* for iom_pingable_common.h */
|
||
static void
|
||
rb_iom_do_wait(rb_thread_t *th, rb_iom_t *iom)
|
||
{
|
||
int maxevents = iom->maxevents;
|
||
int nr = maxevents;
|
||
double timeout;
|
||
RUBY_VM_CHECK_INTS_BLOCKING(th);
|
||
timeout = rb_iom_next_timeout(&iom->timers);
|
||
if (timeout != 0 && (!list_empty(&iom->epws) || !list_empty(&iom->pids))) {
|
||
VALUE v;
|
||
int epfd = iom_epfd(th->vm->iom); /* may raise */
|
||
struct epoll_event *ev = ALLOCV_N(struct epoll_event, v, maxevents);
|
||
int msec = double2msec(timeout);
|
||
struct rb_iom_blocker cur;
|
||
VM_ASSERT(epfd >= 0);
|
||
cur.th = th;
|
||
list_add_tail(&iom->blockers, &cur.bnode);
|
||
BLOCKING_REGION({
|
||
nr = epoll_wait(epfd, ev, maxevents, msec);
|
||
}, ubf_select, th, FALSE);
|
||
list_del(&cur.bnode);
|
||
check_epoll_wait(th, nr, ev);
|
||
if (v) {
|
||
ALLOCV_END(v);
|
||
}
|
||
}
|
||
if (nr == maxevents) { /* || timeout == 0 */
|
||
ping_events(th);
|
||
}
|
||
}
|
||
static void
|
||
epoll_ctl_or_raise(rb_thread_t *th, struct epw *epw)
|
||
{
|
||
int e;
|
||
int epfd;
|
||
struct epoll_event ev;
|
||
/* we cannot raise until list_add: */
|
||
{
|
||
struct rb_iom_fd *fdh = epw->as.pre_ctl.fdh;
|
||
ev.data.ptr = fdh;
|
||
ev.events = rb_events2ep(epw->events);
|
||
/*
|
||
* merge events from other threads/fibers waiting on the same
|
||
* [ descriptor (int fd), description (struct file *) ] tuplet
|
||
*/
|
||
if (!list_empty(&fdh->fdhead)) { /* uncommon, I hope... */
|
||
struct epw *cur;
|
||
list_for_each(&fdh->fdhead, cur, as.fdnode) {
|
||
ev.events |= rb_events2ep(cur->events);
|
||
}
|
||
}
|
||
list_add(&fdh->fdhead, &epw->as.fdnode);
|
||
}
|
||
epfd = iom_epfd(th->vm->iom); /* may raise */
|
||
/* we want to track if an FD is already being watched ourselves */
|
||
if (epw->flags) {
|
||
if (*epw->flags & FMODE_IOM_ADDED) { /* ideal situation */
|
||
e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev);
|
||
}
|
||
else {
|
||
e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev);
|
||
if (e == 0) {
|
||
*epw->flags |= FMODE_IOM_ADDED;
|
||
}
|
||
else if (e < 0 && errno == EEXIST) {
|
||
/*
|
||
* possible EEXIST if several fptrs point to the same FD:
|
||
* f1 = Fiber.start { io1.read(1) }
|
||
* io2 = IO.for_fd(io1.fileno)
|
||
* f2 = Fiber.start { io2.read(1) }
|
||
*/
|
||
*epw->flags |= FMODE_IOM_ADDED;
|
||
e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev);
|
||
}
|
||
}
|
||
}
|
||
else { /* don't know if added or not, fall back to add on ENOENT */
|
||
e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev);
|
||
if (e < 0 && errno == ENOENT) {
|
||
e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev);
|
||
}
|
||
}
|
||
if (e < 0) {
|
||
rb_sys_fail("epoll_ctl");
|
||
}
|
||
}
|
||
static VALUE
|
||
epmod_yield(VALUE ptr)
|
||
{
|
||
/* we must have no posibility of raising until list_add: */
|
||
struct epw *epw = (struct epw *)ptr;
|
||
rb_thread_t *th = epw->as.pre_ctl.th;
|
||
epoll_ctl_or_raise(th, epw);
|
||
ping_events(th);
|
||
(void)rb_fiber_auto_do_yield_p(th);
|
||
return rb_fiber_yield(0, 0);
|
||
}
|
||
static VALUE
|
||
epw_done(VALUE ptr)
|
||
{
|
||
struct epw *epw = (struct epw *)ptr;
|
||
list_del(&epw->as.fdnode);
|
||
return rb_iom_waiter_done((VALUE)&epw->w);
|
||
}
|
||
static int
|
||
iom_waitfd(rb_thread_t *th, int fd, int *flags, int events, double *timeout)
|
||
{
|
||
rb_iom_t *iom = rb_iom_get(th);
|
||
struct epw epw;
|
||
/* unlike kqueue or select, we never need to reread fd */
|
||
epw.fd = fd;
|
||
if (epw.fd < 0) { /* TODO: behave like poll(2) and sleep? */
|
||
return 0;
|
||
}
|
||
/* may raise on OOM: */
|
||
epw.as.pre_ctl.fdh = rb_iom_fd_get(&iom->fdmap, epw.fd);
|
||
epw.as.pre_ctl.th = th;
|
||
epw.flags = flags;
|
||
/*
|
||
* if we did not have GVL, revents may be set immediately
|
||
* upon epoll_ctl by another thread running epoll_wait,
|
||
* so we must initialize it before epoll_ctl:
|
||
*/
|
||
epw.revents = 0;
|
||
epw.events = (short)events;
|
||
list_add(&iom->epws, &epw.w.wnode);
|
||
rb_iom_timer_add(th, &epw.w.timer, timeout, IOM_FIB|IOM_WAIT);
|
||
rb_ensure(epmod_yield, (VALUE)&epw, epw_done, (VALUE)&epw);
|
||
return (int)epw.revents; /* may be zero if timed out */
|
||
}
|
||
int
|
||
rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, double *timeout)
|
||
{
|
||
return iom_waitfd(th, fptr->fd, &fptr->mode, events, timeout);
|
||
}
|
||
int
|
||
rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
|
||
{
|
||
return iom_waitfd(th, *fdp, 0, events, timeout);
|
||
}
|
||
void
|
||
rb_iom_destroy(rb_vm_t *vm)
|
||
{
|
||
rb_iom_t *iom = vm->iom;
|
||
vm->iom = 0;
|
||
if (iom) {
|
||
/*
|
||
* it's possible; but crazy to share epoll FDs across processes
|
||
* (kqueue has a rather unique close-on-fork behavior)
|
||
*/
|
||
if (iom->epoll_fd >= 0) {
|
||
close(iom->epoll_fd);
|
||
}
|
||
rb_iom_fdmap_destroy(&iom->fdmap);
|
||
xfree(iom);
|
||
}
|
||
}
|
||
/* used by thread.c::rb_thread_atfork */
|
||
static void
|
||
rb_iom_atfork_child(rb_thread_t *th)
|
||
{
|
||
rb_iom_destroy(th->vm);
|
||
}
|
||
/* used by thread_pthread.c */
|
||
static int
|
||
rb_iom_reserved_fd(int fd)
|
||
{
|
||
rb_iom_t *iom = GET_VM()->iom;
|
||
return iom && fd == iom->epoll_fd;
|
||
}
|
||
#include "iom_pingable_common.h"
|
||
#include "iom_common.h"
|
iom_internal.h | ||
---|---|---|
#ifndef RB_IOM_COMMON_H
|
||
#define RB_IOM_COMMON_H
|
||
#include "internal.h"
|
||
#include "iom.h"
|
||
/* cont.c */
|
||
void rb_fiber_auto_enqueue(VALUE fibval);
|
||
#define FMODE_IOM_PRIVATE1 0x01000000
|
||
#define FMODE_IOM_PRIVATE2 0x02000000
|
||
#define IOM_FIBMASK ((VALUE)0x1)
|
||
#define IOM_FIB (0x2)
|
||
#define IOM_WAIT (0x1) /* container_of(..., struct rb_iom_waiter, timer) */
|
||
/*
|
||
* fdmap is a singleton.
|
||
*
|
||
* It makes zero sense to have multiple fdmaps per-process; even less so
|
||
* than multiple ioms. The file descriptor table in POSIX is per-process;
|
||
* and POSIX requires syscalls to allocate the lowest available FD.
|
||
* This is also why we use an array instead of a hash table, as there will
|
||
* be no holes for big processes.
|
||
*
|
||
* If contention becomes a problem, we can pad (struct rb_iom_fd) to
|
||
* 64-bytes for cache alignment.
|
||
*
|
||
* Currently we use fdmap to deal with FD aliasing with epoll
|
||
* and kqueue interfaces.. FD aliasing happens when multiple
|
||
* Fibers wait on the same FD; but epoll/kqueue APIs only allow
|
||
* registering a single data pointer per FD.
|
||
*
|
||
* In the future, we may implement rb_notify_fd_close using fdmap.
|
||
*/
|
||
/* linear growth based on power-of-two */
|
||
#define RB_IOM_FD_PER_HEAP 64
|
||
/* on-heap and persistent for process lifetime keep as small as possible. */
|
||
struct rb_iom_fd {
|
||
struct list_head fdhead; /* -kev.(rfdnode|wfdnode), epw.fdnode */
|
||
};
|
||
/* singleton (per-rb_iom_t, or per process, if we ever need > 1 iom) */
|
||
struct rb_iom_fdmap {
|
||
struct rb_iom_fd **map;
|
||
unsigned int heaps;
|
||
int max_fd;
|
||
};
|
||
/* allocated on stack */
|
||
/* Every auto-yielded fiber has this on stack */
|
||
struct rb_iom_timer {
|
||
union {
|
||
struct list_node rnode; /* <=> rb_thread_t.afrunq */
|
||
struct list_node tnode; /* <=> rb_iom_struct.timers */
|
||
} n;
|
||
double expires_at; /* absolute monotonic time */
|
||
VALUE _fibval;
|
||
};
|
||
/* common waiter struct for waiting fds and pids */
|
||
struct rb_iom_waiter {
|
||
struct rb_iom_timer timer;
|
||
struct list_node wnode; /* <=> rb_iom_struct.(fds|pids) */
|
||
};
|
||
struct rb_iom_fd_waiter {
|
||
struct rb_iom_waiter w; /* w.wnode - iom->fds */
|
||
int *fdp; /* (ideally), a pointer fptr->fd to detect closed FDs */
|
||
short events; /* requested events, like poll(2) */
|
||
short revents; /* returned events, like poll(2) */
|
||
};
|
||
struct rb_iom_pid_waiter {
|
||
struct rb_iom_waiter w; /* w.wnode - iom->pids */
|
||
rb_thread_t *th;
|
||
/* same pid, status, options same as waitpid(2) */
|
||
rb_pid_t pid;
|
||
int status;
|
||
int options;
|
||
int errnum;
|
||
};
|
||
/* threads sleeping in select, epoll_wait or kevent w/o GVL; on stack */
|
||
struct rb_iom_blocker {
|
||
rb_thread_t *th;
|
||
struct list_node bnode; /* -iom->blockers */
|
||
};
|
||
#if (RUBYVM_IOM == IOM_KQUEUE || RUBYVM_IOM == IOM_EPOLL)
|
||
/* TODO: IOM_SELECT may use this for rb_notify_fd_close */
|
||
static struct rb_iom_fd *
|
||
iom_fdhead_aref(struct rb_iom_fdmap *fdmap, int fd)
|
||
{
|
||
VM_ASSERT(fd >= 0);
|
||
return &fdmap->map[fd / RB_IOM_FD_PER_HEAP][fd % RB_IOM_FD_PER_HEAP];
|
||
}
|
||
static struct rb_iom_fd *
|
||
rb_iom_fd_get(struct rb_iom_fdmap *fdmap, int fd)
|
||
{
|
||
if (fd >= fdmap->max_fd) {
|
||
struct rb_iom_fd *base, *h;
|
||
unsigned n = fdmap->heaps + 1;
|
||
unsigned i;
|
||
fdmap->map = xrealloc2(fdmap->map, n, sizeof(struct rb_iom_fd *));
|
||
base = h = ALLOC_N(struct rb_iom_fd, RB_IOM_FD_PER_HEAP);
|
||
for (i = 0; i < RB_IOM_FD_PER_HEAP; i++) {
|
||
list_head_init(&h->fdhead);
|
||
h++;
|
||
}
|
||
fdmap->map[fdmap->heaps] = base;
|
||
fdmap->max_fd += RB_IOM_FD_PER_HEAP;
|
||
}
|
||
return iom_fdhead_aref(fdmap, fd);
|
||
}
|
||
static void
|
||
rb_iom_fdmap_init(struct rb_iom_fdmap *fdmap)
|
||
{
|
||
fdmap->max_fd = 0;
|
||
fdmap->heaps = 0;
|
||
fdmap->map = 0;
|
||
}
|
||
static void
|
||
rb_iom_fdmap_destroy(struct rb_iom_fdmap *fdmap)
|
||
{
|
||
unsigned n;
|
||
for (n = 0; n < fdmap->heaps; n++) {
|
||
xfree(fdmap->map[n]);
|
||
}
|
||
xfree(fdmap->map);
|
||
rb_iom_fdmap_init(fdmap);
|
||
}
|
||
#endif /* (RUBYVM_IOM == IOM_KQUEUE || RUBYVM_IOM == IOM_EPOLL) */
|
||
static VALUE
|
||
rb_iom_timer_fibval(const struct rb_iom_timer *t)
|
||
{
|
||
return t->_fibval & ~IOM_FIBMASK;
|
||
}
|
||
static struct rb_iom_waiter *
|
||
rb_iom_waiter_of(struct rb_iom_timer *t)
|
||
{
|
||
if (t->_fibval & IOM_FIBMASK) {
|
||
return 0;
|
||
}
|
||
return container_of(t, struct rb_iom_waiter, timer);
|
||
}
|
||
static double
|
||
rb_iom_next_timeout(struct list_head *timers)
|
||
{
|
||
struct rb_iom_timer *t = list_top(timers, struct rb_iom_timer, n.tnode);
|
||
if (t) {
|
||
double diff = t->expires_at - timeofday();
|
||
return diff <= 0.0 ? 0 : diff;
|
||
}
|
||
else {
|
||
return -1;
|
||
}
|
||
}
|
||
static void rb_iom_timer_check(const rb_thread_t *);
|
||
static void rb_iom_timer_add(rb_thread_t *, struct rb_iom_timer *,
|
||
const double *timeout, int flags);
|
||
static VALUE
|
||
rb_iom_timer_done(VALUE ptr)
|
||
{
|
||
struct rb_iom_timer *t = (struct rb_iom_timer *)ptr;
|
||
list_del(&t->n.tnode);
|
||
return Qfalse;
|
||
}
|
||
static void
|
||
rb_iom_waiter_ready(struct rb_iom_waiter *w)
|
||
{
|
||
VALUE fibval = rb_iom_timer_fibval(&w->timer);
|
||
list_del_init(&w->wnode);
|
||
list_del_init(&w->timer.n.tnode);
|
||
if (fibval != Qfalse) {
|
||
rb_thread_t *owner = rb_fiber_owner_thread(fibval);
|
||
list_add_tail(&owner->afrunq, &w->timer.n.rnode);
|
||
}
|
||
}
|
||
static VALUE
|
||
rb_iom_waiter_done(VALUE ptr)
|
||
{
|
||
struct rb_iom_waiter *w = (struct rb_iom_waiter *)ptr;
|
||
list_del(&w->timer.n.tnode);
|
||
list_del(&w->wnode);
|
||
return Qfalse;
|
||
}
|
||
/* cont.c */
|
||
int rb_fiber_resumable_p(const rb_thread_t *, const rb_fiber_t *);
|
||
/*
|
||
* resume all "ready" fibers belonging to a given thread
|
||
* stop when a fiber has not yielded, yet.
|
||
*/
|
||
static int
|
||
rb_fiber_auto_do_yield_p(rb_thread_t *th)
|
||
{
|
||
rb_fiber_t *current_auto = rb_fiber_auto_sched_p(th) ? th->fiber : 0;
|
||
struct rb_iom_timer *t = 0, *next = 0;
|
||
LIST_HEAD(tmp);
|
||
/*
|
||
* do not infinite loop as new fibers get added to
|
||
* th->afrunq, only work off a temporary list:
|
||
*/
|
||
list_append_list(&tmp, &th->afrunq);
|
||
list_for_each_safe(&tmp, t, next, n.rnode) {
|
||
VALUE fibval = rb_iom_timer_fibval(t);
|
||
rb_fiber_t *fib = RTYPEDDATA_DATA(fibval);
|
||
if (fib == current_auto || !rb_fiber_resumable_p(th, fib)) {
|
||
/* tell the caller to yield */
|
||
list_prepend_list(&th->afrunq, &tmp);
|
||
return 1;
|
||
}
|
||
rb_fiber_resume(fibval, 0, 0);
|
||
}
|
||
return 0;
|
||
}
|
||
/* XXX: is this necessary? */
|
||
void
|
||
rb_iom_mark_runq(const rb_thread_t *th)
|
||
{
|
||
struct rb_iom_timer *t = 0;
|
||
list_for_each(&th->afrunq, t, n.rnode) {
|
||
rb_gc_mark(rb_iom_timer_fibval(t));
|
||
}
|
||
}
|
||
static rb_iom_t *rb_iom_get(rb_thread_t *);
|
||
static void rb_iom_blockers_notify(rb_iom_t *, int max);
|
||
#endif /* IOM_COMMON_H */
|
iom_kqueue.h | ||
---|---|---|
/*
|
||
* kqueue-based implementation of I/O Manager for RubyVM on *BSD
|
||
*
|
||
* The kevent API has an advantage over epoll_ctl+epoll_wait since
|
||
* it can simultaneously add filters and check for events with one
|
||
* syscall. It also allows EV_ADD to be used idempotently for
|
||
* enabling filters, where as epoll_ctl requires separate ADD and
|
||
* MOD operations.
|
||
*
|
||
* These are advantages in the common case...
|
||
*
|
||
* The epoll API has advantages in more esoteric cases:
|
||
*
|
||
* epoll has the advantage over kqueue when watching for multiple
|
||
* events (POLLIN|POLLOUT|POLLPRI) (which is rare). We also have
|
||
* to install two kevent filters to watch POLLIN|POLLOUT simutaneously.
|
||
* See udata_set/udata_get functions below for more on this.
|
||
*
|
||
* Finally, kevent does not support POLLPRI directly, we need to use
|
||
* select() (or perhaps poll() on some platforms) with a zero
|
||
* timeout to check for POLLPRI after EVFILT_READ returns.
|
||
*
|
||
* Finally, several *BSDs implement kqueue; and the quality of each
|
||
* implementation may vary. Anecdotally, *BSDs are not known to even
|
||
* support poll() consistently across different types of files.
|
||
* We will need to selective and careful about injecting them into
|
||
* kevent().
|
||
*/
|
||
#include "iom_internal.h"
|
||
/* LIST_HEAD (via ccan/list) conflicts with sys/queue.h (via sys/event.h) */
|
||
#undef LIST_HEAD
|
||
#include <sys/types.h>
|
||
#include <sys/event.h>
|
||
#include <sys/time.h>
|
||
/* We need to use EVFILT_READ to watch RB_WAITFD_PRI */
|
||
#define WAITFD_READ (RB_WAITFD_IN|RB_WAITFD_PRI)
|
||
/* allocated on heap (rb_vm_t.iom) */
|
||
struct rb_iom_struct {
|
||
/*
|
||
* Everything here is protected by GVL at this time,
|
||
* URCU lists (LGPL-2.1+) may be used in the future
|
||
*/
|
||
/* we NEVER need to scan kevs , only insert + delete + empty check */
|
||
struct list_head kevs; /* -kev.fdw.w.wnode, order agnostic */
|
||
struct list_head timers; /* -rb_iom_timer.n.tnode, sort by expire_at */
|
||
struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
|
||
struct rb_iom_fdmap rfdmap; /* holds fdh for EVFILT_READ */
|
||
struct rb_iom_fdmap wfdmap; /* holds fdh for EVFILT_WRITE */
|
||
int kqueue_fd;
|
||
int nevents; /* auto-increases */
|
||
struct list_head blockers; /* -rb_iom_blocker.bnode */
|
||
};
|
||
/* allocated on stack */
|
||
struct kev {
|
||
/* fdw.w.wnode is overloaded for checking RB_WAITFD_PRI (seee check_pri) */
|
||
struct rb_iom_fd_waiter fdw;
|
||
rb_thread_t *th;
|
||
/*
|
||
* both rfdnode and wfdnode are overloaded for deleting paired
|
||
* filters when watching both EVFILT_READ and EVFILT_WRITE on
|
||
* a single FD
|
||
*/
|
||
struct list_node rfdnode; /* -(ev.udata==fdh)->fdhead (EVFILT_READ) */
|
||
struct list_node wfdnode; /* -(ev.udata==fdh)->fdhead (EVFILT_WRITE) */
|
||
};
|
||
/*
|
||
* like our epoll implementation, we "ping" using kevent with zero-timeout
|
||
* and can do so on any thread.
|
||
*/
|