multi processing
This is about Python multiprocessing. The below scenario has an in_queue where items are put into. There are multipe worker processes that get items from the in_queue. When processed, a item is put into the out_queue. All workers share the same in_queue and out_queue. Queue is process safe.
There is another writer process, which reads from the out_queue, as soon as it gets an item, it simply prints it out.
When a process is trying to get an item from a Queue, if no item is available (here it sets timeout to 1 second to demonstrate), it simply waits a bit, and try again.
import queue, time
import multiprocessing
#note, running this from IDE like spyder wont show the prints, something weird with stdout.
#however, run a command line 'python script.py' will show all the prints
def worker(in_queue, out_queue):
print('**worker {0} started'.format(os.getpid()))
while True:
try:
item = in_queue.get(timeout=1)
if item == 'DONE':
print('**worker {0} quited'.format(os.getpid()))
break
else:
out_queue.put('processed item: {1} '.format(os.getpid(), item))
print('**worker {0} processed {1} '.format(os.getpid(), item))
except queue.Empty:
print('**worker {0} waiting'.format(os.getpid()))
time.sleep(1)
continue
def writer(out_queue):
print('--writer {0} started'.format(os.getpid()))
while True:
try:
item = out_queue.get(timeout=1)
if item == 'DONE':
print('--writer {0} quited'.format(os.getpid()))
break
else:
print('--writer {0} wrote {1} '.format(os.getpid(), item))
except queue.Empty:
print('--writer {0} waiting'.format(os.getpid()))
time.sleep(1)
continue
if __name__ == '__main__':
in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()
workers = 2
#create writer
writer_process = multiprocessing.Process(target=writer, args=(out_queue,))
writer_process.daemon = True
writer_process.start()
print('started')
#create workers
worker_processes = []
for i in range(workers):
p = multiprocessing.Process(target=worker, args= (in_queue, out_queue))
p.daemon = True
p.start()
worker_processes.append(p)
#send items to in_queue for worker to process
for i in range(20):
in_queue.put('something-{0}'.format(i))
time.sleep(1)
#send signals for workers to quit
for i in range(workers):
in_queue.put('DONE')
#wait until the workers finalize and quit
for p in worker_processes:
p.join()
#signal the writer process to quit
out_queue.put('DONE')
#wait for the writer process to quit
writer_process.join
#wait a bit for the writer and workers to print messages
#otherwise, it finishes too quickly and the messages dont have a chance to print
time.sleep(5)
print('exited')
The result is something like this:
started--writer 14420 started**worker 17764 started**worker 17764 processed something-0--writer 14420 wrote processed item: something-0**worker 20944 started**worker 17764 processed something-1--writer 14420 wrote processed item: something-1**worker 20944 waiting**worker 17764 waiting--writer 14420 waiting**worker 20944 processed something-2--writer 14420 wrote processed item: something-2**worker 20944 processed something-3--writer 14420 wrote processed item: something-3**worker 17764 waiting**worker 20944 processed something-4--writer 14420 wrote processed item: something-4**worker 20944 waiting--writer 14420 waiting**worker 17764 processed something-5**worker 17764 waiting--writer 14420 wrote processed item: something-5**worker 20944 processed something-6--writer 14420 wrote processed item: something-6**worker 20944 processed something-7--writer 14420 wrote processed item: something-7**worker 20944 processed something-8--writer 14420 wrote processed item: something-8**worker 20944 processed something-9--writer 14420 wrote processed item: something-9**worker 20944 processed something-10--writer 14420 wrote processed item: something-10--writer 14420 wrote processed item: something-11**worker 20944 processed something-11**worker 20944 processed something-12--writer 14420 wrote processed item: something-12**worker 20944 processed something-13--writer 14420 wrote processed item: something-13**worker 20944 processed something-14--writer 14420 wrote processed item: something-14**worker 20944 waiting--writer 14420 waiting**worker 17764 processed something-15**worker 17764 processed something-16--writer 14420 wrote processed item: something-15--writer 14420 wrote processed item: something-16**worker 17764 waiting**worker 20944 waiting--writer 14420 waiting**worker 20944 processed something-17**worker 17764 processed something-18--writer 14420 wrote processed item: something-17--writer 14420 wrote processed item: something-18**worker 17764 waiting--writer 14420 waiting**worker 20944 waiting**worker 20944 processed something-19**worker 17764 quited--writer 14420 wrote processed item: something-19**worker 20944 quited--writer 14420 quitedexited