Project

General

Profile

Bug #4844 ยป pool.rb

emakris (Ernie Makrkis), 06/07/2011 11:19 PM

 
1
# -*- encoding: utf-8 -*-
2
require 'resque'
3
require 'resque/pool/version'
4
require 'resque/pool/logging'
5
require 'resque/pool/pooled_worker'
6
require 'fcntl'
7
require 'yaml'
8

    
9
module Resque
10
  class Pool
11
    include Logging
12
    attr_reader :config
13
    attr_reader :workers
14

    
15
    # CONSTANTS {{{
16
    SIG_QUEUE_MAX_SIZE = 5
17
    DEFAULT_WORKER_INTERVAL = 5
18
    QUEUE_SIGS = [ :QUIT, :INT, :TERM, :USR1, :USR2, :CONT, :HUP, :WINCH, ]
19
    CHUNK_SIZE=(16 * 1024)
20
    # }}}
21

    
22
    def initialize(config)
23
      init_config(config)
24
      @workers = {}
25
      procline "(initialized)"
26
    end
27

    
28
    # Config: after_prefork {{{
29

    
30
    # The `after_prefork` hook will be run in workers if you are using the
31
    # preforking master worker to save memory. Use this hook to reload
32
    # database connections and so forth to ensure that they're not shared
33
    # among workers.
34
    #
35
    # Call with a block to set the hook.
36
    # Call with no arguments to return the hook.
37
    def self.after_prefork(&block)
38
      block ? (@after_prefork = block) : @after_prefork
39
    end
40

    
41
    # Set the after_prefork proc.
42
    def self.after_prefork=(after_prefork)
43
      @after_prefork = after_prefork
44
    end
45

    
46
    def call_after_prefork!
47
      self.class.after_prefork && self.class.after_prefork.call
48
    end
49

    
50
    # }}}
51
    # Config: class methods to start up the pool using the default config {{{
52

    
53
    @config_files = ["resque-pool.yml", "config/resque-pool.yml"]
54
    class << self; attr_accessor :config_files; end
55
    def self.choose_config_file
56
      if ENV["RESQUE_POOL_CONFIG"]
57
        ENV["RESQUE_POOL_CONFIG"]
58
      else
59
        @config_files.detect { |f| File.exist?(f) }
60
      end
61
    end
62

    
63
    def self.run
64
      if GC.respond_to?(:copy_on_write_friendly=)
65
        GC.copy_on_write_friendly = true
66
      end
67
      Resque::Pool.new(choose_config_file).start.join
68
    end
69

    
70
    # }}}
71
    # Config: load config and config file {{{
72

    
73
    def init_config(config)
74
      unless config
75
        raise ArgumentError,
76
          "No configuration found. Please setup config/resque-pool.yml"
77
      end
78
      if config.kind_of? String
79
        @config_file = config.to_s
80
      else
81
        @config = config.dup
82
      end
83
      load_config
84
    end
85

    
86
    def load_config
87
      @config_file and @config = YAML.load_file(@config_file)
88
      environment and @config[environment] and config.merge!(@config[environment])
89
      config.delete_if {|key, value| value.is_a? Hash }
90
    end
91

    
92
    def environment
93
      if defined? Rails
94
        Rails.env
95
      else
96
        ENV['RACK_ENV'] || ENV['RAILS_ENV'] || ENV['RESQUE_ENV']
97
      end
98
    end
99

    
100
    # }}}
101

    
102
    # Sig handlers and self pipe management {{{
103

    
104
    def self_pipe; @self_pipe ||= [] end
105
    def sig_queue; @sig_queue ||= [] end
106

    
107
    def init_self_pipe!
108
      self_pipe.each { |io| io.close rescue nil }
109
      self_pipe.replace(IO.pipe)
110
      self_pipe.each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
111
    end
112

    
113
    def init_sig_handlers!
114
      QUEUE_SIGS.each { |sig| trap_deferred(sig) }
115
      trap(:CHLD)     { |_| awaken_master }
116
    end
117

    
118
    def awaken_master
119
      begin
120
        self_pipe.last.write_nonblock('.') # wakeup master process from select
121
      rescue Errno::EAGAIN, Errno::EINTR
122
        # pipe is full, master should wake up anyways
123
        retry
124
      end
125
    end
126

    
127
    # defer a signal for later processing in #join (master process)
128
    def trap_deferred(signal)
129
      trap(signal) do |sig_nr|
130
        if sig_queue.size < SIG_QUEUE_MAX_SIZE
131
          sig_queue << signal
132
          awaken_master
133
        else
134
          log "ignoring SIG#{signal}, queue=#{sig_queue.inspect}"
135
        end
136
      end
137
    end
138

    
139
    def reset_sig_handlers!
140
      QUEUE_SIGS.each {|sig| trap(sig, "DEFAULT") }
141
    end
142

    
143
    def handle_sig_queue!
144
      case signal = sig_queue.shift
145
      when :USR1, :USR2, :CONT
146
        log "#{signal}: sending to all workers"
147
        signal_all_workers(signal)
148
      when :HUP
149
        log "HUP: reload config file"
150
        load_config
151
        maintain_worker_count
