<html><head><meta name="color-scheme" content="light dark"></head><body><pre style="word-wrap: break-word; white-space: pre-wrap;">import multiprocessing
import os
import re
import sys
import time

from .processes import ForkedProcess
from .remoteproxy import ClosedError


class CanceledError(Exception):
    """Raised when the progress dialog is canceled during a processing operation."""
    pass

class Parallelize(object):
    """
    Class for ultra-simple inline parallelization on multi-core CPUs
    
    Example::
    
        ## Here is the serial (single-process) task:
        
        tasks = [1, 2, 4, 8]
        results = []
        for task in tasks:
            result = processTask(task)
            results.append(result)
        print(results)
        
        
        ## Here is the parallelized version:
        
        tasks = [1, 2, 4, 8]
        results = []
        with Parallelize(tasks, workers=4, results=results) as tasker:
            for task in tasker:
                result = processTask(task)
                tasker.results.append(result)
        print(results)
        
        
    The only major caveat is that *result* in the example above must be picklable,
    since it is automatically sent via pipe back to the parent process.
    """

    def __init__(self, tasks=None, workers=None, block=True, progressDialog=None, randomReseed=True, **kwds):
        """
        ===============  ===================================================================
        **Arguments:**
        tasks            list of objects to be processed (Parallelize will determine how to 
                         distribute the tasks). If unspecified, then each worker will receive
                         a single task with a unique id number.
        workers          number of worker processes or None to use number of CPUs in the 
                         system
        progressDialog   optional dict of arguments for ProgressDialog
                         to update while tasks are processed
        randomReseed     If True, each forked process will reseed its random number generator
                         to ensure independent results. Works with the built-in random
                         and numpy.random.
        kwds             objects to be shared by proxy with child processes (they will 
                         appear as attributes of the tasker)
        ===============  ===================================================================
        """
        
        ## Generate progress dialog. 
        ## Note that we want to avoid letting forked child processes play with progress dialogs..
        self.showProgress = False
        if progressDialog is not None:
            self.showProgress = True
            if isinstance(progressDialog, str):
                progressDialog = {'labelText': progressDialog}
            from ..widgets.ProgressDialog import ProgressDialog
            self.progressDlg = ProgressDialog(**progressDialog)
        
        if workers is None:
            workers = self.suggestedWorkerCount()
        if not hasattr(os, 'fork'):
            workers = 1
        self.workers = workers
        if tasks is None:
            tasks = range(workers)
        self.tasks = list(tasks)
        self.reseed = randomReseed
        self.kwds = kwds.copy()
        self.kwds['_taskStarted'] = self._taskStarted
        
    def __enter__(self):
        self.proc = None
        if self.workers == 1: 
            return self.runSerial()
        else:
            return self.runParallel()
    
    def __exit__(self, *exc_info):
        
        if self.proc is not None:  ## worker 
            exceptOccurred = exc_info[0] is not None ## hit an exception during processing.
                
            try:
                if exceptOccurred:
                    sys.excepthook(*exc_info)
            finally:
                #print os.getpid(), 'exit'
                os._exit(1 if exceptOccurred else 0)
                
        else:  ## parent
            if self.showProgress:
                try:
                    self.progressDlg.__exit__(None, None, None)
                except Exception:
                    pass

    def runSerial(self):
        if self.showProgress:
            self.progressDlg.__enter__()
            self.progressDlg.setMaximum(len(self.tasks))
        self.progress = {os.getpid(): []}
        return Tasker(self, None, self.tasks, self.kwds)

    
    def runParallel(self):
        self.childs = []
        
        ## break up tasks into one set per worker
        workers = self.workers
        chunks = [[] for i in range(workers)]
        i = 0
        for i in range(len(self.tasks)):
            chunks[i%workers].append(self.tasks[i])
        
        ## fork and assign tasks to each worker
        for i in range(workers):
            proc = ForkedProcess(target=None, preProxy=self.kwds, randomReseed=self.reseed)
            if not proc.isParent:
                self.proc = proc
                return Tasker(self, proc, chunks[i], proc.forkedProxies)
            else:
                self.childs.append(proc)
        
        ## Keep track of the progress of each worker independently.
        self.progress = dict([(ch.childPid, []) for ch in self.childs])
        ## for each child process, self.progress[pid] is a list
        ## of task indexes. The last index is the task currently being
        ## processed; all others are finished.
            
            
        try:
            if self.showProgress:
                self.progressDlg.__enter__()
                self.progressDlg.setMaximum(len(self.tasks))
            ## process events from workers until all have exited.
                
            activeChilds = self.childs[:]
            self.exitCodes = []
            pollInterval = 0.01
            while len(activeChilds) &gt; 0:
                waitingChildren = 0
                rem = []
                for ch in activeChilds:
                    try:
                        n = ch.processRequests()
                        if n &gt; 0:
                            waitingChildren += 1
                    except ClosedError:
                        #print ch.childPid, 'process finished'
                        rem.append(ch)
                        if self.showProgress:
                            self.progressDlg += 1
                #print "remove:", [ch.childPid for ch in rem]
                for ch in rem:
                    activeChilds.remove(ch)
                    while True:
                        try:
                            pid, exitcode = os.waitpid(ch.childPid, 0)
                            self.exitCodes.append(exitcode)
                            break
                        except OSError as ex:
                            if ex.errno == 4:  ## If we get this error, just try again
                                continue
                                #print "Ignored system call interruption"
                            else:
                                raise
                    
                    #print [ch.childPid for ch in activeChilds]
                    
                if self.showProgress and self.progressDlg.wasCanceled():
                    for ch in activeChilds:
                        ch.kill()
                    raise CanceledError()
                    
                ## adjust polling interval--prefer to get exactly 1 event per poll cycle.
                if waitingChildren &gt; 1:
                    pollInterval *= 0.7
                elif waitingChildren == 0:
                    pollInterval /= 0.7
                pollInterval = max(min(pollInterval, 0.5), 0.0005) ## but keep it within reasonable limits
                
                time.sleep(pollInterval)
        finally:
            if self.showProgress:
                self.progressDlg.__exit__(None, None, None)
            for ch in self.childs:
                ch.join()
        if len(self.exitCodes) &lt; len(self.childs):
            raise Exception("Parallelizer started %d processes but only received exit codes from %d." % (len(self.childs), len(self.exitCodes)))
        for code in self.exitCodes:
            if code != 0:
                raise Exception("Error occurred in parallel-executed subprocess (console output may have more information).")
        return []  ## no tasks for parent process.
    
    
    @staticmethod
    def suggestedWorkerCount():
        if 'linux' in sys.platform:
            ## I think we can do a little better here..
            ## cpu_count does not consider that there is little extra benefit to using hyperthreaded cores.
            try:
                cores = {}
                pid = None
                with open('/proc/cpuinfo') as fd:
                    for line in fd:
                        m = re.match(r'physical id\s+:\s+(\d+)', line)
                        if m is not None:
                            pid = m.groups()[0]
                        m = re.match(r'cpu cores\s+:\s+(\d+)', line)
                        if m is not None:
                            cores[pid] = int(m.groups()[0])
                return sum(cores.values())
            except:
                return multiprocessing.cpu_count()
                
        else:
            return multiprocessing.cpu_count()
        
    def _taskStarted(self, pid, i, **kwds):
        ## called remotely by tasker to indicate it has started working on task i
        #print pid, 'reported starting task', i
        if self.showProgress:
            if len(self.progress[pid]) &gt; 0:
                self.progressDlg += 1
            if pid == os.getpid():  ## single-worker process
                if self.progressDlg.wasCanceled():
                    raise CanceledError()
        self.progress[pid].append(i)
    
    
