From 9d1b8ec4145b27570beb69c12c8977a173300fcb Mon Sep 17 00:00:00 2001
From: Eric Wong <e@80x24.org>
Date: Fri, 19 May 2017 07:20:58 +0000
Subject: [PATCH] speed up IO#close with many threads

Today, it increases IO#close performance with many threads:

  Execution time (sec)
  name            trunk   after
  vm_thread_close 4.276   3.018

  Speedup ratio: compare with the result of `trunk' (greater is better)
  name            after
  vm_thread_close 1.417

This speedup comes because rb_notify_fd_close only scans threads
inside rb_thread_io_blocking_region, not all threads in the VM.

In the future, this type data structure may allow us to notify
waiters of multiple FDs on a single thread (when using
Fibers).

* thread.c (struct waiting_fd): declare
  (rb_thread_io_blocking_region): use on-stack list waiter
  (rb_notify_fd_close): walk vm->waiting_fds instead
  (call_without_gvl): remove old field setting
  (th_init): ditto
* vm_core.h (typedef struct rb_vm_struct): add waiting_fds list
* (typedef struct rb_thread_struct): remove waiting_fd field
  (rb_vm_living_threads_init): initialize waiting_fds list

I am now kicking myself for not thinking about this 3 years ago
when I introduced ccan/list in [Feature #9632] to optimize this
same function :<
---
 thread.c  | 25 ++++++++++++++++++-------
 vm.c      |  1 -
 vm_core.h |  4 ++--
 3 files changed, 20 insertions(+), 10 deletions(-)

diff --git a/thread.c b/thread.c
index fd3db3648f..bb14ffb97b 100644
--- a/thread.c
+++ b/thread.c
@@ -101,6 +101,12 @@ static int rb_threadptr_pending_interrupt_empty_p(rb_thread_t *th);
 #define eTerminateSignal INT2FIX(1)
 static volatile int system_working = 1;
 
+struct waiting_fd {
+    struct list_node wfd_node; /* <=> vm.waiting_fds */
+    rb_thread_t *th;
+    int fd;
+};
+
 inline static void
 st_delete_wrap(st_table *table, st_data_t key)
 {
@@ -1316,7 +1322,6 @@ call_without_gvl(void *(*func)(void *), void *data1,
     rb_thread_t *th = GET_THREAD();
     int saved_errno = 0;
 
-    th->waiting_fd = -1;
     if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) {
 	ubf = ubf_select;
 	data2 = th;
@@ -1439,11 +1444,15 @@ VALUE
 rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
 {
     volatile VALUE val = Qundef; /* shouldn't be used */
+    rb_vm_t *vm = GET_VM();
     rb_thread_t *th = GET_THREAD();
     volatile int saved_errno = 0;
     int state;
+    struct waiting_fd wfd;
 
-    th->waiting_fd = fd;
+    wfd.fd = fd;
+    wfd.th = th;
+    list_add(&vm->waiting_fds, &wfd.wfd_node);
 
     TH_PUSH_TAG(th);
     if ((state = EXEC_TAG()) == 0) {
@@ -1451,11 +1460,12 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
 	    val = func(data1);
 	    saved_errno = errno;
 	}, ubf_select, th, FALSE);
+
     }
     TH_POP_TAG();
 
-    /* clear waiting_fd anytime */
-    th->waiting_fd = -1;
+    /* must be deleted before jump */
+    list_del(&wfd.wfd_node);
 
     if (state) {
 	TH_JUMP_TAG(th, state);
@@ -2196,12 +2206,13 @@ int
 rb_notify_fd_close(int fd)
 {
     rb_vm_t *vm = GET_THREAD()->vm;
-    rb_thread_t *th = 0;
+    struct waiting_fd *wfd = 0;
     int busy;
 
     busy = 0;
-    list_for_each(&vm->living_threads, th, vmlt_node) {
-	if (th->waiting_fd == fd) {
+    list_for_each(&vm->waiting_fds, wfd, wfd_node) {
+	if (wfd->fd == fd) {
+	    rb_thread_t *th = wfd->th;
 	    VALUE err = th->vm->special_exceptions[ruby_error_stream_closed];
 	    rb_threadptr_pending_interrupt_enque(th, err);
 	    rb_threadptr_interrupt(th);
diff --git a/vm.c b/vm.c
index 52d505ab7c..4810321d6f 100644
--- a/vm.c
+++ b/vm.c
@@ -2521,7 +2521,6 @@ th_init(rb_thread_t *th, VALUE self)
     th->status = THREAD_RUNNABLE;
     th->errinfo = Qnil;
     th->last_status = Qnil;
-    th->waiting_fd = -1;
     th->root_svar = Qfalse;
     th->local_storage_recursive_hash = Qnil;
     th->local_storage_recursive_hash_for_trace = Qnil;
diff --git a/vm_core.h b/vm_core.h
index 35b1748218..022243fb1d 100644
--- a/vm_core.h
+++ b/vm_core.h
@@ -490,6 +490,7 @@ typedef struct rb_vm_struct {
     struct rb_thread_struct *main_thread;
     struct rb_thread_struct *running_thread;
 
+    struct list_head waiting_fds; /* <=> struct waiting_fd */
     struct list_head living_threads;
     size_t living_thread_num;
     VALUE thgroup_default;
@@ -716,8 +717,6 @@ typedef struct rb_thread_struct {
     /* passing state */
     int state;
 
-    int waiting_fd;
-
     /* for rb_iterate */
     VALUE passed_block_handler;
 
@@ -1442,6 +1441,7 @@ void rb_thread_wakeup_timer_thread(void);
 static inline void
 rb_vm_living_threads_init(rb_vm_t *vm)
 {
+    list_head_init(&vm->waiting_fds);
     list_head_init(&vm->living_threads);
     vm->living_thread_num = 0;
 }
-- 
EW