152
      when :WINCH
153
        log "WINCH: gracefully stopping all workers"
154
        @config = {}
155
        maintain_worker_count
156
      when :QUIT
157
        log "QUIT: graceful shutdown, waiting for children"
158
        signal_all_workers(:QUIT)
159
        reap_all_workers(0) # will hang until all workers are shutdown
160
        :break
161
      when :INT
162
        log "INT: immediate shutdown (graceful worker shutdown)"
163
        signal_all_workers(:QUIT)
164
        :break
165
      when :TERM
166
        log "TERM: immediate shutdown (and immediate worker shutdown)"
167
        signal_all_workers(:TERM)
168
        :break
169
      end
170
    end
171

    
172
    # }}}
173
    # start, join, and master sleep {{{
174

    
175
    def start
176
      procline("(starting)")
177
      init_self_pipe!
178
      init_sig_handlers!
179
      maintain_worker_count
180
      procline("(started)")
181
      log "**** started master at PID: #{Process.pid}"
182
      log "**** Pool contains PIDs: #{all_pids.inspect}"
183
      self
184
    end
185

    
186
    def join
187
      loop do
188
        reap_all_workers
189
        break if handle_sig_queue! == :break
190
        if sig_queue.empty?
191
          master_sleep
192
          monitor_memory_usage
193
          maintain_worker_count
194
        end
195
        procline("managing #{all_pids.inspect}")
196
      end
197
      procline("(shutting down)")
198
      #stop # gracefully shutdown all workers on our way out
199
      log "**** master complete"
200
      #unlink_pid_safe(pid) if pid
201
    end
202

    
203
    def master_sleep
204
      begin
205
        ready = IO.select([self_pipe.first], nil, nil, 1) or return
206
        ready.first && ready.first.first or return
207
        loop { self_pipe.first.read_nonblock(CHUNK_SIZE) }
208
      rescue Errno::EAGAIN, Errno::EINTR
209
      end
210
    end
211

    
212
    # }}}
213
    # worker process management {{{
214

    
215
    def reap_all_workers(waitpid_flags=Process::WNOHANG)
216
      begin
217
        loop do
218
          wpid, status = Process.waitpid2(-1, waitpid_flags)
219
          wpid or break
220
          worker = delete_worker(wpid)
221
          # TODO: close any file descriptors connected to worker, if any
222
          log "** reaped #{status.inspect}, worker=#{worker.queues.join(",")}"
223
        end
224
      rescue Errno::EINTR
225
        retry
226
      rescue Errno::ECHILD
227
      end
228
    end
229

    
230
    def delete_worker(pid)
231
      worker = nil
232
      workers.detect do |queues, pid_to_worker|
233
        worker = pid_to_worker.delete(pid)
234
      end
235
      worker
236
    end
237

    
238
    def all_pids
239
      workers.map {|q,workers| workers.keys }.flatten
240
    end
241

    
242
    def signal_all_workers(signal)
243
      all_pids.each do |pid|
244
        Process.kill signal, pid
245
      end
246
    end
247

    
248
    def memory_usage(pid)
249
      smaps_filename = "/proc/#{pid}/smaps"
250
          
251
      #Grab actual memory usage from proc in MB
252
      begin
253
        mem_usage = `
254
          if [ -f #{smaps_filename} ];
255
            then
256
              grep Private_Dirty #{smaps_filename} | awk '{s+=$2} END {printf("%d", s/1000)}'
257
            else echo "0"
258
          fi
259
        `.to_i
260
        rescue Errno::EINTR
261
          retry
262
        end
263
    end
264
    
265
    def process_exists?(pid)
266
      begin
267
        ps_line = `ps -p #{pid} --no-header`
268
      rescue Errno::EINTR
269
        retry
270
      end
271
      !ps_line.nil? && ps_line.strip != ''
272
    end
273

    
274
    def hard_kill_workers
275
      @term_workers ||= []
276
      #look for workers that didn't terminate
277
      @term_workers.delete_if {|pid| !process_exists?(pid)}
278
      #send the rest a -9
279
      @term_workers.each {|pid| `kill -9 #{pid}`}
280
    end
281

    
282
    def add_killed_worker(pid)
283
      @term_workers ||= []
284
      @term_workers << pid if pid
285
    end
286

    
287
    def monitor_memory_usage
288
      #only check every minute
289
      if @last_mem_check.nil? || @last_mem_check < Time.now - 60
290
        hard_kill_workers
291

    
292
        all_pids.each do |pid|
293

    
294
          total_usage = memory_usage(pid)
295
          child_pid = find_child_pid(pid)
296
          
297
          total_usage += memory_usage(child_pid) if child_pid
298
          
299
          if total_usage > 250
300
            log "Terminating worker #{pid} for using #{total_usage}MB memory"
301
            stop_worker(pid)
302
          elsif total_usage > 200
303
            log "Gracefully shutting down worker #{pid} for using #{total_usage}MB memory"
304
            stop_worker(pid, :QUIT)
305
          end
306

    
307
        end
308

    
309
        @last_mem_check = Time.now
310
      end
311
    end
312

    
313
    def hostname
314
      begin