class Tasker(object):
    def __init__(self, parallelizer, process, tasks, kwds):
        self.proc = process
        self.par = parallelizer
        self.tasks = tasks
        for k, v in kwds.items():
            setattr(self, k, v)
        
    def __iter__(self):
        ## we could fix this up such that tasks are retrieved from the parent process one at a time..
        for i, task in enumerate(self.tasks):
            self.index = i
            #print os.getpid(), 'starting task', i
            self._taskStarted(os.getpid(), i, _callSync='off')
            yield task
        if self.proc is not None:
            #print os.getpid(), 'no more tasks'
            self.proc.close()
    
    def process(self):
        """
        Process requests from parent.
        Usually it is not necessary to call this unless you would like to 
        receive messages (such as exit requests) during an iteration.
        """
        if self.proc is not None:
            self.proc.processRequests()
    
    def numWorkers(self):
        """
        Return the number of parallel workers
        """
        return self.par.workers
    
#class Parallelizer:
    #"""
    #Use::
    
        #p = Parallelizer()
        #with p(4) as i:
            #p.finish(do_work(i))
        #print p.results()
    
    #"""
    #def __init__(self):
        #pass

    #def __call__(self, n):
        #self.replies = []
        #self.conn = None  ## indicates this is the parent process
        #return Session(self, n)
            
    #def finish(self, data):
        #if self.conn is None:
            #self.replies.append((self.i, data))
        #else:
            ##print "send", self.i, data
            #self.conn.send((self.i, data))
            #os._exit(0)
            
    #def result(self):
        #print self.replies
        
#class Session:
    #def __init__(self, par, n):
        #self.par = par
        #self.n = n
        
    #def __enter__(self):
        #self.childs = []
        #for i in range(1, self.n):
            #c1, c2 = multiprocessing.Pipe()
            #pid = os.fork()
            #if pid == 0:  ## child
                #self.par.i = i
                #self.par.conn = c2
                #self.childs = None
                #c1.close()
                #return i
            #else:
                #self.childs.append(c1)
                #c2.close()
        #self.par.i = 0
        #return 0
            
        
        
    #def __exit__(self, *exc_info):
        #if exc_info[0] is not None:
            #sys.excepthook(*exc_info)
        #if self.childs is not None:
            #self.par.replies.extend([conn.recv() for conn in self.childs])
        #else:
            #self.par.finish(None)
        
</pre></body></html>