Run Python Code In Parallel Using Multiprocessing

multiprocessing in Python

Multiprocessing allows the pc to make the most of a number of cores of a CPU to run duties/processes in parallel. This parallelization results in important speedup in duties that contain numerous computation. This article will cowl multiprocessing in Python; it’ll begin by illustrating multiprocessing in Python with some fundamental sleep strategies after which end up with a real-world picture processing instance. In certainly one of our current articles, we mentioned utilizing multithreading in Python to hurry up applications; I like to recommend studying that earlier than persevering with.

Multiprocessing in Python 

Like the threading module, the multiprocessing module comes with the Python customary library. You can create processes by making a Process object utilizing a callable object or operate or by inheriting the Process class and overriding the run() technique. Let’s create the dummy operate we are going to use as an instance the fundamentals of multiprocessing in Python.

 import time
 def useless_function(sec = 1):
     print(f'Sleeping for sec second(s)')
     time.sleep(sec)
     print(f'Done sleeping')
 begin = time.perf_counter()
 useless_function()
 useless_function()
 finish = time.perf_counter()
 print(f'Finished in spherical(end-start, 2) second(s)') 
-----------------------------Output-----------------------------
 Sleeping for 1 second(s)
 Done sleeping
 Sleeping for 1 second(s)
 Done sleeping
 Finished in 2.02 second(s) 

Using the Thread() Constructor

Running the operate twice sequentially took roughly two seconds as anticipated. Let’s create two processes, run them in parallel and see how that pans out. 

 import multiprocessing
 begin = time.perf_counter()
 process1 = multiprocessing.Process(goal=useless_function)
 process2 = multiprocessing.Process(goal=useless_function)
 process1.begin()
 process2.begin()
 finish = time.perf_counter()
 print(f'Finished in spherical(end-start, 2) second(s)') 
-----------------------------Output-----------------------------
 Finished in 0.02 second(s)
 Sleeping for 1 second(s)
 Sleeping for 1 second(s) 

Something appears improper with the output, granted that we forgot to attend for the processes to complete however in accordance with the output, the processes have been began after this system completed execution. The output seems on this order as a result of it takes some time to create the processes and get them operating. This isn’t the case for threads that begin immediately. Like threads, the be part of() technique is used to attend for the processes to complete execution. 

 begin = time.perf_counter()
 process1.begin()
 process2.begin()
 process1.be part of()
 process2.be part of()
 finish = time.perf_counter()
 print(f'Finished in spherical(end-start, 2) second(s)') 
-----------------------------Output-----------------------------
 Sleeping for 1 second(s)
 Sleeping for 1 second(s)
 Done sleeping
 Done sleeping
 Finished in 1.04 second(s) 

Right now we’re not getting that huge of a speedup, however that’s primarily as a result of our operate doesn’t take an excessive amount of time to execute, and we’re solely operating it twice. What if we need to run it ten occasions? If we have been to run it sequentially, it could take a bit of over ten seconds as a result of one must end earlier than the opposite. However, if we run these parallelly in a number of processes, it must be considerably sooner. Instead of manually creating the ten processes, let’s create and begin these in a loop. 

Unlike threads, when passing arguments to processes, the arguments have to be serializable utilizing pickle. Simply put, serialization means changing python objects right into a format (binary format) that may be deconstructed and reconstructed in one other python script.

 begin = time.perf_counter()
 processes = []
 for _ in vary(10):
     p = multiprocessing.Process(goal=useless_function, args = [2])
     p.begin()
     processes.append(p) 

Now we will’t run be part of() inside the identical loop as a result of it could watch for the method to complete earlier than looping and creating the following one. So it could be the identical as operating them sequentially.

 for p in processes:
     p.be part of()
 finish = time.perf_counter()
 print(f'Finished in spherical(end-start, 2) second(s)')     
-----------------------------Output-----------------------------
 Sleeping for two second(s)
 Sleeping for two second(s)
 Sleeping for two second(s)
 Sleeping for two second(s)
 Sleeping for two second(s)
 Sleeping for two second(s)
 Sleeping for two second(s)
 Sleeping for two second(s)
 Sleeping for two second(s)
 Sleeping for two second(s)
 Done sleeping
 Done sleeping
 Done sleeping
 Done sleeping
 Done sleeping
 Done sleeping
 Done sleeping
 Done sleeping
 Done sleeping
 Done sleeping
 Finished in 2.15 second(s) 