315
        @hostname ||= `hostname`.strip
316
      rescue Errno::EINTR
317
        retry
318
      end
319
    end
320

    
321
    def stop_worker(pid, signal=:TERM)
322
      begin
323
        worker = Resque.working.find do |w|
324
          host, worker_pid, queues = w.id.split(':')
325
          w if worker_pid.to_i == pid.to_i && host == hostname
326
        end
327
        if worker
328
          encoded_job = worker.job
329
          verb = signal == :QUIT ? 'Graceful' : 'Forcing'
330
          total_time = Time.now - Time.parse(encoded_job['run_at']) rescue 0
331
          log "#{verb} shutdown while processing: #{encoded_job} -- ran for #{'%.2f' % total_time}s"
332
        end
333

    
334
        Process.kill signal, pid
335
        if signal == :TERM
336
          add_killed_worker(pid)
337
          add_killed_worker(find_child_pid(pid))
338
        end
339
      rescue Errno::EINTR
340
        retry
341
      end
342
    end
343

    
344
    def find_child_pid(parent_pid)
345
      begin
346
        p = `ps --ppid #{parent_pid} -o pid --no-header`.to_i
347
        p == 0 ? nil : p
348
      rescue Errno::EINTR
349
        retry
350
      end
351
    end
352

    
353
    def orphaned_worker_count
354
      if @last_orphaned_check.nil? || @last_orphaned_check < Time.now - 60
355
        if @orphaned_pids.nil?
356
            printf_line = '%d %d\n'
357
          begin
358
            pids_with_parents = `ps -Af | grep resque | grep -v grep | grep -v resque-web | grep -v master | awk '{printf("%d %d\\n", $2, $3)}'`.split("\n")
359
          rescue Errno::EINTR
360
            retry
361
          end
362
          pids = pids_with_parents.collect {|x| x.split[0].to_i}
363
          parents = pids_with_parents.collect {|x| x.split[1].to_i}
364
          pids.delete_if {|x| parents.include?(x)}
365
          pids.delete_if {|x| all_pids.include?(x)}
366
          @orphaned_pids = pids
367
        elsif @orphaned_pids.size > 0
368
          @orphaned_pids.delete_if do |pid|
369
            begin
370
              ps_out = `ps --no-heading p #{pid}`
371
              ps_out.nil? || ps_out.strip == ''
372
            rescue Errno::EINTR
373
              retry
374
            end
375
          end
376
        end
377
        @last_orphaned_check = Time.now
378
        log "Current orphaned pids: #{@orphaned_pids}" if @orphaned_pids.size > 0
379
      end
380
      @orphaned_pids.size
381
    end
382

    
383
    # }}}
384
    # ???: maintain_worker_count, all_known_queues {{{
385

    
386
    def maintain_worker_count
387
      orphaned_offset = orphaned_worker_count / all_known_queues.size
388
      all_known_queues.each do |queues|
389
        delta = worker_delta_for(queues) - orphaned_offset
390
        spawn_missing_workers_for(queues, delta) if delta > 0
391
        quit_excess_workers_for(queues, delta)   if delta < 0
392
      end
393
    end
394

    
395
    def all_known_queues
396
      config.keys | workers.keys
397
    end
398

    
399
    # }}}
400
    # methods that operate on a single grouping of queues {{{
401
    # perhaps this means a class is waiting to be extracted
402

    
403
    def spawn_missing_workers_for(queues, delta)
404
      delta.times { spawn_worker!(queues) } if delta > 0
405
    end
406

    
407
    def quit_excess_workers_for(queues, delta)
408
      if delta < 0
409
        queue_pids = pids_for(queues)
410
        if queue_pids.size >= delta.abs
411
          queue_pids[0...delta.abs].each {|pid| Process.kill("QUIT", pid)}
412
        else
413
          queue_pids.each {|pid| Process.kill("QUIT", pid)}
414
        end
415
      end
416
    end
417

    
418
    def worker_delta_for(queues)
419
      config.fetch(queues, 0) - workers.fetch(queues, []).size
420
    end
421

    
422
    def pids_for(queues)
423
      workers[queues].keys
424
    end
425

    
426
    def spawn_worker!(queues)
427
      worker = create_worker(queues)
428
      pid = fork do
429
        log "*** Starting worker #{worker}"
430
        call_after_prefork!
431
        reset_sig_handlers!
432
        #self_pipe.each {|io| io.close }
433
        begin
434
          worker.work(ENV['INTERVAL'] || DEFAULT_WORKER_INTERVAL) # interval, will block
435
        rescue Errno::EINTR
436
          log "Caught interrupted system call Errno::EINTR. Retrying."
437
          retry
438
        end
439
      end
440
      workers[queues] ||= {}
441
      workers[queues][pid] = worker
442
    end
443

    
444
    def create_worker(queues)
445
      queues = queues.to_s.split(',')
446
      worker = PooledWorker.new(*queues)
447
      worker.verbose = ENV['LOGGING'] || ENV['VERBOSE']
448
      worker.very_verbose = ENV['VVERBOSE']
449
      worker
450
    end
451

    
452
    # }}}
453

    
454
  end
455
end