|  | # -*- encoding: utf-8 -*-
 | 
  
    |  | require 'resque'
 | 
  
    |  | require 'resque/pool/version'
 | 
  
    |  | require 'resque/pool/logging'
 | 
  
    |  | require 'resque/pool/pooled_worker'
 | 
  
    |  | require 'fcntl'
 | 
  
    |  | require 'yaml'
 | 
  
    |  | 
 | 
  
    |  | module Resque
 | 
  
    |  |   class Pool
 | 
  
    |  |     include Logging
 | 
  
    |  |     attr_reader :config
 | 
  
    |  |     attr_reader :workers
 | 
  
    |  | 
 | 
  
    |  |     # CONSTANTS {{{
 | 
  
    |  |     SIG_QUEUE_MAX_SIZE = 5
 | 
  
    |  |     DEFAULT_WORKER_INTERVAL = 5
 | 
  
    |  |     QUEUE_SIGS = [ :QUIT, :INT, :TERM, :USR1, :USR2, :CONT, :HUP, :WINCH, ]
 | 
  
    |  |     CHUNK_SIZE=(16 * 1024)
 | 
  
    |  |     # }}}
 | 
  
    |  | 
 | 
  
    |  |     def initialize(config)
 | 
  
    |  |       init_config(config)
 | 
  
    |  |       @workers = {}
 | 
  
    |  |       procline "(initialized)"
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     # Config: after_prefork {{{
 | 
  
    |  | 
 | 
  
    |  |     # The `after_prefork` hook will be run in workers if you are using the
 | 
  
    |  |     # preforking master worker to save memory. Use this hook to reload
 | 
  
    |  |     # database connections and so forth to ensure that they're not shared
 | 
  
    |  |     # among workers.
 | 
  
    |  |     #
 | 
  
    |  |     # Call with a block to set the hook.
 | 
  
    |  |     # Call with no arguments to return the hook.
 | 
  
    |  |     def self.after_prefork(&block)
 | 
  
    |  |       block ? (@after_prefork = block) : @after_prefork
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     # Set the after_prefork proc.
 | 
  
    |  |     def self.after_prefork=(after_prefork)
 | 
  
    |  |       @after_prefork = after_prefork
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def call_after_prefork!
 | 
  
    |  |       self.class.after_prefork && self.class.after_prefork.call
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     # }}}
 | 
  
    |  |     # Config: class methods to start up the pool using the default config {{{
 | 
  
    |  | 
 | 
  
    |  |     @config_files = ["resque-pool.yml", "config/resque-pool.yml"]
 | 
  
    |  |     class << self; attr_accessor :config_files; end
 | 
  
    |  |     def self.choose_config_file
 | 
  
    |  |       if ENV["RESQUE_POOL_CONFIG"]
 | 
  
    |  |         ENV["RESQUE_POOL_CONFIG"]
 | 
  
    |  |       else
 | 
  
    |  |         @config_files.detect { |f| File.exist?(f) }
 | 
  
    |  |       end
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def self.run
 | 
  
    |  |       if GC.respond_to?(:copy_on_write_friendly=)
 | 
  
    |  |         GC.copy_on_write_friendly = true
 | 
  
    |  |       end
 | 
  
    |  |       Resque::Pool.new(choose_config_file).start.join
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     # }}}
 | 
  
    |  |     # Config: load config and config file {{{
 | 
  
    |  | 
 | 
  
    |  |     def init_config(config)
 | 
  
    |  |       unless config
 | 
  
    |  |         raise ArgumentError,
 | 
  
    |  |           "No configuration found. Please setup config/resque-pool.yml"
 | 
  
    |  |       end
 | 
  
    |  |       if config.kind_of? String
 | 
  
    |  |         @config_file = config.to_s
 | 
  
    |  |       else
 | 
  
    |  |         @config = config.dup
 | 
  
    |  |       end
 | 
  
    |  |       load_config
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def load_config
 | 
  
    |  |       @config_file and @config = YAML.load_file(@config_file)
 | 
  
    |  |       environment and @config[environment] and config.merge!(@config[environment])
 | 
  
    |  |       config.delete_if {|key, value| value.is_a? Hash }
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def environment
 | 
  
    |  |       if defined? Rails
 | 
  
    |  |         Rails.env
 | 
  
    |  |       else
 | 
  
    |  |         ENV['RACK_ENV'] || ENV['RAILS_ENV'] || ENV['RESQUE_ENV']
 | 
  
    |  |       end
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     # }}}
 | 
  
    |  | 
 | 
  
    |  |     # Sig handlers and self pipe management {{{
 | 
  
    |  | 
 | 
  
    |  |     def self_pipe; @self_pipe ||= [] end
 | 
  
    |  |     def sig_queue; @sig_queue ||= [] end
 | 
  
    |  | 
 | 
  
    |  |     def init_self_pipe!
 | 
  
    |  |       self_pipe.each { |io| io.close rescue nil }
 | 
  
    |  |       self_pipe.replace(IO.pipe)
 | 
  
    |  |       self_pipe.each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def init_sig_handlers!
 | 
  
    |  |       QUEUE_SIGS.each { |sig| trap_deferred(sig) }
 | 
  
    |  |       trap(:CHLD)     { |_| awaken_master }
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def awaken_master
 | 
  
    |  |       begin
 | 
  
    |  |         self_pipe.last.write_nonblock('.') # wakeup master process from select
 | 
  
    |  |       rescue Errno::EAGAIN, Errno::EINTR
 | 
  
    |  |         # pipe is full, master should wake up anyways
 | 
  
    |  |         retry
 | 
  
    |  |       end
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     # defer a signal for later processing in #join (master process)
 | 
  
    |  |     def trap_deferred(signal)
 | 
  
    |  |       trap(signal) do |sig_nr|
 | 
  
    |  |         if sig_queue.size < SIG_QUEUE_MAX_SIZE
 | 
  
    |  |           sig_queue << signal
 | 
  
    |  |           awaken_master
 | 
  
    |  |         else
 | 
  
    |  |           log "ignoring SIG#{signal}, queue=#{sig_queue.inspect}"
 | 
  
    |  |         end
 | 
  
    |  |       end
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def reset_sig_handlers!
 | 
  
    |  |       QUEUE_SIGS.each {|sig| trap(sig, "DEFAULT") }
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def handle_sig_queue!
 | 
  
    |  |       case signal = sig_queue.shift
 | 
  
    |  |       when :USR1, :USR2, :CONT
 | 
  
    |  |         log "#{signal}: sending to all workers"
 | 
  
    |  |         signal_all_workers(signal)
 | 
  
    |  |       when :HUP
 | 
  
    |  |         log "HUP: reload config file"
 | 
  
    |  |         load_config
 | 
  
    |  |         maintain_worker_count
 | 
  
    |  |       when :WINCH
 | 
  
    |  |         log "WINCH: gracefully stopping all workers"
 | 
  
    |  |         @config = {}
 | 
  
    |  |         maintain_worker_count
 | 
  
    |  |       when :QUIT
 | 
  
    |  |         log "QUIT: graceful shutdown, waiting for children"
 | 
  
    |  |         signal_all_workers(:QUIT)
 | 
  
    |  |         reap_all_workers(0) # will hang until all workers are shutdown
 | 
  
    |  |         :break
 | 
  
    |  |       when :INT
 | 
  
    |  |         log "INT: immediate shutdown (graceful worker shutdown)"
 | 
  
    |  |         signal_all_workers(:QUIT)
 | 
  
    |  |         :break
 | 
  
    |  |       when :TERM
 | 
  
    |  |         log "TERM: immediate shutdown (and immediate worker shutdown)"
 | 
  
    |  |         signal_all_workers(:TERM)
 | 
  
    |  |         :break
 | 
  
    |  |       end
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     # }}}
 | 
  
    |  |     # start, join, and master sleep {{{
 | 
  
    |  | 
 | 
  
    |  |     def start
 | 
  
    |  |       procline("(starting)")
 | 
  
    |  |       init_self_pipe!
 | 
  
    |  |       init_sig_handlers!
 | 
  
    |  |       maintain_worker_count
 | 
  
    |  |       procline("(started)")
 | 
  
    |  |       log "**** started master at PID: #{Process.pid}"
 | 
  
    |  |       log "**** Pool contains PIDs: #{all_pids.inspect}"
 | 
  
    |  |       self
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def join
 | 
  
    |  |       loop do
 | 
  
    |  |         reap_all_workers
 | 
  
    |  |         break if handle_sig_queue! == :break
 | 
  
    |  |         if sig_queue.empty?
 | 
  
    |  |           master_sleep
 | 
  
    |  |           monitor_memory_usage
 | 
  
    |  |           maintain_worker_count
 | 
  
    |  |         end
 | 
  
    |  |         procline("managing #{all_pids.inspect}")
 | 
  
    |  |       end
 | 
  
    |  |       procline("(shutting down)")
 | 
  
    |  |       #stop # gracefully shutdown all workers on our way out
 | 
  
    |  |       log "**** master complete"
 | 
  
    |  |       #unlink_pid_safe(pid) if pid
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def master_sleep
 | 
  
    |  |       begin
 | 
  
    |  |         ready = IO.select([self_pipe.first], nil, nil, 1) or return
 | 
  
    |  |         ready.first && ready.first.first or return
 | 
  
    |  |         loop { self_pipe.first.read_nonblock(CHUNK_SIZE) }
 | 
  
    |  |       rescue Errno::EAGAIN, Errno::EINTR
 | 
  
    |  |       end
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     # }}}
 | 
  
    |  |     # worker process management {{{
 | 
  
    |  | 
 | 
  
    |  |     def reap_all_workers(waitpid_flags=Process::WNOHANG)
 | 
  
    |  |       begin
 | 
  
    |  |         loop do
 | 
  
    |  |           wpid, status = Process.waitpid2(-1, waitpid_flags)
 | 
  
    |  |           wpid or break
 | 
  
    |  |           worker = delete_worker(wpid)
 | 
  
    |  |           # TODO: close any file descriptors connected to worker, if any
 | 
  
    |  |           log "** reaped #{status.inspect}, worker=#{worker.queues.join(",")}"
 | 
  
    |  |         end
 | 
  
    |  |       rescue Errno::EINTR
 | 
  
    |  |         retry
 | 
  
    |  |       rescue Errno::ECHILD
 | 
  
    |  |       end
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def delete_worker(pid)
 | 
  
    |  |       worker = nil
 | 
  
    |  |       workers.detect do |queues, pid_to_worker|
 | 
  
    |  |         worker = pid_to_worker.delete(pid)
 | 
  
    |  |       end
 | 
  
    |  |       worker
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def all_pids
 | 
  
    |  |       workers.map {|q,workers| workers.keys }.flatten
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def signal_all_workers(signal)
 | 
  
    |  |       all_pids.each do |pid|
 | 
  
    |  |         Process.kill signal, pid
 | 
  
    |  |       end
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def memory_usage(pid)
 | 
  
    |  |       smaps_filename = "/proc/#{pid}/smaps"
 | 
  
    |  |           
 | 
  
    |  |       #Grab actual memory usage from proc in MB
 | 
  
    |  |       begin
 | 
  
    |  |         mem_usage = `
 | 
  
    |  |           if [ -f #{smaps_filename} ];
 | 
  
    |  |             then
 | 
  
    |  |               grep Private_Dirty #{smaps_filename} | awk '{s+=$2} END {printf("%d", s/1000)}'
 | 
  
    |  |             else echo "0"
 | 
  
    |  |           fi
 | 
  
    |  |         `.to_i
 | 
  
    |  |         rescue Errno::EINTR
 | 
  
    |  |           retry
 | 
  
    |  |         end
 | 
  
    |  |     end
 | 
  
    |  |     
 | 
  
    |  |     def process_exists?(pid)
 | 
  
    |  |       begin
 | 
  
    |  |         ps_line = `ps -p #{pid} --no-header`
 | 
  
    |  |       rescue Errno::EINTR
 | 
  
    |  |         retry
 | 
  
    |  |       end
 | 
  
    |  |       !ps_line.nil? && ps_line.strip != ''
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def hard_kill_workers
 | 
  
    |  |       @term_workers ||= []
 | 
  
    |  |       #look for workers that didn't terminate
 | 
  
    |  |       @term_workers.delete_if {|pid| !process_exists?(pid)}
 | 
  
    |  |       #send the rest a -9
 | 
  
    |  |       @term_workers.each {|pid| `kill -9 #{pid}`}
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def add_killed_worker(pid)
 | 
  
    |  |       @term_workers ||= []
 | 
  
    |  |       @term_workers << pid if pid
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def monitor_memory_usage
 | 
  
    |  |       #only check every minute
 | 
  
    |  |       if @last_mem_check.nil? || @last_mem_check < Time.now - 60
 | 
  
    |  |         hard_kill_workers
 | 
  
    |  | 
 | 
  
    |  |         all_pids.each do |pid|
 | 
  
    |  | 
 | 
  
    |  |           total_usage = memory_usage(pid)
 | 
  
    |  |           child_pid = find_child_pid(pid)
 | 
  
    |  |           
 | 
  
    |  |           total_usage += memory_usage(child_pid) if child_pid
 | 
  
    |  |           
 | 
  
    |  |           if total_usage > 250
 | 
  
    |  |             log "Terminating worker #{pid} for using #{total_usage}MB memory"
 | 
  
    |  |             stop_worker(pid)
 | 
  
    |  |           elsif total_usage > 200
 | 
  
    |  |             log "Gracefully shutting down worker #{pid} for using #{total_usage}MB memory"
 | 
  
    |  |             stop_worker(pid, :QUIT)
 | 
  
    |  |           end
 | 
  
    |  | 
 | 
  
    |  |         end
 | 
  
    |  | 
 | 
  
    |  |         @last_mem_check = Time.now
 | 
  
    |  |       end
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def hostname
 | 
  
    |  |       begin
 | 
  
    |  |         @hostname ||= `hostname`.strip
 | 
  
    |  |       rescue Errno::EINTR
 | 
  
    |  |         retry
 | 
  
    |  |       end
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def stop_worker(pid, signal=:TERM)
 | 
  
    |  |       begin
 | 
  
    |  |         worker = Resque.working.find do |w|
 | 
  
    |  |           host, worker_pid, queues = w.id.split(':')
 | 
  
    |  |           w if worker_pid.to_i == pid.to_i && host == hostname
 | 
  
    |  |         end
 | 
  
    |  |         if worker
 | 
  
    |  |           encoded_job = worker.job
 | 
  
    |  |           verb = signal == :QUIT ? 'Graceful' : 'Forcing'
 | 
  
    |  |           total_time = Time.now - Time.parse(encoded_job['run_at']) rescue 0
 | 
  
    |  |           log "#{verb} shutdown while processing: #{encoded_job} -- ran for #{'%.2f' % total_time}s"
 | 
  
    |  |         end
 | 
  
    |  | 
 | 
  
    |  |         Process.kill signal, pid
 | 
  
    |  |         if signal == :TERM
 | 
  
    |  |           add_killed_worker(pid)
 | 
  
    |  |           add_killed_worker(find_child_pid(pid))
 | 
  
    |  |         end
 | 
  
    |  |       rescue Errno::EINTR
 | 
  
    |  |         retry
 | 
  
    |  |       end
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def find_child_pid(parent_pid)
 | 
  
    |  |       begin
 | 
  
    |  |         p = `ps --ppid #{parent_pid} -o pid --no-header`.to_i
 | 
  
    |  |         p == 0 ? nil : p
 | 
  
    |  |       rescue Errno::EINTR
 | 
  
    |  |         retry
 | 
  
    |  |       end
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def orphaned_worker_count
 | 
  
    |  |       if @last_orphaned_check.nil? || @last_orphaned_check < Time.now - 60
 | 
  
    |  |         if @orphaned_pids.nil?
 | 
  
    |  |             printf_line = '%d %d\n'
 | 
  
    |  |           begin
 | 
  
    |  |             pids_with_parents = `ps -Af | grep resque | grep -v grep | grep -v resque-web | grep -v master | awk '{printf("%d %d\\n", $2, $3)}'`.split("\n")
 | 
  
    |  |           rescue Errno::EINTR
 | 
  
    |  |             retry
 | 
  
    |  |           end
 | 
  
    |  |           pids = pids_with_parents.collect {|x| x.split[0].to_i}
 | 
  
    |  |           parents = pids_with_parents.collect {|x| x.split[1].to_i}
 | 
  
    |  |           pids.delete_if {|x| parents.include?(x)}
 | 
  
    |  |           pids.delete_if {|x| all_pids.include?(x)}
 | 
  
    |  |           @orphaned_pids = pids
 | 
  
    |  |         elsif @orphaned_pids.size > 0
 | 
  
    |  |           @orphaned_pids.delete_if do |pid|
 | 
  
    |  |             begin
 | 
  
    |  |               ps_out = `ps --no-heading p #{pid}`
 | 
  
    |  |               ps_out.nil? || ps_out.strip == ''
 | 
  
    |  |             rescue Errno::EINTR
 | 
  
    |  |               retry
 | 
  
    |  |             end
 | 
  
    |  |           end
 | 
  
    |  |         end
 | 
  
    |  |         @last_orphaned_check = Time.now
 | 
  
    |  |         log "Current orphaned pids: #{@orphaned_pids}" if @orphaned_pids.size > 0
 | 
  
    |  |       end
 | 
  
    |  |       @orphaned_pids.size
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     # }}}
 | 
  
    |  |     # ???: maintain_worker_count, all_known_queues {{{
 | 
  
    |  | 
 | 
  
    |  |     def maintain_worker_count
 | 
  
    |  |       orphaned_offset = orphaned_worker_count / all_known_queues.size
 | 
  
    |  |       all_known_queues.each do |queues|
 | 
  
    |  |         delta = worker_delta_for(queues) - orphaned_offset
 | 
  
    |  |         spawn_missing_workers_for(queues, delta) if delta > 0
 | 
  
    |  |         quit_excess_workers_for(queues, delta)   if delta < 0
 | 
  
    |  |       end
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def all_known_queues
 | 
  
    |  |       config.keys | workers.keys
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     # }}}
 | 
  
    |  |     # methods that operate on a single grouping of queues {{{
 | 
  
    |  |     # perhaps this means a class is waiting to be extracted
 | 
  
    |  | 
 | 
  
    |  |     def spawn_missing_workers_for(queues, delta)
 | 
  
    |  |       delta.times { spawn_worker!(queues) } if delta > 0
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def quit_excess_workers_for(queues, delta)
 | 
  
    |  |       if delta < 0
 | 
  
    |  |         queue_pids = pids_for(queues)
 | 
  
    |  |         if queue_pids.size >= delta.abs
 | 
  
    |  |           queue_pids[0...delta.abs].each {|pid| Process.kill("QUIT", pid)}
 | 
  
    |  |         else
 | 
  
    |  |           queue_pids.each {|pid| Process.kill("QUIT", pid)}
 | 
  
    |  |         end
 | 
  
    |  |       end
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def worker_delta_for(queues)
 | 
  
    |  |       config.fetch(queues, 0) - workers.fetch(queues, []).size
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def pids_for(queues)
 | 
  
    |  |       workers[queues].keys
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def spawn_worker!(queues)
 | 
  
    |  |       worker = create_worker(queues)
 | 
  
    |  |       pid = fork do
 | 
  
    |  |         log "*** Starting worker #{worker}"
 | 
  
    |  |         call_after_prefork!
 | 
  
    |  |         reset_sig_handlers!
 | 
  
    |  |         #self_pipe.each {|io| io.close }
 | 
  
    |  |         begin
 | 
  
    |  |           worker.work(ENV['INTERVAL'] || DEFAULT_WORKER_INTERVAL) # interval, will block
 | 
  
    |  |         rescue Errno::EINTR
 | 
  
    |  |           log "Caught interrupted system call Errno::EINTR. Retrying."
 | 
  
    |  |           retry
 | 
  
    |  |         end
 | 
  
    |  |       end
 | 
  
    |  |       workers[queues] ||= {}
 | 
  
    |  |       workers[queues][pid] = worker
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     def create_worker(queues)
 | 
  
    |  |       queues = queues.to_s.split(',')
 | 
  
    |  |       worker = PooledWorker.new(*queues)
 | 
  
    |  |       worker.verbose = ENV['LOGGING'] || ENV['VERBOSE']
 | 
  
    |  |       worker.very_verbose = ENV['VVERBOSE']
 | 
  
    |  |       worker
 | 
  
    |  |     end
 | 
  
    |  | 
 | 
  
    |  |     # }}}
 | 
  
    |  | 
 | 
  
    |  |   end
 | 
  
    |  | end
 |