다중 프로세서 시스템에서 병렬 아동 프로세스를 스폰하는 방법은 무엇입니까?
-
23-08-2019 - |
문제
다른 Python 스크립트의 컨트롤러로 사용하려는 Python 스크립트가 있습니다. 64 개의 프로세서가있는 서버가 있으므로이 두 번째 Python 스크립트의 64 개의 자식 프로세스를 스폰하고 싶습니다. 어린이 스크립트를 다음과 같이합니다.
$ python create_graphs.py --name=NAME
이름은 XYZ, ABC, NYU 등과 같은 곳입니다.
부모 컨트롤러 스크립트에서 목록에서 이름 변수를 검색합니다.
my_list = [ 'XYZ', 'ABC', 'NYU' ]
제 질문은,이 과정을 어린이로서 벗어나는 가장 좋은 방법은 무엇입니까? 한 번에 어린이 수를 64로 제한하고 싶으므로 (아동 프로세스가 끝나는 경우) 상태를 추적해야하므로 전체 세대를 효율적으로 운영 할 수 있습니다.
하위 프로세스 패키지를 사용하는 것을 살펴 보았지만 한 번에 한 아이 만 낳았 기 때문에 거부했습니다. 마침내 멀티 프로세서 패키지를 찾았지만 전체 스레드 대 하위 프로세스 문서에 압도당하는 것을 인정합니다.
지금 내 스크립트가 사용됩니다 subprocess.call
한 번에 한 아이 만 스폰하고 다음과 같이 보입니다.
#!/path/to/python
import subprocess, multiprocessing, Queue
from multiprocessing import Process
my_list = [ 'XYZ', 'ABC', 'NYU' ]
if __name__ == '__main__':
processors = multiprocessing.cpu_count()
for i in range(len(my_list)):
if( i < processors ):
cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]]
child = subprocess.call( cmd, shell=False )
한 번에 64 명의 아이들을 낳기를 정말로 원합니다. 다른 stackoverflow 질문에서 나는 사람들이 큐를 사용하는 것을 보았지만 그것이 성능 히트를 만드는 것처럼 보입니까?
해결책
당신이 찾고있는 것은 프로세스 풀 다중 프로세싱의 클래스.
import multiprocessing
import subprocess
def work(cmd):
return subprocess.call(cmd, shell=False)
if __name__ == '__main__':
count = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=count)
print pool.map(work, ['ls'] * count)
그리고 여기에 쉽게 이해할 수 있도록 계산 예제가 있습니다. 다음은 N이 CPU 카운트 인 N 프로세스에서 100000 작업을 나눕니다. 프로세스 수로 전달하지 않습니다. 이로 인해 풀 클래스가 프로세스 수에 CPU_COUNT를 사용하게됩니다.참조)
import multiprocessing
import subprocess
def calculate(value):
return value * 10
if __name__ == '__main__':
pool = multiprocessing.Pool(None)
tasks = range(10000)
results = []
r = pool.map_async(calculate, tasks, callback=results.append)
r.wait() # Wait on the results
print results
다른 팁
Nadia와 Jim의 의견을 바탕으로 제가 제기 한 해결책은 다음과 같습니다. 그것이 가장 좋은 방법인지 확실하지 않지만 작동합니다. Matlab을 포함한 일부 타사 앱을 사용해야하기 때문에 원래의 자식 스크립트는 쉘 스크립트 여야합니다. 그래서 나는 그것을 파이썬에서 꺼내서 bash로 코딩해야했습니다.
import sys
import os
import multiprocessing
import subprocess
def work(staname):
print 'Processing station:',staname
print 'Parent process:', os.getppid()
print 'Process id:', os.getpid()
cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ]
return subprocess.call(cmd, shell=False)
if __name__ == '__main__':
my_list = [ 'XYZ', 'ABC', 'NYU' ]
my_list.sort()
print my_list
# Get the number of processors available
num_processes = multiprocessing.cpu_count()
threads = []
len_stas = len(my_list)
print "+++ Number of stations to process: %s" % (len_stas)
# run until all the threads are done, and there is no data left
for list_item in my_list:
# if we aren't using all the processors AND there is still data left to
# compute, then spawn another thread
if( len(threads) < num_processes ):
p = multiprocessing.Process(target=work,args=[list_item])
p.start()
print p, p.is_alive()
threads.append(p)
else:
for thread in threads:
if not thread.is_alive():
threads.remove(thread)
이것은 합리적인 솔루션처럼 보입니까? Jim 's While Loop 형식을 사용하려고했지만 내 스크립트는 아무것도 반환하지 않았습니다. 왜 그런지 잘 모르겠습니다. 다음은 Jim 's'whend 'loop을'for 'loop을 대체하는 스크립트를 실행할 때의 출력입니다.
hostname{me}2% controller.py
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
hostname{me}3%
'for'루프로 실행하면 더 의미있는 것을 얻습니다.
hostname{me}6% controller.py
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
Processing station: ABC
Parent process: 1056
Process id: 1068
Processing station: NYU
Parent process: 1056
Process id: 1069
Processing station: XYZ
Parent process: 1056
Process id: 1071
hostname{me}7%
그래서 이것은 작동하고 행복합니다. 그러나 나는 여전히 내가 사용하는 'for'루프 대신 Jim 's'whind '스타일 루프를 사용할 수없는 이유를 알지 못합니다. 모든 도움에 감사드립니다. @ stackoverflow의 폭 넓은 지식에 깊은 인상을 받았습니다.
나는 확실히 사용할 것입니다 다중 프로세싱 하위 프로세스를 사용하여 내 솔루션을 굴리는 대신.
응용 프로그램에서 데이터를 얻으려고하지 않는 한 큐가 필요하지 않다고 생각합니다 (데이터를 원한다면 데이터베이스에 데이터베이스에 추가하는 것이 더 쉽다고 생각합니다).
그러나 크기를 위해 이것을 시도하십시오.
create_graphs.py 스크립트의 내용을 모두 "create_graphs"라는 함수에 넣으십시오.
import threading
from create_graphs import create_graphs
num_processes = 64
my_list = [ 'XYZ', 'ABC', 'NYU' ]
threads = []
# run until all the threads are done, and there is no data left
while threads or my_list:
# if we aren't using all the processors AND there is still data left to
# compute, then spawn another thread
if (len(threads) < num_processes) and my_list:
t = threading.Thread(target=create_graphs, args=[ my_list.pop() ])
t.setDaemon(True)
t.start()
threads.append(t)
# in the case that we have the maximum number of threads check if any of them
# are done. (also do this when we run out of data, until all the threads are done)
else:
for thread in threads:
if not thread.isAlive():
threads.remove(thread)
이것은 프로세서보다 스레드가 1 개 줄어든다는 것을 알고 있습니다. 이는 아마도 우수 할 것입니다. 이는 아마도 스레드, 디스크 I/O 및 컴퓨터에서 발생하는 다른 것들을 관리하는 프로세서를 남깁니다. 마지막 코어를 사용하기로 결정하면 하나를 추가하십시오.
편집하다: 나는 my_list의 목적을 잘못 해석했을 것이라고 생각합니다. 당신은 필요하지 않습니다 my_list
스레드를 전혀 추적하기 위해 threads
목록). 그러나 이것은 프로세스 입력을 공급하는 훌륭한 방법입니다.
목적 my_list
그리고 threads
my_list
기능에서 처리 해야하는 데이터를 보유합니다.
threads
현재 실행중인 스레드 목록 일뿐입니다
while 루프는 두 가지 작업을 수행하고 새 스레드를 시작하여 데이터를 처리하고 스레드가 실행되는지 확인하십시오.
따라서 (a) 처리 할 데이터가 더 많거나 (b) 실행이 끝나지 않은 스레드 .... 계속 실행하도록 프로그램을 원합니다. 두 목록이 모두 비워지면 평가할 것입니다 False
그리고 while 루프가 종료 될 것입니다