|
#!/usr/bin/env ruby -w
|
|
# encoding: UTF-8
|
|
#
|
|
# = Project.rb -- The TaskJuggler III Project Management Software
|
|
#
|
|
# Copyright (c) 2006, 2007, 2008, 2009 by Chris Schlaeger <cs@kde.org>
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of version 2 of the GNU General Public License as
|
|
# published by the Free Software Foundation.
|
|
#
|
|
|
|
require 'thread'
|
|
require 'monitor'
|
|
|
|
class TaskJuggler
|
|
|
|
# The JobInfo class is just a storage container for some batch job realted
|
|
# pieces of information. It contains things like a job id, the process id,
|
|
# the stdout data and the like.
|
|
class JobInfo
|
|
|
|
attr_reader :jobId, :block, :tag
|
|
attr_accessor :pid, :retVal, :stdoutP, :stdoutC, :stdout, :stdoutEOT,
|
|
:stderrP, :stderrC, :stderr, :stderrEOT
|
|
|
|
def initialize(jobId, block, tag)
|
|
# The job id. A unique number that is used by the BatchProcessor objects
|
|
# to indentify jobs.
|
|
@jobId = jobId
|
|
# This the the block of code to be run as external process.
|
|
@block = block
|
|
# The tag can really be anything that the user of BatchProcessor needs
|
|
# to uniquely identify the job.
|
|
@tag = tag
|
|
# The pipe to transfer stdout data from the child to the parent.
|
|
@stdoutP, @stdoutC = nil
|
|
# The stdout output of the child
|
|
@stdout = ''
|
|
# This flag is set to true when the EOT character has been received.
|
|
@stdoutEOF = false
|
|
# The pipe to transfer stderr data from the child to the parent.
|
|
@stderrP, @stderrC = nil
|
|
# The stderr output of the child
|
|
@stderr = ''
|
|
# This flag is set to true when the EOT character has been received.
|
|
@stderrEOT = false
|
|
end
|
|
|
|
def openPipes
|
|
@stdoutP, @stdoutC = IO.pipe
|
|
@stderrP, @stderrC = IO.pipe
|
|
end
|
|
|
|
end
|
|
|
|
# The BatchProcessor class can be used to run code blocks of the program as
|
|
# a separate process. Mulitple pieces of code can be submitted to be
|
|
# executed in parallel. The number of CPU cores to use is limited at object
|
|
# creation time. The submitted jobs will be queued and scheduled to the
|
|
# given number of CPUs. The usage model is simple. Create an BatchProcessor
|
|
# object. Use BatchProcessor#queue to submit all the jobs and then use
|
|
# BatchProcessor#wait to wait for completion and to process the results.
|
|
class BatchProcessor
|
|
|
|
# Create a BatchProcessor object. +maxCpuCores+ limits the number of
|
|
# simultaneously spawned processes.
|
|
def initialize(maxCpuCores)
|
|
@maxCpuCores = maxCpuCores
|
|
# Jobs submitted by calling queue() are put in the @toRunQueue. The
|
|
# pusher Thread will pick them up and fork them off into another
|
|
# process.
|
|
@toRunQueue = Queue.new
|
|
# A hash that maps the JobInfo objects of running jobs by their PID.
|
|
@runningJobs = { }
|
|
# A list of jobs that wait to complete their writing.
|
|
@spoolingJobs = [ ]
|
|
# The wait() method will then clean the @toDropQueue, executes the post
|
|
# processing block and removes all JobInfo related objects.
|
|
@toDropQueue = Queue.new
|
|
|
|
# A semaphore to guard accesses to @runningJobs, @spoolingJobs and
|
|
# following shared data structures.
|
|
@lock = Monitor.new
|
|
# We count the submitted and completed jobs. The @jobsIn counter also
|
|
# doubles as a unique job ID.
|
|
@jobsIn = @jobsOut = 0
|
|
# An Array that holds all the IO objects to receive data from.
|
|
@pipes = []
|
|
# A hash that maps IO objects to JobInfo objects
|
|
@pipeToJob = {}
|
|
|
|
# This global flag is set to true to signal the threads to terminate.
|
|
@terminate = false
|
|
# Sleep time of the threads when no data is pending. This value must be
|
|
# large enough to allow for a context switch between the sending
|
|
# (forked-off) process and this process. If it's too large, throughput
|
|
# will suffer.
|
|
@timeout = 0.02
|
|
|
|
Thread.abort_on_exception = true
|
|
end
|
|
|
|
# Add a new job the job queue. +tag+ is some data that the caller can use
|
|
# to identify the job upon completion. +block+ is a Ruby code block to be
|
|
# executed in a separate process.
|
|
def queue(tag = nil, &block)
|
|
raise 'You cannot call queue() while wait() is running!' if @jobsOut > 0
|
|
|
|
# If this is the first queued job for this run, we have to start the
|
|
# helper threads.
|
|
if @jobsIn == 0
|
|
# The JobInfo objects in the @toRunQueue are processed by the pusher
|
|
# thread. It forkes off processes to execute the code block associated
|
|
# with the JobInfo.
|
|
@pusher = Thread.new { pusher }
|
|
# The popper thread waits for terminated childs and picks up the
|
|
# results.
|
|
@popper = Thread.new { popper }
|
|
# The grabber thread collects $stdout and $stderr data from each child
|
|
# process and stores them in the corresponding JobInfo.
|
|
@grabber = Thread.new { grabber }
|
|
end
|
|
|
|
# Create a new JobInfo object for the job and push it to the @toRunQueue.
|
|
job = JobInfo.new(@jobsIn, block, tag)
|
|
# Increase job counter
|
|
@lock.synchronize { @jobsIn += 1 }
|
|
@toRunQueue.push(job)
|
|
end
|
|
|
|
# Wait for all jobs to complete. The code block will get the JobInfo
|
|
# objects for each job to pick up the results.
|
|
def wait
|
|
# When we have received as many jobs in the @toDropQueue than we have
|
|
# started then we're done.
|
|
while !@lock.synchronize { @jobsIn == @jobsOut }
|
|
if @toDropQueue.empty?
|
|
sleep(@timeout)
|
|
else
|
|
# We have completed jobs.
|
|
while !@toDropQueue.empty?
|
|
# Pop a job from the @toDropQueue and call the block with it.
|
|
job = @toDropQueue.pop
|
|
# Remove the job related entries from the housekeeping tables.
|
|
@lock.synchronize { @jobsOut += 1 }
|
|
|
|
# Call the post-processing block that was passed to wait() with
|
|
# the JobInfo object as argument.
|
|
yield(job)
|
|
end
|
|
end
|
|
end
|
|
|
|
# Signal threads to stop
|
|
@terminate = true
|
|
# Wait for treads to finish
|
|
@pusher.join
|
|
@popper.join
|
|
@grabber.join
|
|
|
|
# Reset some variables so we can reuse the object for further job runs.
|
|
@jobsIn = @jobsOut = 0
|
|
@terminate = false
|
|
|
|
# Make sure all data structures are empty and clean.
|
|
check
|
|
end
|
|
|
|
private
|
|
|
|
# This function runs in a separate thread to pop JobInfo items from the
|
|
# @toRunQueue and create child processes for them.
|
|
def pusher
|
|
# Run until the terminate flag is set.
|
|
until @terminate
|
|
if @toRunQueue.empty? || @runningJobs.count >= @maxCpuCores
|
|
# We have no jobs in the @toRunQueue or all CPU cores in use already.
|
|
sleep(@timeout)
|
|
else
|
|
# Get a new job from the @toRunQueue
|
|
job = @toRunQueue.pop
|
|
|
|
job.openPipes
|
|
# Add the receiver end of the pipe to the @pipes Array.
|
|
@pipes << job.stdoutP
|
|
# Map the pipe end to this JobInfo object.
|
|
@pipeToJob[job.stdoutP] = job
|
|
# Same for $stderr.
|
|
@pipes << job.stderrP
|
|
@pipeToJob[job.stderrP] = job
|
|
|
|
@lock.synchronize do
|
|
pid = fork do
|
|
# This is the child process now. Connect $stdout and $stderr to
|
|
# the pipes.
|
|
$stdout.reopen(job.stdoutC)
|
|
job.stdoutC.close
|
|
$stderr.reopen(job.stderrC)
|
|
job.stderrC.close
|
|
# Call the Ruby code block
|
|
retVal = job.block.call
|
|
# Send EOT character to mark the end of the text.
|
|
$stdout.putc 4
|
|
$stdout.close
|
|
$stderr.putc 4
|
|
$stderr.close
|
|
# Now exit the child process and return the return value of the
|
|
# block as process return value.
|
|
exit retVal
|
|
end
|
|
job.pid = pid
|
|
# Save the process ID in the PID to JobInfo hash.
|
|
@runningJobs[pid] = job
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
# This function runs in a separate thread to wait for completed jobs. It
|
|
# waits for the process completion and stores the result in the
|
|
# corresponding JobInfo object.
|
|
def popper
|
|
until @terminate
|
|
if @runningJobs.empty?
|
|
# No pending jobs, wait a bit.
|
|
sleep(@timeout)
|
|
else
|
|
# Wait for the next job to complete.
|
|
pid, retVal = Process.wait2
|
|
job = nil
|
|
@lock.synchronize do
|
|
# Get the JobInfo object that corresponds to the process ID.
|
|
job = @runningJobs[pid]
|
|
raise "Unknown pid #{pid}" if job.nil?
|
|
# Remove the job from the @runningJobs Hash.
|
|
@runningJobs.delete(pid)
|
|
# Save the return value.
|
|
job.retVal = retVal.dup
|
|
if retVal.signaled?
|
|
cleanPipes(job)
|
|
# Aborted jobs will probably not send an EOT. So we fastrack
|
|
# them to the toDropQueue.
|
|
@toDropQueue.push(job)
|
|
else
|
|
# Push the job into the @spoolingJobs list to wait for it to
|
|
# finish writing IO.
|
|
@spoolingJobs << job
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
# This function runs in a separate thread to pick up the $stdout and
|
|
# $stderr outputs of the child processes. It stores them in the JobInfo
|
|
# object that corresponds to each child process.
|
|
def grabber
|
|
until @terminate
|
|
# Wait for output in any of the pipes or a timeout. To make sure that
|
|
# we get all output, we remain in the loop until the select() call
|
|
# times out.
|
|
res = nil
|
|
begin
|
|
@lock.synchronize do
|
|
if (res = select(@pipes, nil, @pipes, @timeout))
|
|
# We have output data from at least one child. Check which pipe
|
|
# actually triggered the select.
|
|
res[0].each do |pipe|
|
|
# Find the corresponding JobInfo object.
|
|
job = @pipeToJob[pipe]
|
|
# Store the output.
|
|
if pipe == job.stdoutP
|
|
# Look for the EOT character to signal the end of the text.
|
|
if (c = pipe.getc) == ?\004
|
|
job.stdoutEOT = true
|
|
else
|
|
job.stdout << c
|
|
end
|
|
else
|
|
if (c = pipe.getc) == ?\004
|
|
job.stderrEOT = true
|
|
else
|
|
job.stderr << c
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
sleep(@timeout) unless res
|
|
end while res
|
|
|
|
# Search the @spoolingJobs list for jobs that have completed IO and
|
|
# push them to the @toDropQueue.
|
|
@lock.synchronize do
|
|
@spoolingJobs.each do |job|
|
|
# Both stdout and stderr need to have reached the end of text.
|
|
if job.stdoutEOT && job.stderrEOT
|
|
@spoolingJobs.delete(job)
|
|
cleanPipes(job)
|
|
@toDropQueue.push(job)
|
|
# Since we deleted a list item during an iterator run, we
|
|
# terminate the iterator.
|
|
break
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
def cleanPipes(job)
|
|
@pipes.delete(job.stdoutP)
|
|
@pipeToJob.delete(job.stdoutP)
|
|
@pipes.delete(job.stderrP)
|
|
@pipeToJob.delete(job.stderrP)
|
|
job.stdoutC.close
|
|
job.stdoutP.close
|
|
job.stderrC.close
|
|
job.stderrP.close
|
|
job.stdoutC = job.stderrC = nil
|
|
job.stdoutP = job.stderrP = nil
|
|
end
|
|
|
|
def check
|
|
raise "toRunQueue not empty!" unless @toRunQueue.empty?
|
|
raise "runningJobs list not empty!" unless @runningJobs.empty?
|
|
raise "spoolingJobs list not empty!" unless @spoolingJobs.empty?
|
|
raise "toDropQueue not empty!" unless @toDropQueue.empty?
|
|
|
|
raise "pipe list not empty!" unless @pipes.empty?
|
|
raise "pipe map not empty!" unless @pipeToJob.empty?
|
|
end
|
|
|
|
end
|
|
|
|
end
|