Even when operating the capabilities ten occasions, it finishes in about two seconds. Now, this does appear a bit unusual, seeing that my processor solely has Four cores. However, the pc has its personal abstracted methods of switching cores when certainly one of them isn’t busy (trace: multithreading).

Creating a Custom Process Class

To create your personal customized course of class, you may inherit the Process class and override its run() technique.

  from multiprocessing import Process
  def countdown(identify, delay, depend):
      whereas depend:
          time.sleep(delay)
          print (f'identify, time.ctime(time.time()), depend')
          depend -= 1
  class newProcess(Process):
      def __init__(self, identify, depend):
          multiprocessing.Process.__init__(self)
          self.identify = identify
          self.depend = depend
      def run(self):
          print("Starting: " + self.identify + "n")
          countdown(self.identify, 1,self.depend)
          print("Exiting: " + self.identify + "n")
  t = newProcess("newProcess 1", 5)
  t.begin()
  t.be part of()
  print("Done")  
-----------------------------Output-----------------------------
 Starting: newProcess 1
 ('newProcess 1', 'Fri Apr 30 07:24:56 2021', 5)
 ('newProcess 1', 'Fri Apr 30 07:24:57 2021', 4)
 ('newProcess 1', 'Fri Apr 30 07:24:58 2021', 3)
 ('newProcess 1', 'Fri Apr 30 07:24:59 2021', 2)
 ('newProcess 1', 'Fri Apr 30 07:25:00 2021', 1)
 Exiting: newProcess 1
 Done 

Using ProcessPoolExecutor

In addition to utilizing the multiprocessing library, there’s one other method of operating processes. In Python 3.2, they launched ProcessPoolExecuter. It is a extra environment friendly method of operating a number of processes. It additionally permits us to modify over to utilizing a number of threads as a substitute of processes with minimal adjustments. If we need to execute the operate one after the other, we will use the submit() technique. It schedules the goal operate for execution and returns a futures object. This futures object encapsulates the operate’s execution and permits us to verify that it’s operating or if it’s achieved and fetch the return worth utilizing end result()

Let’s redefine the dummy operate, so it has a return worth and illustrates using  ProcessPoolExecuter.

 import concurrent.futures
 begin = time.perf_counter()
 def useless_function(sec = 1):
     print(f'Sleeping for sec second(s)')
     time.sleep(sec)
     print(f'Done sleeping')
     return sec
 with concurrent.futures.ProcessPoolExecutor() as executor:
     process1 = executor.submit(useless_function, 1)
     process2 = executor.submit(useless_function, 1)
     print(f'Return Value: process1.end result()')
     print(f'Return Value: process2.end result()')
 finish = time.perf_counter()
 print(f'Finished in spherical(end-start, 2) second(s)') 
-----------------------------Output-----------------------------
 Sleeping for 1 second(s)
 Sleeping for 1 second(s)
 Done sleeping
 Done sleeping
 Return Value: 1
 Return Value: 1
 Finished in 1.06 second(s) 

If we need to run this ten occasions, we must create two loops, one for creating the processes and one other for fetching their outcomes. A greater method of doing this might be the as_completed() technique. The as_completed() technique returns an iterator that we will loop over to get the outcomes of the processes as they’re accomplished, i.e, within the order of their completion.

See Also


 with concurrent.futures.ProcessPoolExecutor() as executor:
     secs = [5, 4, 3, 2, 1]
     pool = [executor.submit(useless_function, i) for i in secs]
     for i in concurrent.futures.as_completed(pool):
         print(f'Return Value: i.end result()')
 finish = time.perf_counter()
 print(f'Finished in spherical(end-start, 2) second(s)') 
-----------------------------Output-----------------------------
 Sleeping for five second(s)
 Sleeping for Four second(s)
 Done sleeping
 Sleeping for Three second(s)
 Return Value: 4
 Done sleeping
 Sleeping for two second(s)
 Return Value: 5
 Done sleeping
 Done sleeping
 Sleeping for 1 second(s)
 Return Value: 2
 Return Value: 3
 Done sleeping
 Return Value: 1
 Finished in 6.07 second(s) 

