파이썬에서 멀티프로세싱 큐를 사용하는 방법은 무엇입니까?
멀티프로세싱 큐가 파이썬에서 어떻게 작동하는지, 어떻게 구현하는지 이해하는 데 많은 어려움을 겪고 있습니다.공유 파일의 데이터에 액세스하는 두 개의 파이썬 모듈이 있다고 가정해 보겠습니다. 이 두 모듈을 라이터 및 리더라고 합니다.제 계획은 독자와 작성자 모두가 두 개의 별도의 멀티프로세싱 큐에 요청을 넣은 다음 세 번째 프로세스가 이러한 요청을 루프로 팝하여 실행하도록 하는 것입니다.
저의 주된 문제는 멀티프로세싱을 구현하는 방법을 정말 모른다는 것입니다.큐가 정확합니다. 각 프로세스는 별도의 큐이기 때문에 개체를 인스턴스화할 수 없습니다. 모든 프로세스가 공유 큐(또는 이 경우 큐)와 관련이 있는지 어떻게 확인합니까?
간단한 요약
CY2023년 현재, 이 답변에 설명된 기술은 상당히 구식입니다.요즘은 대신에 사용합니다.multiprocessing
아래...
원답
저의 주된 문제는 멀티프로세싱을 구현하는 방법을 정말 모른다는 것입니다.큐가 정확합니다. 각 프로세스는 별도의 큐이기 때문에 개체를 인스턴스화할 수 없습니다. 모든 프로세스가 공유 큐(또는 이 경우 큐)와 관련이 있는지 어떻게 확인합니까?
이것은 판독기와 작성기가 단일 대기열을 공유하는 간단한 예입니다.작성자는 정수 묶음을 판독기로 보냅니다. 작성자의 숫자가 부족하면 'DONE'을 전송하여 판독기가 판독 루프를 벗어날 수 있도록 합니다.
원하는 만큼의 판독기 프로세스를 생성할 수 있습니다.
from multiprocessing import Process, Queue
import time
import sys
def reader_proc(queue):
"""Read from the queue; this spawns as a separate Process"""
while True:
msg = queue.get() # Read from the queue and do nothing
if msg == "DONE":
break
def writer(count, num_of_reader_procs, queue):
"""Write integers into the queue. A reader_proc() will read them from the queue"""
for ii in range(0, count):
queue.put(ii) # Put 'count' numbers into queue
### Tell all readers to stop...
for ii in range(0, num_of_reader_procs):
queue.put("DONE")
def start_reader_procs(qq, num_of_reader_procs):
"""Start the reader processes and return all in a list to the caller"""
all_reader_procs = list()
for ii in range(0, num_of_reader_procs):
### reader_p() reads from qq as a separate process...
### you can spawn as many reader_p() as you like
### however, there is usually a point of diminishing returns
reader_p = Process(target=reader_proc, args=((qq),))
reader_p.daemon = True
reader_p.start() # Launch reader_p() as another proc
all_reader_procs.append(reader_p)
return all_reader_procs
if __name__ == "__main__":
num_of_reader_procs = 2
qq = Queue() # writer() writes to qq from _this_ process
for count in [10**4, 10**5, 10**6]:
assert 0 < num_of_reader_procs < 4
all_reader_procs = start_reader_procs(qq, num_of_reader_procs)
writer(count, len(all_reader_procs), qq) # Queue stuff to all reader_p()
print("All reader processes are pulling numbers from the queue...")
_start = time.time()
for idx, a_reader_proc in enumerate(all_reader_procs):
print(" Waiting for reader_p.join() index %s" % idx)
a_reader_proc.join() # Wait for a_reader_proc() to finish
print(" reader_p() idx:%s is done" % idx)
print(
"Sending {0} integers through Queue() took {1} seconds".format(
count, (time.time() - _start)
)
)
print("")
의 아주 간단한 사용법은 다음과 같습니다.multiprocessing.Queue
그리고.multiprocessing.Process
호출자가 "이벤트"와 인수를 별도의 프로세스로 전송하여 프로세스의 "do_" 메서드에 이벤트를 디스패치할 수 있습니다.(파이썬 3.4+)
import multiprocessing as mp
import collections
Msg = collections.namedtuple('Msg', ['event', 'args'])
class BaseProcess(mp.Process):
"""A process backed by an internal queue for simple one-way message passing.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.queue = mp.Queue()
def send(self, event, *args):
"""Puts the event and args as a `Msg` on the queue
"""
msg = Msg(event, args)
self.queue.put(msg)
def dispatch(self, msg):
event, args = msg
handler = getattr(self, "do_%s" % event, None)
if not handler:
raise NotImplementedError("Process has no handler for [%s]" % event)
handler(*args)
def run(self):
while True:
msg = self.queue.get()
self.dispatch(msg)
용도:
class MyProcess(BaseProcess):
def do_helloworld(self, arg1, arg2):
print(arg1, arg2)
if __name__ == "__main__":
process = MyProcess()
process.start()
process.send('helloworld', 'hello', 'world')
그send
부모 과정에서 발생합니다.do_*
자녀 과정에서 발생합니다.
실행 루프를 방해하고 하위 프로세스를 종료할 수 있는 예외 처리를 생략했습니다.재정의하여 사용자 지정할 수도 있습니다.run
차단이나 다른 것들을 통제할 수 있습니다.
이것은 단일 작업자 프로세스가 있는 상황에서만 유용하지만, 이 질문에 대한 적절한 대답은 객체 지향적인 일반적인 시나리오를 보여주는 것이라고 생각합니다.
저는 스택 오버플로와 웹 전반에 걸친 여러 답변을 살펴보면서 대형 판다 데이터 프레임을 전달하기 위한 대기열을 사용하여 다중 처리를 수행하는 방법을 설정하려고 했습니다.제가 보기에 모든 답은 이러한 계산을 설정할 때 분명히 발견될 수많은 에지 사례에 대한 고려 없이 동일한 종류의 솔루션을 반복하는 것처럼 보였습니다.문제는 동시에 많은 것들이 놀고 있다는 것입니다.작업 수, 작업자 수, 각 작업 기간 및 작업 실행 중 가능한 예외입니다.이 모든 것이 동기화를 어렵게 만들고 대부분의 대답은 동기화를 수행할 수 있는 방법을 다루지 않습니다.이것은 몇 시간 동안 빈둥빈둥 놀고 난 후의 저의 견해입니다. 바라건대 이것이 대부분의 사람들이 유용하다고 느낄 만큼 일반적이기를 바랍니다.
코딩 예제 앞에 몇 가지 생각이 있습니다. 때부터queue.Empty
또는queue.qsize()
제어에 할 수 없음, 제어에 대해 신뢰할 수 없음
while True:
try:
task = pending_queue.get_nowait()
except queue.Empty:
break
가짜입니다.이렇게 하면 몇 밀리초 후에 대기열에 다른 작업이 나타나더라도 작업자가 사망합니다.작업자가 복구되지 않고 잠시 후 임의로 대기열이 비어 있는 것을 발견하면 모든 작업자가 사라집니다.최종 결과는 모든 작업이 완료되지 않은 상태에서 기본 다중 처리 기능(프로세스에 조인()이 있는 기능)이 반환됩니다.좋습니다. 수천 개의 작업이 있고 몇 개가 누락된 경우 디버깅을 수행하십시오.
다른 문제는 센티넬 값의 사용입니다.많은 사람들이 대기열 끝에 플래그를 표시하기 위해 대기열에 센티널 값을 추가할 것을 제안했습니다.하지만 정확히 누구에게 표시하는 것?N개의 작업자가 있는 경우 N이 주고 받을 수 있는 코어 수라고 가정하면 단일 센티널 값이 대기열 끝에 한 작업자에게만 플래그를 지정합니다.남은 일이 없을 때 다른 직원들은 모두 앉아서 더 많은 일을 기다리고 있을 것입니다.제가 본 대표적인 예는
while True:
task = pending_queue.get()
if task == SOME_SENTINEL_VALUE:
break
작업자 한 명은 보초 값을 받고 나머지는 무기한 대기합니다.제가 만난 어떤 게시물도 모든 직원이 받을 수 있도록 적어도 직원이 있는 횟수만큼 대기열에 보초 값을 제출해야 한다고 언급하지 않았습니다.
다른 문제는 작업 실행 중 예외 처리입니다.다시 한 번 이러한 것들을 포착하고 관리해야 합니다.게다가, 만약 당신이 가지고 있다면.completed_tasks
대기열 작업이 완료되었는지 결정하기 전에 대기열에 있는 항목 수를 독립적으로 계산해야 합니다.또한 대기열 크기에 의존하면 실패할 수 있으며 예상치 못한 결과가 반환됩니다.
아래의 예에서,par_proc()
함수는 명명된 인수 및 값과 함께 이러한 작업을 실행해야 하는 함수를 포함한 작업 목록을 수신합니다.
import multiprocessing as mp
import dill as pickle
import queue
import time
import psutil
SENTINEL = None
def do_work(tasks_pending, tasks_completed):
# Get the current worker's name
worker_name = mp.current_process().name
while True:
try:
task = tasks_pending.get_nowait()
except queue.Empty:
print(worker_name + ' found an empty queue. Sleeping for a while before checking again...')
time.sleep(0.01)
else:
try:
if task == SENTINEL:
print(worker_name + ' no more work left to be done. Exiting...')
break
print(worker_name + ' received some work... ')
time_start = time.perf_counter()
work_func = pickle.loads(task['func'])
result = work_func(**task['task'])
tasks_completed.put({work_func.__name__: result})
time_end = time.perf_counter() - time_start
print(worker_name + ' done in {} seconds'.format(round(time_end, 5)))
except Exception as e:
print(worker_name + ' task failed. ' + str(e))
tasks_completed.put({work_func.__name__: None})
def par_proc(job_list, num_cpus=None):
# Get the number of cores
if not num_cpus:
num_cpus = psutil.cpu_count(logical=False)
print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))
# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()
# Gather processes and results here
processes = []
results = []
# Count tasks
num_tasks = 0
# Add the tasks to the queue
for job in job_list:
for task in job['tasks']:
expanded_job = {}
num_tasks = num_tasks + 1
expanded_job.update({'func': pickle.dumps(job['func'])})
expanded_job.update({'task': task})
tasks_pending.put(expanded_job)
# Use as many workers as there are cores (usually chokes the system so better use less)
num_workers = num_cpus
# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
tasks_pending.put(SENTINEL)
print('* Number of tasks: {}'.format(num_tasks))
# Set-up and start the workers
for c in range(num_workers):
p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed))
p.name = 'worker' + str(c)
processes.append(p)
p.start()
# Gather the results
completed_tasks_counter = 0
while completed_tasks_counter < num_tasks:
results.append(tasks_completed.get())
completed_tasks_counter = completed_tasks_counter + 1
for p in processes:
p.join()
return results
그리고 여기 위의 코드를 실행하기 위한 테스트가 있습니다.
def test_parallel_processing():
def heavy_duty1(arg1, arg2, arg3):
return arg1 + arg2 + arg3
def heavy_duty2(arg1, arg2, arg3):
return arg1 * arg2 * arg3
task_list = [
{'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
{'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
]
results = par_proc(task_list)
job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])
assert job1 == 15
assert job2 == 21
그리고 약간의 예외가 있는 또 다른 것.
def test_parallel_processing_exceptions():
def heavy_duty1_raises(arg1, arg2, arg3):
raise ValueError('Exception raised')
return arg1 + arg2 + arg3
def heavy_duty2(arg1, arg2, arg3):
return arg1 * arg2 * arg3
task_list = [
{'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
{'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
]
results = par_proc(task_list)
job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])
assert not job1
assert job2 == 21
그것이 도움이 되길 바랍니다.
"에.from queue import Queue
라는 모듈이 없습니다.queue
,대신multiprocessing
사용해야 합니다.따라서, "처럼 보여야 합니다.from multiprocessing import Queue
"
두 개의 독립 실행형 프로그램 사이의 대기열을 통해 메시지를 전달하는 것을 보여주는 간단하고 일반적인 예를 방금 만들었습니다.OP의 질문에 직접적으로 답하지는 않지만 개념을 충분히 분명하게 나타내야 합니다.
서버:
multiprocessing-queue-manager-server.py
import asyncio
import concurrent.futures
import multiprocessing
import multiprocessing.managers
import queue
import sys
import threading
from typing import Any, AnyStr, Dict, Union
class QueueManager(multiprocessing.managers.BaseManager):
def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
pass
def get_queue(ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
global q
if not ident in q:
q[ident] = multiprocessing.Queue()
return q[ident]
q: Dict[Union[AnyStr, int, type(None)], multiprocessing.Queue] = dict()
delattr(QueueManager, 'get_queue')
def init_queue_manager_server():
if not hasattr(QueueManager, 'get_queue'):
QueueManager.register('get_queue', get_queue)
def serve(no: int, term_ev: threading.Event):
manager: QueueManager
with QueueManager(authkey=QueueManager.__name__.encode()) as manager:
print(f"Server address {no}: {manager.address}")
while not term_ev.is_set():
try:
item: Any = manager.get_queue().get(timeout=0.1)
print(f"Client {no}: {item} from {manager.address}")
except queue.Empty:
continue
async def main(n: int):
init_queue_manager_server()
term_ev: threading.Event = threading.Event()
executor: concurrent.futures.ThreadPoolExecutor = concurrent.futures.ThreadPoolExecutor()
i: int
for i in range(n):
asyncio.ensure_future(asyncio.get_running_loop().run_in_executor(executor, serve, i, term_ev))
# Gracefully shut down
try:
await asyncio.get_running_loop().create_future()
except asyncio.CancelledError:
term_ev.set()
executor.shutdown()
raise
if __name__ == '__main__':
asyncio.run(main(int(sys.argv[1])))
고객:
multiprocessing-queue-manager-client.py
import multiprocessing
import multiprocessing.managers
import os
import sys
from typing import AnyStr, Union
class QueueManager(multiprocessing.managers.BaseManager):
def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
pass
delattr(QueueManager, 'get_queue')
def init_queue_manager_client():
if not hasattr(QueueManager, 'get_queue'):
QueueManager.register('get_queue')
def main():
init_queue_manager_client()
manager: QueueManager = QueueManager(sys.argv[1], authkey=QueueManager.__name__.encode())
manager.connect()
message = f"A message from {os.getpid()}"
print(f"Message to send: {message}")
manager.get_queue().put(message)
if __name__ == '__main__':
main()
사용.
서버:
$ python3 multiprocessing-queue-manager-server.py N
N
생성할 서버 수를 나타내는 정수입니다.다음 중 하나를 복사합니다.<server-address-N>
서버에 의해 출력되고 그것을 각의 첫 번째 인수로 만듭니다.multiprocessing-queue-manager-client.py
.
고객:
python3 multiprocessing-queue-manager-client.py <server-address-1>
결과
서버:
Client 1: <item> from <server-address-1>
요지: https://gist.github.com/89062d639e40110c61c2f88018a8b0e5
UPD: 여기에 패키지를 만들었습니다.
서버:
import ipcq
with ipcq.QueueManagerServer(address=ipcq.Address.AUTO, authkey=ipcq.AuthKey.AUTO) as server:
server.get_queue().get()
고객:
import ipcq
client = ipcq.QueueManagerClient(address=ipcq.Address.AUTO, authkey=ipcq.AuthKey.AUTO)
client.get_queue().put('a message')
우리는 이것의 두 가지 버전을 구현했습니다. 하나는 다양한 유형의 콜을 실행하여 우리의 삶을 훨씬 더 쉽게 만들 수 있는 단순한 멀티 스레드 풀이고, 다른 하나는 프로세스를 사용하는 콜과 요구사항 측면에서 덜 유연하고 추가적인 딜링 호출입니다.
frozen_pool을 true로 설정하면 finish_pool_queue가 두 클래스에서 호출될 때까지 실행이 중지됩니다.
스레드 버전:
'''
Created on Nov 4, 2019
@author: Kevin
'''
from threading import Lock, Thread
from Queue import Queue
import traceback
from helium.loaders.loader_retailers import print_info
from time import sleep
import signal
import os
class ThreadPool(object):
def __init__(self, queue_threads, *args, **kwargs):
self.frozen_pool = kwargs.get('frozen_pool', False)
self.print_queue = kwargs.get('print_queue', True)
self.pool_results = []
self.lock = Lock()
self.queue_threads = queue_threads
self.queue = Queue()
self.threads = []
for i in range(self.queue_threads):
t = Thread(target=self.make_pool_call)
t.daemon = True
t.start()
self.threads.append(t)
def make_pool_call(self):
while True:
if self.frozen_pool:
#print '--> Queue is frozen'
sleep(1)
continue
item = self.queue.get()
if item is None:
break
call = item.get('call', None)
args = item.get('args', [])
kwargs = item.get('kwargs', {})
keep_results = item.get('keep_results', False)
try:
result = call(*args, **kwargs)
if keep_results:
self.lock.acquire()
self.pool_results.append((item, result))
self.lock.release()
except Exception as e:
self.lock.acquire()
print e
traceback.print_exc()
self.lock.release()
os.kill(os.getpid(), signal.SIGUSR1)
self.queue.task_done()
def finish_pool_queue(self):
self.frozen_pool = False
while self.queue.unfinished_tasks > 0:
if self.print_queue:
print_info('--> Thread pool... %s' % self.queue.unfinished_tasks)
sleep(5)
self.queue.join()
for i in range(self.queue_threads):
self.queue.put(None)
for t in self.threads:
t.join()
del self.threads[:]
def get_pool_results(self):
return self.pool_results
def clear_pool_results(self):
del self.pool_results[:]
프로세스 버전:
'''
Created on Nov 4, 2019
@author: Kevin
'''
import traceback
from helium.loaders.loader_retailers import print_info
from time import sleep
import signal
import os
from multiprocessing import Queue, Process, Value, Array, JoinableQueue, Lock,\
RawArray, Manager
from dill import dill
import ctypes
from helium.misc.utils import ignore_exception
from mem_top import mem_top
import gc
class ProcessPool(object):
def __init__(self, queue_processes, *args, **kwargs):
self.frozen_pool = Value(ctypes.c_bool, kwargs.get('frozen_pool', False))
self.print_queue = kwargs.get('print_queue', True)
self.manager = Manager()
self.pool_results = self.manager.list()
self.queue_processes = queue_processes
self.queue = JoinableQueue()
self.processes = []
for i in range(self.queue_processes):
p = Process(target=self.make_pool_call)
p.start()
self.processes.append(p)
print 'Processes', self.queue_processes
def make_pool_call(self):
while True:
if self.frozen_pool.value:
sleep(1)
continue
item_pickled = self.queue.get()
if item_pickled is None:
#print '--> Ending'
self.queue.task_done()
break
item = dill.loads(item_pickled)
call = item.get('call', None)
args = item.get('args', [])
kwargs = item.get('kwargs', {})
keep_results = item.get('keep_results', False)
try:
result = call(*args, **kwargs)
if keep_results:
self.pool_results.append(dill.dumps((item, result)))
else:
del call, args, kwargs, keep_results, item, result
except Exception as e:
print e
traceback.print_exc()
os.kill(os.getpid(), signal.SIGUSR1)
self.queue.task_done()
def finish_pool_queue(self, callable=None):
self.frozen_pool.value = False
while self.queue._unfinished_tasks.get_value() > 0:
if self.print_queue:
print_info('--> Process pool... %s' % (self.queue._unfinished_tasks.get_value()))
if callable:
callable()
sleep(5)
for i in range(self.queue_processes):
self.queue.put(None)
self.queue.join()
self.queue.close()
for p in self.processes:
with ignore_exception: p.join(10)
with ignore_exception: p.terminate()
with ignore_exception: del self.processes[:]
def get_pool_results(self):
return self.pool_results
def clear_pool_results(self):
del self.pool_results[:]
def test(eg): print 'EG', eg
다음 중 하나로 통화:
tp = ThreadPool(queue_threads=2)
tp.queue.put({'call': test, 'args': [random.randint(0, 100)]})
tp.finish_pool_queue()
또는
pp = ProcessPool(queue_processes=2)
pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]}))
pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]}))
pp.finish_pool_queue()
다중 생산자 및 다중 소비자의 예를 확인했습니다.다른 경우, 단일/다중 생산자, 단일/다중 소비자를 포함하도록 쉽게 수정할 수 있어야 합니다.
from multiprocessing import Process, JoinableQueue
import time
import os
q = JoinableQueue()
def producer():
for item in range(30):
time.sleep(2)
q.put(item)
pid = os.getpid()
print(f'producer {pid} done')
def worker():
while True:
item = q.get()
pid = os.getpid()
print(f'pid {pid} Working on {item}')
print(f'pid {pid} Finished {item}')
q.task_done()
for i in range(5):
p = Process(target=worker, daemon=True).start()
# send thirty task requests to the worker
producers = []
for i in range(2):
p = Process(target=producer)
producers.append(p)
p.start()
# make sure producers done
for p in producers:
p.join()
# block until all workers are done
q.join()
print('All work completed')
설명:
- 이 예에서는 두 명의 생산자와 다섯 명의 소비자가 있습니다.
- JoinableQueue는 큐에 저장된 모든 요소가 처리되도록 하는 데 사용됩니다.'task_done'은 작업자가 요소가 완료되었음을 알리기 위한 것입니다.'q.deven'은 완료된 것으로 표시된 모든 요소를 기다립니다.
- #2에서는 모든 작업자의 대기에 참여할 필요가 없습니다.
- 그러나 모든 생산자가 요소를 대기열에 저장할 때까지 대기에 참여하는 것이 중요합니다.그렇지 않으면 즉시 종료를 프로그래밍합니다.
언급URL : https://stackoverflow.com/questions/11515944/how-to-use-multiprocessing-queue-in-python
'programing' 카테고리의 다른 글
파이썬 람다 식에 여러 개의 문을 가질 수 있습니까? (0) | 2023.07.19 |
---|---|
Python에서 SVG를 PNG로 변환 (0) | 2023.07.19 |
파이썬에서 열거형에 대한 일반적인 관행은 무엇입니까? (0) | 2023.07.19 |
이미지를 PIL에서 개방형 CV 형식으로 변환 (0) | 2023.07.19 |
컬러 리소스에서 컬러 인트를 얻으려면 어떻게 해야 합니까? (0) | 2023.07.19 |