Question

J'ai un pipeline Python complexe (dont le code je ne peux pas changer), appelant plusieurs autres scripts et autres exécutables.Le fait est qu’il faut beaucoup de temps pour gérer plus de 8 000 annuaires et effectuer des analyses scientifiques.J'ai donc écrit un simple wrapper (peut-être pas le plus efficace, mais semble fonctionner) en utilisant le module multitraitement.

from os import path, listdir, mkdir, system
from os.path import join as osjoin, exists, isfile
from GffTools import Gene, Element, Transcript
from GffTools import read as gread, write as gwrite, sort as gsort
from re import match
from multiprocessing import JoinableQueue, Process
from sys import argv, exit

# some absolute paths
inbase = "/.../abfgp_in"
outbase = "/.../abfgp_out"
abfgp_cmd = "python /.../abfgp-2.rev/abfgp.py"
refGff = "/.../B0510_manual_reindexed_noSeq.gff"

# the Queue
Q = JoinableQueue()
i = 0

# define number of processes
try: num_p = int(argv[1])
except ValueError: exit("Wrong CPU argument")

# This is the function calling the abfgp.py script, which in its turn calls alot of third party software
def abfgp(id_, pid):
    out = osjoin(outbase, id_)
    if not exists(out): mkdir(out)

    # logfile
    log = osjoin(outbase, "log_process_%s" %(pid))
    try:
        # call the script
        system("%s --dna %s --multifasta %s --target %s -o %s -q >>%s" %(abfgp_cmd, osjoin(inbase, id_, id_ +".dna.fa"), osjoin(inbase, id_, "informants.mfa"), id_, out, log))
    except:
        print "ABFGP FAILED"
        return

# parse the output
def extractGff(id_):
   # code not relevant


# function called by multiple processes, using the Queue
def run(Q, pid):
    while not Q.empty():
        try:
            d = Q.get()             
            print "%s\t=>>\t%s" %(str(i-Q.qsize()), d)          
            abfgp(d, pid)
            Q.task_done()
        except KeyboardInterrupt:
            exit("Interrupted Child")

# list of directories
genedirs = [d for d in listdir(inbase)]
genes = gread(refGff)
for d in genedirs:
    i += 1
    indir = osjoin(inbase, d)
    outdir = osjoin(outbase, d)
    Q.put(d)

# this loop creates the multiple processes
procs = []
for pid in range(num_p):
    try:
        p = Process(target=run, args=(Q, pid+1))
        p.daemon = True
        procs.append(p) 
        p.start()
    except KeyboardInterrupt:
        print "Aborting start of child processes"
        for x in procs:
            x.terminate()
        exit("Interrupted")     

try:
    for p in procs:
        p.join()
except:
    print "Terminating child processes"
    for x in procs:
        x.terminate()
    exit("Interrupted")

print "Parsing output..."
for d in genedirs: extractGff(d)

Maintenant, le problème est que abfgp.py utilise la fonction os.chdir, ce qui semble perturber le traitement parallèle.Je reçois beaucoup d'erreurs, indiquant que certains fichiers/répertoires (d'entrée/sortie) sont introuvables en lecture/écriture.Même si j'appelle le script via os.system(), à partir duquel je pensais que la création de processus séparés empêcherait cela.

Comment puis-je contourner ces interférences chdir ?

Modifier:Je pourrais changer os.system() en subprocess.Popen(cwd="...") avec le bon répertoire.J'espère que cela fait une différence.

Merci.

Était-ce utile?

La solution

Modifier 2

Ne pas utiliser os.system() utiliser subprocess.call()

system("%s --dna %s --multifasta %s --target %s -o %s -q >>%s" %(abfgp_cmd, osjoin(inbase, id_, id_ +".dna.fa"), osjoin(inbase, id_, "informants.mfa"), id_, out, log))

se traduirait par

subprocess.call((abfgp_cmd, '--dna', osjoin(inbase, id_, id_ +".dna.fa"), '--multifasta', osjoin(inbase, id_, "informants.mfa"), '--target', id_, '-o', out, '-q')) # without log.

Modifier 1Je pense que le problème est que le multitraitement utilise les noms de modules pour sérialiser les fonctions et les classes.

Cela signifie que si vous le faites import module où se trouve le module ./module.py et tu fais quelque chose comme os.chdir('./dir') maintenant tu devrais from .. import module.

Les processus enfants héritent du dossier du processus parent.Cela peut poser un problème.

Solutions

  1. Assurez-vous que tous les modules sont importés (dans les processus enfants) et après cela, vous changez de répertoire
  2. insérez l'original os.getcwd() à sys.path pour activer l'importation à partir du répertoire d'origine.Cela doit être fait avant qu'une fonction ne soit appelée depuis le répertoire local.
  3. placez toutes les fonctions que vous utilisez dans un répertoire qui peut toujours être importé.Le site-packages pourrait être un tel répertoire.Ensuite, vous pouvez faire quelque chose comme import module module.main() pour commencer ce que vous faites.
  4. C'est un hack que je fais parce que je sais comment fonctionne le cornichon.Utilisez-le uniquement si d'autres tentatives échouent.Le script imprime :

    serialized # the function runD is serialized
    string executed # before the function is loaded the code is executed
    loaded # now the function run is deserialized
    run # run is called
    

    Dans votre cas, vous feriez quelque chose comme ceci :

    runD = evalBeforeDeserialize('__import__("sys").path.append({})'.format(repr(os.getcwd())), run)
    p = Process(target=runD, args=(Q, pid+1))
    

    Voici le script :

    # functions that you need
    
    class R(object):
        def __init__(self, call, *args):
    
            self.ret = (call, args)
        def __reduce__(self):
            return self.ret
        def __call__(self, *args, **kw):
            raise NotImplementedError('this should never be called')
    
    class evalBeforeDeserialize(object):
        def __init__(self, string, function):
            self.function = function
            self.string = string
        def __reduce__(self):
            return R(getattr, tuple, '__getitem__'), \
                     ((R(eval, self.string), self.function), -1)
    
    # code to show how it works        
    
    def printing():
        print('string executed')
    
    def run():
        print('run')
    
    runD = evalBeforeDeserialize('__import__("__main__").printing()', run)
    
    import pickle
    
    s = pickle.dumps(runD)
    print('serialized')
    run2 = pickle.loads(s)
    print('loaded')
    run2()
    

Veuillez nous signaler si cela ne fonctionne pas.

Autres conseils

Vous pouvez déterminer quelle instance du os bibliothèque utilisée par le programme inaltérable ;puis créez une version personnalisée de chdir dans cette bibliothèque qui fait ce dont vous avez besoin : empêcher le changement de répertoire, le consigner, peu importe.Si le comportement personnalisé doit être réservé à un seul programme, vous pouvez utiliser l'option inspect module pour identifier l'appelant et adapter le comportement d'une manière spécifique à cet appelant.

Vos options sont limitées si vous ne pouvez vraiment pas modifier le programme existant ;mais si vous avez la possibilité de modifier les bibliothèques importées, quelque chose comme ceci pourrait être le moyen le moins invasif d'éviter le comportement indésirable.

Les mises en garde habituelles s'appliquent lors de la modification d'une bibliothèque standard.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top