multi processing

This is about Python multiprocessing. The below scenario has an in_queue where items are put into. There are multipe worker processes that get items from the in_queue. When processed, a item is put into the out_queue. All workers share the same in_queue and out_queue. Queue is process safe.

There is another writer process, which reads from the out_queue, as soon as it gets an item, it simply prints it out.

When a process is trying to get an item from a Queue, if no item is available (here it sets timeout to 1 second to demonstrate), it simply waits a bit, and try again.

import queue, time

import multiprocessing

#note, running this from IDE like spyder wont show the prints, something weird with stdout.

#however, run a command line 'python script.py' will show all the prints              

def worker(in_queue, out_queue):

    print('**worker {0} started'.format(os.getpid()))

    while True:

        try:

            item = in_queue.get(timeout=1)

            if item == 'DONE':

                print('**worker {0} quited'.format(os.getpid()))

                break

            else:

                out_queue.put('processed item: {1} '.format(os.getpid(), item))

                print('**worker {0} processed {1} '.format(os.getpid(), item))

        except queue.Empty:

            print('**worker {0} waiting'.format(os.getpid()))

            time.sleep(1)            

            continue 

        

def writer(out_queue):

    print('--writer {0} started'.format(os.getpid()))

    while True:

        try:

            item = out_queue.get(timeout=1)

            if item == 'DONE':

                print('--writer {0} quited'.format(os.getpid()))

                break

            else:

               print('--writer {0} wrote {1} '.format(os.getpid(), item)) 

        except queue.Empty:

            print('--writer {0} waiting'.format(os.getpid()))

            time.sleep(1)            

            continue     

        

if __name__ == '__main__':

    in_queue = multiprocessing.Queue()

    out_queue = multiprocessing.Queue()

    workers = 2

    

    #create writer

    writer_process = multiprocessing.Process(target=writer, args=(out_queue,))

    writer_process.daemon = True

    writer_process.start()

       

    print('started')

    #create workers

    worker_processes = []

    for i in range(workers):

        p = multiprocessing.Process(target=worker, args= (in_queue, out_queue))

        p.daemon = True

        p.start()

        worker_processes.append(p)

   

    #send items to in_queue for worker to process

    for i in range(20):

        in_queue.put('something-{0}'.format(i))

        time.sleep(1)

    

    #send signals for workers to quit

    for i in range(workers):

        in_queue.put('DONE')

    

    #wait until the workers finalize and quit

    for p in worker_processes:

        p.join()

        

    #signal the writer process to quit    

    out_queue.put('DONE')

    

    #wait for the writer process to quit

    writer_process.join   

        

    #wait a bit for the writer and workers to print messages

    #otherwise, it finishes too quickly and the messages dont have a chance to print

    time.sleep(5)

    print('exited')   

    

The result is something like this:

started--writer 14420 started**worker 17764 started**worker 17764 processed something-0--writer 14420 wrote processed item: something-0**worker 20944 started**worker 17764 processed something-1--writer 14420 wrote processed item: something-1**worker 20944 waiting**worker 17764 waiting--writer 14420 waiting**worker 20944 processed something-2--writer 14420 wrote processed item: something-2**worker 20944 processed something-3--writer 14420 wrote processed item: something-3**worker 17764 waiting**worker 20944 processed something-4--writer 14420 wrote processed item: something-4**worker 20944 waiting--writer 14420 waiting**worker 17764 processed something-5**worker 17764 waiting--writer 14420 wrote processed item: something-5**worker 20944 processed something-6--writer 14420 wrote processed item: something-6**worker 20944 processed something-7--writer 14420 wrote processed item: something-7**worker 20944 processed something-8--writer 14420 wrote processed item: something-8**worker 20944 processed something-9--writer 14420 wrote processed item: something-9**worker 20944 processed something-10--writer 14420 wrote processed item: something-10--writer 14420 wrote processed item: something-11**worker 20944 processed something-11**worker 20944 processed something-12--writer 14420 wrote processed item: something-12**worker 20944 processed something-13--writer 14420 wrote processed item: something-13**worker 20944 processed something-14--writer 14420 wrote processed item: something-14**worker 20944 waiting--writer 14420 waiting**worker 17764 processed something-15**worker 17764 processed something-16--writer 14420 wrote processed item: something-15--writer 14420 wrote processed item: something-16**worker 17764 waiting**worker 20944 waiting--writer 14420 waiting**worker 20944 processed something-17**worker 17764 processed something-18--writer 14420 wrote processed item: something-17--writer 14420 wrote processed item: something-18**worker 17764 waiting--writer 14420 waiting**worker 20944 waiting**worker 20944 processed something-19**worker 17764 quited--writer 14420 wrote processed item: something-19**worker 20944 quited--writer 14420 quitedexited