To keep away from utilizing loops altogether, we will use the map() technique. This map() technique is much like the built-in map() technique; it runs the operate for each merchandise of the iterable we move in. It simply makes use of processes relatively than doing it sequentially. And as a substitute of returning a futures object, it returns an iterable containing the outcomes. These outcomes are within the order the processes have been began, not within the order they’re accomplished. Another factor to notice is that if our operate raises an exception, it received’t increase it whereas operating the method; the exception shall be raised when its worth is retrieved from the outcomes iterator. 

 begin = time.perf_counter()
 with concurrent.futures.ProcessPoolExecutor() as executor:
     secs = [5, 4, 3, 2, 1]
     pool = executor.map(useless_function, sec)
     for res in pool:
         print(f'Return Value: res')
 finish = time.perf_counter()
 print(f'Finished in spherical(end-start, 2) second(s)') 
-----------------------------Output-----------------------------
 Sleeping for five second(s)
 Sleeping for Four second(s)
 Done sleeping
 Sleeping for Three second(s)
 Done sleeping
 Sleeping for two second(s)
 Return Value: 5
 Return Value: 4
 Done sleeping
 Done sleeping
 Sleeping for 1 second(s)
 Return Value: 3
 Return Value: 2
 Done sleeping
 Return Value: 1
 Finished in 6.06 second(s) 

Parallelized Image Augmentation 

To display using multiprocessing in a considerably reasonable setting we are going to proceed with the pictures instance used within the multithreading article, and carry out some picture augmentation on the pictures we downloaded from Pexels. Although picture augmentation is a computation-intensive activity, it’s not at all the proper use case for multiprocessing as a result of it does contain a good bit of I/O operations.

Running the Image Augmentation Function Sequentially 

 from PIL import Image, ImageFilter
 file_names = ['305821.jpg', '509922.jpg', '325812.jpg',
             '1252814.jpg', '1420709.jpg', '963486.jpg',
             '1557183.jpg', '3023211.jpg', '1031641.jpg',
             '439227.jpg', '696644.jpg', '911254.jpg',
             '1001990.jpg', '3518623.jpg', '916044.jpg']
 begin = time.perf_counter()
 dimension = (1200, 1200)
 def augment_image(img_name):
     img = Image.open(img_name)
     img = img.filter(ImageFilter.GaussianBlur(15))
     img.thumbnail(dimension)
     img.save(f'augmented-img_name')
     print(f'img_name was augmented...')
 for f in file_names:
     augment_image(f)
 finish = time.perf_counter()
 print(f'Finished in spherical(end-start, 2) seconds') 
-----------------------------Output-----------------------------
 305821.jpg was augmented...
 509922.jpg was augmented...
 325812.jpg was augmented...
 1252814.jpg was augmented...
 1420709.jpg was augmented...
 963486.jpg was augmented...
 1557183.jpg was augmented...
 3023211.jpg was augmented...
 1031641.jpg was augmented...
 439227.jpg was augmented...
 696644.jpg was augmented...
 911254.jpg was augmented...
 1001990.jpg was augmented...
 3518623.jpg was augmented...
 916044.jpg was augmented...
 Finished in 20.66153374500027 seconds 

Running the Function in Parallel utilizing Multiprocessing

 begin = time.perf_counter()
 with concurrent.futures.ProcessPoolExecutor() as executor:
     executor.map(augment_image, file_names)
 finish = time.perf_counter()
 print(f'Finished in spherical(end-start, 2) seconds') 
-----------------------------Output-----------------------------
 509922.jpg was augmented...
 305821.jpg was augmented...
 325812.jpg was augmented...
 1420709.jpg was augmented...
 1252814.jpg was augmented...
 963486.jpg was augmented...
 1557183.jpg was augmented...
 3023211.jpg was augmented...
 1031641.jpg was augmented...
 696644.jpg was augmented...
 911254.jpg was augmented...
 1001990.jpg was augmented...
 439227.jpg was augmented...
 3518623.jpg was augmented...
 916044.jpg was augmented...
 Finished in 8.63 seconds 

Using multiprocessing allows this system to complete execution in nearly one-third the time of sequential execution.

To be taught extra concerning the Python multiprocessing module, consult with the official documentation and thw source code.

Want to be taught extra concerning the ins and outs of Python? Check out these articles:


Join Our Telegram Group. Be a part of an attractive on-line group. Join Here.

Subscribe to our Newsletter

Get the most recent updates and related gives by sharing your electronic mail.

LEAVE A REPLY

Please enter your comment!
Please enter your name here